当前位置:网站首页 / C#开发 / 正文

AY C# RabbitMQ 2019 微笔记6

时间:2018年12月05日 | 作者 : aaronyang | 分类 : C#开发 | 浏览: 2900次 | 评论 0

消息超时处理,比如订单 超过24小时不处理,系统取消订单。这个需求如果用 数据库去轮 就不好了。

这种场景: 延迟任务 

知识:消息的TTL和死信Exchange


 RabbitMQ本身不具有延时消息队列的功能,但是可以通过TTL(Time To Live)、DLX(Dead Letter Exchanges)特性实现。其原理给消息设置过期时间,在消息队列上为过期消息指定转发器,这样消息过期后会转发到与指定转发器匹配的队列上,变向实现延时队列。

rabbitmq-delayed-message-exchange这个插件也可以实现,看C#实现。


消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。

超过了这个时间,我们认为这个消息就死了,称之为死信。如果队列设置了,消息也设置了,那么会取小的。

所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。

这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。


Dead Letter Exchanges

定义消息死亡

1. 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。

2. 上面的消息的TTL到了,消息过期了。

3. 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。

就是个普通的Exchange,存放死掉的消息



两个控制台:

P端(以后生产者只说P了,消费者是 C端):

using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace MQ.Product2
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 };
            using (var connection = factory.CreateConnection())
            {
                while (Console.ReadLine() != null)
                {
                    using (var channel = connection.CreateModel())
                    {
                        Dictionary<string, object> dic = new Dictionary<string, object>();
                        dic.Add("x-expires", 10000);
                        dic.Add("x-message-ttl", 8000);//队列上消息过期时间,应小于队列过期时间  
                        dic.Add("x-dead-letter-exchange", "exchange-direct");//过期消息转向路由  
                        dic.Add("x-dead-letter-routing-key", "routing-delay");//过期消息转向路由相匹配routingkey  
                        //创建一个名叫"deadQueue"的消息队列
                        channel.QueueDeclare(queue: "deadQueue",
                            durable: true,
                            exclusive: false,
                            autoDelete: false,
                            arguments: dic);

                        var message = "Hello World!";
                        var body = Encoding.UTF8.GetBytes(message);

                        //向该消息队列发送消息message
                        channel.BasicPublish(exchange: "",
                                                    routingKey: "deadQueue",
                                                    basicProperties: null,
                                                    body: body);
                        Console.WriteLine(" [x] Sent {0}", message);
                    }
                }
            }

            Console.ReadKey();
        }
    }
}

运行项目,按任意键,创建1个队列,刷新web管理

image.png

然后过了8秒,队列就删除了。

这里队列有个TTL Exp DLX DLK属性,正好上面3个属性的设置image.png

我们也可以在管理端看到这些消息参数:

image.png

单击的时候,就可以设置了,

在网上找了一些资料,下面代码可能java的设置,.net简单如上键值就行了

Message TTL(x-message-ttl):设置队列中的所有消息的生存周期(统一为整个队列的所有消息设置生命周期), 也可以在发布消息的时候单独为某个消息指定剩余生存时间,单位毫秒, 类似于redis中的ttl,生存时间到了,消息会被从队里中删除,注意是消息被删除,而不是队列被删除, 特性Features=TTL, 单独为某条消息设置过期时间AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().expiration(“6000”); 

channel.basicPublish(EXCHANGE_NAME, “”, properties.build(), message.getBytes(“UTF-8”));


Auto Expire(x-expires): 当队列在指定的时间没有被访问(consume, basicGet, queueDeclare…)就会被删除,Features=Exp


Max Length(x-max-length): 限定队列的消息的最大值长度,超过指定长度将会把最早的几条删除掉, 类似于mongodb中的固定集合,例如保存最新的100条消息, Feature=Lim


Max Length Bytes(x-max-length-bytes): 限定队列最大占用的空间大小, 一般受限于内存、磁盘的大小, Features=Lim B


