可以查企业备案的网站吗花生壳如何建设网站
可以查企业备案的网站吗,花生壳如何建设网站,wordpress 2.7.1下载,档案网站建设存在的问题rabbitmq 使用SAC队列实现顺序消息
前提
SAC: single active consumer, 是指如果有多个实例#xff0c;只允许其中一个实例消费#xff0c;其他实例为空闲
目的
实现消息顺序消费#xff0c;操作#xff1a;
创建4个SAC队列,消息的路由key 取队列个数模#xff0c;这…rabbitmq 使用SAC队列实现顺序消息
前提
SAC: single active consumer, 是指如果有多个实例只允许其中一个实例消费其他实例为空闲
目的
实现消息顺序消费操作
创建4个SAC队列,消息的路由key 取队列个数模这里是4发送消息到每个队列保证每个队列只有一个消费者
实现
定义消息 SeqMessage
Data
AllArgsConstructor
public class SeqMessage implements Serializable {//消息idprivate String requestNo;//消息中顺序1234private int order;
}创建 队列 绑定
Configuration
public class OrderQueueConfiguration {public static final String EXCHANGE order-ex;public static final String RK_PREFIX rk-;public static final String ONE_QUEUE one-queue;public static final String TWO_QUEUE two-queue;public static final String THREE_QUEUE three-queue;public static final String FOUR_QUEUE four-queue;Beanpublic DirectExchange exchange() { // 使用直连的模式return new DirectExchange(EXCHANGE, true, false);}Beanpublic Binding oneBinding() {return BindingBuilder.bind(oneQueue()).to(exchange()).with(RK_PREFIX 1);}Beanpublic Binding twoBinding() {return BindingBuilder.bind(twoQueue()).to(exchange()).with(RK_PREFIX 2);}Beanpublic Binding threeBinding() {return BindingBuilder.bind(threeQueue()).to(exchange()).with(RK_PREFIX 3);}Beanpublic Binding fourBinding() {return BindingBuilder.bind(fourQueue()).to(exchange()).with(RK_PREFIX 3);}Beanpublic Queue oneQueue() {return createSacQueue(ONE_QUEUE);}Beanpublic Queue twoQueue() {return createSacQueue(TWO_QUEUE);}Beanpublic Queue threeQueue() {return createSacQueue(THREE_QUEUE);}Beanpublic Queue fourQueue() {return createSacQueue(FOUR_QUEUE);}private static Queue createSacQueue(String queueName) {MapString, Object arguments new HashMap(2);arguments.put(x-single-active-consumer, true);return new Queue(queueName, true, false, false, arguments);}}重要的是 x-single-active-consumer 这个属性, 只有一个实例生效 创建 消费者
为每个队列创建一个监听消费者
Slf4j
Component
public class OrderListener {RabbitListener(bindings QueueBinding(exchange Exchange(value EXCHANGE,declare false),value Queue(value ONE_QUEUE, durable true, declare false), key RK_PREFIX 1))public void onMessage1(Message message, Headers Channel channel) {String messageStr ;try {messageStr new String(message.getBody(), StandardCharsets.UTF_8);log.info({} recv: {}, ONE_QUEUE, messageStr);} catch (Exception e) {log.error(######### OrderListener.onMessage: {}-{}, messageStr, e);}}RabbitListener(bindings QueueBinding(exchange Exchange(value EXCHANGE,declare false),value Queue(value TWO_QUEUE, durable true, declare false), key RK_PREFIX 2))public void onMessage2(Message message, Headers Channel channel) {String messageStr ;try {messageStr new String(message.getBody(), StandardCharsets.UTF_8);log.info({} recv: {}, TWO_QUEUE, messageStr);} catch (Exception e) {log.error(######### OrderListener.onMessage: {}-{}, messageStr, e);}}RabbitListener(bindings QueueBinding(exchange Exchange(value EXCHANGE,declare false),value Queue(value THREE_QUEUE, durable true, declare false), key RK_PREFIX 3))public void onMessage3(Message message, Headers Channel channel) {String messageStr ;try {messageStr new String(message.getBody(), StandardCharsets.UTF_8);log.info({} recv: {}, THREE_QUEUE, messageStr);} catch (Exception e) {log.error(######### OrderListener.onMessage: {}-{}, messageStr, e);}}RabbitListener(bindings QueueBinding(exchange Exchange(value EXCHANGE,declare false),value Queue(value FOUR_QUEUE, durable true, declare false), key RK_PREFIX 4))public void onMessage4(Message message, Headers Channel channel) {String messageStr ;try {messageStr new String(message.getBody(), StandardCharsets.UTF_8);log.info({} recv: {}, FOUR_QUEUE, messageStr);} catch (Exception e) {log.error(######### OrderListener.onMessage: {}-{}, messageStr, e);}}}生产者发送消息
GetMapping(/send/seq/messqge)public String sendSeqMessage() throws JsonProcessingException {int cnt 100;int mod 4;int seqSize 6;for (int i 0; i cnt; i) {for (int j 0; j seqSize; j) {int rk i % mod 1;SeqMessage seqMessage new SeqMessage(seq- i, j);String s objectMapper.writeValueAsString(seqMessage);log.info(routeKey: {}, send msg: {}, rk, s);rabbitTemplate.convertAndSend(EXCHANGE, RK_PREFIX rk, s);}}return success;}运行结果:
two-queue recv: {requestNo:seq-1,order:0}
two-queue recv: {requestNo:seq-1,order:1}
two-queue recv: {requestNo:seq-1,order:2}
two-queue recv: {requestNo:seq-1,order:3}
two-queue recv: {requestNo:seq-1,order:4}
two-queue recv: {requestNo:seq-1,order:5}
two-queue recv: {requestNo:seq-5,order:0}
two-queue recv: {requestNo:seq-5,order:1}
two-queue recv: {requestNo:seq-5,order:2}
two-queue recv: {requestNo:seq-5,order:3}
two-queue recv: {requestNo:seq-5,order:4}
two-queue recv: {requestNo:seq-5,order:5}three-queue recv: {requestNo:seq-2,order:0}
three-queue recv: {requestNo:seq-2,order:1}
three-queue recv: {requestNo:seq-2,order:2}
three-queue recv: {requestNo:seq-2,order:3}
three-queue recv: {requestNo:seq-2,order:4}
three-queue recv: {requestNo:seq-2,order:5}
three-queue recv: {requestNo:seq-6,order:0}
three-queue recv: {requestNo:seq-6,order:1}
three-queue recv: {requestNo:seq-6,order:2}
three-queue recv: {requestNo:seq-6,order:3}
three-queue recv: {requestNo:seq-6,order:4}
three-queue recv: {requestNo:seq-6,order:5}可以发现消息消费是顺序的
good luck!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/91167.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!