每隔一段时间,我会遇到一个与ActiveMQ的连接和池相关的有趣问题,而今天,我想讨论一些并不总是很清楚的问题,并且在使用ActiveMQ和Camel JMS时可能会导致您大量饮酒。 并不是说您无论如何都不会在使用ActiveMQ和Camel时大量喝酒……以庆祝使用它们时集成和消息传递变得多么令人愉快。
所以首先。 连接池。
当然,您总是听说过要建立连接。 这到底是什么意思,为什么要这么做?
与创建会话或使用方等其他操作相比,打开与ActiveMQ代理的连接是一项相对昂贵的操作。 因此,在发送或接收消息并通常与代理进行交互时,如果可能的话,您希望重用现有的连接。 您不需要做的就是依靠JMS库(例如Spring JmsTemplate),该库在每次发送或接收消息时都会打开和关闭连接……除非您可以合并/缓存连接。
因此,如果我们可以同意池化连接是一个好主意,请看一个示例配置:
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" init-method="start" destroy-method="stop"><property name="maxConnections" value="10" /><property name="maximumActiveSessionPerConnection" value="10" /><property name="connectionFactory" ><bean class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="tcp://127.0.0.1:61616" /></bean></property></bean>您甚至可能要使用Apache Camel及其出色的camel-jms组件,因为否则这样做很愚蠢。 因此,也许您想要设置类似于以下内容的JMS配置:
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration"><property name="connectionFactory" ref="pooledConnectionFactory" /><property name="transacted" value="true" /><property name="concurrentConsumers" value="15" /><property name="deliveryPersistent" value="true" /><property name="requestTimeout" value="10000" /><property name="cacheLevelName" value="CACHE_CONSUMER" /></bean>此配置基本上对消费者意味着,设置15个并发消费者,使用事务(本地),对生产者使用PERSISTENT消息,将超时设置为10000以进行请求答复等。
值得注意的是 :如果您想更全面地了解jms组件的配置,尤其是在缓存使用者,事务等方面,请参考Torsten关于Camel JMS 的出色的博客, 其中包含交易-经验教训 。 也许您还应该花一些时间在他的博客上闲逛,因为他也有很多很好的Camel / ActiveMQ东西!
到目前为止很棒。 我们有一个包含10个连接的连接池,我们希望每个连接10个会话(如果需要,总共100个会话…),以及15个并发使用者。 我们应该能够应付一些沉重的负担,对不对?
在这里看看这条路线。 这很简单,公开了activemq组件(它将使用上面的jmsConfig,因此有15个并发使用者)并仅执行一些日志记录:
from("activemq:test.queue").routeId("test.queue.routeId").to("log:org.apache.camel.blog?groupSize=100");尝试运行此程序。 您会发现您的消费者立即被封锁,堆栈跟踪将显示出这种美丽:
"Camel (camel-1) thread #1 - JmsConsumer[test.queue]" daemon prio=5 tid=7f81eb4bc000 nid=0x10abbb000 in Object.wait() [10abba000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <7f40e9070> (a org.apache.commons.pool.impl.GenericKeyedObjectPool$Latch)
at java.lang.Object.wait(Object.java:485)
at org.apache.commons.pool.impl.GenericKeyedObjectPool.borrowObject(GenericKeyedObjectPool.java:1151)
- locked <7f40e9070> (a org.apache.commons.pool.impl.GenericKeyedObjectPool$Latch)
at org.apache.activemq.pool.ConnectionPool.createSession(ConnectionPool.java:146)
at org.apache.activemq.pool.PooledConnection.createSession(PooledConnection.java:173)
at org.springframework.jms.support.JmsAccessor.createSession(JmsAccessor.java:196)
....怎么可能呢? 我们有连接池...我们将每个连接的会话数设置为每个连接10个,那么我们如何阻止创建新会话呢?
答案是,您正在耗尽会话数,就像堆栈跟踪所期望的那样。 但是如何? 我需要喝多少才能解决这个问题?
好吧,现在等等。 喝啤酒,听我说。
首先了解一下。 ActiveMQ的池实现使用commons-pool ,而maxActiveSessionsPerConnection属性实际上已映射到基础池的maxActive属性。 从文档中这意味着:
maxActive controls the maximum number of objects (per key) that can allocated by the pool (checked out to client threads, or idle in the pool) at one time.这里的键是“键”(字面意思是……文档中的“每个键”子句)。 因此,在ActiveMQ实现中,关键是一个对象,它表示1)是否事务处理模式,以及2)确认模式是() ,如此处所示 。 简而言之,对于该连接上使用的每个密钥,您最终都会获得一个“ maxActive”会话。.因此,如果您有使用事务的客户端,则不进行任何事务,client-ack,auto-ack,transacted-session, dups-okay,等等,您可以开始看到对于每个排列,您最终都将获得“ maxActive”会话。 因此,如果将maxActiveSesssionsionsPerConnection设置为10,则最终可能会得到10 x 2 x 4 == 80个会话。 这是藏在你脑海中的东西。
这里的第二个关键是,当camel-jms组件设置使用者时,它最终会在并发消费者会话指定的所有使用者之间共享一个连接。 这是一个有趣的观点,因为camel-jms使用了底层Spring框架的DefaultMessageListenerContainer,不幸的是,此限制来自该库。 因此,如果您有15个并发使用者,那么他们将共享一个连接(即使是池化……它也会从池中获取一个连接并保持它)。 因此,如果您有15个使用者,每个使用者共享一个连接,每个使用者共享一个事务处理模式,每个使用者共享一个ack模式,那么您最终将尝试为该连接创建15个会话。 最后,您得到了上述结果。
因此,我避免这些情况的经验法则是:
- 确切了解您的每个生产者和消费者正在做什么,他们的TX和ACK模式是什么
- 需要时,请始终调整最大会话参数(会话线程太多?我不知道..),但并发ConsumersConsumers + 1的值应至少为ATEST
- 如果生产者和消费者正在生产/消费相同的目的地,则将连接池拆分:一个消费者池,一个生产者池
邓诺(Dunno)该信息将非常有价值,但我想亲自写下来。 如果其他人觉得它有价值或有疑问,请在评论中让我知道。
翻译自: https://www.javacodegeeks.com/2014/03/lessons-learned-activemq-apache-camel-and-connection-pooling.html