rocketMQ中有关事务的发送消息方式,写的一个demo
1、在MyProducer类中的方法,即先定义调用
@Component public class MyProducer {@Autowiredprivate RocketMQTemplate template; public void sendTractionMessage(String topic, String msg) throws InterruptedException {String[] tags = new String[]{"TagA","TagB","TagC","TagD","TagE"}; //这里的tag是标签,具体使用的时候,可以自行根据规则赋值,可以是订单编号for (int i = 0; i < 10; i++) { //这个10也是demo时候用的,实际使用可以根据具体需求Message<String> message = MessageBuilder.withPayload(msg).build();String destination = topic + ":" + tags[i % tags.length]; //这里是取模TransactionSendResult transactionSendResult = template.sendMessageInTransaction(destination, message, destination);System.out.println("transactionSendResult = " + transactionSendResult);}}}
2、监听MyTransactionListener这个类作用是对不同tag的处理方式
@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate") //使用rocketMQTemplate public class MyTransactionListener implements RocketMQLocalTransactionListener { //需要实现该类@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object o) {String destination = (String) o;org.apache.rocketmq.common.message.Message message = RocketMQUtil.convertToRocketMessage(new StringMessageConverter(),String.valueOf(StandardCharsets.UTF_8),destination,msg);String tags = message.getTags();if (StringUtils.contains(tags,"TagA")){ //对于不同的tag标签,返回不同的状态,rocketMQ会根据状态来决定两阶段提交的第二阶段决定消息是提交还是放弃return RocketMQLocalTransactionState.COMMIT;}else if (StringUtils.contains(tags,"TagB")){return RocketMQLocalTransactionState.ROLLBACK;}else {return RocketMQLocalTransactionState.UNKNOWN;}}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message message) {return null;} }
3、使用测试类测试
@Testvoid testSendTransactionMessage() throws InterruptedException {String topic = "my-boot-topic";String message = "hello rocket mq transaction springboot message";myProducer.sendTractionMessage(topic,message);System.out.println("事务消息发送成功~");}
4、消费端消费情况,demo中循环是10,但只有2条被消费者接收处理,就是那TagA的两条数据,因为他们的状态是commit
5、rocketMQ的事务分两阶段提交,第一阶段是发送broker一个half消息,而只有等本地事务处理完成,才能再告诉broker是commit还是rockback,最终保证原子性
以上内容纯属学习使用!