网站在正在建设中崇义县网站建设
news/
2025/9/30 0:39:21/
文章来源:
网站在正在建设中,崇义县网站建设,淘宝运营培训有必要吗,设计说明英语翻译JMS规范 一、JMS是什么二、MQ中间件对比三、JMS组成1.JMS Provider2.JMS Producer3.JMS Consumer4.JSM Message4.1 消息头4.2 消息体4.2.1 生产者4.2.2 消费者 4.3 消息属性 四、JMS可靠性1.PERSISTENT - 持久化1.1 参数设置1.2 Queue持久化1.3 Topic持久化1.3.1 持久的发布主题… JMS规范 一、JMS是什么二、MQ中间件对比三、JMS组成1.JMS Provider2.JMS Producer3.JMS Consumer4.JSM Message4.1 消息头4.2 消息体4.2.1 生产者4.2.2 消费者 4.3 消息属性 四、JMS可靠性1.PERSISTENT - 持久化1.1 参数设置1.2 Queue持久化1.3 Topic持久化1.3.1 持久的发布主题生产者1.3.2 持久的订阅主题消费者 2.Transaction - 事务2.1 生产者开启事务2.2 消费者开启事务 3.Acknowledge - 签收3.1 案例-手动签收 4.签收和事务的关系 一、JMS是什么
首先需要区分JavaSE、JavaEE、JMS JavaSE是一门编程语言JavaEE是一套使用Java进行企业级应用开发的大家一致遵循的13个核心规范工业标准。JavaEE平台提供了一个基于组件的方法来加快设计开发。装配及部署企业应用程序。 DBCJava Databease数据库连接JNDIJava Naming and Directory InterfacesJava的命令和目录接口EJBEnterprise JavaBeanRMIRemote Method Invoke远程方法调用Java IDLInterface Description Language/CORBACommon Object Broker Architecture接口定义语言/共用对象请求代理程序体系结构JSPJava Server PageServletXMLExtensible Markup Language可标记白标记语言JMSJava Message ServiceJava消息服务JTAJava Transaction APIJava事务APIJTSJava Transaction ServiceJava事务服务JavaMailJAFJavaBean Activation Framework JMSJava Message Service是JavaEE中的一个技术是消息服务 Java消息服务指的是两个应用程序之间进行异步通信的API它为标准协议和消息服务提供了一组通用接口包括创建、发送、读取消息等用于支持Java应用程序开发。在JavaEE中当两个应用程序使用JMS进行通信时它们之间不是直接相连的而是通过一个共同的消息收发服务组件关联起来以达到解耦/异步削峰的效果。
二、MQ中间件对比
特性ActiveMQRabbitMQKafkaRocketMQPRODUCER-CUMSUMER支持支持支持支持PUBLISH-SUBSCRIBE支持支持支持支持REQUEST-REPLY(请求-响应)支持支持-支持API完备性高高高低静态配置多语言支持支持Java优先语言无关支持Java优先支持单机吞吐量万级万级十万级单机万级消息延迟-微秒级毫秒级-可用性高主从高主从非常高分布式高消息丢失-低理论上不会丢失-消息重复-可控制理论上会有重复-文档的完备性高高高中提供快速入门有有有无首次部署难度-低中高
三、JMS组成
1.JMS Provider
实现JMS接口和规范的消息中间件也就是我们说的MQ服务器
2.JMS Producer
消息生产者创建和发送JMS消息的客户端应用
3.JMS Consumer
消息消费者接收和处理JMS消息的客户端应用
4.JSM Message
4.1 消息头
JMSDestination消息发送的目的地主要是指Queue和TopicJMSDeliveryMode持久模式和非持久模式。 一条持久性的消息应该被传送“一次仅仅一次”这就意味着如果JMS提供者出现故障该消息并不会丢失它会在服务器恢复之后再次传递。一条非持久的消息最多会传递一次这意味着服务器出现故障该消息将会永远丢失。 JMSExpiration可以设置消息在一定时间后过期默认是永不过期 消息过期时间等于Destination的send方法中的timeToLive值加上发送时刻的GMT时间值。如果timeToLive值等于0则JMSExpiration被设为0表示该消息永不过期。如果发送后在消息过期时间之后还没有被发送到目的地则该消息被清除。 JMSPriority消息优先级从0-9十个级别0-4是普通消息5-9是加急消息。 JMS不要求MQ严格按照这十个优先级发送消息但必须保证加急消息要先于普通消息到达。默认是4级。 JMSMessageID唯一标识每个消息的标识由MQ产生。
4.2 消息体
封装具体的消息数据5种消息格式 TxtMessage普通字符串消息包含一个StringMapMessage一个Map类型的消息key为Strng类型而值为Java基本类型BytesMessage二进制数组消息包含一个byte[]StreamMessageJava数据流消息用标准流操作来顺序填充和读取ObjectMessage对象消息包含一个可序列化的Java对象 注意发送和接收的消息体类型必须一致对应
4.2.1 生产者
package com.qingsi.activemq;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class JmsProduce {public static final String ACTIVEMQ_URL tcp://192.168.86.128:61616;public static final String QUEUE_NAME queue01;public static void main(String[] args) throws JMSException {// 1.创建连接工厂, 采用默认的用户名密码ActiveMQConnectionFactory activeMQConnectionFactory new ActiveMQConnectionFactory(ACTIVEMQ_URL);// 2.通过连接工厂获得connection并启动访问Connection connection activeMQConnectionFactory.createConnection();connection.start();// 3.创建会话session// 两个参数第一个叫事务第二个叫签收Session session connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 4.创建目的地是队列还是主题Queue queue session.createQueue(QUEUE_NAME);// 5.创建消息的生产者MessageProducer producer session.createProducer(queue);// 6.使用生产者生成3条消息发送到MQ队列for (int i 1; i 3; i) {// 7.1 创建文本消息TextMessage textMessage session.createTextMessage(msg-- i);// 最简单的字符串producer.send(textMessage);// 7.2 创建Map类型的消息MapMessage mapMessage session.createMapMessage();mapMessage.setString(k1, v1);producer.send(mapMessage);// 7.3 创建 BytesMessageBytesMessage bytesMessage session.createBytesMessage();bytesMessage.writeBytes(hello.getBytes());bytesMessage.writeBytes(world.getBytes());producer.send(bytesMessage);// 7.4 创建 StreamMessageStreamMessage streamMessage session.createStreamMessage();streamMessage.writeString(hello);streamMessage.writeInt(123);producer.send(streamMessage);// 7.5 创建 ObjectMessage// ObjectMessage objectMessage session.createObjectMessage();// MyObject myObject new MyObject();// myObject.setName(张三);// myObject.setAge(20);// objectMessage.setObject(myObject);// producer.send(objectMessage);}// 9.关闭资源producer.close();session.close();connection.close();System.out.println(MQ消息发布完成);}}
4.2.2 消费者
package com.qingsi.activemq;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;
import java.io.IOException;public class JmsConsumer {public static final String ACTIVEMQ_URL tcp://192.168.86.128:61616;public static final String QUEUE_NAME queue01;public static void main(String[] args) throws JMSException, IOException {// 1.创建连接工厂, 采用默认的用户名密码ActiveMQConnectionFactory activeMQConnectionFactory new ActiveMQConnectionFactory(ACTIVEMQ_URL);// 2.通过连接工厂获得connection并启动访问Connection connection activeMQConnectionFactory.createConnection();connection.start();// 3.创建会话session// 两个参数第一个叫事务第二个叫签收Session session connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 4.创建目的地是队列还是主题Queue queue session.createQueue(QUEUE_NAME);// 5.创建消息的消费者MessageConsumer consumer session.createConsumer(queue);// 6.通过监听的方式来消费消息consumer.setMessageListener(new MessageListener() {Overridepublic void onMessage(Message message) {if (message instanceof TextMessage) {TextMessage textMessage (TextMessage) message;try {System.out.println(消费者消费消息 textMessage.getText());} catch (JMSException e) {e.printStackTrace();}}if (message instanceof MapMessage) {MapMessage mapMessage (MapMessage) message;try {System.out.println(消费者消费map消息 mapMessage.getString(k1));} catch (JMSException e) {e.printStackTrace();}}if (message instanceof BytesMessage) {BytesMessage bytesMessage (BytesMessage) message;byte[] bytes new byte[0];try {bytes new byte[(int) bytesMessage.getBodyLength()];} catch (JMSException e) {e.printStackTrace();}try {bytesMessage.readBytes(bytes);} catch (JMSException e) {e.printStackTrace();}String content new String(bytes);System.out.println(消费者消费 BytesMessage 消息 content);}if (message instanceof StreamMessage) {StreamMessage streamMessage (StreamMessage) message;String text null;try {text streamMessage.readString();} catch (JMSException e) {e.printStackTrace();}int number 0;try {number streamMessage.readInt();} catch (JMSException e) {e.printStackTrace();}System.out.println(消费者消费 StreamMessage 消息 text , number);}// if (message instanceof ObjectMessage) {// ObjectMessage objectMessage (ObjectMessage) message;// Serializable object objectMessage.getObject();// if (object instanceof MyObject) {// MyObject myObject (MyObject) object;// System.out.println(消费者消费 ObjectMessage 消息 myObject.getName() , myObject.getAge());// }//}}});// 保证控制台不关掉System.in.read();consumer.close();session.close();connection.close();}
}
4.3 消息属性
如果需要除消息字段以外的值那么可以使用消息属性识别/去重/重点标注等操作非常有用的方法有以下API
四、JMS可靠性
1.PERSISTENT - 持久化
1.1 参数设置
非持久当服务器宕机消息不存在。 messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT) 持久持久化当服务器宕机消息依然存在。 messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT)
1.2 Queue持久化
Queue默认是持久
1.3 Topic持久化
场景只要订阅了之后离线了重新上线就会继续消费 类似微信公众号 注意先启动定阅消费者再启动定阅生产者当所有的消息必须被接收则用持久订阅。当消息丢失能够被容忍则用非持久订阅
1.3.1 持久的发布主题生产者
package com.qingsi.activemq;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class JmsTopicProduce {public static final String ACTIVEMQ_URL tcp://192.168.86.128:61616;public static final String TOPIC_NAME topic01;public static void main(String[] args) throws JMSException {// 1.创建连接工厂, 采用默认的用户名密码ActiveMQConnectionFactory activeMQConnectionFactory new ActiveMQConnectionFactory(ACTIVEMQ_URL);// 2.通过连接工厂获得connection并启动访问Connection connection activeMQConnectionFactory.createConnection();// 3.创建会话session// 两个参数第一个叫事务第二个叫签收Session session connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 4.创建目的地是队列还是主题Topic topic session.createTopic(TOPIC_NAME);// 5.创建消息的生产者MessageProducer producer session.createProducer(topic);//6.设置生产者生产持久化的Topicproducer.setDeliveryMode(DeliveryMode.PERSISTENT);//7.启动连接connection.start();// 8.使用生产者生成3条消息发送到MQ主题for (int i 1; i 3; i) {// 9.创建消息TextMessage textMessage session.createTextMessage(msg-topic-- i);// 最简单的字符串// 10.通过producer发送给mqproducer.send(textMessage);}// 11.关闭资源producer.close();session.close();connection.close();System.out.println(MQ消息发布到topic完成);}}
1.3.2 持久的订阅主题消费者
package com.qingsi.activemq;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;
import java.io.IOException;public class JmsTopicConsumer {public static final String ACTIVEMQ_URL tcp://192.168.86.128:61616;public static final String TOPIC_NAME topic01;public static void main(String[] args) throws JMSException, IOException {System.out.println(我是1号消费者);// 1.创建连接工厂, 采用默认的用户名密码ActiveMQConnectionFactory activeMQConnectionFactory new ActiveMQConnectionFactory(ACTIVEMQ_URL);// 2.通过连接工厂获得connection并启动访问Connection connection activeMQConnectionFactory.createConnection();connection.setClientID(王五);// 3.创建会话session// 两个参数第一个叫事务第二个叫签收Session session connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 4.创建目的地是队列还是主题Topic topic session.createTopic(TOPIC_NAME);//5.通过session创建持久化订阅TopicSubscriber topicSubscriber session.createDurableSubscriber(topic, 我是王五);//6.启动连接connection.start();// 7.通过监听的方式来消费消息topicSubscriber.setMessageListener(new MessageListener() {Overridepublic void onMessage(Message message) {if (message instanceof TextMessage){TextMessage textMessage (TextMessage) message;try {System.out.println(我是1号消费者消费消息 textMessage.getText());} catch (JMSException e) {e.printStackTrace();}}}});System.in.read();session.close();connection.close();}
}
订阅者在线
订阅者离线
这个必须订阅启动过一次程序过一次才行
2.Transaction - 事务
事务偏生产者注意如果生产者开了事务那么签收默认就是自动签收指定了其他签收类型也是自动签收Session session connection.createSession(false, Session.AUTO_ACKNOWLEDGE) createSession的第一个参数决定是否开启。如果开启了 需要提交事务才会起效关闭事务只要执行send就进入到队列中。关闭事务那第2个签收参数的设置需要有效开启事务先执行send再执行commit消息才被真正提交到队列中。消息需要批量提交需要缓冲处理。 如果开启了事务那么级别是比签收更高一些。 事务场景涉及到一次性发送两条及以上的消息那么需要使用事务。容易出现的生产事故消费者开启了事务但是没有commit就会造成消息重复消费。
2.1 生产者开启事务
package com.qingsi.activemq;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class JmsProduce {public static final String ACTIVEMQ_URL tcp://192.168.86.128:61616;public static final String QUEUE_NAME queue01;public static void main(String[] args) throws JMSException {//1.创建连接工厂按照给定的URL采用默认的用户名密码ActiveMQConnectionFactory activeMQConnectionFactory new ActiveMQConnectionFactory(ACTIVEMQ_URL);//2.通过连接工厂,获得connection并启动访问Connection connection activeMQConnectionFactory.createConnection();connection.start();//3.创建会话session//两个参数transacted事务,acknowledgeMode确认模式(签收)//开启事务需要commitSession session connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//4.创建目的地(具体是队列queue还是主题topic)Queue queue session.createQueue(QUEUE_NAME);//5.创建消息的生产者,并设置不持久化消息MessageProducer producer session.createProducer(queue);//6.通过使用消息生产者,生产三条消息,发送到MQ的队列里面try {for (int i 0; i 3; i) {TextMessage textMessage session.createTextMessage(tx msg-- i);producer.send(textMessage);}//7.提交事务session.commit();System.out.println(消息发送完成);} catch (Exception e) {System.out.println(出现异常,消息回滚);session.rollback();} finally {//8.关闭资源producer.close();session.close();connection.close();}}}
2.2 消费者开启事务
package com.qingsi.activemq;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class JmsConsumer {public static final String ACTIVEMQ_URL tcp://192.168.86.128:61616;public static final String QUEUE_NAME queue01;public static void main(String[] args) throws JMSException {//1.创建连接工厂按照给定的URL采用默认的用户名密码ActiveMQConnectionFactory activeMQConnectionFactory new ActiveMQConnectionFactory(ACTIVEMQ_URL);//2.通过连接工厂,获得connection并启动访问Connection connection activeMQConnectionFactory.createConnection();connection.start();//3.创建会话session//两个参数transacted事务,acknowledgeMode确认模式(签收)//消费者开启了事务就必须手动提交不然会重复消费消息Session session connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//4.创建目的地(具体是队列queue还是主题topic)Queue queue session.createQueue(QUEUE_NAME);//5.创建消息的消费者,指定消费哪一个队列里面的消息MessageConsumer messageConsumer session.createConsumer(queue);//6.通过监听的方式消费消息messageConsumer.setMessageListener(new MessageListener() {int a 0;Overridepublic void onMessage(Message message) {if (message instanceof TextMessage) {try {if (a 2) {System.out.println(1 / 0);}TextMessage textMessage (TextMessage) message;System.out.println(***消费者接收到的消息: textMessage.getText());session.commit();a a 1;} catch (Exception e) {System.out.println(出现异常消费失败放弃消费);try {session.rollback();a0;} catch (JMSException ex) {ex.printStackTrace();}}}}});//7.关闭资源}
}
3.Acknowledge - 签收
签收偏消费者签收Acknowledge是指消费者告知消息代理Broker已经成功处理并消费了特定消息的操作。签收的作用主要有以下几点 1.确保可靠性当消费者接收到一条消息时可以对该消息进行签收。通过签收消费者向 Broker 表示已经成功处理了该消息并且要求 Broker 删除该消息或将其标记为已消费以确保消息不会再次被传递给其他消费者。2.消息顺序签收还可以控制消息的消费顺序。在 ActiveMQ 中可以使用消息的签收方式来控制消息的顺序性。例如如果消费者使用手动签收模式MANUAL_ACKNOWLEDGE消费者可以在处理完当前消息后再进行签收这样可以确保消息按照顺序进行消费。3.事务支持签收和事务密切相关。在使用事务模式时消费者可以批量消费多条消息并在事务提交时进行签收。如果事务回滚则消息将被重新传递给消费者以确保消息的可靠性处理。 签收是非事务的签收类型(以下标红的是常用) 自动签收默认Session.AUTO_ACKNOWLEDGE 在这种模式下当消费者从队列或主题中接收到消息后消息代理会立即将该消息视为已经被消费不需要消费者显式地调用acknowledge()方法进行确认。因此存在一定的消息传递风险因为如果消费者在处理消息期间发生故障消息可能会丢失。 手动签收Session.CLIENT_ACKNOWLEDGE 在这种模式下消费者必须显式地调用acknowledge()方法来确认消息的接收。消费者可以在处理完消息后手动调用acknowledge()方法以通知消息代理该消息已成功处理。这种方式能够确保消息在被消费后才会被视为已被消费从而增加了消息传递的可靠性。 延迟确认Session.DUPS_OK_ACKNOWLEDGE。 消费者不需要在接收消息时就立即确认。消息代理会允许消息重复传递但是这种模式可以提高消息的传递性能。消费者可以在后续的某个时间点调用acknowledge()方法来确认消息的接收。 事务签收Session.SESSION_TRANSACTED事务签收) 在使用会话Session进行消息接收时可以选择开启事务并在事务提交时进行消息的确认。这种方式可以确保一组消息要么全部被消费要么全部不被消费从而保证消息的一致性和可靠性。
3.1 案例-手动签收
package com.qingsi.activemq;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;
import java.io.IOException;public class JmsConsumer {public static final String ACTIVEMQ_URL tcp://192.168.86.128:61616;public static final String QUEUE_NAME queue01;public static void main(String[] args) throws JMSException, IOException {//1.创建连接工厂按照给定的URL采用默认的用户名密码ActiveMQConnectionFactory activeMQConnectionFactory new ActiveMQConnectionFactory(ACTIVEMQ_URL);//2.通过连接工厂,获得connection并启动访问Connection connection activeMQConnectionFactory.createConnection();connection.start();//3.创建会话session//两个参数transacted事务,acknowledgeMode确认模式(签收)//消费者开启了事务就必须手动提交不然会重复消费消息Session session connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);//4.创建目的地(具体是队列queue还是主题topic)Queue queue session.createQueue(QUEUE_NAME);//5.创建消息的消费者,指定消费哪一个队列里面的消息MessageConsumer messageConsumer session.createConsumer(queue);//6.通过监听的方式消费消息messageConsumer.setMessageListener(new MessageListener() {Overridepublic void onMessage(Message message) {if (message instanceof TextMessage) {try {TextMessage textMessage (TextMessage) message;System.out.println(***消费者接收到的消息: textMessage.getText());// 手动签收textMessage.acknowledge();} catch (Exception e) {System.out.println(出现异常消费失败放弃消费);}}}});//7.关闭资源System.in.read();messageConsumer.close();session.close();connection.close();}
}
4.签收和事务的关系
在事务性会话中当一个事务被成功提交则消息被自动签收。如果事务回滚则消息会被再次传送。非事务性会话中消息何时被确认取决于创建会话时的应答模式(acknowledgement mode)
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/922370.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!