当前位置:网站首页 / C#开发 / 正文

ay的RabbitMQ研究报告-第7章-C#代码实战[3]-发布与订阅

时间:2016年06月01日 | 作者 : aaronyang | 分类 : C#开发 | 浏览: 1089次 | 评论 0

(国内第一个rabbitMQ中文 系列讲解------引进外国技术,我是AY,安徽合肥的杨洋,1991年。我学习技术,我开心)

为了给自己站点增加流量,望各位抓包的,手下留情,自己去外国下载书籍,回来翻译着看,也不容易。谢谢你了。


====================www.ayjs.net 杨洋 wpfui.com ayui ay aaronyang=======请不要转载谢谢了。


虽然有多个消费者,但是一个消息只会有一个消费者来处理。而订阅和发布则是每个订阅该消息的消费者都会收到这个消息。RabbitMQ的路由机制让我们实现这个功能轻而易举

要了解RabbitMQ的路由机制,exchange是一个关键。exchange可以叫做交换机,也似乎可以叫做路由器,反正它是用来选择路由的。前文说到,RabbitMQ的核心思想就是消息的发布者不是直接把消息发送到目标队列中的,事实上,通常它并不知道消息要发到哪个队列中,它只知道把消息队列发送到exchange中。exchange一边接收发送者发过来的消息,而另一边则把消息发送到目标队列中去。exchange一定知道哪些队列需要接收这个消息,是加到一个队列里还是加到好几个队列里,还是直接扔掉


新建解决方案文件夹3.Publisher and Subscriber,新建2个控制台程序,然后引用common和RabbitMQ 客户端dll

publisher代码如下:

using Common;
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Pulisher
{
    class Program
    {
        private static ConnectionFactory _factory;
        private static IConnection _connection;
        private static IModel _model;
        private const string ExchangeName = "PublishSubscribe_Exchange";
        private static void CreateConnection()
        {
            _factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "guest",
                Password = "guest"
            };
            _connection = _factory.CreateConnection();
            _model = _connection.CreateModel();
            _model.ExchangeDeclare(ExchangeName, "fanout");
        }

        private static void SendMessage(Payment message)
        {
            {
                _model.BasicPublish(ExchangeName, "", null, message.Serialize());
                Console.WriteLine(" Payment Sent {0}, £{1}", message.CardNumber,
                message.AmountToPay);
            }
        }

        static void Main(string[] args)
        {
            var payment1 = new Payment
            {
                AmountToPay = 25.0m,
                CardNumber = "1234123412341234"
            };
            var payment2 = new Payment
            {
                AmountToPay = 5.0m,
                CardNumber = "1234123412341234"
            };
            var payment3 = new Payment
            {
                AmountToPay = 2.0m,
                CardNumber = "1234123412341234"
            };
            var payment4 = new Payment
            {
                AmountToPay = 17.0m,
                CardNumber = "1234123412341234"
            };
            var payment5 = new Payment
            {
                AmountToPay = 300.0m,
                CardNumber =
                    "1234123412341234"
            };
            var payment6 = new Payment
            {
                AmountToPay = 350.0m,
                CardNumber =
                    "1234123412341234"
            };
            var payment7 = new Payment
            {
                AmountToPay = 295.0m,
                CardNumber =
                    "1234123412341234"
            };
            var payment8 = new Payment
            {
                AmountToPay = 5625.0m,
                CardNumber =
                    "1234123412341234"
            };
            var payment9 = new Payment
            {
                AmountToPay = 5.0m,
                CardNumber = "1234123412341234"
            };
            var payment10 = new Payment
            {
                AmountToPay = 12.0m,
                CardNumber =
                    "1234123412341234"
            };
            CreateConnection();
            SendMessage(payment1);
            SendMessage(payment2);
            SendMessage(payment3);
            SendMessage(payment4);
            SendMessage(payment5);
            SendMessage(payment6);
            SendMessage(payment7);
            SendMessage(payment8);
            SendMessage(payment9);
            SendMessage(payment10);
        }
    }
}

====================www.ayjs.net 杨洋 wpfui.com ayui ay aaronyang=======请不要转载谢谢了。=========


