.netcore3.1 RabbitMq MessageTTL,AutoExpire,MaxLength与MaxLengthBytes

.netcore3.1 RabbitMq MessageTTL,AutoExpire,MaxLength与MaxLengthBytes
关于上述的一些特定参数,我们都可以在UI
中找到相关的参数。
Message TTL
简单来说发布到队列的消息在被丢弃之前可以存在的时间(毫秒)。但它的规则也分为Queue TTL
与Message TTL
,区别在于在该队列中所存在的时间与单个消息在队列中存在的时间。
Queue TTL
可以通过使用策略设置message-ttl
参数 或在队列声明时指定相同的参数来为给定队列设置消息TTL。
队列中已存在时间超过配置的TTL的消息被认为已死。请注意,路由到多个队列的消息可以在它所驻留的每个队列中的不同时间死亡,或者根本不消失。一个队列中消息的死亡不会影响其他队列中同一消息的寿命。
服务器保证不发送死消息不会使用basic.deliver
(发送给使用者)或包含在basic.get-ok
响应中(用于一次性提取操作)。此外,服务器将尝试在基于TTL的到期时间或之后删除消息。
TTL参数或策略的值必须是非负整数(0 <= n),以毫秒为单位描述TTL周期。因此,值为1000意味着添加到队列中的消息将在队列中保留1秒钟,或者直到将其传递给使用者为止。参数可以是AMQP 0-9-1类型short-short-int
,short-int
, long-int
或long-long-int
。
使用策略为队列定义消息TTL
要使用策略指定TTL,请将键message-ttl
添加到策略定义中(这会将60
秒的TTL应用于所有队列
。):
rabbitmqctl | rabbitmqctl set_policy TTL “.*” ‘{“message-ttl”:60000}’ —apply-to queues |
rabbitmqctl (Windows) | rabbitmqctl set_policy TTL “.*” “{“”message-ttl””:60000}” —apply-to queues |
Queue TTL Demo
下面我将以一个小Demo为大家演示一下,在下列代码中我将创建一个消息只可存活12
秒的MyTTLQueue
队列
var factory = new ConnectionFactory()
{
HostName = "127.0.0.1",
UserName = "bob",
Password = "bob"
};
// 创建一个链接
using (var connection = factory.CreateConnection())
{
// 创建一个通道
using (var channel = connection.CreateModel())
{
// 创建x-message-ttl为6秒的参数
var arguments = new Dictionary<string,object>();
arguments.Add("x-message-ttl",12000);
// 创建一个消息只可以存活6秒的队列
channel.QueueDeclare(
queue: "MyTTLQueue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: arguments);
// 发布一个新消息
channel.BasicPublish(
exchange: string.Empty,
routingKey: "MyTTLQueue",
basicProperties: null,
body: Encoding.UTF8.GetBytes("新消息")
);
Console.WriteLine("Finish");
Console.ReadLine();
}
}
我们很容易的发现消息在12
秒后将自动被队列清理掉了
Message TTL
通过在发送basic.publish
时在基本AMQP 0-9-1类中 设置expiration
字段,可以基于每个消息指定TTL 。
到期字段的值描述了TTL周期(以毫秒为单位)。适用与x-message-ttl
相同的约束 。由于 到期字段必须是字符串,因此代理将(仅)接受数字的字符串表示形式。
当同时指定了每个队列和每个消息的TTL时,将选择两者之间的较低值。
Message Demo
同样我们在普通的MyMessageQueue
队列中定义只能存活12
秒的消息
var factory = new ConnectionFactory()
{
HostName = "127.0.0.1",
UserName = "bob",
Password = "bob"
};
// 创建一个链接
using (var connection = factory.CreateConnection())
{
// 创建一个通道
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(
queue: "MyMessageQueue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
// 发布一个只能存活12秒的新消息
var props = channel.CreateBasicProperties();
props.Expiration = "12000";
channel.BasicPublish(
exchange: string.Empty,
routingKey: "MyMessageQueue",
basicProperties: props,
body: Encoding.UTF8.GetBytes("新消息")
);
Console.WriteLine("Finish");
Console.ReadLine();
}
}
Auto Expire(队列存活时间)
TTL也可以在队列中设置,而不仅仅是队列内容。队列只有在不使用(例如没有使用方)时,才会在一段时间后过期。
通过将x-expires
参数设置为queue.declare
或设置expires
策略,可以为给定队列设置到期时间 。这控制队列在被自动删除之前可以闲置多长时间。未使用表示该队列没有使用者,该队列最近尚未重新声明(重新声明续订租约),并且至少在有效期限内未调用basic.get
。例如,可将其用于RPC样式的答复队列,在该队列中可以创建许多永远不会耗尽的队列。
服务器保证队列将被删除(如果至少在到期期限内未使用)。无法保证到期时间过后队列将如何迅速删除。服务器重新启动时,持久队列的租约重新开始。
所述的值x-expires
参数或 expires
策略以毫秒来描述有效期限。它必须是一个正整数(与消息TTL不同,它不能为0)。因此,值1000表示将删除1秒钟未使用的队列。
Auto Expire Demo
接着我们写一个可以存活12
秒的队列,作为演示。
var factory = new ConnectionFactory()
{
HostName = "127.0.0.1",
UserName = "bob",
Password = "bob"
};
// 创建一个链接
using (var connection = factory.CreateConnection())
{
// 创建一个通道
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(
queue: "MyExpireQueue",
durable: false,
exclusive: false,
autoDelete: false,
arguments: new Dictionary<string,object>(){
{"x-expires",1000*12}
});
channel.BasicPublish(
exchange: string.Empty,
routingKey: "MyExpireQueue",
basicProperties: null,
body: Encoding.UTF8.GetBytes("新消息")
);
Console.WriteLine("Finish");
Console.ReadLine();
}
}
命令
rabbitmqctl | rabbitmqctl set_policy expiry ".*" '{"expires":12000}' --apply-to queues |
rabbitmqctl (Windows) | rabbitmqctl.bat set_policy expiry ".*" "{""expires"":12000}" --apply-to queues |
MaxLength
通过限制队列中消息数量,多余的消息将会从头部进行删除。
MaxLength Demo
我们通过x-max-length
的方式去限制MyMaxLengthQueue
队列中消息的数量写一个小Demo
。
var factory = new ConnectionFactory()
{
HostName = "127.0.0.1",
UserName = "bob",
Password = "bob"
};
// 创建一个链接
using (var connection = factory.CreateConnection())
{
// 创建一个通道
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(
queue: "MyMaxLengthQueue",
durable: false,
exclusive: false,
autoDelete: false,
arguments: new Dictionary<string,object>(){
{"x-max-length",10}
});
// 我们创建15条消息
for (var i = 0; i < 15; i++)
{
channel.BasicPublish(
exchange: string.Empty,
routingKey: "MyMaxLengthQueue",
basicProperties: null,
body: Encoding.UTF8.GetBytes($"新消息{i}")
);
}
Console.WriteLine("Finish");
Console.ReadLine();
}
}
我们通过运行,查看UI很容易的发现它只有10
条数据,接着我们来查看是哪10
条数据。
我们发现它是从第5
条开始的,说明丢失的是前5
条数据。
MaxLengthBytes
就是限制队列的内存大小,超过多少就从队列的头部开始删除。参数是x-max-length-bytes
,例如不超过1MIB就是x-max-length-bytes:1024
,例子我就不举例了,都差不多的,大家国人尝试一下
欢迎加群讨论技术,1群:677373950(满了,可以加,但通过不了),2群:656732739

