1. 引入ActiveMQ的SpringBoot插件
<!-- ActiveMQ --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency>2. application中增加activemq的配置
spring: activemq: #ActiveMQ服务器地址 broker-url: tcp://127.0.0.1:61616 user: admin password: admin jms: #false == Queue ; true == Topic ; 默认为false pub-sub-domain: false3. Java代码
package com.geofly.apicall.mq; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.geofly.apicall.model.entity.FlowConfig; import com.geofly.apicall.service.FlowConfigService; import com.geofly.apicall.service.FlowEngineService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; import java.util.Map; /** * ActiveMQ 消息消费者 * 接收消息并触发指定的流程 */ @Slf4j @Component public class ActiveMQConsumer { @Autowired private FlowConfigService flowConfigService; @Autowired private FlowEngineService flowEngineService; /** * 接收消息并触发流程 * 消息格式示例: {"triggerName": "workorder_finish", "params": {"id": 123, "status": "done"}} * * @param message JSON 格式的消息字符串 */ @JmsListener(destination = "${spring.activemq.destination:flow.trigger.destination}") public void receiveMessage(String message) { log.info("接收到 ActiveMQ 消息: {}", message); try { // 1. 解析消息 if (!JSONUtil.isTypeJSONObject(message)) { log.warn("消息不是有效的 JSON 对象: {}", message); return; } JSONObject json = JSONUtil.parseObj(message); String triggerName = json.getStr("triggerName"); // 兼容性处理:如果消息中直接传了 flowCode 也可以 String flowCode = json.getStr("flowCode"); Map<String, Object> params = null; if (json.containsKey("params")) { params = json.get("params", Map.class); } // 2. 查找流程 FlowConfig flowConfig = null; if (triggerName != null) { flowConfig = flowConfigService.getByTriggerName(triggerName); } else if (flowCode != null) { flowConfig = flowConfigService.getOne( new LambdaQueryWrapper<FlowConfig>() .eq(FlowConfig::getFlowCode, flowCode) ); } if (flowConfig == null) { log.error("未找到对应的流程,triggerName: {}, flowCode: {}", triggerName, flowCode); return; } // 3. 检查状态 if (flowConfig.getStatus() != 1) { log.warn("流程已禁用: {} ({})", flowConfig.getFlowName(), flowConfig.getFlowCode()); return; } // 4. 执行流程 log.info("从 ActiveMQ 消息触发流程: {} (ID: {})", flowConfig.getFlowName(), flowConfig.getId()); flowEngineService.executeFlow(flowConfig.getId(), params); } catch (Exception e) { log.error("处理 ActiveMQ 消息异常: {}", message, e); } } }这里监听了一个事件,如果ActiveMQ收到相应的事件就会触发这个方法