当前位置:网站首页 / C#开发 / 正文

ay的RabbitMQ研究报告-第7章-C#代码实战[1]-BasicQueue

时间:2016年05月31日 | 作者 : aaronyang | 分类 : C#开发 | 浏览: 1455次 | 评论 0

(国内第一个rabbitMQ中文 系列讲解------引进外国技术,我是AY,安徽合肥的杨洋,1991年。我学习技术,我开心)

为了给自己站点增加流量,望各位抓包的,手下留情,自己去外国下载书籍,回来翻译着看,也不容易。谢谢你了。


====================www.ayjs.net 杨洋 wpfui.com ayui ay aaronyang=======请不要转载谢谢了。

总共6个例子

Basic queues

Worker queues

Publisher and subscribers
Direct routing of queues

Topic-based publisher and subscribers

Remote procedure calls



RabbitMQ Client Library

ay的这些报告,并不会太深入地讲客户端的库的使用,如果你想深入,可以看官网:地址

C#的RabbitMQ官网地址:地址

这里我使用.NET4.5 的库来开发,因为功能更多点:下载地址

blob.png

客户端类库包含什么?

IModel: 表示了1个AMQP 0-9-1 的channel,提供了很多操作(协议方法)

IConnection:表示1个AMQP 0-9-1连接Connection

ConnectionFactory:构造IConnection实例

IBasicConsumer:表示一个 消息的消费者


其他有用的接口和类包括:

DefaultBasicConsumer:消费者的公共使用的父类

ConnectionParamters: 对ConnectionFactory的配置

QueueingBasicConsumer: 接收来自服务器的消息


除了RabbitMQ.Client的,其他Public的命名空间

RabbitMQ.Client.Events:各种事件定义和事件处理Handler,包括EventingBasicConsumer,使用C#事件handlers实现的消费者。

RabbitMQ.Client.Exceptions:对用户可见的Exception


====================www.ayjs.net       杨洋    wpfui.com        ayui      ay  aaronyang=======请不要转载谢谢了。=========


基本工作

新建一个AYRabbitMQDEMO的解决方案

blob.png

新建1个Common的类库,加上几个我们必须用到的实体,就是消息的结构,这里我们新建一个Payment类

注意顶部标记Serializable,表示可以序列化的

blob.png

第一个AmountToPay代表支付金额

第二个CardNumber,储蓄卡或者信用卡金额

第三个Name,是支付人的姓名。


新建第二个类

blob.png

AmountToPay代表   支付金额

PoNumber代表,订单号

CompanyName,发起订单的公司名称

PaymentDayTerms, 多少天后,订单会被支付,是个整型



新建第三个类ObjectSerialize.cs

把对象序列化为二进制的帮助类

using System;
using System.IO;
using System.IO.Compression;
using System.Runtime.Serialization.Formatters.Binary;

namespace Common
{
    public static class ObjectSerialize
    {
        public static byte[] Serialize(this Object obj)
        {
            if (obj == null)
            {
                return null;
            }
            using (var memoryStream = new MemoryStream())
            {
                var binaryFormatter = new BinaryFormatter();
                binaryFormatter.Serialize(memoryStream, obj);
                var compressed = Compress(memoryStream.ToArray());
                return compressed;
            }
        }
        public static Object DeSerialize(this byte[] arrBytes)
        {
            using (var memoryStream = new MemoryStream())
            {
                var binaryFormatter = new BinaryFormatter();
                var decompressed = Decompress(arrBytes);
                memoryStream.Write(decompressed, 0, decompressed.Length);
                memoryStream.Seek(0, SeekOrigin.Begin);
                return binaryFormatter.Deserialize(memoryStream);
            }
        }
        private static byte[] Compress(byte[] input)
        {
            byte[] compressesData;
            using (var outputStream = new MemoryStream())
            {
                using (var zip = new GZipStream(outputStream,
                CompressionMode.Compress))
                {
                    zip.Write(input, 0, input.Length);
                }
                compressesData = outputStream.ToArray();
            }
            return compressesData;
        }
        private static byte[] Decompress(byte[] input)
        {
            byte[] decompressedData;
            using (var outputStream = new MemoryStream())
            {
                using (var inputStream = new MemoryStream(input))
                {
                    using (var zip = new GZipStream(inputStream,
                    CompressionMode.Decompress))
                    {
                        zip.CopyTo(outputStream);
                    }
                }
                decompressedData = outputStream.ToArray();
            }
            return decompressedData;
        }
    }
}


新建个控制台程序,测试序列化类

