当前位置:网站首页 / C#开发 / 正文

ay的RabbitMQ研究报告-第7章-C#代码实战[5]-基于Topic的发布与订阅

时间:2016年06月01日 | 作者 : aaronyang | 分类 : C#开发 | 浏览: 2486次 | 评论 0

(国内第一个rabbitMQ中文 系列讲解------引进外国技术,我是AY,安徽合肥的杨洋,1991年。我学习技术,我开心)

为了给自己站点增加流量,望各位抓包的,手下留情,自己去外国下载书籍,回来翻译着看,也不容易。谢谢你了。


====================www.ayjs.net 杨洋 wpfui.com ayui ay aaronyang=======请不要转载谢谢了。

基于Topic的发布与订阅

blob.png

我们在前面已经说过了, *号代表1个单词,#代表0 个或者更多的单词

Topic类型的Exchange是一个类似 正则表达式,匹配合适的routekey的 queue的


====================www.ayjs.net 杨洋 wpfui.com ayui ay aaronyang=======请不要转载谢谢了。=========


新建解决方案文件夹,然后TopicPublish和TopicCustomer控制台程序

using Common;
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace TopicPublish
{
    class Program
    {
        private static ConnectionFactory _factory;
        private static IConnection _connection;
        private static IModel _model;
        private const string ExchangeName = "Topic_Exchange";
        static void Main()
        {
            var payment1 = new Payment
            {
                AmountToPay = 25.0m,
                CardNumber =
                    "1234123412341234",
                Name = "Mr F Bloggs"
            };
            var payment2 = new Payment
            {
                AmountToPay = 5.0m,
                CardNumber =
                    "1234123412341234",
                Name = "Mr S Simpson"
            };
            var payment3 = new Payment
            {
                AmountToPay = 2.0m,
                CardNumber =
                    "1234123412341234",
                Name = "Mr G Washington"
            };
            var payment4 = new Payment
            {
                AmountToPay = 17.0m,
                CardNumber =
                    "1234123412341234",
                Name = "Mr B Gates"
            };
            var payment5 = new Payment
            {
                AmountToPay = 300.0m,
                CardNumber =
                    "1234123412341234",
                Name = "Mrs K Kardashian"
            };
            var payment6 = new Payment
            {
                AmountToPay = 350.0m,
                CardNumber =
                    "1234123412341234",
                Name = "Mrs M Whitehouse"
            };
            var payment7 = new Payment
            {
                AmountToPay = 295.0m,
                CardNumber =
                    "1234123412341234",
                Name = "Mrs E Windsor"
            };
            var payment8 = new Payment
            {
                AmountToPay = 5625.0m,
                CardNumber =
                    "1234123412341234",
                Name = "Mr B Obama"
            };
            var payment9 = new Payment
            {
                AmountToPay = 5.0m,
                CardNumber =
                    "1234123412341234",
                Name = "Mr S Haunts"
            };
            var payment10 = new Payment
            {
                AmountToPay = 12.0m,
                CardNumber =
                    "1234123412341234",
                Name = "Mr F Bloggs Jr"
            };
            var purchaseOrder1 = new PurchaseOrder
            {
                AmountToPay = 50.0m,
                CompanyName
                    = "Company A",
                PaymentDayTerms = 75,
                PoNumber = "123434A"
            };
            var purchaseOrder2 = new PurchaseOrder
            {
                AmountToPay = 150.0m,
                CompanyName = "Company B",
                PaymentDayTerms = 75,
                PoNumber = "193434B"
            };
            var purchaseOrder3 = new PurchaseOrder
            {
                AmountToPay = 12.0m,
                CompanyName
                    = "Company C",
                PaymentDayTerms = 75,
                PoNumber = "196544A"
            };
            var purchaseOrder4 = new PurchaseOrder
            {
                AmountToPay = 2150.0m,
                CompanyName = "Company D",
                PaymentDayTerms = 75,
                PoNumber = "234434H"
            };
            var purchaseOrder5 = new PurchaseOrder
            {
                AmountToPay = 2150.0m,
                CompanyName = "Company E",
                PaymentDayTerms = 75,
                PoNumber = "876434W"
            };
            var purchaseOrder6 = new PurchaseOrder
            {
                AmountToPay = 7150.0m,
                CompanyName = "Company F",
                PaymentDayTerms = 75,
                PoNumber = "1423474U"
            };
            var purchaseOrder7 = new PurchaseOrder
            {
                AmountToPay = 3150.0m,
                CompanyName = "Company G",
                PaymentDayTerms = 75,
                PoNumber = "1932344O"
            };
            var purchaseOrder8 = new PurchaseOrder
            {
                AmountToPay = 3190.0m,
                CompanyName = "Company H",
                PaymentDayTerms = 75,
                PoNumber = "1123457Q"
            };
            var purchaseOrder9 = new PurchaseOrder
            {
                AmountToPay = 50.0m,
                CompanyName
                    = "Company I",
                PaymentDayTerms = 75,
                PoNumber = "1595344R"
            };
            var purchaseOrder10 = new PurchaseOrder
            {
                AmountToPay = 2150.0m,
                CompanyName = "Company J",
                PaymentDayTerms = 75,
                PoNumber = "656734L"
            };
            CreateConnection();
            SendPayment(payment1);
            SendPayment(payment2);
            SendPayment(payment3);
            SendPayment(payment4);
            SendPayment(payment5);
            SendPayment(payment6);
            SendPayment(payment7);
            SendPayment(payment8);
            SendPayment(payment9);
            SendPayment(payment10);
            SendPurchaseOrder(purchaseOrder1);
            SendPurchaseOrder(purchaseOrder2);
            SendPurchaseOrder(purchaseOrder3);
            SendPurchaseOrder(purchaseOrder4);
            SendPurchaseOrder(purchaseOrder5);
            SendPurchaseOrder(purchaseOrder6);
            SendPurchaseOrder(purchaseOrder7);
            SendPurchaseOrder(purchaseOrder8);
            SendPurchaseOrder(purchaseOrder9);
            SendPurchaseOrder(purchaseOrder10);
        }
        private static void CreateConnection()
        {
            _factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName =
                    "guest",
                Password = "guest"
            };
            _connection = _factory.CreateConnection();
            _model = _connection.CreateModel();
            _model.ExchangeDeclare(ExchangeName, "topic");
        }
        private static void SendPayment(Payment payment)
        {
            SendMessage(payment.Serialize(), "payment.card");
            Console.WriteLine(" Payment Sent {0}, £{1}", payment.CardNumber,
            payment.AmountToPay);
        }
        private static void SendPurchaseOrder(PurchaseOrder purchaseOrder)
        {
            SendMessage(purchaseOrder.Serialize(), "payment.purchaseorder");
            Console.WriteLine(" Purchase Order Sent {0}, £{1}, {2}, {3}",
            purchaseOrder.CompanyName, purchaseOrder.AmountToPay, purchaseOrder.PaymentDayTerms,
            purchaseOrder.PoNumber);
        }
        private static void SendMessage(byte[] message, string routingKey)
        {
            _model.BasicPublish(ExchangeName, routingKey, null, message);
        }


    }
}

