With a simple flow from ActiveMQ queue to IBM MQ queue using Atomikos TransactionEssentials 6.0.0, I see that every message results in a new connection/session. The flow is running in a single thread, with Jms.messageDrivenChannelAdapter (using DefaultMessageListenerContainer) and Jms.outboundAdapter (using JmsTemplate).
Is there a trick to make Atomikos reuse the existing connection?
Output of amqsact command:
=============================================================================
Tid Date Time Operation CompCode MQRC HObj (ObjName)
451 2025-05-28 15:32:41 MQXF_CONNX MQCC_OK 0000 -
451 2025-05-28 15:32:41 MQXF_OPEN MQCC_OK 0000 2 ( )
451 2025-05-28 15:32:41 MQXF_INQ MQCC_OK 0000 2 ( )
451 2025-05-28 15:32:41 MQXF_CLOSE MQCC_OK 0000 2 ( )
451 2025-05-28 15:32:41 MQXF_BACK MQCC_OK 0000 -
451 2025-05-28 15:32:41 MQXF_BACK MQCC_OK 0000 -
451 2025-05-28 15:32:41 MQXF_DISC MQCC_OK 0000 -
=============================================================================
=============================================================================
Tid Date Time Operation CompCode MQRC HObj (ObjName)
451 2025-05-28 15:32:41 MQXF_CONNX MQCC_OK 0000 -
451 2025-05-28 15:32:41 MQXF_OPEN MQCC_OK 0000 2 ( )
451 2025-05-28 15:32:41 MQXF_INQ MQCC_OK 0000 2 ( )
451 2025-05-28 15:32:41 MQXF_CLOSE MQCC_OK 0000 2 ( )
451 2025-05-28 15:32:41 MQXF_XAOPEN 0000 -
451 2025-05-28 15:32:41 MQXF_OPEN MQCC_OK 0000 2 (TEST.OUT1 )
451 2025-05-28 15:32:41 MQXF_XASTART 0000 -
451 2025-05-28 15:32:41 MQXF_PUT MQCC_OK 0000 2 (TEST.OUT1 )
451 2025-05-28 15:32:41 MQXF_CLOSE MQCC_OK 0000 2 (TEST.OUT1 )
451 2025-05-28 15:32:41 MQXF_XAEND 0000 -
451 2025-05-28 15:32:41 MQXF_XAPREPARE 0000 -
451 2025-05-28 15:32:41 MQXF_XACOMMIT 0000 -
451 2025-05-28 15:32:41 MQXF_XACLOSE 0000 -
451 2025-05-28 15:32:41 MQXF_DISC MQCC_OK 0000 -
=============================================================================
Using:
org.springframework.boot:spring-boot-starter:3.3.11
org.springframework.boot:spring-boot-starter-integration:6.3.9
com.atomikos:transactions-spring-boot3-starter:6.0.0
org.apache.activemq:activemq-client:6.1.6
com.ibm.mq:com.ibm.mq.jakarta.client:9.4.2.0
@Configuration
public class WebSphereMQConfiguration {
@Autowired
@Value("${webspheremq.queueManager}")
private String queueNamager;
@Autowired
@Value("${webspheremq.host}")
private String host;
@Autowired
@Value("${webspheremq.port}")
private int port;
@Autowired
@Value("${webspheremq.channel}")
private String channel;
@Autowired
@Value("${webspheremq.sslCipherSuite}")
private String sslCipherSuite;
@Autowired
@Value("${webspheremq.username}")
private String username;
@Autowired
@Value("${webspheremq.password}")
private String password;
@Autowired
@Value("${webspheremq.connCacheSize:100}")
private int connCacheSize;
@Bean
public XAConnectionFactory webSphereMQXaConnectionFactory() throws Exception {
var xacf = new UserCredentialsMQXAConnectionFactory();
xacf.setTransportType(CommonConstants.WMQ_CM_CLIENT);
xacf.setQueueManager(queueNamager);
xacf.setHostName(host);
xacf.setPort(port);
xacf.setChannel(channel);
xacf.setSSLCipherSuite(sslCipherSuite);
xacf.setUsername(username);
xacf.setPassword(password);
return xacf;
}
@Bean("WebSphereMQConnectionFactory")
public ConnectionFactory webSphereMQAtomikosConnectionFactory(@Qualifier("webSphereMQXaConnectionFactory") XAConnectionFactory xaConnectionFactory) {
var xacf = new AtomikosConnectionFactoryBean();
xacf.setUniqueResourceName("MQSeries_XA_RMI");
xacf.setXaConnectionFactory(xaConnectionFactory);
xacf.setLocalTransactionMode(false);
xacf.setMinPoolSize(10);
xacf.setMaxPoolSize(connCacheSize);
return xacf;
}
public class UserCredentialsMQXAConnectionFactory extends MQXAConnectionFactory {
private static final long serialVersionUID = 1992951660213943477L;
private String username;
private String password;
@Override
public XAConnection createXAConnection() throws JMSException {
return super.createXAConnection(username, password);
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public void afterPropertiesSet() {
if (this.username == null) {
throw new IllegalArgumentException("Property 'username' is required");
}
}
}
}
@Configuration
public class ActiveMQConfiguration {
@Autowired
@Value("${activemq.brokerUrl}")
private String brokerUrl;
@Autowired
@Value("${activemq.username}")
private String username;
@Autowired
@Value("${activemq.password}")
private String password;
@Autowired
@Value("${activemq.connCacheSize:100}")
private int connCacheSize;
@Bean
public RedeliveryPolicy redeliveryPolicy() {
var policy = new RedeliveryPolicy();
policy.setMaximumRedeliveries(3);
policy.setBackOffMultiplier(1);
policy.setInitialRedeliveryDelay(50);
policy.setRedeliveryDelay(50);
return policy;
}
@Bean
public XAConnectionFactory activeMQXaConnectionFactory(RedeliveryPolicy redeliveryPolicy) throws Exception {
var xacf = new ActiveMQXAConnectionFactory();
xacf.setBrokerURL(brokerUrl);
xacf.setUserName(username);
xacf.setPassword(password);
xacf.setRedeliveryPolicy(redeliveryPolicy);
return xacf;
}
@Primary
@Bean("ActiveMQConnectionFactory")
public ConnectionFactory activeMQAtomikosConnectionFactory(@Qualifier("activeMQXaConnectionFactory") XAConnectionFactory xaConnectionFactory) {
var xacf = new AtomikosConnectionFactoryBean();
xacf.setUniqueResourceName("xaActiveMQ");
xacf.setXaConnectionFactory(xaConnectionFactory);
xacf.setLocalTransactionMode(false);
xacf.setMinPoolSize(10);
xacf.setMaxPoolSize(connCacheSize);
return xacf;
}
}
With a simple flow from ActiveMQ queue to IBM MQ queue using Atomikos TransactionEssentials 6.0.0, I see that every message results in a new connection/session. The flow is running in a single thread, with Jms.messageDrivenChannelAdapter (using DefaultMessageListenerContainer) and Jms.outboundAdapter (using JmsTemplate).
Is there a trick to make Atomikos reuse the existing connection?
Output of amqsact command:
Using:
org.springframework.boot:spring-boot-starter:3.3.11
org.springframework.boot:spring-boot-starter-integration:6.3.9
com.atomikos:transactions-spring-boot3-starter:6.0.0
org.apache.activemq:activemq-client:6.1.6
com.ibm.mq:com.ibm.mq.jakarta.client:9.4.2.0