Dead letter exchange(x-dead-letter-exchange): 当队列消息长度大于最大长度、或者过期的等,将从队列中删除的消息推送到指定的交换机中去而不是丢弃掉,Features=DLX


Dead letter routing key(x-dead-letter-routing-key):将删除的消息推送到指定交换机的指定路由键的队列中去, Feature=DLK


Maximum priority(x-max-priority):优先级队列,声明队列时先定义最大优先级值(定义最大值一般不要太大),在发布消息的时候指定该消息的优先级, 优先级更高(数值更大的)的消息先被消费,


Lazy mode(x-queue-mode=lazy): Lazy Queues: 先将消息保存到磁盘上,不放在内存中,当消费者开始消费的时候才加载到内存中

Master locator(x-queue-master-locator) 将队列设置为主位置模式,确定在节点集群上声明时队列主机所在的规则。

(设置“x-queue-master-locator”参数。)


整理下:  1个 Exchange 可以多个 Queue   1个Queue可以多个Message  Message可以设置优先级,Queue可以设置最多多少个Message,多大字节的存储,也可以设置优先级。


上次持久化的问题理解:

设置消息持久化必须先设置队列持久化,要不然队列不持久化消息持久化队列都不存在了,消息存在还有什么意义。消息持久化需要将交换机持久化、队列持久化、消息持久化,才能最终达到持久化的目的。




上面代码,我们看到了,消息过期了,就会被扔到一个叫exchange-direct的 邮箱,真好,那么等他过期了,那么消费者 监听这个邮箱,是不是8秒过后就收到了消息,达到了 延迟的效果。跟RPC的设计异曲同工之妙。如果没有指定邮箱,8秒后消息就会被删除了。 10秒后 队列就会被删除。

image.png


接下来,

消费者,就是创建个 被转发的 exchange的监听,

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace AyTestMQ2
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.Title = "AY 2019 2018-12-5";

            var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 };

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange: "exchange-direct", type: "direct");
                    string name = channel.QueueDeclare().QueueName;
                    //string name = "deadQueue";
                    channel.QueueBind(queue: name, exchange: "exchange-direct", routingKey: "routing-delay");

                    //回调,当consumer收到消息后会执行该函数
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                                        {
                                            var body = ea.Body;
                                            var message = Encoding.UTF8.GetString(body);
                                            Console.WriteLine(ea.RoutingKey);
                                            Console.WriteLine(" [x] Received {0}", message);
                                        };

                    //Console.WriteLine("name:" + name);
                    //消费队列"hello"中的消息
                    channel.BasicConsume(queue: name,
                                         autoAck: true,
                                         consumer: consumer);

                    Console.WriteLine(" Press [enter] to exit.");
                    Console.ReadLine();
                }
            }

            Console.ReadKey();

        }
    }
}

打开生产端,按下回车,进入循环,然后打开消费端,8秒后消费者就收到了消息。

如果8秒后,消息都被转发了,消费者打开,是收不到消息的。


====================www.ayjs.net       杨洋    wpfui.com        ayui      ay  aaronyang=======请不要转载谢谢了。=========








推荐您阅读更多有关于“RabbitMQ,”的文章

猜你喜欢

额 本文暂时没人评论 来添加一个吧

发表评论

必填

选填

选填

必填

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

  查看权限

抖音:wpfui 工作wpf

目前在合肥企迈科技公司上班,加我QQ私聊

2023年11月网站停运,将搬到CSDN上

AYUI8全源码 Github地址:前往获取

杨洋(AaronYang简称AY,安徽六安人)AY唯一QQ:875556003和AY交流

高中学历,2010年开始web开发,2015年1月17日开始学习WPF

声明:AYUI7个人与商用免费,源码可购买。部分DEMO不免费

查看捐赠

AYUI7.X MVC教程 更新如下:

第一课 第二课 程序加密教程

标签列表