故如虹,知恩;故如月,知明
排名
6
文章
6
粉丝
16
评论
8
{{item.articleTitle}}
{{item.blogName}} : {{item.content}}
ICP备案 :渝ICP备18016597号-1
网站信息:2018-2024TNBLOG.NET
技术交流:群号656732739
联系我们:contact@tnblog.net
欢迎加群交流技术

net core使用消息队列rabbitmq

7983人阅读 2019/3/19 22:16 总访问:3837202 评论:0 收藏:0 手机
分类: 消息队列


使用NuGet下载rabbitmq客户端工具

Install-Package RabbitMQ.Client -Version 3.6.5



生产消息

 static void Main(string[] args)
        {
            //1:创建一个连接的工厂类  Port= 5672(端口号)
            ConnectionFactory connectionFactory = new ConnectionFactory() { HostName = "localhost", UserName = "guest", Password = "guest", Port = 5672 };
            //创建连接
            using (var conn = connectionFactory.CreateConnection())
            {
                //创建通道
                using (var channel = conn.CreateModel())
                {
                    //创建一个交换机
                    channel.ExchangeDeclare("MYExchange", "direct");

                    //创建一个队列
                    channel.QueueDeclare("AJ", false, false, false, null);

                    //把交换机和队列进行关联
                    channel.QueueBind("AJ", "MYExchange", "routing_key");

                    //生产消息(循环生产3条)
                    for (int i = 1; i <= 3; i++)
                    {
                        byte[] bytes = Encoding.UTF8.GetBytes("iphone" + i);
                        channel.BasicPublish("MYExchange", "routing_key", null, bytes);
                    }

                }
            }
            Console.WriteLine("消息生产完成");
            Console.ReadLine();
        }

注意默认的连接端口是5672,默认的web工具端口是15672不要搞混了,如果使用15672用代码去连接是不行的


几个步骤基本上对应网页的几个



消息生产成功后就能看到,我们创建的交换机

以及队列


以及队列里边存放的消息条数

当然交换机和队列,不需要每次添加消息的时候都创建,创建一次后面只添加消息就行了



消息消费(及时接收)

通过事件关联的方式,当有消息进来的时候可以及时的消费掉

  public class ArticleRedisMQ
    {
        public void GetMsg()
        {
            //连接配置(类似连接字符串)
            ConnectionFactory rabbitFactory = new ConnectionFactory() { HostName = "127.0.0.1", UserName = "guest", Password = "guest", Port = 5672 };

            //打开连接
            using (IConnection conn = rabbitFactory.CreateConnection())
            {
                //获取一个通道
                using (IModel channel = conn.CreateModel())
                {
                    //获取客户端的关联
                    EventingBasicConsumer custom = new EventingBasicConsumer(channel);

                    //绑定一个接收事件(及时接收消息)
                    custom.Received += Custom_Received;

                    //开始接收消息(自动应答为false,获取消息后不会自动删除,自己控制消息是否删除)
                    channel.BasicConsume("topic.justin.queue", noAck: false, consumer: custom);

                    Console.WriteLine("按任意值,退出程序");
                    Console.ReadKey();
                }
            }

        }

        private void Custom_Received(object sender, BasicDeliverEventArgs e)
        {
            var msgBody = e.Body;
            string msg = Encoding.UTF8.GetString(msgBody);
            Console.WriteLine(msg);
        }
    }


手动确认:

首先自定应答设置成false:     noAck:fasle

//开始接收消息(自动应答为false,获取消息后不会自动删除,自己控制消息是否删除)
channel.BasicConsume("RRedis", noAck: false, consumer: custom);

然后手动调用方法BasicAck完成应答

//处理完成,告诉Broker可以服务端可以删除消息,分配新的消息过来
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);








欢迎加群讨论技术,群:677373950(满了,可以加,但通过不了),2群:656732739

评价