时间:2016年06月01日 | 作者 : aaronyang | 分类 : C#开发 | 浏览: 2486次 | 评论 0 人
(国内第一个rabbitMQ中文 系列讲解------引进外国技术,我是AY,安徽合肥的杨洋,1991年。我学习技术,我开心)
为了给自己站点增加流量,望各位抓包的,手下留情,自己去外国下载书籍,回来翻译着看,也不容易。谢谢你了。
====================www.ayjs.net 杨洋 wpfui.com ayui ay aaronyang=======请不要转载谢谢了。
基于Topic的发布与订阅
我们在前面已经说过了, *号代表1个单词,#代表0 个或者更多的单词
Topic类型的Exchange是一个类似 正则表达式,匹配合适的routekey的 queue的
====================www.ayjs.net 杨洋 wpfui.com ayui ay aaronyang=======请不要转载谢谢了。=========
新建解决方案文件夹,然后TopicPublish和TopicCustomer控制台程序
using Common; using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace TopicPublish { class Program { private static ConnectionFactory _factory; private static IConnection _connection; private static IModel _model; private const string ExchangeName = "Topic_Exchange"; static void Main() { var payment1 = new Payment { AmountToPay = 25.0m, CardNumber = "1234123412341234", Name = "Mr F Bloggs" }; var payment2 = new Payment { AmountToPay = 5.0m, CardNumber = "1234123412341234", Name = "Mr S Simpson" }; var payment3 = new Payment { AmountToPay = 2.0m, CardNumber = "1234123412341234", Name = "Mr G Washington" }; var payment4 = new Payment { AmountToPay = 17.0m, CardNumber = "1234123412341234", Name = "Mr B Gates" }; var payment5 = new Payment { AmountToPay = 300.0m, CardNumber = "1234123412341234", Name = "Mrs K Kardashian" }; var payment6 = new Payment { AmountToPay = 350.0m, CardNumber = "1234123412341234", Name = "Mrs M Whitehouse" }; var payment7 = new Payment { AmountToPay = 295.0m, CardNumber = "1234123412341234", Name = "Mrs E Windsor" }; var payment8 = new Payment { AmountToPay = 5625.0m, CardNumber = "1234123412341234", Name = "Mr B Obama" }; var payment9 = new Payment { AmountToPay = 5.0m, CardNumber = "1234123412341234", Name = "Mr S Haunts" }; var payment10 = new Payment { AmountToPay = 12.0m, CardNumber = "1234123412341234", Name = "Mr F Bloggs Jr" }; var purchaseOrder1 = new PurchaseOrder { AmountToPay = 50.0m, CompanyName = "Company A", PaymentDayTerms = 75, PoNumber = "123434A" }; var purchaseOrder2 = new PurchaseOrder { AmountToPay = 150.0m, CompanyName = "Company B", PaymentDayTerms = 75, PoNumber = "193434B" }; var purchaseOrder3 = new PurchaseOrder { AmountToPay = 12.0m, CompanyName = "Company C", PaymentDayTerms = 75, PoNumber = "196544A" }; var purchaseOrder4 = new PurchaseOrder { AmountToPay = 2150.0m, CompanyName = "Company D", PaymentDayTerms = 75, PoNumber = "234434H" }; var purchaseOrder5 = new PurchaseOrder { AmountToPay = 2150.0m, CompanyName = "Company E", PaymentDayTerms = 75, PoNumber = "876434W" }; var purchaseOrder6 = new PurchaseOrder { AmountToPay = 7150.0m, CompanyName = "Company F", PaymentDayTerms = 75, PoNumber = "1423474U" }; var purchaseOrder7 = new PurchaseOrder { AmountToPay = 3150.0m, CompanyName = "Company G", PaymentDayTerms = 75, PoNumber = "1932344O" }; var purchaseOrder8 = new PurchaseOrder { AmountToPay = 3190.0m, CompanyName = "Company H", PaymentDayTerms = 75, PoNumber = "1123457Q" }; var purchaseOrder9 = new PurchaseOrder { AmountToPay = 50.0m, CompanyName = "Company I", PaymentDayTerms = 75, PoNumber = "1595344R" }; var purchaseOrder10 = new PurchaseOrder { AmountToPay = 2150.0m, CompanyName = "Company J", PaymentDayTerms = 75, PoNumber = "656734L" }; CreateConnection(); SendPayment(payment1); SendPayment(payment2); SendPayment(payment3); SendPayment(payment4); SendPayment(payment5); SendPayment(payment6); SendPayment(payment7); SendPayment(payment8); SendPayment(payment9); SendPayment(payment10); SendPurchaseOrder(purchaseOrder1); SendPurchaseOrder(purchaseOrder2); SendPurchaseOrder(purchaseOrder3); SendPurchaseOrder(purchaseOrder4); SendPurchaseOrder(purchaseOrder5); SendPurchaseOrder(purchaseOrder6); SendPurchaseOrder(purchaseOrder7); SendPurchaseOrder(purchaseOrder8); SendPurchaseOrder(purchaseOrder9); SendPurchaseOrder(purchaseOrder10); } private static void CreateConnection() { _factory = new ConnectionFactory { HostName = "localhost", UserName = "guest", Password = "guest" }; _connection = _factory.CreateConnection(); _model = _connection.CreateModel(); _model.ExchangeDeclare(ExchangeName, "topic"); } private static void SendPayment(Payment payment) { SendMessage(payment.Serialize(), "payment.card"); Console.WriteLine(" Payment Sent {0}, £{1}", payment.CardNumber, payment.AmountToPay); } private static void SendPurchaseOrder(PurchaseOrder purchaseOrder) { SendMessage(purchaseOrder.Serialize(), "payment.purchaseorder"); Console.WriteLine(" Purchase Order Sent {0}, £{1}, {2}, {3}", purchaseOrder.CompanyName, purchaseOrder.AmountToPay, purchaseOrder.PaymentDayTerms, purchaseOrder.PoNumber); } private static void SendMessage(byte[] message, string routingKey) { _model.BasicPublish(ExchangeName, routingKey, null, message); } } }
假设你看了上篇的direct exchange,这个很好理解了,这里不说了。不懂的看上篇。
客户端也没什么
using Common; using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace TopicCustom { class Program { private static ConnectionFactory _factory; private static IConnection _connection; private const string ExchangeName = "Topic_Exchange"; static void Main() { _factory = new ConnectionFactory { HostName = "localhost", UserName = "guest", Password = "guest" }; using (_connection = _factory.CreateConnection()) { using (var channel = _connection.CreateModel()) { Console.WriteLine("Publisher listening for Topic<payment.card>"); Console.WriteLine("--------------------------------------------"); Console.WriteLine(); channel.ExchangeDeclare(ExchangeName, "topic"); var queueName = channel.QueueDeclare().QueueName; channel.QueueBind(queueName, ExchangeName, "payment.card"); var consumer = new QueueingBasicConsumer(channel); channel.BasicConsume(queueName, true, consumer); while (true) { var ea = consumer.Queue.Dequeue(); var message = (Payment)ea.Body.DeSerialize(); var routingKey = ea.RoutingKey; Console.WriteLine("--- Payment - Routing Key <{0}> : {1} :{2}", routingKey, message.CardNumber, message.AmountToPay); } } } } } }
先启动消费者,然后启动生产者
修改消费者代码为:
channel.QueueBind(queueName, ExchangeName, "payment.*");
这样就接受payment.开头的然后,2个单词的名字的队列消息了。
注意这里,也就是可以接受 生产者发布的payment.card和payment.purchaseorder消息了。
这里
就要加判断了。
while (true) { var ea = consumer.Queue.Dequeue(); if (ea.RoutingKey == "payment.purchaseorder") { var message = (PurchaseOrder)ea.Body.DeSerialize(); var routingKey = ea.RoutingKey; Console.WriteLine("--- Purchase Order - Routing Key <{0}> : {1} :{2}", routingKey, message.CompanyName, message.AmountToPay); } else { var message = (Payment)ea.Body.DeSerialize(); var routingKey = ea.RoutingKey; Console.WriteLine("--- Payment - Routing Key <{0}> : {1} :{2}", routingKey, message.CardNumber, message.AmountToPay); } }
开启2个消费者,1个生产者。消息不是一对一的,只要符合规则,消费者都可以拿走
====================www.ayjs.net 杨洋 wpfui.com ayui ay aaronyang=======请不要转载谢谢了。=========
推荐您阅读更多有关于“RabbitMQ,”的文章
抖音:wpfui 工作wpf
目前在合肥企迈科技公司上班,加我QQ私聊
2023年11月网站停运,将搬到CSDN上
AYUI8全源码 Github地址:前往获取
杨洋(AaronYang简称AY,安徽六安人)和AY交流
高中学历,2010年开始web开发,2015年1月17日开始学习WPF
声明:AYUI7个人与商用免费,源码可购买。部分DEMO不免费
查看捐赠AYUI7.X MVC教程 更新如下:
第一课 第二课 程序加密教程
额 本文暂时没人评论 来添加一个吧
发表评论