Skip to content

Not reusing connection and session #238

@cknzl2014

Description

@cknzl2014

With a simple flow from ActiveMQ queue to IBM MQ queue using Atomikos TransactionEssentials 6.0.0, I see that every message results in a new connection/session. The flow is running in a single thread, with Jms.messageDrivenChannelAdapter (using DefaultMessageListenerContainer) and Jms.outboundAdapter (using JmsTemplate).
Is there a trick to make Atomikos reuse the existing connection?

Output of amqsact command:

=============================================================================
  Tid Date       Time      Operation       CompCode      MQRC  HObj (ObjName)
  451 2025-05-28   15:32:41  MQXF_CONNX      MQCC_OK       0000  -
  451 2025-05-28   15:32:41  MQXF_OPEN       MQCC_OK       0000  2 (                                                )
  451 2025-05-28   15:32:41  MQXF_INQ        MQCC_OK       0000  2 (                                                )
  451 2025-05-28   15:32:41  MQXF_CLOSE      MQCC_OK       0000  2 (                                                )
  451 2025-05-28   15:32:41  MQXF_BACK       MQCC_OK       0000  -
  451 2025-05-28   15:32:41  MQXF_BACK       MQCC_OK       0000  -
  451 2025-05-28   15:32:41  MQXF_DISC       MQCC_OK       0000  -
=============================================================================
=============================================================================
  Tid Date       Time      Operation       CompCode      MQRC  HObj (ObjName)
  451 2025-05-28   15:32:41  MQXF_CONNX      MQCC_OK       0000  -
  451 2025-05-28   15:32:41  MQXF_OPEN       MQCC_OK       0000  2 (                                                )
  451 2025-05-28   15:32:41  MQXF_INQ        MQCC_OK       0000  2 (                                                )
  451 2025-05-28   15:32:41  MQXF_CLOSE      MQCC_OK       0000  2 (                                                )
  451 2025-05-28   15:32:41  MQXF_XAOPEN                   0000  -
  451 2025-05-28   15:32:41  MQXF_OPEN       MQCC_OK       0000  2 (TEST.OUT1                                       )
  451 2025-05-28   15:32:41  MQXF_XASTART                  0000  -
  451 2025-05-28   15:32:41  MQXF_PUT        MQCC_OK       0000  2 (TEST.OUT1                                       )
  451 2025-05-28   15:32:41  MQXF_CLOSE      MQCC_OK       0000  2 (TEST.OUT1                                       )
  451 2025-05-28   15:32:41  MQXF_XAEND                    0000  -
  451 2025-05-28   15:32:41  MQXF_XAPREPARE                0000  -
  451 2025-05-28   15:32:41  MQXF_XACOMMIT                 0000  -
  451 2025-05-28   15:32:41  MQXF_XACLOSE                  0000  -
  451 2025-05-28   15:32:41  MQXF_DISC       MQCC_OK       0000  -
=============================================================================

Using:
org.springframework.boot:spring-boot-starter:3.3.11
org.springframework.boot:spring-boot-starter-integration:6.3.9
com.atomikos:transactions-spring-boot3-starter:6.0.0
org.apache.activemq:activemq-client:6.1.6
com.ibm.mq:com.ibm.mq.jakarta.client:9.4.2.0

@Configuration
public class WebSphereMQConfiguration {

	@Autowired
	@Value("${webspheremq.queueManager}")
	private String queueNamager;

	@Autowired
	@Value("${webspheremq.host}")
	private String host;

	@Autowired
	@Value("${webspheremq.port}")
	private int port;

	@Autowired
	@Value("${webspheremq.channel}")
	private String channel;

	@Autowired
	@Value("${webspheremq.sslCipherSuite}")
	private String sslCipherSuite;

	@Autowired
	@Value("${webspheremq.username}")
	private String username;
	
	@Autowired
	@Value("${webspheremq.password}")
	private String password;
	
	@Autowired
	@Value("${webspheremq.connCacheSize:100}")
	private int connCacheSize;
	
	@Bean
	public XAConnectionFactory webSphereMQXaConnectionFactory() throws Exception {
		var xacf = new UserCredentialsMQXAConnectionFactory();
		xacf.setTransportType(CommonConstants.WMQ_CM_CLIENT);
		xacf.setQueueManager(queueNamager);
		xacf.setHostName(host);
		xacf.setPort(port);
		xacf.setChannel(channel);
		xacf.setSSLCipherSuite(sslCipherSuite);
		xacf.setUsername(username);
		xacf.setPassword(password);
		return xacf;
	}

	@Bean("WebSphereMQConnectionFactory")
	public ConnectionFactory webSphereMQAtomikosConnectionFactory(@Qualifier("webSphereMQXaConnectionFactory") XAConnectionFactory xaConnectionFactory) {
		var xacf = new AtomikosConnectionFactoryBean();
		xacf.setUniqueResourceName("MQSeries_XA_RMI");
		xacf.setXaConnectionFactory(xaConnectionFactory);
		xacf.setLocalTransactionMode(false);
		xacf.setMinPoolSize(10);
		xacf.setMaxPoolSize(connCacheSize);
		return xacf;
	}

	public class UserCredentialsMQXAConnectionFactory extends MQXAConnectionFactory {

		private static final long serialVersionUID = 1992951660213943477L;

		private String username;
		private String password;

		@Override
		public XAConnection createXAConnection() throws JMSException {
			return super.createXAConnection(username, password);
		}

		public String getUsername() {
			return username;
		}

		public void setUsername(String username) {
			this.username = username;
		}

		public String getPassword() {
			return password;
		}

		public void setPassword(String password) {
			this.password = password;
		}

		public void afterPropertiesSet() {
			if (this.username == null) {
				throw new IllegalArgumentException("Property 'username' is required");
			}
		}
	}
}
@Configuration
public class ActiveMQConfiguration {

	@Autowired
	@Value("${activemq.brokerUrl}")
	private String brokerUrl;

	@Autowired
	@Value("${activemq.username}")
	private String username;
	
	@Autowired
	@Value("${activemq.password}")
	private String password;
	
	@Autowired
	@Value("${activemq.connCacheSize:100}")
	private int connCacheSize;

	@Bean
	public RedeliveryPolicy redeliveryPolicy() {
		var policy = new RedeliveryPolicy();
		policy.setMaximumRedeliveries(3);
		policy.setBackOffMultiplier(1);
		policy.setInitialRedeliveryDelay(50);
		policy.setRedeliveryDelay(50);
		return policy;
	}

	@Bean
	public XAConnectionFactory activeMQXaConnectionFactory(RedeliveryPolicy redeliveryPolicy) throws Exception {
		var xacf = new ActiveMQXAConnectionFactory();
		xacf.setBrokerURL(brokerUrl);
		xacf.setUserName(username);
		xacf.setPassword(password);
		xacf.setRedeliveryPolicy(redeliveryPolicy);
		return xacf;
	}
	
	@Primary
	@Bean("ActiveMQConnectionFactory")
	public ConnectionFactory activeMQAtomikosConnectionFactory(@Qualifier("activeMQXaConnectionFactory") XAConnectionFactory xaConnectionFactory) {
		var xacf = new AtomikosConnectionFactoryBean();
		xacf.setUniqueResourceName("xaActiveMQ");
		xacf.setXaConnectionFactory(xaConnectionFactory);
		xacf.setLocalTransactionMode(false);
		xacf.setMinPoolSize(10);
		xacf.setMaxPoolSize(connCacheSize);
		return xacf;
	}
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions