当前位置:网站首页 / C#开发 / 正文

ay的RabbitMQ研究报告-第7章[完结]-C#代码实战[6]Remote Procedure Call

时间:2016年06月01日 | 作者 : aaronyang | 分类 : C#开发 | 浏览: 2189次 | 评论 0

(国内第一个rabbitMQ中文 系列讲解------引进外国技术,我是AY,安徽合肥的杨洋,1991年。我学习技术,我开心)

为了给自己站点增加流量,望各位抓包的,手下留情,自己去外国下载书籍,回来翻译着看,也不容易。谢谢你了。


====================www.ayjs.net 杨洋 wpfui.com ayui ay aaronyang=======请不要转载谢谢了。


前面的5个例子中,都是单向操作,生产者把数据放到队列,未来,然后消费者再从队列中取出数据处理它们。

在本章也是最后一讲,说的是,post一个消息到队列,消费者获得消息,然后可以给原始的生产者发出的队列进行post back


这个例子是2个项目,客户端发送个payment消息到队列,每个消息post成功后,应用程序会从replay queue中等待一个回复。这是一个同步的操作,server端收到后,然后就会向客户端等待的那个reply queue,进行回复,只有当客户端收到回复后,客户端才会post另外一个消息

blob.png


当一个message从客户端发送到服务器,CorrelationId就会生成,并添加到message的属性中去了。同样CorrelationId也会在回复的message对象中。这样你就能从很多的原始message中检索到了(如果这些message你存储下来了)。

图65讲解:客户端post个message到rpc_queue中去,生成correlation_id为12345,服务器收到后,post个消息到reply queue ,id也是12345.


先看客户端代码Client:新建解决方案文件夹6. RPC 然后Client控制台项目RPCClient

using Common;
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace RPCClient
{
    class Program
    {
        private static IConnection _connection;
        private static IModel _channel;
        private static string _replyQueueName;
        private static QueueingBasicConsumer _consumer;
        static void Main()
        {
            SetupClient();

            MakePayment(new Payment
            {
                AmountToPay = 25.0m,
                CardNumber =
                    "1234123412341234",
                Name = "Mr F Bloggs"
            });
            MakePayment(new Payment
            {
                AmountToPay = 5.0m,
                CardNumber =
                    "1234123412341234",
                Name = "Mr D Wibble"
            });
            MakePayment(new Payment
            {
                AmountToPay = 225.0m,
                CardNumber =
                    "1234123412341234",
                Name = "Mr B Smith"
            });
            MakePayment(new Payment
            {
                AmountToPay = 255.0m,
                CardNumber =
                    "1234123412341234",
                Name = "Mr S Jones"
            });
            MakePayment(new Payment
            {
                AmountToPay = 255.0m,
                CardNumber =
                    "1234123412341234",
                Name = "Mr A Dibbles"
            });
            MakePayment(new Payment
            {
                AmountToPay = 125.0m,
                CardNumber =
                    "1234123412341234",
                Name = "Mr H Howser"
            });
            MakePayment(new Payment
            {
                AmountToPay = 27.0m,
                CardNumber =
                    "1234123412341234",
                Name = "Mr J Jupiter"
            });
            MakePayment(new Payment
            {
                AmountToPay = 925.0m,
                CardNumber =
                    "1234123412341234",
                Name = "Mr Z Zimzibar"
            });
            MakePayment(new Payment
            {
                AmountToPay = 325.0m,
                CardNumber =
                    "1234123412341234",
                Name = "Mr G Goggie"
            });
            MakePayment(new Payment
            {
                AmountToPay = 925.0m,
                CardNumber =
                    "1234123412341234",
                Name = "Mr U Bloggs"
            });
        }
        private static void SetupClient()
        {
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName =
                    "guest",
                Password = "guest"
            };
            _connection = factory.CreateConnection();
            _channel = _connection.CreateModel();
            _replyQueueName = _channel.QueueDeclare();
            _consumer = new QueueingBasicConsumer(_channel);
            _channel.BasicConsume(_replyQueueName, true, _consumer);
        }
        public static string MakePayment(Payment payment)
        {
            var corrId = Guid.NewGuid().ToString();
            var props = _channel.CreateBasicProperties();
            props.ReplyTo = _replyQueueName;
            props.CorrelationId = corrId;
            _channel.BasicPublish("", "rpc_queue", props, payment.Serialize());
            while (true)
            {
                Console.WriteLine("----------------------------------------------------------");
                Console.WriteLine("Payment Made for Card : {0}, for £{1}", payment.CardNumber, payment.AmountToPay);
                Console.WriteLine("Correlation ID = {0}", corrId);
                var ea = _consumer.Queue.Dequeue();
                if (ea.BasicProperties.CorrelationId != corrId) continue;
                var authCode = Encoding.UTF8.GetString(ea.Body);
                Console.WriteLine("Reply Auth Code : {0}", authCode);


                Console.WriteLine("----------------------------------------------------------");
                Console.WriteLine("");
                return authCode;
            }
        }
    }
}

