当前位置:网站首页 / C#开发 / 正文

ay的RabbitMQ研究报告-第7章-C#代码实战[2]-Worker Queue

时间:2016年05月31日 | 作者 : aaronyang | 分类 : C#开发 | 浏览: 2401次 | 评论 0

(国内第一个rabbitMQ中文 系列讲解------引进外国技术,我是AY,安徽合肥的杨洋,1991年。我学习技术,我开心)

为了给自己站点增加流量,望各位抓包的,手下留情,自己去外国下载书籍,回来翻译着看,也不容易。谢谢你了。


====================www.ayjs.net 杨洋 wpfui.com ayui ay aaronyang=======请不要转载谢谢了。


这一篇主要讲多个消费者

blob.png

新建解决方案文件夹2.Worker Queue

新建项目WorkerQueueDEMO   生产者

和WorkerQueueDemoCustomer 消费者

这次队列名字我们使用WorkerQueue_Queue

引用Common和RabbitMQ.Client

 

生产者代码如下:

using Common;
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace WorkerQueueDEMO
{
    class Program
    {
        private static ConnectionFactory _factory;
        private static IConnection _connection;
        private static IModel _model;
        private const string QueueName = "WorkerQueue_Queue";
        private static void CreateConnection()
        {
            _factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "guest",
                Password = "guest"
            };
            _connection = _factory.CreateConnection();
            _model = _connection.CreateModel();
            _model.QueueDeclare(QueueName, true, false, false, null);
        }

        private static void SendMessage(Payment message)
        {
            _model.BasicPublish("", QueueName, null, message.Serialize());
            Console.WriteLine(" Payment Sent {0}, £{1}", message.CardNumber,
            message.AmountToPay);
        }



        static void Main(string[] args)
        {
            var payment1 = new Payment
            {
                AmountToPay = 25.0m,
                CardNumber = "1234123412341234"
            };
            var payment2 = new Payment
            {
                AmountToPay = 5.0m,
                CardNumber = "1234123412341234"
            };
            var payment3 = new Payment
            {
                AmountToPay = 2.0m,
                CardNumber = "1234123412341234"
            };
            var payment4 = new Payment
            {
                AmountToPay = 17.0m,
                CardNumber = "1234123412341234"
            };
            var payment5 = new Payment
            {
                AmountToPay = 300.0m,
                CardNumber =
                    "1234123412341234"
            };
            var payment6 = new Payment
            {
                AmountToPay = 350.0m,
                CardNumber =
                    "1234123412341234"
            };
            var payment7 = new Payment
            {
                AmountToPay = 295.0m,
                CardNumber =
                    "1234123412341234"
            };
            var payment8 = new Payment
            {
                AmountToPay = 5625.0m,
                CardNumber =
                    "1234123412341234"
            };
            var payment9 = new Payment
            {
                AmountToPay = 5.0m,
                CardNumber = "1234123412341234"
            };
            var payment10 = new Payment
            {
                AmountToPay = 12.0m,
                CardNumber =
                    "1234123412341234"
            };

            CreateConnection();
            SendMessage(payment1);
            SendMessage(payment2);
            SendMessage(payment3);
            SendMessage(payment4);
            SendMessage(payment5);
            SendMessage(payment6);
            SendMessage(payment7);
            SendMessage(payment8);
            SendMessage(payment9);
            SendMessage(payment10);

            Console.ReadLine();

        }
    }
}

先不要运行,先打开web管理页面

运行项目后,刷新界面blob.png

生产者界面:

blob.png

====================www.ayjs.net 杨洋 wpfui.com ayui ay aaronyang=======请不要转载谢谢了。=========



添加消费者控制台程序代码:

using Common;
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace WorkerQueueDemoCustomer
{
    class Program
    {
        private static ConnectionFactory _factory;
        private static IConnection _connection;
        private const string QueueName = "WorkerQueue_Queue";

        public static void Receive()
        {
            _factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "guest",
                Password = "guest"
            };
            using (_connection = _factory.CreateConnection())
            {
                using (var channel = _connection.CreateModel())
                {
                    channel.QueueDeclare(QueueName, true, false, false, null);
                    channel.BasicQos(0, 1, false);
                    var consumer = new QueueingBasicConsumer(channel);
                    channel.BasicConsume(QueueName, false, consumer);
                    while (true)
                    {
                        var ea = consumer.Queue.Dequeue();
                        var message = (Payment)ea.Body.DeSerialize();
                        channel.BasicAck(ea.DeliveryTag, false);
                        Console.WriteLine("----- Payment Processed {0} : {1}",
                        message.CardNumber, message.AmountToPay);
                    }
                }
            }
        }

        static void Main(string[] args)
        {
            Receive();
            Console.ReadLine();
        }
    }
}

