上篇文章:RabbitMQ简介和交换机入门使用
一、消息如何保障100%的投递成功?1、什么是生产端的可靠性投递?
保障消息的成功发出
保障MQ节点的成功接收
发送端收到MQ节点(Broker)确认应答
完善的消息进行补偿机制(如网络问题没有返回确认应答)
2、可靠性投递的解决方案
方案一:消息入库,对消息状态进行打标(变更消息状态)。
1.生产者将业务数据和消息入库,并设置消息状态为0,即初始待投递(可能涉及多个数据库,业务库,消息库等)
2.生产者将消息发送至MQ节点(Broker)
3.Broker向生产者发送确认
4.生产者收到Broker确认后修改消息状态为1,即消息投递成功
5.系统定时任务扫描未投递成功的消息(消息状态为0)
6.生产者将未投递的消息重发给Broker,并记录消息重发次数
7.当重发次数大于3(阈值自定义)时,此时修改消息状态为2,即消息投递失败。对于投递失败的消息启动补偿机制或者人工去处理失败消息。
存在的问题:在高并发场景下,每次要对业务数据和消息数据入库,数据库会遇到瓶颈,所以会采用方案二。
方案二:消息的延迟投递,做二次确认,回调检查。
在高并发的场景下,少做一次数据库持久操作,提高系统处理能力,故将业务和消息的持久化拆开。所以在高并发的场景下,消息就不要入库了,延迟投递,可以不保证首次100%的成功,但是一定要保证性能。
0.先将业务数据入库(一定等到业务数据入库之后再发送消息,Upstream serivce上游服务 )
1.生产者第一次向MQ节点(Broker)发送消息
2.生产者第二次向MQ节点(Broker)发送check延迟消息,一般按自己业务设为2min-5min
3.Consumer消费者从MessageQueue获取消息
4.Consumer成功消费消息后,会Broker发送确认消息(设其队列名为ConsumerQueueConfirm)
5.Callback Service服务监听ConsumerQueueConfirm,并将成功消费的消息入库MSG DB。
6.同时Callback Service服务监听checkdetailQueue(处理第2步发送check延迟消息),并去MSG DB查询该消息是否被成功消费。如果查询不到check message,则Callback Service服务向Upstream Service服务发送RPC请求,让其重发消息,设置重发次数,达到重发次数后,设置其为消费失败
7.人工处理因网络闪断或者业务问题产生的未成功消费消息,使系统消息投递几乎达到100%
二、如何保证消息的幂等性首先,无论是RabbitMQ、RocketMQ还是kafka,都有可能出现消息的重复发送,这个是MQ无法保障的。
幂等性:就是相同条件下对一个业务的操作,不管操作多少次,结果都是一样。
消息的幂等性:就是即使我们收到多条一样的消息,永远也不会重复消费,即消息只会被消费一次。
例如:向支付宝发起支付请求,无论是由于网络问题无法收到请求结果而重新发起请求,或是前端的操作抖动而造成重复提交情况。支付宝只能扣一次钱。
1、RabbitMQ可能导致出现非幂等性的情况
1.可靠性消息投递机制:consumer回复confirm出现网络闪断,producer没有收到ack,定时任务轮询可能就会重新发送消息,这样consumer就会收到两条消息
2.MQ Broker与消费端传输消息的过程出现网络抖动
3.消费端故障或异常
2、解决方案
方案一:令牌机制,即唯一ID + 指纹码
原理就是利用数据库主键去重,业务完成后插入主键标识,使用ID进行分库分表算法路由,从单库的幂等性到多库的幂等性
1.这里唯一ID一般就是业务表的主键,比如商品ID
2.指纹码:每次操作都要生成指纹码,可以用时间戳+业务编号+...组成,目的是保证每次操作都是正常的
整体流程:
每次操作都生产一个唯一标记(统一ID生成服务),通过客户端传给服务器端,服务器端通过这个标记去查询数据库里面是否有该唯一标记,如果有就是重复消费了。
1.需要一个统一ID生成服务,为了保证可靠性,上游服务也要有个本地ID生成服务,然后发送消息给Broker
2.需要ID规则路由组件去监听消息,先入库,如果入库成功,证明没有重复,然后发给下游,如果发现库里面有了这条消息,就不发给下游
好处:整体实现相对简单
坏处:高并发下有数据库写入的性能瓶颈。
优化:跟进id进行分库分表进行算法的路由分压分流。
方案二:利用Redis的原子性去实现。
利用redis的原子操作,做个操作完成的标记,Redis的实现性能比较好,而且Redis一般使用集群,不用担心某台机器挂掉了,影响服务。但是也存在一些的问题:
我们是否需要进行数据落库,如果落库的话,怎么保证缓存和storage的一致性、事务,关键解决的问题是数据库和Redis操作如何做到原子性?如果不进行落库,那么都存储到缓存中,如何设置定时同步策略?
三、生产端confirm确认消息1、理解Confirm消息确认机制
消息的确认:是指生产者投递消息后,如果Broker收到消息,则会给生产者一个应答。
生产者进行接收应答,用来确定这条消息是否正常的发送到Broker,这种方式也是消息的可靠性投递的核心保障。
2、如何实现Confirm确认消息?
在生产端
第一步:在channel上开启确认模式:channel.confirmSelect()
第二步:在channel上添加确认监听:addConfirmListener,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或者记录日志等后续处理。
3、代码实现
1.生产端的代码
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
public class Producer {
public static void main(String[] args) throws Exception {
//1.创建工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setVirtualHost("/");
connectionFactory.setPort(5672);
//默认情况下为“ guest” /“ guest”,仅限本地主机连接
connectionFactory.setHost("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//2.通过工厂创建connection
Connection connection = connectionFactory.newConnection();
//3.创建channel对象
Channel channel = connection.createChannel();
//4.指定消息的投递模式:确认模式
channel.confirmSelect();
//5.发布消息
String exchangeName = "test_confirm_exchange";
String routingKey = "test.confirm";
String msg = "hello rabbitmq consumer, test_direct-message: confirm";
for (int i = 0; i < 2; i++) {
/**
参数:
exchange -将消息发布到的交换机, 若为空字符串时,使用默认的交换机
routingKey -路由键
mandatory -如果要设置“强制性”标志,则为true
props -消息的其他属性-路由标头等
body -消息正`文
*/
channel.basicPublish(exchangeName, routingKey, false, null, msg.getBytes());
}
//6.添加确认监听,来监听消息中间件的确认消息。
channel.addConfirmListener(new ConfirmListener() {
//当消息中间件接收到消息后,要执行的函数
// deliveryTag表示是Broker给每条消息指定的唯一ID(从1开始)
// multiple表示是否接收所有的应答消息,比如multiple=true时,发送100条消息成功过后,我们并不会收到100次handleAck方法调用。
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("----ack----");
System.out.println("deliveryTag:"+ deliveryTag +" multiple:" + multiple);
}
//消息中间件出现异常(比如队列满了)后,要执行的函数
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("-----no ack----");
System.out.println("deliveryTag:"+ deliveryTag +" multiple:" + multiple);
}
});
//7.释放资源
channel.close();
connection.close();
}
}
2.消费端的代码
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer {
public static void main(String[] args) throws Exception {
//1.创建工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setVirtualHost("/");
connectionFactory.setPort(5672);
connectionFactory.setHost("127.0.0.1");
//默认情况下为“ guest” /“ guest”,仅限本地主机连接
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//2.通过工厂创建connection
Connection connection = connectionFactory.newConnection();
//3.创建channel对象
Channel channel = connection.createChannel();
//4. 创建消息队列和direct交换机,并通过channel让交换机跟消息队列进行绑定
String queueName = "test_confirm_queue";
String exchangeName="test_confirm_exchange";
String exchangeType="direct";
String routingKey="test.confirm";
/**
参数:
queue -队列名称
durable -如果我们声明一个持久队列,则为true(该队列将在服务器重启后保留下来)
exclusive -如果我们声明一个排他队列,则为true(仅限此连接)
autoDelete -如果我们声明一个自动删除队列,则为true(服务器将在不再使用它时将其删除)
arguments -队列的其他属性(构造参数)
*/
channel.queueDeclare(queueName, true, false, false, null);
/**
参数:
exchange -交易所名称
type -交易所类型
durable -如果我们声明持久交换,则为true(该交换将在服务器重启后保留下来)
autoDelete -如果服务器在不再使用交换机时应删除该交换机,则为true
arguments -用于交换的其他属性(构造参数)
*/
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
/**
参数:
queue -队列名称
exchange -交易所名称
routingKey -用于绑定的路由键
*/
channel.queueBind(queueName, exchangeName, routingKey);
//5.通过channel把消费者和消息队列进行关联,获取消息进行处理
/**
参数:
queue -队列名称
autoAck-如果服务器应考虑一旦传递已确认的消息,则为true;如果服务器应该期望显式确认,则为false
callback -消费者对象的接口
*/
boolean autoAck = true;
channel.basicConsume(queueName, autoAck, new DefaultConsumer(channel) {
/**
参数:
consumerTag-与消费者相关联的消费者标签
envelope -消息的打包数据
properties -消息的内容头数据
body -消息正文(客户端特定的不透明字节数组)
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println("------------consumer message-----------");
System.out.println("sonsumerTag:" + consumerTag);
System.out.println("envelope:" + envelope);
System.out.println("properties:" + properties);
System.out.println("msg:" + new String(body));
}
});
}
}
1、理解Return消息机制
Return Listener用于处理一些不可路由的消息。
1.我们的消息生产者,通过指定一个Exchange和Routingkey,把消息送到某一个队列中,然后我们的消费者监听队列,进行消息处理操作。
2.但是在某些情况下,如果我们在发送消息的时候,当前的exchange不存在或者指定的路由key路由不到,这个时候我们需要监听这种不可达的消息,就要使用return listener。
在发送消息的basicPublish方法中有一个关键的配置项:
Mandatory:如果为true,则监听会接收到路由不可达的消息,然后进行后续处理,
如果为false,那么broker端自动删除该消息。(默认false)
2、代码实现
在生产端,在channel上添加监听:addReturnListener,并指定mandatory为true。
1.生产端的代码
import com.rabbitmq.client.*;
import java.io.IOException;
public class Producer {
public static void main(String[] args) throws Exception {
//1.创建工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setVirtualHost("/");
connectionFactory.setPort(5672);
//默认情况下为“ guest” /“ guest”,仅限本地主机连接
connectionFactory.setHost("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//2.通过工厂创建connection
Connection connection = connectionFactory.newConnection();
//3.创建channel对象
Channel channel = connection.createChannel();
//4.添加return监听
channel.addReturnListener(new ReturnListener() {
/**
* relaycode:表示中间件响应给浏览器的状态码
* relayText:表示状态码对应的文本。
* exchange:表示消息发布时对应的交换机名
* routingKey:表示的是路由键
* basicProperties:表示消息的属性,
* bytes:个表示消息的内容
*/
@Override
public void handleReturn(int relaycode, String relayText, String exchange, String routingKey,
AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
System.out.println("----------------handle return ----------------");
System.out.println("状态码:"+relaycode);
System.out.println("信息:"+relayText);
System.out.println("交换机:"+exchange);
System.out.println("路由键:"+routingKey);
System.out.println("属性:"+basicProperties);
System.out.println("消息内容:"+new String(bytes));
}
});
//5.发布消息
String exchangeName = "test_return_exchange2";
String routingKey = "test.return.#";
String routingKeyError="test.returnerror.#";
String msg = "hello rabbitmq consumer, test_return-message: return";
for (int i = 0; i < 2; i++) {
/**
参数:
exchange -将消息发布到的交换机, 若为空字符串时,使用默认的交换机
routingKey -路由键
mandatory -如果为true,则监听会接收到路由不可达的消息,然后进行后续处理,
如果为false,那么broker端自动删除该消息。(默认false)
props -消息的其他属性-路由标头等
body -消息正`文
*/
channel.basicPublish(exchangeName, routingKey, true, null, msg.getBytes());
channel.basicPublish(exchangeName, routingKeyError, true, null, msg.getBytes());
}
//6.释放资源
// channel.close();
// connection.close();
}
}
2.消费端的代码
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer {
public static void main(String[] args) throws Exception {
//1.创建工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setVirtualHost("/");
connectionFactory.setPort(5672);
connectionFactory.setHost("127.0.0.1");
//默认情况下为“ guest” /“ guest”,仅限本地主机连接
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//2.通过工厂创建connection
Connection connection = connectionFactory.newConnection();
//3.创建channel对象
Channel channel = connection.createChannel();
//4. 创建消息队列和direct交换机,并通过channel让交换机跟消息队列进行绑定
String queueName = "test_return_queue2";
String exchangeName="test_return_exchange2";
String exchangeType="topic";
String routingKey="test.return.#";
channel.queueDeclare(queueName, true, false, false, null);
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//5.通过channel把消费者和消息队列进行关联,获取消息进行处理
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println("------------consumer message-----------");
System.out.println("sonsumerTag:" + consumerTag);
System.out.println("envelope:" + envelope);
System.out.println("properties:" + properties);
System.out.println("msg:" + new String(body));
}
});
}
}
我们使用自定义的Consumer更加的方便,解耦性更强,在实际工作中也最常用。 实现也计较简单,新建一个类继承DefaultConsumer,并重写其中一个handleDelivery方法即可。
1、代码实现,将四的例子中消费端使用自定义监听器使用,生产端不改变。
1.消费端自定义监听器,继承DefaultConsumer
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* 自定义消费端自定义监听器
*/
public class MyConsumer extends DefaultConsumer {
private Channel channel;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("------------consumer message-----------");
System.out.println("sonsumerTag:" + consumerTag);
System.out.println("envelope:" + envelope);
System.out.println("properties:" + properties);
System.out.println("msg:" + new String(body));
}
}
2.消费端的代码
import com.rabbitmq.client.*;
public class Consumer {
public static void main(String[] args) throws Exception {
//1.创建工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setVirtualHost("/");
connectionFactory.setPort(5672);
connectionFactory.setHost("127.0.0.1");
//默认情况下为“ guest” /“ guest”,仅限本地主机连接
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//2.通过工厂创建connection
Connection connection = connectionFactory.newConnection();
//3.创建channel对象
Channel channel = connection.createChannel();
//4. 创建消息队列和direct交换机,并通过channel让交换机跟消息队列进行绑定
String queueName = "test_return_queue2";
String exchangeName="test_return_exchange2";
String exchangeType="topic";
String routingKey="test.return.#";
channel.queueDeclare(queueName, true, false, false, null);
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//5.通过channel把消费者和消息队列进行关联,获取消息进行处理
channel.basicConsume(queueName, true, new MyConsumer(channel));
}
}
六、消费端限流
1、什么要对消费端限流
如果是高并发的场景下,RabbitMQ服务器上收到成千上万条消息,那么当打开消费者客户端时,会出现:这些巨量的消息必定会瞬时全部推送过来,但是我们单个客户端无法同时处理这么多数据,导致消费端消费不过来甚至挂掉都有可能。
当数据量特别大的时候,我们对生产端限流肯定是不科学的,因为有时候并发量就是特别大,有时候并发量又特别少,我们无法约束生产端,这是用户的行为。所以我们应该对消费端限流,用于保持消费端的稳定,当消息数量激增的时候很有可能造成资源耗尽,以及影响服务的性能,导致系统的卡顿甚至直接崩溃。
注意:在非自动确认的模式下,可以采用限流模式。RabbitMQ 提供了一种 qos(服务质量保证)功能机制来控制一次消费消息数量,即在非自动确认消息的前提下,如果一定数目的消息(通过基于 consume 或者 channel 设置 Qos 的值)未被确认前,不进行消费新的消息。
2、理解限流的 API方法
限流设置 - basicQos(prefetchSize,prefetchCount,global)
prefetchSize:0,单条消息大小限制,0代表不限制消息大小
prefetchCount:一次性消费的消息数量。会告诉 RabbitMQ 不要同时给一个消费者推送多于 N 个消息,即一旦有 N 个消息还没有 ack,则该 consumer 将 block 掉,直到有消息 ack。
global:true/false 是否将上面设置应用于 channel,简单点说,就是上面限制是 channel 级别的还是 consumer 级别。当设置为 false 时生效,设置为 true 时没有了限流功能,因为 channel 级别尚未实现。
注意:
1.不能设置自动签收功能(autoAck = false)
2.如果消息没被手动确认,就不会再给消费端发送消息,目的就是给消费端减压
3、代码实现消费端限流
生产端代码改变不多,主要操作集中在消费端。
1)设置具体的限流大小以及数量:channel.basicQos(0, 4, false);
2)关闭自动 ack:将 autoAck 设置为 false。channel.basicConsume(queueName, false, consumer);
3)在 handleDelivery 消费方法中手工ACK - basicAck()。void basicAck(Integer deliveryTag,boolean multiple)
手工ACK,调用这个方法就会主动回送给Broker一个应答,表示这条消息我处理完了,你可以给我下一条了。参数multiple表示是否批量签收,由于demo我是一次处理两条消息,所以设置为true。
1.生产端的代码
import com.rabbitmq.client.*;
import java.io.IOException;
public class Producer {
public static void main(String[] args) throws Exception {
//1.创建工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setVirtualHost("/");
connectionFactory.setPort(5672);
//默认情况下为“ guest” /“ guest”,仅限本地主机连接
connectionFactory.setHost("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//2.通过工厂创建connection
Connection connection = connectionFactory.newConnection();
//3.创建channel对象
Channel channel = connection.createChannel();
//4.发布消息
String exchangeName = "test_qos_exchange";
String routingKey = "test.qos.#";
String msg = "hello rabbitmq consumer, test_qos-message: qos";
for (int i = 0; i < 5; i++) {
/**
参数:
exchange -将消息发布到的交换机, 若为空字符串时,使用默认的交换机
routingKey -路由键
mandatory -如果为true,则监听会接收到路由不可达的消息,然后进行后续处理,
如果为false,那么broker端自动删除该消息。(默认false)
props -消息的其他属性-路由标头等
body -消息正文
*/
channel.basicPublish(exchangeName, routingKey, false, null, (msg + i).getBytes());
}
//5.释放资源
channel.close();
connection.close();
}
}
2.消费端的代码
import com.rabbitmq.client.*;
public class Consumer {
public static void main(String[] args) throws Exception {
//1.创建工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setVirtualHost("/");
connectionFactory.setPort(5672);
connectionFactory.setHost("127.0.0.1");
//默认情况下为“ guest” /“ guest”,仅限本地主机连接
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//2.通过工厂创建connection
Connection connection = connectionFactory.newConnection();
//3.创建channel对象
Channel channel = connection.createChannel();
//4. 创建消息队列和direct交换机,并通过channel让交换机跟消息队列进行绑定
String queueName = "test_qos_queue";
String exchangeName = "test_qos_exchange";
String exchangeType = "topic";
String routingKey = "test.qos.#";
channel.queueDeclare(queueName, true, false, false, null);
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//5.通过channel把消费者和消息队列进行关联,获取消息进行处理
// 设置限流,并autoAck设置为 false
channel.basicQos(0, 2, false);
channel.basicConsume(queueName, false, new MyConsumer(channel));
}
}
3.消费端自定义监听器,继承DefaultConsumer
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* 自定义消费端自定义监听器
*/
public class MyConsumer extends DefaultConsumer {
private Channel channel;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("------------consumer message-----------");
System.out.println("sonsumerTag:" + consumerTag);
System.out.println("envelope:" + envelope);
System.out.println("properties:" + properties);
System.out.println("msg:" + new String(body));
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//手动签收消息
channel.basicAck(envelope.getDeliveryTag(), true);
}
}
从图中发现 Unacked值一直都是 2 ,每过 3 秒 消费一条消息即 Ready 和 Total 都减少 2,而 Unacked的值在这里代表消费者正在处理的消息,通过demo发现了消费者一次性最多处理 2 条消息,达到了消费者限流的预期功能。
七、消费端手工ACK与NACK和重回队列1、消费端的手工ACK与NACK
当我们设置 autoACK=true 时,即在自动ACK的情况下,只要Broker发消息给消费者了,这个消息就会从消息队列中移除,不会管消费端有没有异常。
当我们设置 autoACK=false 时,就可以使用手工ACK方式了,那么其实手工方式包括了手工ACK与NACK。
- 如果我们手工 ACK 时,会发送给Broker一个应答,代表消息成功处理了,Broker就可以回送响应给生产端了。
- 如果我们手工 NACK 时,则表示消息处理失败了,可以重回队列也可以丢弃。具体做什么操作可以通过异常类型进行判断。若重回队列,Broker端就会将没有成功处理的消息重新发送。
在消费端需要限流时,我们需要让消费端进行手工的ACK。在消费端消费消息出现异常时,我们要通知Broker消息有问题,这时我们可以手动NACK。
1.使用方式
消费端进行消费的时候,如果由于业务异常我们可以手工 NACK 并进行日志的记录,然后进行补偿!
方法:void basicNack(long deliveryTag, boolean multiple, boolean requeue)
如果由于服务器宕机等严重问题,那我们就需要手工进行 ACK 保障消费端消费成功!
方法:void basicAck(long deliveryTag, boolean multiple)
2、消费端的重回队列
消费端重回队列是为了对没有处理成功的消息,把消息重新会递给Broker!
重回队列,会把消费失败的消息重新添加到队列的尾端,供消费者继续消费。
一般实在实际应用中不会设置重回队列这个属性(关闭重回队列-设置为false),我们都是自己去做补偿或者投递到延迟队列里的,然后指定时间去处理即可。
3、代码实现
1.生产端的代码
import com.rabbitmq.client.*;
import java.io.IOException;
public class Producer {
public static void main(String[] args) throws Exception {
//1.创建工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setVirtualHost("/");
connectionFactory.setPort(5672);
//默认情况下为“ guest” /“ guest”,仅限本地主机连接
connectionFactory.setHost("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//2.通过工厂创建connection
Connection connection = connectionFactory.newConnection();
//3.创建channel对象
Channel channel = connection.createChannel();
//4.发布消息
String exchangeName = "test_ack_exchange";
String routingKey = "test.ack.#";
String msg = "hello rabbitmq consumer, test_ack-message: !";
for (int i = 0; i < 3; i++) {
channel.basicPublish(exchangeName, routingKey, false, null, (msg + i).getBytes());
}
//5.释放资源
channel.close();
connection.close();
}
}
2.消费端的代码
import com.rabbitmq.client.*;
public class Consumer {
public static void main(String[] args) throws Exception {
//1.创建工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setVirtualHost("/");
connectionFactory.setPort(5672);
connectionFactory.setHost("127.0.0.1");
//默认情况下为“ guest” /“ guest”,仅限本地主机连接
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//2.通过工厂创建connection
Connection connection = connectionFactory.newConnection();
//3.创建channel对象
Channel channel = connection.createChannel();
//4. 创建消息队列和direct交换机,并通过channel让交换机跟消息队列进行绑定
String queueName = "test_ack_queue";
String exchangeName = "test_ack_exchange";
String exchangeType = "topic";
String routingKey = "test.ack.#";
channel.queueDeclare(queueName, true, false, false, null);
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//5.通过channel把消费者和消息队列进行关联,获取消息进行处理
// 手工签收 必须要关闭 autoAck = false
channel.basicConsume(queueName, false, new MyConsumer(channel));
}
}
3.自定义监听器,继承DefaultConsumer
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* 自定义消费端自定义监听器
*/
public class MyConsumer extends DefaultConsumer {
private Channel channel;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("------------consumer message-----------");
System.out.println("sonsumerTag:" + consumerTag);
System.out.println("envelope:" + envelope);
System.out.println("properties:" + properties);
System.out.println("msg:" + new String(body));
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String msg = new String(body);
if (msg.indexOf("1") >= 0) {
//手动拒签消息NACK,第三个requeue参数表示是否对拒签的消息重回队列
// channel.basicNack(envelope.getDeliveryTag(), false, false);
channel.basicNack(envelope.getDeliveryTag(), false, true); // 重回队列
} else {
//手动签收消息ACK
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
}
消费端打印说明,消息1由于我们调用了NACK,并且设置了重回队列,所以会导致该条消息一直重复发送,消费端就会一直循环消费。
八、TTL队列TTL是time to live的缩写,也就是生存时间。
RabbitMQ支持消息过期时间,在消息发送时可以进行指定。
RabbitMQ支持队列的过期时间,从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息就会自动的清除。
RabbitMQ可以对消息和队列设置TTL. 有两种方法可以设置。也可以在RabbitMQ控制台设置(不推荐)
1、通过队列属性设置TTL
在队列属性中设置消息的TTL,那么队列中所有消息都有相同的过期时间,可以通过声明队列时设置 x-message-ttl参数。
2、对消息本身进行单独设置TTL
对每条消息本身单独设置TTL,每条消息TTL可以不同,一旦消息过期就会从队列中清除。
如果上述两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就称为dead message, 消费者将无法再收到该消息。
3、代码实现
这里两种方法同时设置TTL,看效果
1.生产端的代码
import com.rabbitmq.client.*;
import java.io.IOException;
public class Producer {
public static void main(String[] args) throws Exception {
//1.创建工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setVirtualHost("/");
connectionFactory.setPort(5672);
//默认情况下为“ guest” /“ guest”,仅限本地主机连接
connectionFactory.setHost("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//2.通过工厂创建connection
Connection connection = connectionFactory.newConnection();
//3.创建channel对象
Channel channel = connection.createChannel();
//5.发布消息
String exchangeName = "test_ttl_exchange";
String routingKey = "test.ttl.#";
String msg = "hello rabbitmq consumer, test_topic-message: ttl";
for (int i = 0; i < 2; i++) {
/**
参数:
exchange -将消息发布到的交换机, 若为空字符串时,使用默认的交换机
routingKey -路由键
mandatory -如果为true,则监听会接收到路由不可达的消息,然后进行后续处理,
如果为false,那么broker端自动删除该消息。(默认false)
props -消息的其他属性-路由标头等
body -消息正`文
*/
// 对消息本身进行单独设置TTL, 我这里两条消息TTL一样
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.expiration("10000") // 设置TTL 10秒
.build();
channel.basicPublish(exchangeName, routingKey, false, properties, msg.getBytes());
}
//7.释放资源
channel.close();
connection.close();
}
}
2.消费端的代码
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class Consumer {
public static void main(String[] args) throws Exception {
//1.创建工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setVirtualHost("/");
connectionFactory.setPort(5672);
connectionFactory.setHost("127.0.0.1");
//默认情况下为“ guest” /“ guest”,仅限本地主机连接
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//2.通过工厂创建connection
Connection connection = connectionFactory.newConnection();
//3.创建channel对象
Channel channel = connection.createChannel();
//4. 创建消息队列和direct交换机,并通过channel让交换机跟消息队列进行绑定
String queueName = "test_ttl_queue";
String exchangeName = "test_ttl_exchange";
String exchangeType = "topic";
String routingKey = "test.ttl.#";
// 通过队列属性设置TTL, 15秒
Map arguments = new HashMap();
arguments.put("vhost", "/");
arguments.put("username","guest");
arguments.put("password", "guest");
arguments.put("x-message-ttl", 15000);
/**
参数:
queue -队列名称
durable -如果我们声明一个持久队列,则为true(该队列将在服务器重启后保留下来)
exclusive -如果我们声明一个排他队列,则为true(仅限此连接)
autoDelete -如果我们声明一个自动删除队列,则为true(服务器将在不再使用它时将其删除)
arguments -队列的其他属性(构造参数)
*/
channel.queueDeclare(queueName, true, false, false, arguments);
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//5.不消费消息,让其超时
}
}
两种方法同时使用,去最小TTL为10秒后超时。启动之后,监控RabbitMQ控制台,发现10秒后设置了消息超时时间的消息超时就被清除。
九、死信队列DLX(Dead-Letter-Exchange),利用DLX,当消息在一个队列中变成死信之后,它能够被重新publish到另一个exchange,这个exchange就是DLX。
DLX也是一个正常的exchange,和一般的 exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的 exchange上去,进而被路由到另一个队列。
可以监听这个队列中消息做相应的处理。
1、消息变成死信有以下几个情况:
消息被拒绝(basic.reject/basic.nack)
消息TTL过期
队列达到最大的长度
2、死信队列的设置:
首先要设置死信队列的 exchange 和 queue,然后进行绑定:
Exchange:dlx.exchange
Queue:dlx.queue
RoutingKey:#
然后进行正常声明交换机、队列、绑定,只不过需要在队列加上一个参数即可:
argument.put("x-dead-letter- exchange", "dlx.exchange");
这样消息在过期、requeue、队列在达到最大长度时,消息就可以直接路由到死信队列。
3、代码实现
1.生产端的代码
import com.rabbitmq.client.*;
public class Producer {
public static void main(String[] args) throws Exception {
//1.创建工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setVirtualHost("/");
connectionFactory.setPort(5672);
//默认情况下为“ guest” /“ guest”,仅限本地主机连接
connectionFactory.setHost("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//2.通过工厂创建connection
Connection connection = connectionFactory.newConnection();
//3.创建channel对象
Channel channel = connection.createChannel();
//5.发布消息
String exchangeName = "test_ttl_exchange";
String routingKey = "test.ttl.#";
String msg = "hello rabbitmq consumer, test_topic-message: ttl";
for (int i = 0; i < 5; i++) {
channel.basicPublish(exchangeName, routingKey, false, null, (msg + i).getBytes());
}
//7.释放资源
channel.close();
connection.close();
}
}
2.消费端的代码
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class Consumer {
public static void main(String[] args) throws Exception {
//1.创建工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setVirtualHost("/");
connectionFactory.setPort(5672);
connectionFactory.setHost("127.0.0.1");
//默认情况下为“ guest” /“ guest”,仅限本地主机连接
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//2.通过工厂创建connection
Connection connection = connectionFactory.newConnection();
//3.创建channel对象
Channel channel = connection.createChannel();
//4.声明死信队列
channel.queueDeclare("dlx.queue", true, false, false, null);
channel.exchangeDeclare("dlx.exchange", "topic", true);
channel.queueBind("dlx.queue", "dlx.exchange", "#");
//5. 创建消息队列和direct交换机,并通过channel让交换机跟消息队列进行绑定
String queueName = "test_ttl_queue";
String exchangeName = "test_ttl_exchange";
String exchangeType = "topic";
String routingKey = "test.ttl.#";
// 通过队列属性设置TTL, 15秒
Map arguments = new HashMap();
arguments.put("vhost", "/");
arguments.put("username", "guest");
arguments.put("password", "guest");
arguments.put("x-message-ttl", 15000);
arguments.put("x-dead-letter-exchange", "dlx.exchange");
channel.queueDeclare(queueName, true, false, false, arguments);
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//5.通过channel把消费者和消息队列进行关联,获取消息进行处理
// 这里我们手工签收 必须要关闭 autoAck = false
channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println("------------consumer message-----------");
System.out.println("sonsumerTag:" + consumerTag);
System.out.println("envelope:" + envelope);
System.out.println("properties:" + properties);
System.out.println("msg:" + new String(body));
String msg = new String(body);
if (msg.indexOf("1") >= 0) {
//手动拒签消息NACK,开启重回队列
channel.basicNack(envelope.getDeliveryTag(), false, false);
// channel.basicReject(envelope.getDeliveryTag(), true);
} else {
//手动签收消息ACK
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
});
}
}
删除TTL创建的 exchange和 queue,然后运行 。上图说明,我这里模拟消息被拒绝(basic.reject/basic.nack)并且requeue=false或者true时分别运行了一下,共四次,四条消息都可以进去死信队列。
这里对RabbitMQ高级特性,做一整理,有些图来自网络,重点在知识点的掌握上。
参考文章:
RabbitMQ系列(四)--消息如何保证可靠性传输以及幂等性
RabbitMQ 消费端限流、TTL、死信队列
—— Stay Hungry. Stay Foolish. 求知若饥,虚心若愚。