其中代码:   ====================www.ayjs.net       杨洋    wpfui.com        ayui      ay  aaronyang=======请不要转载谢谢了。=========

 private static void SetupClient()
        {
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName =
                    "guest",
                Password = "guest"
            };
            _connection = factory.CreateConnection();
            _channel = _connection.CreateModel();
            _replyQueueName = _channel.QueueDeclare();
            _consumer = new QueueingBasicConsumer(_channel);
            _channel.BasicConsume(_replyQueueName, true, _consumer);
        }

建立一个队列,没有指定名字,用于回复用的,相当于客户端也是写了服务端的代码而已

MakePayment方法中,

设置消息的属性,也就是BasicPublish的第三个参数,把correlateid加入,方便回复时候用来判断

     var corrId = Guid.NewGuid().ToString();
            var props = _channel.CreateBasicProperties();
            props.ReplyTo = _replyQueueName;
            props.CorrelationId = corrId;

BasicPublish开启自动应答机制,一直循环,接收服务端的消息

如果接收到服务器的反馈的BasicProperties.CorrelationId为自己发送过去的时候的CorrelationId,就输出内容,结束循环。

 _channel.BasicPublish("", "rpc_queue", props, payment.Serialize());

客户端把消息加入队列,给服务端消费。




接下来看下服务端代码

using Common;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace RPCServer
{
    class Program
    {
        private static ConnectionFactory _factory;
        private static IConnection _connection;
        private static IModel _channel;
        private static QueueingBasicConsumer _consumer;
        private static Random _rnd;
        private static void Main()
        {
            CreateConnection();
            Console.WriteLine("Awaiting Remote Procedure Call Requests");
            while (true)
            {
                GetMessageFromQueue();
            }
        }
        private static void GetMessageFromQueue()
        {
            string response = null;
            var ea = _consumer.Queue.Dequeue();
            var props = ea.BasicProperties;
            var replyProps = _channel.CreateBasicProperties();
            replyProps.CorrelationId = props.CorrelationId;
            Console.WriteLine("----------------------------------------------------------");
            try
            {
                response = MakePayment(ea);
                Console.WriteLine("Correlation ID = {0}", props.CorrelationId);
            }
            catch (Exception ex)
            {
                Console.WriteLine(" ERROR : " + ex.Message);
                response = "";
            }
            finally
            {
                if (response != null)
                {
                    var responseBytes = Encoding.UTF8.GetBytes(response);
                    _channel.BasicPublish("", props.ReplyTo, replyProps,
                    responseBytes);
                }
                _channel.BasicAck(ea.DeliveryTag, false);
            }
            Console.WriteLine("----------------------------------------------------------");
            Console.WriteLine("");
        }
        private static string MakePayment(BasicDeliverEventArgs ea)
        {
            var payment = (Payment)ea.Body.DeSerialize();
            var response = _rnd.Next(1000, 100000000).ToString(CultureInfo.InvariantCulture);
            Console.WriteLine("Payment - {0} : £{1} : Auth Code <{2}> ",
            payment.CardNumber, payment.AmountToPay, response);
            return response;
        }
        private static void CreateConnection()
        {
            _factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName =
                    "guest",
                Password = "guest"
            };
            _connection = _factory.CreateConnection();
            _channel = _connection.CreateModel();
            _channel.QueueDeclare("rpc_queue", false, false, false, null);
            _channel.BasicQos(0, 1, false);
            _consumer = new QueueingBasicConsumer(_channel);
            _channel.BasicConsume("rpc_queue", false, _consumer);
            _rnd = new Random();
        }
    }
}


