时间:2016年06月01日 | 作者 : aaronyang | 分类 : C#开发 | 浏览: 2189次 | 评论 0 人
(国内第一个rabbitMQ中文 系列讲解------引进外国技术,我是AY,安徽合肥的杨洋,1991年。我学习技术,我开心)
为了给自己站点增加流量,望各位抓包的,手下留情,自己去外国下载书籍,回来翻译着看,也不容易。谢谢你了。
====================www.ayjs.net 杨洋 wpfui.com ayui ay aaronyang=======请不要转载谢谢了。
前面的5个例子中,都是单向操作,生产者把数据放到队列,未来,然后消费者再从队列中取出数据处理它们。
在本章也是最后一讲,说的是,post一个消息到队列,消费者获得消息,然后可以给原始的生产者发出的队列进行post back
这个例子是2个项目,客户端发送个payment消息到队列,每个消息post成功后,应用程序会从replay queue中等待一个回复。这是一个同步的操作,server端收到后,然后就会向客户端等待的那个reply queue,进行回复,只有当客户端收到回复后,客户端才会post另外一个消息
当一个message从客户端发送到服务器,CorrelationId就会生成,并添加到message的属性中去了。同样CorrelationId也会在回复的message对象中。这样你就能从很多的原始message中检索到了(如果这些message你存储下来了)。
图65讲解:客户端post个message到rpc_queue中去,生成correlation_id为12345,服务器收到后,post个消息到reply queue ,id也是12345.
先看客户端代码Client:新建解决方案文件夹6. RPC 然后Client控制台项目RPCClient
using Common; using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace RPCClient { class Program { private static IConnection _connection; private static IModel _channel; private static string _replyQueueName; private static QueueingBasicConsumer _consumer; static void Main() { SetupClient(); MakePayment(new Payment { AmountToPay = 25.0m, CardNumber = "1234123412341234", Name = "Mr F Bloggs" }); MakePayment(new Payment { AmountToPay = 5.0m, CardNumber = "1234123412341234", Name = "Mr D Wibble" }); MakePayment(new Payment { AmountToPay = 225.0m, CardNumber = "1234123412341234", Name = "Mr B Smith" }); MakePayment(new Payment { AmountToPay = 255.0m, CardNumber = "1234123412341234", Name = "Mr S Jones" }); MakePayment(new Payment { AmountToPay = 255.0m, CardNumber = "1234123412341234", Name = "Mr A Dibbles" }); MakePayment(new Payment { AmountToPay = 125.0m, CardNumber = "1234123412341234", Name = "Mr H Howser" }); MakePayment(new Payment { AmountToPay = 27.0m, CardNumber = "1234123412341234", Name = "Mr J Jupiter" }); MakePayment(new Payment { AmountToPay = 925.0m, CardNumber = "1234123412341234", Name = "Mr Z Zimzibar" }); MakePayment(new Payment { AmountToPay = 325.0m, CardNumber = "1234123412341234", Name = "Mr G Goggie" }); MakePayment(new Payment { AmountToPay = 925.0m, CardNumber = "1234123412341234", Name = "Mr U Bloggs" }); } private static void SetupClient() { var factory = new ConnectionFactory { HostName = "localhost", UserName = "guest", Password = "guest" }; _connection = factory.CreateConnection(); _channel = _connection.CreateModel(); _replyQueueName = _channel.QueueDeclare(); _consumer = new QueueingBasicConsumer(_channel); _channel.BasicConsume(_replyQueueName, true, _consumer); } public static string MakePayment(Payment payment) { var corrId = Guid.NewGuid().ToString(); var props = _channel.CreateBasicProperties(); props.ReplyTo = _replyQueueName; props.CorrelationId = corrId; _channel.BasicPublish("", "rpc_queue", props, payment.Serialize()); while (true) { Console.WriteLine("----------------------------------------------------------"); Console.WriteLine("Payment Made for Card : {0}, for £{1}", payment.CardNumber, payment.AmountToPay); Console.WriteLine("Correlation ID = {0}", corrId); var ea = _consumer.Queue.Dequeue(); if (ea.BasicProperties.CorrelationId != corrId) continue; var authCode = Encoding.UTF8.GetString(ea.Body); Console.WriteLine("Reply Auth Code : {0}", authCode); Console.WriteLine("----------------------------------------------------------"); Console.WriteLine(""); return authCode; } } } }
其中代码: ====================www.ayjs.net 杨洋 wpfui.com ayui ay aaronyang=======请不要转载谢谢了。=========
private static void SetupClient() { var factory = new ConnectionFactory { HostName = "localhost", UserName = "guest", Password = "guest" }; _connection = factory.CreateConnection(); _channel = _connection.CreateModel(); _replyQueueName = _channel.QueueDeclare(); _consumer = new QueueingBasicConsumer(_channel); _channel.BasicConsume(_replyQueueName, true, _consumer); }
建立一个队列,没有指定名字,用于回复用的,相当于客户端也是写了服务端的代码而已
MakePayment方法中,
设置消息的属性,也就是BasicPublish的第三个参数,把correlateid加入,方便回复时候用来判断
var corrId = Guid.NewGuid().ToString(); var props = _channel.CreateBasicProperties(); props.ReplyTo = _replyQueueName; props.CorrelationId = corrId;
BasicPublish开启自动应答机制,一直循环,接收服务端的消息
如果接收到服务器的反馈的BasicProperties.CorrelationId为自己发送过去的时候的CorrelationId,就输出内容,结束循环。
_channel.BasicPublish("", "rpc_queue", props, payment.Serialize());
客户端把消息加入队列,给服务端消费。
接下来看下服务端代码
using Common; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Globalization; using System.Linq; using System.Text; using System.Threading.Tasks; namespace RPCServer { class Program { private static ConnectionFactory _factory; private static IConnection _connection; private static IModel _channel; private static QueueingBasicConsumer _consumer; private static Random _rnd; private static void Main() { CreateConnection(); Console.WriteLine("Awaiting Remote Procedure Call Requests"); while (true) { GetMessageFromQueue(); } } private static void GetMessageFromQueue() { string response = null; var ea = _consumer.Queue.Dequeue(); var props = ea.BasicProperties; var replyProps = _channel.CreateBasicProperties(); replyProps.CorrelationId = props.CorrelationId; Console.WriteLine("----------------------------------------------------------"); try { response = MakePayment(ea); Console.WriteLine("Correlation ID = {0}", props.CorrelationId); } catch (Exception ex) { Console.WriteLine(" ERROR : " + ex.Message); response = ""; } finally { if (response != null) { var responseBytes = Encoding.UTF8.GetBytes(response); _channel.BasicPublish("", props.ReplyTo, replyProps, responseBytes); } _channel.BasicAck(ea.DeliveryTag, false); } Console.WriteLine("----------------------------------------------------------"); Console.WriteLine(""); } private static string MakePayment(BasicDeliverEventArgs ea) { var payment = (Payment)ea.Body.DeSerialize(); var response = _rnd.Next(1000, 100000000).ToString(CultureInfo.InvariantCulture); Console.WriteLine("Payment - {0} : £{1} : Auth Code <{2}> ", payment.CardNumber, payment.AmountToPay, response); return response; } private static void CreateConnection() { _factory = new ConnectionFactory { HostName = "localhost", UserName = "guest", Password = "guest" }; _connection = _factory.CreateConnection(); _channel = _connection.CreateModel(); _channel.QueueDeclare("rpc_queue", false, false, false, null); _channel.BasicQos(0, 1, false); _consumer = new QueueingBasicConsumer(_channel); _channel.BasicConsume("rpc_queue", false, _consumer); _rnd = new Random(); } } }
讲解代码:
private static void Main() { CreateConnection(); Console.WriteLine("Awaiting Remote Procedure Call Requests"); while (true) { GetMessageFromQueue(); } }
创建连接
private static void CreateConnection() { _factory = new ConnectionFactory { HostName = "localhost", UserName = "guest", Password = "guest" }; _connection = _factory.CreateConnection(); _channel = _connection.CreateModel(); _channel.QueueDeclare("rpc_queue", false, false, false, null); _channel.BasicQos(0, 1, false); _consumer = new QueueingBasicConsumer(_channel); _channel.BasicConsume("rpc_queue", false, _consumer); _rnd = new Random(); }
获得客户端创建的rpc_queue队列,不持久化,不自动删除。
_channel.BasicQos(0, 1, false);
传递参数为prefetchCount = 1。这样告诉RabbitMQ不要在同一时间给一个消费者超过一条消息
服务器也是消费者,消费客户端创建的队列消息
开始消费,false,开启手动应答模式。
开始后,服务器读取客户端rpc_queue队列消息,消息自带BasicProperties属性,里面有CorrelationId属性,
然后服务端显示来自客户端队列中的消息MakePayment方法。
显示完后,服务端向客户端提供的第二个队列BasicProperties.ReplyTo这个名字的队列中,发布消息,客户端此时一直在消费ReplyTo这个名字的队列的,一旦服务器发布,那么客户端就消费了,等于响应了。
if (response != null) { var responseBytes = Encoding.UTF8.GetBytes(response); _channel.BasicPublish("", props.ReplyTo, replyProps, responseBytes); }
最后服务器写一行代码
_channel.BasicAck(ea.DeliveryTag, false);
代表,服务器我处理完成了。
在客户端那边
一直在处理服务器的消息。这个demo就是 服务端和客户端既是消费者也是发布者。
运行看效果,先服务端开启,等待客户端发送消息,然后客户端启动后,服务端接受到消息,开始响应。
关与RabbitMQ
这本书(纯英文)我就讲解完了,其中,有很多是自己百度,觉得很好理解的话,我放到了自己的博客中,方便理解,整体来说,RabbitMQ还是很理想的。关于RabbitMQ集群相关的技术,下次再讲了。
谢谢大家!! See you next time!!
AY 2016-6-1 15:02:48(花费3天阅读完)
====================www.ayjs.net 杨洋 wpfui.com ayui ay aaronyang=======请不要转载谢谢了。=========
推荐您阅读更多有关于“RabbitMQ,”的文章
抖音:wpfui 工作wpf,兴趣学习flutter
目前在合肥市某公司上班,已经厌弃,如果你的公司看的上我,加我QQ私聊
AYUI8全源码 Github地址:前往获取
杨洋(AaronYang简称AY,安徽六安人)和AY交流
高中学历,2010年开始web开发,2015年1月17日开始学习WPF
声明:AYUI7个人与商用免费,源码可购买。部分DEMO不免费
不是从我处购买的ayui7源码,我不提供任何技术服务,如果你举报从哪里买的,我可以帮你转正为我的客户,并送demo
查看捐赠AYUI7.X MVC教程 更新如下:
第一课 第二课 程序加密教程
额 本文暂时没人评论 来添加一个吧
发表评论