当前位置:网站首页 / C#开发 / 正文

ay的RabbitMQ研究报告-第7章-C#代码实战[3]-发布与订阅

时间:2016年06月01日 | 作者 : aaronyang | 分类 : C#开发 | 浏览: 2245次 | 评论 0

(国内第一个rabbitMQ中文 系列讲解------引进外国技术,我是AY,安徽合肥的杨洋,1991年。我学习技术,我开心)

为了给自己站点增加流量,望各位抓包的,手下留情,自己去外国下载书籍,回来翻译着看,也不容易。谢谢你了。


====================www.ayjs.net 杨洋 wpfui.com ayui ay aaronyang=======请不要转载谢谢了。


虽然有多个消费者,但是一个消息只会有一个消费者来处理。而订阅和发布则是每个订阅该消息的消费者都会收到这个消息。RabbitMQ的路由机制让我们实现这个功能轻而易举

要了解RabbitMQ的路由机制,exchange是一个关键。exchange可以叫做交换机,也似乎可以叫做路由器,反正它是用来选择路由的。前文说到,RabbitMQ的核心思想就是消息的发布者不是直接把消息发送到目标队列中的,事实上,通常它并不知道消息要发到哪个队列中,它只知道把消息队列发送到exchange中。exchange一边接收发送者发过来的消息,而另一边则把消息发送到目标队列中去。exchange一定知道哪些队列需要接收这个消息,是加到一个队列里还是加到好几个队列里,还是直接扔掉


新建解决方案文件夹3.Publisher and Subscriber,新建2个控制台程序,然后引用common和RabbitMQ 客户端dll

publisher代码如下:

using Common;
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Pulisher
{
    class Program
    {
        private static ConnectionFactory _factory;
        private static IConnection _connection;
        private static IModel _model;
        private const string ExchangeName = "PublishSubscribe_Exchange";
        private static void CreateConnection()
        {
            _factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "guest",
                Password = "guest"
            };
            _connection = _factory.CreateConnection();
            _model = _connection.CreateModel();
            _model.ExchangeDeclare(ExchangeName, "fanout");
        }

        private static void SendMessage(Payment message)
        {
            {
                _model.BasicPublish(ExchangeName, "", null, message.Serialize());
                Console.WriteLine(" Payment Sent {0}, £{1}", message.CardNumber,
                message.AmountToPay);
            }
        }

        static void Main(string[] args)
        {
            var payment1 = new Payment
            {
                AmountToPay = 25.0m,
                CardNumber = "1234123412341234"
            };
            var payment2 = new Payment
            {
                AmountToPay = 5.0m,
                CardNumber = "1234123412341234"
            };
            var payment3 = new Payment
            {
                AmountToPay = 2.0m,
                CardNumber = "1234123412341234"
            };
            var payment4 = new Payment
            {
                AmountToPay = 17.0m,
                CardNumber = "1234123412341234"
            };
            var payment5 = new Payment
            {
                AmountToPay = 300.0m,
                CardNumber =
                    "1234123412341234"
            };
            var payment6 = new Payment
            {
                AmountToPay = 350.0m,
                CardNumber =
                    "1234123412341234"
            };
            var payment7 = new Payment
            {
                AmountToPay = 295.0m,
                CardNumber =
                    "1234123412341234"
            };
            var payment8 = new Payment
            {
                AmountToPay = 5625.0m,
                CardNumber =
                    "1234123412341234"
            };
            var payment9 = new Payment
            {
                AmountToPay = 5.0m,
                CardNumber = "1234123412341234"
            };
            var payment10 = new Payment
            {
                AmountToPay = 12.0m,
                CardNumber =
                    "1234123412341234"
            };
            CreateConnection();
            SendMessage(payment1);
            SendMessage(payment2);
            SendMessage(payment3);
            SendMessage(payment4);
            SendMessage(payment5);
            SendMessage(payment6);
            SendMessage(payment7);
            SendMessage(payment8);
            SendMessage(payment9);
            SendMessage(payment10);
        }
    }
}