新建一个解决方案文件夹

TestCommon,新建类库,然后引用Common

blob.png

添加测试代码:

  Payment payment1 = new Payment
            {
                AmountToPay = 25.0m,
                CardNumber = "ay1314520"
            };
            byte[] serialized = payment1.Serialize();
            Payment payment_deserialized = serialized.DeSerialize() as Payment;
            Console.WriteLine(payment_deserialized.CardNumber);

blob.png


====================www.ayjs.net       杨洋    wpfui.com        ayui      ay  aaronyang=======请不要转载谢谢了。=========


Basic Queue

新建一个空的解决方案文件夹,取名 1. Basic Queue

然后新建1个控制台程序AyBasicQueue,引用Common

这个例子,是生产者和消费者在一起,方便测试,正常情况下是分开的。

using Common;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace AyBasicQueue
{
    class Program
    {
        static void Main(string[] args)
        {
            var payment1 = new Payment { AmountToPay = 25.0m, CardNumber = "1234123412341234"};
            var payment2 = new Payment { AmountToPay = 5.0m, CardNumber = "1234123412341234"};
            var payment3 = new Payment { AmountToPay = 2.0m, CardNumber = "1234123412341234"};
            var payment4 = new Payment { AmountToPay = 17.0m, CardNumber = "1234123412341234"};
            var payment5 = new Payment { AmountToPay = 300.0m, CardNumber ="1234123412341234"};
            var payment6 = new Payment { AmountToPay = 350.0m, CardNumber = "1234123412341234"};
            var payment7 = new Payment { AmountToPay = 295.0m, CardNumber ="1234123412341234"};
            var payment8 = new Payment { AmountToPay = 5625.0m, CardNumber ="1234123412341234"};
            var payment9 = new Payment { AmountToPay = 5.0m, CardNumber = "1234123412341234"};
            var payment10 = new Payment { AmountToPay = 12.0m, CardNumber = "1234123412341234" };


            Console.ReadLine();
        }
    }
}


添加RabbitMQ客户端

blob.png

输入 Install-Package RabbitMQ.Client

blob.png

然后添加引用,在项目packages文件夹里面

blob.png


然后

创建1个队列,创建1个工厂,通过工厂创建连接,通过连接,打开通道IModel(channel),通过通道,载入指定名字,创建队列,创建队列有参数

代码

 class Program
    {
        private static ConnectionFactory _factory;
        private static IConnection _connection;
        private static IModel _model;
        private const string QueueName = "StandardQueue_ExampleQueue";

        private static void CreateQueue()
        {
            _factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "guest",
                Password = "guest"
            };
            _connection = _factory.CreateConnection();
            _model = _connection.CreateModel();
            _model.QueueDeclare(QueueName, true, false, false, null);
        }

        static void Main(string[] args)
        {
        
        }
  }  
}

blob.png

第二个参数是持久化的意思,true就是持久化队列了。


发送消息

    private static void SendMessage(Payment message)
        {
            _model.BasicPublish("", QueueName, null, message.Serialize());
            Console.WriteLine(" [x] Payment Message Sent : {0} : {1}", message.CardNumber,message.AmountToPay);
        }

blob.png

我们调用了_model的BasicPublish方法,第一个参数,是指定exchange的名字,这里我们没有创建自己的exchange,所以不要指定,第二个参数,把消息发送到哪个队列上,第三个属性是 传入队列的其他值:比如   相关ID,回复地址,这个例子我们不需要的。最后1个参数就是我们的消息。这个消息要是byte[]的,这里我们已经拓展方法写了个Serialize()方法了。


第三块代码:

   public static void Recieve()
        {
            var consumer = new QueueingBasicConsumer(_model);
            var msgCount = GetMessageCount(_model, QueueName);
            _model.BasicConsume(QueueName, true, consumer);
            var count = 0;
            while (count < msgCount)
            {
                var message = (Payment)consumer.Queue.Dequeue().Body.DeSerialize();
                Console.WriteLine("----- Received {0} : {1}", message.CardNumber,
                message.AmountToPay);
                count++;
            }
        }
        private static uint GetMessageCount(IModel channel, string queueName)
        {
            var results = channel.QueueDeclare(queueName, true, false, false, null);
            return results.MessageCount;
        }

获得指定channel下的 消息数量没什么问题,获取数量,你需要重新声明队列。在RabbitMQ中声明队列是幂等操作,就是存在就重新使用那个存在的,不存在就重新创建1个新的给你使用。类似单例模式。

