消息事务
消息事务是在生产者producer到broker或broker到consumer过程中同一个session中发生的,保证几条消息在发送过程中的原子性。
在支持事务的session中,producer发送message时在message中带有transactionID。
broker收到message后判断是否有transactionID,如果有就把message保存在transaction store中,等待commit或者rollback消息。
消息生产者-异步发送
消息生产者使用持久(persistent)传递模式发送消息的时候,Producer.send() 方法会被阻塞,直到 broker 发送一个确认消息给生产者(ProducerAck),这个确认消息暗示broker已经成功接收到消息并把消息保存到二级存储中。这个过程通常称为同步发送。
如果应用程序能够容忍一些消息的丢失,那么可以使用异步发送。异步发送不会在受到 broker 的确认之前一直阻塞 Producer.send() 方法。
但有一个例外,当发送方法在一个事务上下文中时,被阻塞的是 commit 方法而不是 send 方法。
commit 方法成功返回意味着所有的持久消息都以被写到二级存储中。
想要使用异步,在brokerURL中增加 jms.alwaysSyncSend=false&jms.useAsyncSend=true
如果设置了alwaysSyncSend=true系统将会忽略useAsyncSend设置的值都采用同步
1) 当alwaysSyncSend=false时,“NON_PERSISTENT”(非持久化)、事务中的消息将使用“异步发送”
2) 当alwaysSyncSend=false时,如果指定了useAsyncSend=true,“PERSISTENT”类型的消息使用异步发送。如果useAsyncSend=false,“PERSISTENT”类型的消息使用同步发送。
总结:默认情况(alwaysSyncSend=false,useAsyncSend=false),非持久化消息、事务内的消息均采用异步发送;对于持久化消息采用同步发送。
超时时间
jms.sendTimeout: 发送超时时间,默认等于0,如果jms.sendTimeout>0将会忽略(alwaysSyncSend、useAsyncSend、消息是否持久化)所有的消息都是用同步发送!
即使使用异步发送,也可以通过producerWindowSize来控制发送端无节制的向broker发送消息。
窗口尺寸
producerWindowSize: 窗口尺寸,用来约束在异步发送时producer端允许积压的(尚未ACK)的消息的尺寸,且只对异步发送有意义。
每次发送消息之后,都将会导致memoryUsage尺寸增加(+message.size),当broker返回producerAck时,如果达到了producerWindowSize上限,即使是异步调用也会被阻塞,防止不停向broker发送消息。
通过 jms.producerWindowSize
来设置
消息消费者-消息确认
确认机制
- ack_mod
AUTO_ACKNOWLEDGE = 1 自动确认
CLIENT_ACKNOWLEDGE = 2 客户端手动确认
DUPS_OK_ACKNOWLEDGE = 3 自动批量确认
SESSION_TRANSACTED = 0 事务提交并确认
ACK_MODE描述了Consumer与broker确认消息的方式(时机),比如当消息被Consumer接收之后,Consumer将在何时确认消息。
所以ack_mode描述的不是producer于broker之间的关系,而是customer于broker之间的关系。
- 作用
对于broker而言,只有接收到ACK指令,才会认为消息被正确的接收或者处理成功了, 通过ACK,可以在consumer与Broker之间建立一种简单的“担保”机制。
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
第一个参数:是否支持事务,如果为true,则会忽略第二个参数,自动被jms服务器设置为SESSION_TRANSACTED。
实战使用
生产者
此处其它代码与普通式消息发送代码相似,只在以下几处有不同,
try {
//1. 首先在取得session时会声明事务开启“true”。
session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
//2. 然后在发送时会有一个动作:
producer.send(message);
System.out.println("send......" + Thread.currentThread().getId());
session.commit();
} catch (Exception e) {
//3. 相应的在catch(Exception)时需要
session.rollback();
}
消费者
事务型消息接收端(消费端)
在我们的接收端的createSession时也需要把它设为“事务开启”,此时请注意,生产和消费是在一个事务边界中的。
session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
// 然后在接收时会有一个动作:
try {
TextMessage tm = (TextMessage) message;
System.out.println("TranQConsumer receive message: " + tm.getText());
session.commit();
} catch (Exception e) {
session.rollback();
}
队列和订阅都是支持事务的。
参考资料
https://blog.csdn.net/songhaifengshuaige/article/details/54176849