====================www.ayjs.net 杨洋 wpfui.com ayui ay aaronyang=======请不要转载谢谢了。=========


这块代码跟我们在Worker Queue的Demo是几乎一样的。查看

唯一不一样的,我们创建了一个fanout类型的exchange路由器。

_model.ExchangeDeclare(ExchangeName, "fanout");

 _model.BasicPublish(ExchangeName, "", null, message.Serialize());


接下来添加订阅的消费者

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Common;
using RabbitMQ.Client;

namespace Subscriber
{
    class Program
    {
        private static ConnectionFactory _factory;
        private static IConnection _connection;
        private static QueueingBasicConsumer _consumer;
        private const string ExchangeName = "PublishSubscribe_Exchange";
        private static string DeclareAndBindQueueToExchange(IModel channel)
        {
            channel.ExchangeDeclare(ExchangeName, "fanout");
            var queueName = channel.QueueDeclare().QueueName;
            channel.QueueBind(queueName, ExchangeName, "");
            _consumer = new QueueingBasicConsumer(channel); 
            return queueName;
        }
        static void Main()
        {
            _factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "guest",
                Password = "guest"
            };
            using (_connection = _factory.CreateConnection())
            {
                using (var channel = _connection.CreateModel())
                {
                    var queueName = DeclareAndBindQueueToExchange(channel);
                    channel.BasicConsume(queueName, true, _consumer);
                    while (true)
                    {
                        var ea = _consumer.Queue.Dequeue();
                        var message = (Payment)ea.Body.DeSerialize();
                        Console.WriteLine("----- Payment Processed {0} : {1}",
                        message.CardNumber, message.AmountToPay);
                    }
                }
            }

      
        }
    }
}

消费者代码ay的讲解:

ExchangeDeclare 路由器声明:RabbitMQ存在就返回存在的,不存在就创建新的返回,跟QueueDeclare是一样的。

这里我们指定queue的名字,跟以前的例子不太一样,对于订阅程序,queuename有点特殊,需要系统的方式生成queue的name。

例如

amq.gen-TsdoX9qziswm9QCbdkp9Zw

每次启动订阅端,都会创建一个channel,每一个channel都会按照PublishSubscribe_Exchange这个名字找到exchange,然后binding一个queue,然后RabbitMQ指定名字,然后消费。

运行2个订阅者

GIF1.gif

打开管理器

blob.png

2个订阅者在这个邮箱里面,因为我们启动一个消费者,就随机一个名字绑定了这个路由器。所以发布者往这个路由器发布消息,路由器都会把消息发给订阅在这个路由器上的 queue。因为是fanout类型,也就是广播,绑定了,就给你推送消息。


如果发布者没有启动,消费者启动,打开管理器

blob.png

此时消费者连接着的,一旦发布者连接上且发布消息了。这些连接的消费者就会收到消息。当然后来的消费者,就不会收到连接之前的消息。



====================www.ayjs.net       杨洋    wpfui.com        ayui      ay  aaronyang=======请不要转载谢谢了。=========





推荐您阅读更多有关于“RabbitMQ,”的文章

猜你喜欢

额 本文暂时没人评论 来添加一个吧

发表评论

必填

选填

选填

必填

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

  查看权限

抖音:wpfui 工作wpf

目前在合肥企迈科技公司上班,加我QQ私聊

2023年11月网站停运,将搬到CSDN上

AYUI8全源码 Github地址:前往获取

杨洋(AaronYang简称AY,安徽六安人)AY唯一QQ:875556003和AY交流

高中学历,2010年开始web开发,2015年1月17日开始学习WPF

声明:AYUI7个人与商用免费,源码可购买。部分DEMO不免费

查看捐赠

AYUI7.X MVC教程 更新如下:

第一课 第二课 程序加密教程

标签列表