当前位置:网站首页 / C#开发 / 正文

AY C# RabbitMQ 2019 微笔记5

时间:2018年12月05日 | 作者 : aaronyang | 分类 : C#开发 | 浏览: 377次 | 评论 0

Routing 路由  集中处理 数据  然后 按照 约定/规则 正确的 广播到 消费者

在本教程中,我们将为其添加一个功能 - 我们将只能订阅一部分消息。 例如,我们只能将关键错误消息定向到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。


Direct Exchange 以前我写的博客

我们上一个教程中的日志记录系统向所有消费者广播所有消息。 我们希望扩展它以允许根据消息的严重性过滤消息。 例如,我们可能希望将日志消息写入磁盘的脚本仅接收严重错误,而不是在警告或信息日志消息上浪费磁盘空间。

我们使用的是fanout,它没有给我们太大的灵活性 - 它只能进行无意识的广播。

我们将使用direct。 direct背后的路由算法很简单 - 消息进入队列,binding的key和route key一致就行了。

using System;
using System.Linq;
using RabbitMQ.Client;
using System.Text;

class Program
{
    public static void Main(string[] args)
    {
    
        var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "direct_logs",
                                    type: "direct");

            var severity = "info";
            var message = "Hello World!";
            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange: "direct_logs",
                                 routingKey: severity,
                                 basicProperties: null,
                                 body: body);
            Console.WriteLine(" [x] Sent '{0}':'{1}'", severity, message);
        }

        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }
}

image.png

direct类型的,然后写个 路由的规则,RouteKey 这里直接 给个名字。



消费者代码:

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

class Program
{
    public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "direct_logs",
                                    type: "direct");
            var queueName = channel.QueueDeclare().QueueName;
            var severity = "info";
            channel.QueueBind(queue: queueName,
                                  exchange: "direct_logs",
                                  routingKey: severity);

            Console.WriteLine(" [*] Waiting for messages.");

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                var routingKey = ea.RoutingKey;
                Console.WriteLine(" [x] Received '{0}':'{1}'",
                                  routingKey, message);
            };
            channel.BasicConsume(queue: queueName,
                                 autoAck: true,
                                 consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

 路由key 一致 就接收消息了。

image.png


====================www.ayjs.net       杨洋    wpfui.com        ayui      ay  aaronyang=======请不要转载谢谢了。=========



接下来Topic,直接在这个demo改

image.png

using System;
using System.Linq;
using RabbitMQ.Client;
using System.Text;

class Program
{
    public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "topic_logs",
                                    type: "topic");

            var routingKey = "anonymous.info";
            var message = "Hello World!";
            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange: "topic_logs",
                                 routingKey: routingKey,
                                 basicProperties: null,
                                 body: body);
            Console.WriteLine(" [x] Sent '{0}':'{1}'", routingKey, message);
        }
        Console.ReadKey();
    }
}

消费者

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

class Program
{
    public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "topic_logs",
                                    type: "topic");
            var queueName = channel.QueueDeclare().QueueName;
            var severity = "anonymous.*";
            channel.QueueBind(queue: queueName,
                                  exchange: "topic_logs",
                                  routingKey: severity);

            Console.WriteLine(" [*] Waiting for messages.");

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                var routingKey = ea.RoutingKey;
                Console.WriteLine(" [x] Received '{0}':'{1}'",
                                  routingKey, message);
            };
            channel.BasicConsume(queue: queueName,
                                 autoAck: true,
                                 consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

image.png


跟上面区别是 RouteKey 带了 * 号或者 #号

*号 代表1个单词

#号代表 0个以上的单词


服务端 消息路由规则 anonymous.info

换成      var severity = "a.*";

肯定收不到消息

换成# 肯定可以

*.info也可以

a# 收不到消息的

*.* 可以收到,然后把生产换成anonymous 就收不到了,因为路由规则 1个单词.第二个单词

以上内容是 AY做过测试了。


========================================================================


讲一下RPC,稍微有点绕,你理解一个既是消费端也是生产者,双方都是的,也可以配合 http请求响应稍微配合理解。

但是也是有点不一样。 我还是把上面代码注释了,还在那2个 控制台改。


场景:

如果我们需要在远程计算机上运行一个函数并等待结果呢? 嗯,这是一个不同的故事。 此模式通常称为Remote Procedure Call 或者 RPC.


在本教程中,我们将使用RabbitMQ构建RPC系统:客户端和可伸缩的RPC服务器。 由于我们没有任何值得分发的耗时任务,我们将创建一个返回Fibonacci 斐波那契 数字的虚拟RPC服务

为了说明如何使用RPC服务,我们将创建一个简单的客户端类。 它将公开一个名为Call的方法,该方法发送一个RPC请求并阻塞,直到收到答案为止。


请求服务器(生产者,返回一个斐波那契数字)

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

class RPCServer
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: "rpc_queue", durable: false,
              exclusive: false, autoDelete: false, arguments: null);
            channel.BasicQos(0, 1, false);

            var consumer = new EventingBasicConsumer(channel);
            channel.BasicConsume(queue: "rpc_queue",
              autoAck: false, consumer: consumer);
            Console.WriteLine(" [x] Awaiting RPC requests");

            consumer.Received += (model, ea) =>
            {
                string response = null;

                var body = ea.Body;
                var props = ea.BasicProperties;
                var replyProps = channel.CreateBasicProperties();
                replyProps.CorrelationId = props.CorrelationId;

                try
                {
                    var message = Encoding.UTF8.GetString(body);
                    int n = int.Parse(message);
                    Console.WriteLine(" [.] fib({0})", message);
                    response = fib(n).ToString();
                }
                catch (Exception e)
                {
                    Console.WriteLine(" [.] " + e.Message);
                    response = "";
                }
                finally
                {
                    var responseBytes = Encoding.UTF8.GetBytes(response);
                    channel.BasicPublish(exchange: "", routingKey: props.ReplyTo,
                      basicProperties: replyProps, body: responseBytes);
                    channel.BasicAck(deliveryTag: ea.DeliveryTag,
                      multiple: false);
                }
            };

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
    private static int fib(int n)
    {
        if (n == 0 || n == 1)
        {
            return n;
        }

        return fib(n - 1) + fib(n - 2);
    }
}

