当前位置:网站首页 / C#开发 / 正文

AY C# RabbitMQ 2019 微笔记3

时间:2018年12月04日 | 作者 : aaronyang | 分类 : C#开发 | 浏览: 404次 | 评论 0

发送消息,生产者  接收消息 消费者   RabbitMQ是Erlang语言开发

上篇博客

实际场景Exchange用的多

1对多发布订阅(下篇讲,这篇让你更了解队列)


==============开始DEMO

2个控制台

发布者2

using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace MQ.Product2
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                var message = GetMessage(args);
                var body = Encoding.UTF8.GetBytes(message);
                channel.QueueDeclare(queue: "task_queue",
                                     durable: true,
                                     exclusive: false,
                                     autoDelete: true,
                                     arguments: null);
                var properties = channel.CreateBasicProperties();
                properties.Persistent = true;

                channel.BasicPublish(exchange: "",
                     routingKey: "task_queue",
                     basicProperties: properties,
                     body: body);

                //channel.QueueDeclare(queue: "hello",
                //                     durable: true,
                //                     exclusive: false,
                //                     autoDelete: true,
                //                     arguments: null);

                //string message = "Hello World!"; 
                //var body = Encoding.UTF8.GetBytes(message);

                //channel.BasicPublish(exchange: "",
                //                     routingKey: "hello",
                //                     basicProperties: null,
                //                     body: body);
                //Console.WriteLine(" [x] Sent {0}", message);
            }

            Console.WriteLine(" Press [enter] to exit.");




            Console.ReadLine();
        }


        private static string GetMessage(string[] args)
        {
            return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
        }
    }
}

这里基于上一个DEMO改的,这里我们设置了一个properties了。

运行项目。


然后消费者修改代码(基于DEMO1的消费者 代码)

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace AyTestMQ2
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                //channel.QueueDeclare(queue: "task_queue",
                //                     durable: true,
                //                     exclusive: false,
                //                     autoDelete: true,
                //                     arguments: null);

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine(" [x] Received {0}", message);

                    int dots = message.Split('.').Length - 1;
                    Thread.Sleep(dots * 1000);

                    Console.WriteLine(" [x] Done");
                };
                channel.BasicConsume(queue: "task_queue", autoAck: true, consumer: consumer);

                //var consumer = new EventingBasicConsumer(channel);
                //consumer.Received += (model, ea) =>
                //{
                //    var body = ea.Body;
                //    var message = Encoding.UTF8.GetString(body);
                //    Console.WriteLine(" [x] Received {0}", message);
                //};
                //channel.BasicConsume(queue: "hello",
                //                     autoAck: true,
                //                     consumer: consumer);

                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }

            Console.ReadKey();
        }
    }
}

主要接收消息,处理,模拟耗时工作。

发的消息一个 点号 停顿1秒


生产端消息改下

  private static string GetMessage(string[] args)

        {

            return ((args.Length > 0) ? string.Join(" ", args) : "Hello.World.AY.2019");

        }