讲解代码:

 private static void Main()
        {
            CreateConnection();
            Console.WriteLine("Awaiting Remote Procedure Call Requests");
            while (true)
            {
                GetMessageFromQueue();
            }
        }

创建连接

   private static void CreateConnection()
        {
            _factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName =
                    "guest",
                Password = "guest"
            };
            _connection = _factory.CreateConnection();
            _channel = _connection.CreateModel();
            _channel.QueueDeclare("rpc_queue", false, false, false, null);
            _channel.BasicQos(0, 1, false);
            _consumer = new QueueingBasicConsumer(_channel);
            _channel.BasicConsume("rpc_queue", false, _consumer);
            _rnd = new Random();
        }

获得客户端创建的rpc_queue队列,不持久化,不自动删除。

_channel.BasicQos(0, 1, false);

传递参数为prefetchCount = 1。这样告诉RabbitMQ不要在同一时间给一个消费者超过一条消息

服务器也是消费者,消费客户端创建的队列消息

开始消费,false,开启手动应答模式。


开始后,服务器读取客户端rpc_queue队列消息,消息自带BasicProperties属性,里面有CorrelationId属性,

然后服务端显示来自客户端队列中的消息MakePayment方法。

显示完后,服务端向客户端提供的第二个队列BasicProperties.ReplyTo这个名字的队列中,发布消息,客户端此时一直在消费ReplyTo这个名字的队列的,一旦服务器发布,那么客户端就消费了,等于响应了。

 if (response != null)
                {
                    var responseBytes = Encoding.UTF8.GetBytes(response);
                    _channel.BasicPublish("", props.ReplyTo, replyProps, responseBytes);
                }

最后服务器写一行代码

 _channel.BasicAck(ea.DeliveryTag, false);

代表,服务器我处理完成了。

在客户端那边

blob.png

一直在处理服务器的消息。这个demo就是 服务端和客户端既是消费者也是发布者。



运行看效果,先服务端开启,等待客户端发送消息,然后客户端启动后,服务端接受到消息,开始响应。

2.gif


关与RabbitMQ

blob.png

这本书(纯英文)我就讲解完了,其中,有很多是自己百度,觉得很好理解的话,我放到了自己的博客中,方便理解,整体来说,RabbitMQ还是很理想的。关于RabbitMQ集群相关的技术,下次再讲了。

blob.png

谢谢大家!! See you next time!!

AY 2016-6-1 15:02:48(花费3天阅读完)


====================www.ayjs.net       杨洋    wpfui.com        ayui      ay  aaronyang=======请不要转载谢谢了。=========



推荐您阅读更多有关于“RabbitMQ,”的文章

猜你喜欢

额 本文暂时没人评论 来添加一个吧

发表评论

必填

选填

选填

必填

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

  查看权限

抖音:wpfui 工作wpf,兴趣学习flutter

目前在合肥市某公司上班,已经厌弃,如果你的公司看的上我,加我QQ私聊

AYUI8全源码 Github地址:前往获取

杨洋(AaronYang简称AY,安徽六安人)AY唯一QQ:875556003和AY交流

高中学历,2010年开始web开发,2015年1月17日开始学习WPF

声明:AYUI7个人与商用免费,源码可购买。部分DEMO不免费

不是从我处购买的ayui7源码,我不提供任何技术服务,如果你举报从哪里买的,我可以帮你转正为我的客户,并送demo

查看捐赠

AYUI7.X MVC教程 更新如下:

第一课 第二课 程序加密教程

标签列表