这块代码跟我们在Worker Queue的Demo是几乎一样的。查看

唯一不一样的,我们创建了一个fanout类型的exchange路由器。

_model.ExchangeDeclare(ExchangeName, "fanout");

 _model.BasicPublish(ExchangeName, "", null, message.Serialize());


接下来添加订阅的消费者

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Common;
using RabbitMQ.Client;

namespace Subscriber
{
    class Program
    {
        private static ConnectionFactory _factory;
        private static IConnection _connection;
        private static QueueingBasicConsumer _consumer;
        private const string ExchangeName = "PublishSubscribe_Exchange";
        private static string DeclareAndBindQueueToExchange(IModel channel)
        {
            channel.ExchangeDeclare(ExchangeName, "fanout");
            var queueName = channel.QueueDeclare().QueueName;
            channel.QueueBind(queueName, ExchangeName, "");
            _consumer = new QueueingBasicConsumer(channel); 
            return queueName;
        }
        static void Main()
        {
            _factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "guest",
                Password = "guest"
            };
            using (_connection = _factory.CreateConnection())
            {
                using (var channel = _connection.CreateModel())
                {
                    var queueName = DeclareAndBindQueueToExchange(channel);
                    channel.BasicConsume(queueName, true, _consumer);
                    while (true)
                    {
                        var ea = _consumer.Queue.Dequeue();
                        var message = (Payment)ea.Body.DeSerialize();
                        Console.WriteLine("----- Payment Processed {0} : {1}",
                        message.CardNumber, message.AmountToPay);
                    }
                }
            }

      
        }
    }
}

消费者代码ay的讲解:

ExchangeDeclare 路由器声明:RabbitMQ存在就返回存在的,不存在就创建新的返回,跟QueueDeclare是一样的。

这里我们指定queue的名字,跟以前的例子不太一样,对于订阅程序,queuename有点特殊,需要系统的方式生成queue的name。

例如

amq.gen-TsdoX9qziswm9QCbdkp9Zw

每次启动订阅端,都会创建一个channel,每一个channel都会按照PublishSubscribe_Exchange这个名字找到exchange,然后binding一个queue,然后RabbitMQ指定名字,然后消费。

运行2个订阅者

GIF1.gif

打开管理器

blob.png

2个订阅者在这个邮箱里面,因为我们启动一个消费者,就随机一个名字绑定了这个路由器。所以发布者往这个路由器发布消息,路由器都会把消息发给订阅在这个路由器上的 queue。因为是fanout类型,也就是广播,绑定了,就给你推送消息。


如果发布者没有启动,消费者启动,打开管理器

blob.png

此时消费者连接着的,一旦发布者连接上且发布消息了。这些连接的消费者就会收到消息。当然后来的消费者,就不会收到连接之前的消息。



====================www.ayjs.net       杨洋    wpfui.com        ayui      ay  aaronyang=======请不要转载谢谢了。=========





推荐您阅读更多有关于“RabbitMQ,”的文章

猜你喜欢

额 本文暂时没人评论 来添加一个吧

发表评论

必填

选填

选填

必填

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

  查看权限

抖音号:wpfui,可以看到我的很多作品效果

AYUI8社区版Github地址:前往获取

作者:杨洋(AaronYang简称AY,安徽六安人)目前是个人,还没公司AY唯一QQ:875556003和AY交流

高中学历,2015年1月17日开始,兴趣学习研究WPF,目前工作繁忙,不接任何活

声明:AYUI7个人与商用免费,源码可购买。部分DEMO不免费.AY主要靠卖技术服务挣钱

不是从我处购买的ayui7源码,我不提供任何技术服务,如果你举报从哪里买的,我可以帮你转正为我的客户,并送demo

查看捐赠

AYUI7.X MVC教程 更新如下:

第一课 第二课 程序加密教程

兼容XP到win10,vs2015/2017/2019,最新AYUI:7.6.5.5

vs2015 企业版密钥HM6NR-QXX7C-DFW2Y-8B82K-WTYJV

vs2017 企业版密钥NJVYC-BMHX2-G77MM-4XJMR-6Q8QF

标签列表