blob.png

查看web管理

blob.png

此时消息已被消费掉了。

消费者数量,1个

blob.png

通过这种方式创建多个消费者。


讲解下代码:

消费者指定服务器,账户和密码,创建工厂

创建连接,和channel


这行代码可能有疑问:

channel.BasicQos(0, 1, false);

blob.png


或许会发现,目前的消息转发机制(Round-robin)并非是我们想要的。

例如,这样一种情况,对于两个消费者,有一系列的任务,奇数任务特别耗时,而偶数任务却很轻松,这样造成一个消费者一直繁忙,另一个消费者却很快执行完任务后等待。

造成这样的原因是因为RabbitMQ仅仅是当消息到达队列进行转发消息。

并不在乎有多少任务消费者并未传递一个应答给RabbitMQ。仅仅盲目转发所有的奇数给一个消费者,偶数给另一个消费者。

为了解决这样的问题,我们可以使用basicQos方法,传递参数为prefetchCount = 1。这样告诉RabbitMQ不要在同一时间给一个消费者超过一条消息。换句话说,只有在消费者空闲的时候会发送下一条信息。

测试:启动两个消费者后,然后开启生产者,此时两个消费者都是5条消息。这就是prefetchCount =1的效果,一个不忙的时候,才发下一条消息给消费者处理,而不是盲目的给消费者。默认的就是盲目的给。



channel.BasicAck(ea.DeliveryTag, false);

如果一个消息队列中有大量消息等待操作时,我们可以用多个客户端来处理消息,这里的分发机制是采用负载均衡算法中的轮询。第一个消息给A,下一个消息给B,下下一个消息给A,下下下一个消息给B......以此类推。

为了保证消息的安全性,保证此消息被正确处理后才能在服务端的消息队列中删除。那么rabbitmq提供啦ack应答机制,来实现这一功能。

ack应答有两种方式:1、自动应答(下面蓝色的参数为true时候),2、手动应答(下面蓝色的参数为false时候,等于开启了应答机制)

在RabbitMQ中,为了不让消息丢失,它提供了消息应答的概念。当消费者获取到了一个消息以后,需要给RabbitMQ服务一个应答的消息,告知服务我已经收到或正确处理了该消息。那么RabbitMQ可以放心的在队列中删除该消息

需要注意的是,RabbitMQ对于没有发送应答包的消息没有时间的限制,也就是说没有Timeout。RabbitMQ只会在处理消息的接收程序与RabbitMQ服务端断开连接后才会重新分配该消息。如果连接没有断,但处理程序几天没有给回应包,它也不会重新发送。所以,如果在处理程序出现异常的时候,我们可以写代码将与RabbitMQ的连接断开来实现消息的重新发送(也许会发到其他负载均衡的机器上处理)


channel.BasicConsume(qName, false, consumer);

如果是自动应答,

channel.BasicAck(ea.DeliveryTag, false);这句代码不用写啦。

消费者消费完消息,就会通过BasicAck方法发送通知给broker,告诉broker我已经处理完消息了,我已经准备好处理好下一条消息了。

如果你是手动应答,不写这行代码,服务器是不知道你处理完消息的,直到你断开连接,RabbitMQ才重新分配这条给你没有应答的消息给其他消费者。




====================www.ayjs.net       杨洋    wpfui.com        ayui      ay  aaronyang=======请不要转载谢谢了。=========




推荐您阅读更多有关于“RabbitMQ,”的文章

猜你喜欢

额 本文暂时没人评论 来添加一个吧

发表评论

必填

选填

选填

必填

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

  查看权限

抖音:wpfui 工作wpf

目前在合肥企迈科技公司上班,加我QQ私聊

2023年11月网站停运,将搬到CSDN上

AYUI8全源码 Github地址:前往获取

杨洋(AaronYang简称AY,安徽六安人)AY唯一QQ:875556003和AY交流

高中学历,2010年开始web开发,2015年1月17日开始学习WPF

声明:AYUI7个人与商用免费,源码可购买。部分DEMO不免费

查看捐赠

AYUI7.X MVC教程 更新如下:

第一课 第二课 程序加密教程

标签列表