假设你看了上篇的direct exchange,这个很好理解了,这里不说了。不懂的看上篇。

客户端也没什么

using Common;
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace TopicCustom
{
    class Program
    {
        private static ConnectionFactory _factory;
        private static IConnection _connection;
        private const string ExchangeName = "Topic_Exchange";
        static void Main()
        {
            _factory = new ConnectionFactory { HostName = "localhost", UserName = "guest", Password = "guest" };
            using (_connection = _factory.CreateConnection())
            {
                using (var channel = _connection.CreateModel())
                {
                    Console.WriteLine("Publisher listening for Topic<payment.card>");
                    Console.WriteLine("--------------------------------------------");
                    Console.WriteLine();
                    channel.ExchangeDeclare(ExchangeName, "topic");
                    var queueName = channel.QueueDeclare().QueueName;
                    channel.QueueBind(queueName, ExchangeName, "payment.card");
                    var consumer = new QueueingBasicConsumer(channel);
                    channel.BasicConsume(queueName, true, consumer);
                    while (true)
                    {
                        var ea = consumer.Queue.Dequeue();
                        var message = (Payment)ea.Body.DeSerialize();
                        var routingKey = ea.RoutingKey;
                        Console.WriteLine("--- Payment - Routing Key <{0}> : {1} :{2}", routingKey, message.CardNumber, message.AmountToPay);
                    }

                }
            }
        }
    }
 
}

先启动消费者,然后启动生产者

blob.png


修改消费者代码为:

         channel.QueueBind(queueName, ExchangeName, "payment.*");

这样就接受payment.开头的然后,2个单词的名字的队列消息了。

注意这里,也就是可以接受 生产者发布的payment.card和payment.purchaseorder消息了。

这里blob.png

就要加判断了。

  while (true)
                    {
                        var ea = consumer.Queue.Dequeue();
                        if (ea.RoutingKey == "payment.purchaseorder")
                        {
                            var message = (PurchaseOrder)ea.Body.DeSerialize();

                            var routingKey = ea.RoutingKey;
                            Console.WriteLine("--- Purchase Order - Routing Key <{0}> : {1} :{2}", routingKey, message.CompanyName, message.AmountToPay);
                        }
                        else {
                            var message = (Payment)ea.Body.DeSerialize();

                            var routingKey = ea.RoutingKey;
                            Console.WriteLine("--- Payment - Routing Key <{0}> : {1} :{2}", routingKey, message.CardNumber, message.AmountToPay);
                        }
                       
                    }

开启2个消费者,1个生产者。消息不是一对一的,只要符合规则,消费者都可以拿走blob.png

====================www.ayjs.net       杨洋    wpfui.com        ayui      ay  aaronyang=======请不要转载谢谢了。=========










推荐您阅读更多有关于“RabbitMQ,”的文章

猜你喜欢

额 本文暂时没人评论 来添加一个吧

发表评论

必填

选填

选填

必填

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

  查看权限

抖音:wpfui 工作wpf

目前在合肥企迈科技公司上班,加我QQ私聊

2023年11月网站停运,将搬到CSDN上

AYUI8全源码 Github地址:前往获取

杨洋(AaronYang简称AY,安徽六安人)AY唯一QQ:875556003和AY交流

高中学历,2010年开始web开发,2015年1月17日开始学习WPF

声明:AYUI7个人与商用免费,源码可购买。部分DEMO不免费

查看捐赠

AYUI7.X MVC教程 更新如下:

第一课 第二课 程序加密教程

标签列表