代码比较好理解的,fib是一个 返回斐波那契数字的,这里不考虑数字是否是正整数了。

自己创建一个接收请求的队列,名字叫rpc_queue,手动应答,处理完成,再应答完成。

然后收到一个消息后,处理,中间有个约定的CorrelationId 写上去,

然后 在往这个RouteKey写上 返回值的一些信息。


消费者:

using System;
using System.Collections.Concurrent;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

public class RpcClient
{
    private readonly IConnection connection;
    private readonly IModel channel;
    private readonly string replyQueueName;
    private readonly EventingBasicConsumer consumer;
    private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>();
    private readonly IBasicProperties props;

    public RpcClient()
    {
        var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 };

        connection = factory.CreateConnection();
        channel = connection.CreateModel();
        replyQueueName = channel.QueueDeclare().QueueName;
        consumer = new EventingBasicConsumer(channel);

        props = channel.CreateBasicProperties();
        var correlationId = Guid.NewGuid().ToString();
        props.CorrelationId = correlationId;
        props.ReplyTo = replyQueueName;

        consumer.Received += (model, ea) =>
        {
            var body = ea.Body;
            var response = Encoding.UTF8.GetString(body);
            if (ea.BasicProperties.CorrelationId == correlationId)
            {
                respQueue.Add(response);
            }
        };
    }

    public string Call(string message)
    {
        var messageBytes = Encoding.UTF8.GetBytes(message);
        channel.BasicPublish(
            exchange: "",
            routingKey: "rpc_queue",
            basicProperties: props,
            body: messageBytes);

        channel.BasicConsume(
            consumer: consumer,
            queue: replyQueueName,
            autoAck: true);

        return respQueue.Take(); 
    }

    public void Close()
    {
        connection.Close();
    }
}

public class Rpc
{
    public static void Main()
    {
        var rpcClient = new RpcClient();

        Console.WriteLine(" [x] Requesting fib(30)");
        var response = rpcClient.Call("30");

        Console.WriteLine(" [.] Got '{0}'", response);
        rpcClient.Close();
        Console.ReadLine();
    }
}

以上的调用没有进行类型判断响应,比如输入的是否合法,如果服务器没有运行等,这里是最简单的调用示例。

客户端是否应该有超时设计等。

比如服务器处理发生异常,是否应该转发回客户端,说没处理好。



此处介绍的设计并不是RPC服务的唯一可能实现,但它具有一些重要优势:

如果RPC服务器太慢,您可以通过运行另一个服务器来扩展。 尝试在新控制台中运行第二个RPCServer。

在客户端,RPC只需要发送和接收一条消息。 不需要像QueueDeclare这样的同步调用。 因此,对于单个RPC请求,RPC客户端只需要一次网络往返。



测试:

先运行 服务端,服务端等待 消费者的请求,然后接收到,处理返回给  消费者。

====================www.ayjs.net       杨洋    wpfui.com        ayui      ay  aaronyang=======请不要转载谢谢了。=========

到此,基本的类库调用 AY讲解完了。


后面讲解点运维方面的。基本场景。

推荐您阅读更多有关于“RabbitMQ,”的文章

猜你喜欢

额 本文暂时没人评论 来添加一个吧

发表评论

必填

选填

选填

必填

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

  查看权限

抖音号:wpfui,可以看到我的很多作品效果

AYUI8社区版Github地址:前往获取

作者:杨洋(AaronYang简称AY,安徽六安人)目前是个人,还没公司AY唯一QQ:875556003和AY交流

高中学历,2015年1月17日开始,兴趣学习研究WPF

声明:AYUI7个人与商用免费,源码可购买。部分DEMO不免费.AY主要靠卖技术服务挣钱

不是从我处购买的ayui7源码,我不提供任何技术服务,如果你举报从哪里买的,我可以帮你转正为我的客户,并送demo

查看捐赠

AYUI7.X MVC教程 更新如下:

第一课 第二课 程序加密教程

兼容XP到win10,vs2015/2019,最新AYUI:7.6.5.2

vs2015 企业版密钥HM6NR-QXX7C-DFW2Y-8B82K-WTYJV

vs2017 企业版密钥NJVYC-BMHX2-G77MM-4XJMR-6Q8QF

标签列表