消费端改改

      consumer.Received += (model, ea) =>

                {

                    var body = ea.Body;

                    var message = Encoding.UTF8.GetString(body);

                    Console.WriteLine(" [x] Received {0}", message);

                    var _3 = message.Split('.');

                    //int dots = message.Split('.').Length - 1;

                    foreach (var item in _3)

                    {

                        Console.WriteLine(item);

                        Thread.Sleep(1000);

                    }

运行生产端,然后消费端效果如下

6F.gif

测试2,

开启生产者,然后开启消费者,如上所示,不要关闭,关掉生产者在打开,消费者那段又收到消息了。

image.png

同样的,如果有2个消费者, rabbitmq会发给下一个消费者,这种分发消息叫做 round-robin(循环调度)

一个消息只给一个消费者处理。

场景:其实我们可以做  用户的请求,每个请求放入消息队列,然后让消息队列给空闲的 消费者去消费处理。1个消费者不够处理,可以运行多个来吃完任务。



任务会耗时间的。

您可能想知道如果其中一个消费者开始执行长任务并且仅在部分完成时死亡会发生什么。

上面的代码,一旦RabbitMQ向客户发送消息,它立即将其标记为删除。 

在这种情况下,如果当前的消费者挂了,我们将丢失它刚刚处理的消息。 

我还将丢失分发给这个消费者的 还未处理的所有消息。


但是我不想丢失任何的消息(1个消息一个任务),如果消费者处理挂了,我当然更想把消息给其他的消费者处理。

为了确保消息永不丢失,RabbitMQ 提供了一个 ack机制, 手动应答,处理完了,告诉兔子,我处理完了,等兔子空闲时候就删除该消息了。

一些文章


定义 消费者死了,就是 channel关闭,connection关闭,tcp断开了,没网络了。

当消费者还没发送 ack,兔子那边就会认为 消息没有被处理,又会恢复回去了。如果同一时间,还有其他消费者在线,兔子会把这烫手山芋给其他的消费者。

恩,所以啊,你的程序没死,他的消息一直存在兔子那的,除非你手动应答。如果你挂了没应答,会看有没有其他的消费者处理。


接下来模拟这个场景

生产者生产个消息,然后修改消费者代码

    var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                //channel.QueueDeclare(queue: "task_queue",
                //                     durable: true,
                //                     exclusive: false,
                //                     autoDelete: true,
                //                     arguments: null);

                var consumer = new EventingBasicConsumer(channel);
                channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer);

                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body);
                         throw new NotImplementedException();
                    Console.WriteLine(" [x] Received {0}", message);
                    var _3 = message.Split('.');
                    foreach (var item in _3)
                    {
                        Console.WriteLine(item);
                        Thread.Sleep(2000);
                    }
         
                    Console.WriteLine(" [x] Done");
                    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                };

设置了autoack:false了

然后received里面设置了 ea.DeliveryTag


测试1

这里处理消息,停留了2秒1个字段,我们再BasicAck应答之前关闭程序,看消息会不会被删除了。

image.png

由于抛出异常Unacked 为1了。

把程序关了

消息还是删除了。。

我怀疑服务端设置了 自动删除导致的。我改为false测试,这样生产了1个不会自动删除的消息。

image.png

测试2

运行修改后的生产者

image.png

消费者代码不改,让抛出异常

然后关闭程序,过一会,消息恢复正常了。这次就对了。也就是生产者自动删除我觉得大部分都是关闭的。

image.png

测试3

正确处理,看消息会不会删除,移除抛弃异常的代码

ready终于是0了

image.png

然后关闭客户端,断开连接(执行完using,释放连接),队列被处理了,没删除哦

image.png

那如果想要删除呢,暂时先这样吧,因为用的最多的还是Exchange的Topic


注意:

 忘记写 BasicAck这行代码, 这是一个简单的错误,但后果是严重的。 当您的客户端退出时,消息将被重新传递(这可能看起来像随机重新传递),但RabbitMQ将会占用越来越多的内存,因为它无法释放任何未经处理的消息。

假如忘了unack


测试4

image.png

注释掉代码,然后生产个消息,然后运行消费者

再运行消费者,当然 连接不要释放,不然任务客户端死了,又恢复回去了

image.png

这里我们打开命令行

rabbitmqctl list_queues name messages_ready messages_unacknowledged

image.png

貌似超时了  这里就列出名字了。算了,遇到再看。




=============================================================

持久性,如果兔子挂了,消息还是会丢丢失了。

hannel.QueueDeclare(queue: "hello",

                     durable: true,

                     exclusive: false,

                     autoDelete: false,

                     arguments: null);


设置持久化,就会不丢失了。但是兔子不允许你重新定义一个已经存在的队列,然后更改属性

你可以换个名字重新定义一个。