我们直接看下Recieve方法

创建消费者,通道消费开启。

blob.png

消费后,消费者队列,出列,然后Body反序列化,拿到发送的payment消息,然后输出结果。

blob.png


完整Program代码如下:

using Common;
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace AyBasicQueue
{
    class Program
    {
        private static ConnectionFactory _factory;
        private static IConnection _connection;
        private static IModel _model;
        private const string QueueName = "StandardQueue_ExampleQueue";

        private static void CreateQueue()
        {
            _factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "guest",
                Password = "guest"
            };
            _connection = _factory.CreateConnection();
            _model = _connection.CreateModel();
            _model.QueueDeclare(QueueName, true, false, false, null);
        }
        private static void SendMessage(Payment message)
        {
            _model.BasicPublish("", QueueName, null, message.Serialize());
            Console.WriteLine(" [x] Payment Message Sent : {0} : {1}", message.CardNumber, message.AmountToPay);
        }

        public static void Recieve()
        {
            var consumer = new QueueingBasicConsumer(_model);
            var msgCount = GetMessageCount(_model, QueueName);
            _model.BasicConsume(QueueName, true, consumer);
            var count = 0;
            while (count < msgCount)
            {
                var message = (Payment)consumer.Queue.Dequeue().Body.DeSerialize();
                Console.WriteLine("----- Received {0} : {1}", message.CardNumber,message.AmountToPay);
                count++;
            }
        }

        private static uint GetMessageCount(IModel channel, string queueName)
        {
            var results = channel.QueueDeclare(queueName, true, false, false, null);
            return results.MessageCount;
        }

        static void Main(string[] args)
        {
            var payment1 = new Payment { AmountToPay = 25.0m, CardNumber = "1234123412341234" };
            var payment2 = new Payment { AmountToPay = 5.0m, CardNumber = "1234123412341234" };
            var payment3 = new Payment { AmountToPay = 2.0m, CardNumber = "1234123412341234" };
            var payment4 = new Payment { AmountToPay = 17.0m, CardNumber = "1234123412341234" };
            var payment5 = new Payment { AmountToPay = 300.0m, CardNumber = "1234123412341234" };
            var payment6 = new Payment { AmountToPay = 350.0m, CardNumber = "1234123412341234" };
            var payment7 = new Payment { AmountToPay = 295.0m, CardNumber = "1234123412341234" };
            var payment8 = new Payment { AmountToPay = 5625.0m, CardNumber = "1234123412341234" };
            var payment9 = new Payment { AmountToPay = 5.0m, CardNumber = "1234123412341234" };
            var payment10 = new Payment { AmountToPay = 12.0m, CardNumber = "1234123412341234" };

            CreateQueue();
            SendMessage(payment1);
            SendMessage(payment2);
            SendMessage(payment3);
            SendMessage(payment4);
            SendMessage(payment5);
            SendMessage(payment6);
            SendMessage(payment7);
            SendMessage(payment8);
            SendMessage(payment9);
            SendMessage(payment10);
            Recieve();


            Console.WriteLine("www.ayjs.net 杨洋");

            Console.ReadLine();
        }
    }
}


关于VS2013中提示

blob.png

这个先不管,后期我们再使用新特性。先了解




====================www.ayjs.net       杨洋    wpfui.com        ayui      ay  aaronyang=======请不要转载谢谢了。=========




推荐您阅读更多有关于“RabbitMQ,”的文章

猜你喜欢

额 本文暂时没人评论 来添加一个吧

发表评论

必填

选填

选填

必填

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

  查看权限

抖音号:wpfui,可以看到我的很多作品效果

AYUI8社区版Github地址:前往获取

作者:杨洋(AaronYang简称AY,安徽六安人)目前是个人,还没公司AY唯一QQ:875556003和AY交流

高中学历,2015年1月17日开始,兴趣学习研究WPF

声明:AYUI7个人与商用免费,源码可购买。部分DEMO不免费.AY主要靠卖技术服务挣钱

不是从我处购买的ayui7源码,我不提供任何技术服务,如果你举报从哪里买的,我可以帮你转正为我的客户,并送demo

查看捐赠

AYUI7.X MVC教程 更新如下:

第一课 第二课 程序加密教程

兼容XP到win10,vs2015/2019,最新AYUI:7.6.5.2

vs2015 企业版密钥HM6NR-QXX7C-DFW2Y-8B82K-WTYJV

vs2017 企业版密钥NJVYC-BMHX2-G77MM-4XJMR-6Q8QF

标签列表