问题描述
在一个事务内完成插入操作,通过MQ异步通知其他微服务进行事件处理。
由于是在事务内发送,其他服务消费消息,查询数据时还不存在如何解决呢?
解决方案
通过spring-tx包的TransactionSynchronizationManager事务管理器解决。
public abstract class TransactionSynchronizationManager {
private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
new NamedThreadLocal<>("Transaction synchronizations");
/**
* Return if transaction synchronization is active for the current thread.
* Can be called before register to avoid unnecessary instance creation.
* @see #registerSynchronization
*/
public static boolean isSynchronizationActive() {
return (synchronizations.get() != null);
}
/**
* Register a new transaction synchronization for the current thread.
* Typically called by resource management code.
* <p>Note that synchronizations can implement the
* {@link org.springframework.core.Ordered} interface.
* They will be executed in an order according to their order value (if any).
* @param synchronization the synchronization object to register
* @throws IllegalStateException if transaction synchronization is not active
* @see org.springframework.core.Ordered
*/
public static void registerSynchronization(TransactionSynchronization synchronization)
throws IllegalStateException {
Assert.notNull(synchronization, "TransactionSynchronization must not be null");
Set<TransactionSynchronization> synchs = synchronizations.get();
if (synchs == null) {
throw new IllegalStateException("Transaction synchronization is not active");
}
synchs.add(synchronization);
}
}
Rocketmq方法封装,通过TransactionSynchronizationManager.isSynchronizationActive()判断当前方法的调用是否在事务内。
如果是,则注册一个事务同步适配器,在事务提交后发送消息。
否则直接发送。
/**
* 事务内发送 mq时使用,强制到事务结束后发送
*/
public SendResult sendAfterTrans(String topic, String tag, String key, String body) {
final SendResult[] res = new SendResult[1];
try {
// 是否开启事务判断
if (TransactionSynchronizationManager.isSynchronizationActive()) {
log.debug("Mysql事务内Mq消息发送 延迟到事务提交后 waiting……");
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
log.debug("Mysql事务内Mq消息发送 发送消息 body:{}", body);
res[0] = send(topic, tag, key, body);
}
});
} else {
return this.send(topic, tag, key, body);
}
} catch (Exception e) {
e.printStackTrace();
}
return res[0];
}