对了,如果服务器重启,我们在上篇博客说到 消息恢复了,但是不可再被消费了,但是如果生产消息时候,加上下面代码就好了,终于解决了 durable=true也无效的问题了。

     var properties = channel.CreateBasicProperties();

                properties.Persistent = true;

将消息标记为持久性并不能完全保证消息不会丢失。 虽然它告诉RabbitMQ将消息保存到磁盘,但是当RabbitMQ接s收消息并且尚未保存消息时,仍然有一个短时间窗口。 此外,RabbitMQ不会为每条消息执行fsync(2) - 它可能只是保存到缓存而不是真正写入磁盘。 持久性保证不强,但对于我们简单的任务队列来说已经足够了。 如果您需要更强的保证,那么您可以使用发布者确认(publisher confirms)。



公平调度 Fair Dispatch

2个消费者,一个很忙,一个几乎不做事,兔子不知道谁忙谁不忙的,还是均匀的发消息的。

发生这种情况是因为RabbitMQ只是在消息进入队列时调度消息。

 它不会查看消费者未确认消息的数量。

 它只是盲目地向第n个消费者发送每个第n个消息

image.png

为了改变这种行为,我们可以使用BasicQos方法,shezhi PrefetchCount=1

这会告诉兔子,不要同一时间给超过一个消息以上给一个消费者,因为它很忙,可能还没处理完,你又来了。

换句话说,在处理并确认前一个消息之前,不要向该工作程序发送新消息。 相反,它会将它发送给下一个不忙的 消费者。

channel.BasicQos(0, 1, false);


这里注意队列的 size

如果所有的 消费者都很忙,并且你的queue填满了。你就要考虑是否添加更多的消费者,或者换个思路去解决问题。


消费者修改后的代码如下:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace AyTestMQ2
{
    class Program
    {
        static void Main(string[] args)
        {
    
            var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "task_queue",
                                     durable: true,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);

                channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
                Console.WriteLine(" [*] Waiting for messages.");

                var consumer = new EventingBasicConsumer(channel);
                channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer);

                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine(" [x] Received {0}", message);

                    var _3 = message.Split('.');
                    foreach (var item in _3)
                    {
                        Console.WriteLine(item);
                    }
         
                    Console.WriteLine(" [x] Done");
                    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                };

                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }

            Console.ReadKey();
        }
    }
}

====================www.ayjs.net       杨洋    wpfui.com        ayui      ay  aaronyang=======请不要转载谢谢了。=========


关于IModel内的方法和IBasicProperties你想了解的,可以查看 RabbitMQ .NET client API reference online

特别推荐以下指南

particularly recommend the following guides: Publisher Confirms and Consumer AcknowledgementsProduction Checklist and Monitoring.




====================www.ayjs.net       杨洋    wpfui.com        ayui      ay  aaronyang=======请不要转载谢谢了。=========



推荐您阅读更多有关于“”的文章

猜你喜欢

额 本文暂时没人评论 来添加一个吧

发表评论

必填

选填

选填

必填

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

  查看权限

抖音号:wpfui,可以看到我的很多作品效果

AYUI8社区版Github地址:前往获取

作者:杨洋(AaronYang简称AY,安徽六安人)目前是个人,还没公司AY唯一QQ:875556003和AY交流

高中学历,2015年1月17日开始,兴趣学习研究WPF,目前工作繁忙,不接任何活

声明:AYUI7个人与商用免费,源码可购买。部分DEMO不免费.AY主要靠卖技术服务挣钱

不是从我处购买的ayui7源码,我不提供任何技术服务,如果你举报从哪里买的,我可以帮你转正为我的客户,并送demo

查看捐赠

AYUI7.X MVC教程 更新如下:

第一课 第二课 程序加密教程

兼容XP到win10,vs2015/2017/2019,最新AYUI:7.6.5.5

vs2015 企业版密钥HM6NR-QXX7C-DFW2Y-8B82K-WTYJV

vs2017 企业版密钥NJVYC-BMHX2-G77MM-4XJMR-6Q8QF

标签列表