时间:2016年05月31日 | 作者 : aaronyang | 分类 : C#开发 | 浏览: 2401次 | 评论 0 人
(国内第一个rabbitMQ中文 系列讲解------引进外国技术,我是AY,安徽合肥的杨洋,1991年。我学习技术,我开心)
为了给自己站点增加流量,望各位抓包的,手下留情,自己去外国下载书籍,回来翻译着看,也不容易。谢谢你了。
====================www.ayjs.net 杨洋 wpfui.com ayui ay aaronyang=======请不要转载谢谢了。
这一篇主要讲多个消费者
新建解决方案文件夹2.Worker Queue
新建项目WorkerQueueDEMO 生产者
和WorkerQueueDemoCustomer 消费者
这次队列名字我们使用WorkerQueue_Queue
引用Common和RabbitMQ.Client
生产者代码如下:
using Common; using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace WorkerQueueDEMO { class Program { private static ConnectionFactory _factory; private static IConnection _connection; private static IModel _model; private const string QueueName = "WorkerQueue_Queue"; private static void CreateConnection() { _factory = new ConnectionFactory { HostName = "localhost", UserName = "guest", Password = "guest" }; _connection = _factory.CreateConnection(); _model = _connection.CreateModel(); _model.QueueDeclare(QueueName, true, false, false, null); } private static void SendMessage(Payment message) { _model.BasicPublish("", QueueName, null, message.Serialize()); Console.WriteLine(" Payment Sent {0}, £{1}", message.CardNumber, message.AmountToPay); } static void Main(string[] args) { var payment1 = new Payment { AmountToPay = 25.0m, CardNumber = "1234123412341234" }; var payment2 = new Payment { AmountToPay = 5.0m, CardNumber = "1234123412341234" }; var payment3 = new Payment { AmountToPay = 2.0m, CardNumber = "1234123412341234" }; var payment4 = new Payment { AmountToPay = 17.0m, CardNumber = "1234123412341234" }; var payment5 = new Payment { AmountToPay = 300.0m, CardNumber = "1234123412341234" }; var payment6 = new Payment { AmountToPay = 350.0m, CardNumber = "1234123412341234" }; var payment7 = new Payment { AmountToPay = 295.0m, CardNumber = "1234123412341234" }; var payment8 = new Payment { AmountToPay = 5625.0m, CardNumber = "1234123412341234" }; var payment9 = new Payment { AmountToPay = 5.0m, CardNumber = "1234123412341234" }; var payment10 = new Payment { AmountToPay = 12.0m, CardNumber = "1234123412341234" }; CreateConnection(); SendMessage(payment1); SendMessage(payment2); SendMessage(payment3); SendMessage(payment4); SendMessage(payment5); SendMessage(payment6); SendMessage(payment7); SendMessage(payment8); SendMessage(payment9); SendMessage(payment10); Console.ReadLine(); } } }
先不要运行,先打开web管理页面
运行项目后,刷新界面
生产者界面:
====================www.ayjs.net 杨洋 wpfui.com ayui ay aaronyang=======请不要转载谢谢了。=========
添加消费者控制台程序代码:
using Common; using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace WorkerQueueDemoCustomer { class Program { private static ConnectionFactory _factory; private static IConnection _connection; private const string QueueName = "WorkerQueue_Queue"; public static void Receive() { _factory = new ConnectionFactory { HostName = "localhost", UserName = "guest", Password = "guest" }; using (_connection = _factory.CreateConnection()) { using (var channel = _connection.CreateModel()) { channel.QueueDeclare(QueueName, true, false, false, null); channel.BasicQos(0, 1, false); var consumer = new QueueingBasicConsumer(channel); channel.BasicConsume(QueueName, false, consumer); while (true) { var ea = consumer.Queue.Dequeue(); var message = (Payment)ea.Body.DeSerialize(); channel.BasicAck(ea.DeliveryTag, false); Console.WriteLine("----- Payment Processed {0} : {1}", message.CardNumber, message.AmountToPay); } } } } static void Main(string[] args) { Receive(); Console.ReadLine(); } } }
查看web管理
此时消息已被消费掉了。
消费者数量,1个
通过这种方式创建多个消费者。
讲解下代码:
消费者指定服务器,账户和密码,创建工厂
创建连接,和channel
这行代码可能有疑问:
channel.BasicQos(0, 1, false);
或许会发现,目前的消息转发机制(Round-robin)并非是我们想要的。
例如,这样一种情况,对于两个消费者,有一系列的任务,奇数任务特别耗时,而偶数任务却很轻松,这样造成一个消费者一直繁忙,另一个消费者却很快执行完任务后等待。
造成这样的原因是因为RabbitMQ仅仅是当消息到达队列进行转发消息。
并不在乎有多少任务消费者并未传递一个应答给RabbitMQ。仅仅盲目转发所有的奇数给一个消费者,偶数给另一个消费者。
为了解决这样的问题,我们可以使用basicQos方法,传递参数为prefetchCount = 1。这样告诉RabbitMQ不要在同一时间给一个消费者超过一条消息。换句话说,只有在消费者空闲的时候会发送下一条信息。
测试:启动两个消费者后,然后开启生产者,此时两个消费者都是5条消息。这就是prefetchCount =1的效果,一个不忙的时候,才发下一条消息给消费者处理,而不是盲目的给消费者。默认的就是盲目的给。
channel.BasicAck(ea.DeliveryTag, false);
如果一个消息队列中有大量消息等待操作时,我们可以用多个客户端来处理消息,这里的分发机制是采用负载均衡算法中的轮询。第一个消息给A,下一个消息给B,下下一个消息给A,下下下一个消息给B......以此类推。
为了保证消息的安全性,保证此消息被正确处理后才能在服务端的消息队列中删除。那么rabbitmq提供啦ack应答机制,来实现这一功能。
ack应答有两种方式:1、自动应答(下面蓝色的参数为true时候),2、手动应答(下面蓝色的参数为false时候,等于开启了应答机制)
在RabbitMQ中,为了不让消息丢失,它提供了消息应答的概念。当消费者获取到了一个消息以后,需要给RabbitMQ服务一个应答的消息,告知服务我已经收到或正确处理了该消息。那么RabbitMQ可以放心的在队列中删除该消息
需要注意的是,RabbitMQ对于没有发送应答包的消息没有时间的限制,也就是说没有Timeout。RabbitMQ只会在处理消息的接收程序与RabbitMQ服务端断开连接后才会重新分配该消息。如果连接没有断,但处理程序几天没有给回应包,它也不会重新发送。所以,如果在处理程序出现异常的时候,我们可以写代码将与RabbitMQ的连接断开来实现消息的重新发送(也许会发到其他负载均衡的机器上处理)
channel.BasicConsume(qName, false, consumer);
如果是自动应答,
channel.BasicAck(ea.DeliveryTag, false);这句代码不用写啦。
消费者消费完消息,就会通过BasicAck方法发送通知给broker,告诉broker我已经处理完消息了,我已经准备好处理好下一条消息了。
如果你是手动应答,不写这行代码,服务器是不知道你处理完消息的,直到你断开连接,RabbitMQ才重新分配这条给你没有应答的消息给其他消费者。
====================www.ayjs.net 杨洋 wpfui.com ayui ay aaronyang=======请不要转载谢谢了。=========
推荐您阅读更多有关于“RabbitMQ,”的文章
抖音:wpfui 工作wpf
目前在合肥企迈科技公司上班,加我QQ私聊
2023年11月网站停运,将搬到CSDN上
AYUI8全源码 Github地址:前往获取
杨洋(AaronYang简称AY,安徽六安人)和AY交流
高中学历,2010年开始web开发,2015年1月17日开始学习WPF
声明:AYUI7个人与商用免费,源码可购买。部分DEMO不免费
查看捐赠AYUI7.X MVC教程 更新如下:
第一课 第二课 程序加密教程
额 本文暂时没人评论 来添加一个吧
发表评论