rabbitmq创建缓存连接工厂

转自:

【RabbitMQ-3】连接池的配置_小胖学编程的博客-CSDN博客文章目录1. rabbitmq的connection连接池1.1 问题提出1.1.1 Connection对象管理以及性能1.1.2 Channel对象管理以及性能1.2 Spring AMQP线程池配置1.2.1 ConnectionFactory连接工厂1.2.2 消费发送和接收使用不同的Connectionjava NIO是IO的多路复用,Channel连接是TCP的多路复用。那么他们有什么关系呢?NIO是服务器开启一个线程,在内核中使用select()进行轮询管理一些socket,当sockehttps://blog.csdn.net/qq_29595463/article/details/107858293


文章目录

    1. rabbitmq的connection连接池
        1.1 问题提出
        1.1.1 Connection对象管理以及性能
            1.1.2 Channel对象管理以及性能
        1.2 Spring AMQP线程池配置
            1.2.1 ConnectionFactory连接工厂
            1.2.2 消费发送和接收使用不同的Connection

    java NIO是IO的多路复用,Channel连接是TCP的多路复用。那么他们有什么关系呢?
    NIO是服务器开启一个线程,在内核中使用select()进行轮询管理一些socket,当socket数据准备好时,会通知应用程序进行读写请求。系统之间那点事-NIO(内附IO模型)-IO/NIO/AIO到底是什么。服务器看起来就好像是一个socket在通信,实现了多路复用。
    channel复用TCP连接,是为了避免TCP连接创建和销毁的性能损耗,而多个channel使用一个tcp连接。

1. rabbitmq的connection连接池

RabbitMQ.png

我们看到一个Connection里面可以包含多个channel。那么我们在连接broker时,connection和channel的关系是什么?
1.1 问题提出
1.1.1 Connection对象管理以及性能

Connection连接本质上就是TCP连接,系统之间那点事-问题驱动-TCP的连接和关闭是比较耗费时间的。我们可以使用一个单例的Connection对象创建多个Channel来实现数据传输,但是在channel信息比较大的情况下,Connection带宽会限制消息的传输。那么需要设计Connection池,将流量分摊到不同的connection上。
1.1.2 Channel对象管理以及性能

Channel对象的创建和销毁也是非常耗时的,推荐共享使用Channel,而不是每次都创建和销毁Channel。那如何设计一个channel线程池呢?

官网对于Connection的解读:

    AMQP 0-9-1 connections are typically long-lived. AMQP 0-9-1 is an application level protocol that uses TCP for reliable delivery. Connections use authentication and can be protected using TLS. When an application no longer needs to be connected to the server, it should gracefully close its AMQP 0-9-1 connection instead of abruptly closing the underlying TCP connection.

大概意思就是:AMQP 0-9-1一般是一个TCP的长链接,当应用程序不再需要连接到服务器时,应该正常关闭AMQP 0-9-1连接而不是关闭TCP连接。

官网对于Channel的解读:

    Some applications need multiple connections to the broker. However, it is undesirable to keep many TCP connections open at the same time because doing so consumes system resources and makes it more difficult to configure firewalls. AMQP 0-9-1 connections are multiplexed withchannels that can be thought of as “lightweight connections that share a single TCP connection”.
    Every protocol operation performed by a client happens on a channel. Communication on a particular channel is completely separate from communication on another channel, therefore every protocol method also carries a channel ID (a.k.a. channel number), an integer that both the broker and clients use to figure out which channel the method is for.
    A channel only exists in the context of a connection and never on its own. When a connection is closed, so are all channels on it.
    For applications that use multiple threads/processes for processing, it is very common to open a new channel per thread/process and not share channels between them.

一些应用需要同时创建多个连接到broker也就是RabbitMQ服务器上。然而因为防火墙的存在,很难同时创建多个连接。 AMQP 0-9-1连接使用多个channel连接实现对单一Connection的复用。
客户端的每一个协议操作都发送在channel上。每个协议方法携带者channel ID。broker和client使用channel ID来确定方法对应的channel。因此实现channel之间的数据隔离。
channel不能单独存在,仅存在connection上下文中。当connection关闭时,channel也会关闭。
多线程/进程之间打开一个channel但不共享channels是很普遍的。

通道和并发注意事项(线程安全)

    As a rule of thumb, sharing Channel instances between threads is something to be avoided. Applications should prefer using a Channel per thread instead of sharing the same Channel across multiple threads.

线程之间共享channel是无法避免的,应用程序跟喜欢每个线程使用一个channel而不是跨线程共享相同的channel。

    A classic anti-pattern to be avoided is opening a channel for each published message. Channels are supposed to be reasonably long-lived and opening a new one is a network round-trip which makes this pattern extremely inefficient.

要避免一个反例,为每一个发布的消息分配一个channel,开辟一个新的channel需要一个网络的往返,这种模式是很低效的。channel保持合理的存活时间。

    It is possible to use channel pooling to avoid concurrent publishing on a shared channel: once a thread is done working with a channel, it returns it to the pool, making the channel available for another thread. Channel pooling can be thought of as a specific synchronization solution. It is recommended that an existing pooling library is used instead of a homegrown solution. For example, Spring AMQP which comes with a ready-to-use channel pooling feature.

(敲黑板,划重点)可以使用channel pool来避免共享channel上并发发布:一旦一个线程使用完了channel,那么它将返回到pool中。其他线程便可使用这个Channel。线程池是一个解决方案,可以使用 Spring AMQP线程池而不是自己开发。

__总结:__频繁建立TCP连接和channel连接是消耗性能的,于是我们希望可以共享connection或者channel。达到连接的复用。
1.2 Spring AMQP线程池配置

版本spring-rabbit:2.0.2.RELEASE
1.2.1 ConnectionFactory连接工厂

这个ConnectionFactory是Spring AMQP定义的连接工厂,负责创建连接。而CacheConnectionFactory实现支持对这些通道的缓存。

  private static ConnectionFactory newRabbitConnectionFactory() {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setAutomaticRecoveryEnabled(false);
        return connectionFactory;
    }

    1
    2
    3
    4
    5

参数分析:

1. 开启confirm机制。
connectionFactory.setPublisherConfirms(true); connectionFactory.setPublisherReturns(true);
为了消息的不丢失,生产者可以设置事务或者confirm异步通知。但是事务性能并不是很好,所以一般使用confirm模式。
区别:(confirm保证达到交换机,return保证交换机到达队列)
如果消息没有到exchange,则confirm回调,ack=false;
如果消息到达exchange,则confirm回调,ack=true;
exchange到queue成功,则不回调return
exchange到queue失败,则回调return(需设置mandatory=true,否则不回回调,消息就丢了)

*注意:设置PublisherReturns状态为true,那么需要设置 rabbitTemplate.setMandatory(true);
具体如何保证消息不丢失,请参考RabbitMQ的消息不丢失机制

2. 配置模式
缓存模式一般两种:
connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);

  public static enum CacheMode {
        CHANNEL,
        CONNECTION;

        private CacheMode() {
        }
    }

    1
    2
    3
    4
    5
    6
    7

2.1 CHANNEL模式

程序运行期间ConnectionFactory只维护着一个connection,但是可以含有多个channel,操作rabbitmq之前必须先获取一个channel,否则将会阻塞。

相关参数配置:

connectionFactory.setChannelCacheSize(10);
设置每个Connection中的缓存Channel的数量。操作rabbitmq之前(send/receive message等)要先获取到一个Channel,获取Channel时会先从缓存中找闲置的Channel,如果没有则创建新的Channel,当Channel数量大于缓存数量时,多出来没法放进缓存的会被关闭。

connectionFactory.setChannelCheckoutTimeout(600);
单位毫秒,当这个值大于0时,ChannelCacheSize代表的是缓存的数量上限,当缓存获取不到可用的channel时,不会创建新的channel会等待指定的时间,若到时间后还获取不到可用的channel,直接抛出AmqpTimeoutException。

注意:在CONNECTION模式,这个值也会影响获取Connection的等待时间,超时获取不到Connection也会抛出AmqpTimeoutException异常。

2.2 CONNECTION模式

CONNECTION模式。在这个模式下允许创建多个connection,会缓存一定数量的connection,每个connection中同样缓存着一些channel。

相关参数配置:

connectionFactory.setConnectionCacheSize(3);
仅在CONNECTION模式下使用,指定connection缓存数量。

connectionFactory.setConnectionLimit(10);
仅在CONNECTION模式下使用,指定connection数量上限。

官网对于是否关闭channel解答:

    Channels used within the framework (e.g. RabbitTemplate) will be reliably returned to the cache. If you create channels outside of the framework, (e.g. by accessing the connection(s) directly and invoking createChannel()), you must return them (by closing) reliably, perhaps in a finally block, to avoid running out of channels.

注意:若使用RabbitTemplate创建channel,那么无需关闭,但是自己新建connection创建channel,则需要手动关闭!避免channel溢出。

ConnectionFactory 代码:

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        //设置virtualHost。
        connectionFactory.setVirtualHost("/");
        //消息的确认机制(confirm);
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        //setCacheMode:设置缓存模式,共有两种,CHANNEL和CONNECTION模式。
        //1、CONNECTION模式,这个模式下允许创建多个Connection,会缓存一定数量的Connection,每个Connection中同样会缓存一些Channel,
        // 除了可以有多个Connection,其它都跟CHANNEL模式一样。
        //2、CHANNEL模式,程序运行期间ConnectionFactory会维护着一个Connection,
        // 所有的操作都会使用这个Connection,但一个Connection中可以有多个Channel,
        // 操作rabbitmq之前都必须先获取到一个Channel,
        // 否则就会阻塞(可以通过setChannelCheckoutTimeout()设置等待时间),
        // 这些Channel会被缓存(缓存的数量可以通过setChannelCacheSize()设置);
        connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);   //设置CONNECTION模式,可创建多个Connection连接
        //设置每个Connection中缓存Channel的数量,不是最大的。操作rabbitmq之前(send/receive message等)
        // 要先获取到一个Channel.获取Channel时会先从缓存中找闲置的Channel,如果没有则创建新的Channel,
        // 当Channel数量大于缓存数量时,多出来没法放进缓存的会被关闭。
        connectionFactory.setChannelCacheSize(10);
        //单位:毫秒;配合channelCacheSize不仅是缓存数量,而且是最大的数量。
        // 从缓存获取不到可用的Channel时,不会创建新的Channel,会等待这个值设置的毫秒数
        //同时,在CONNECTION模式,这个值也会影响获取Connection的等待时间,
        // 超时获取不到Connection也会抛出AmqpTimeoutException异常。
        connectionFactory.setChannelCheckoutTimeout(600);

        //仅在CONNECTION模式使用,设置Connection的缓存数量。
        connectionFactory.setConnectionCacheSize(3);
        //setConnectionLimit:仅在CONNECTION模式使用,设置Connection的数量上限。
        connectionFactory.setConnectionLimit(10);
        return connectionFactory;
    }

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35

RabbitTemplate 代码:

  @Autowired
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        //客户端开启confirm模式
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
            }
        });
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message);
            }
        });
        return rabbitTemplate;
    }

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20

1.2.2 消费发送和接收使用不同的Connection

    当一个服务同时作为消息的发送端和接收端时,建议使用不同的Connection避免一方出现故障或者阻塞影响另一方。

只需要在RabbitTemplate中加入下面的配置,那么RabbitTemplate在创建Connection时,会根据这个boolean的值,选择使用ConnectionFactory本身或者ConnectionFactory中的publisherConnectionFactory(也即是一个ConnectionFactory)来创建。

rabbitTemplate.setUsePublisherConnection(true);
 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/329672.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python简短语法_写出优雅简洁的 python 语法(二)函数传参

Python中函数传递参数的形式包含多种,可根据情况而定使用哪种方式。基本所有语言都有简介明了的语法去替代繁琐的代码,优雅的写法不仅能提升代码美观,更能提高提高开发效率,让代码更加易读。位置传递,默认参数位置传参…

你真的了解Java中的三目运算符吗

转载自 你真的了解Java中的三目运算符吗三目运算符是我们经常在代码中使用的,a (bnull?0:1);这样一行代码可以代替一个if-else,可以使代码变得清爽易读。但是,三目运算符也是有一定的语言规范的。在运用不恰当的时候会导致意想不到的问题。本文就介绍一…

关于.NET下开源及商业图像处理(PSD)组件

1 前言 这篇博客的背景是:为了完成吉日嘎拉的“PSD文件损坏检测和图层检测”任务,查找了目前.NET各种开源的及商业的图像处理资料,在完成任务之后,进行总结。此次任务主要是用C#操作PSD(PhotoShop)文件,中文资料很少&a…

SpringBoot连接多RabbitMQ源

转自: SpringBoot连接多RabbitMQ源 - 掘金在实际开发中,很多场景需要异步处理,这时就需要用到RabbitMQ,而且随着场景的增多程序可能需要连接多个RabbitMQ。SpringBoot本身提供了默认的配置可以快速配置连接RabbitMQ,但…

满足其中一个条件则可_农村分户好处多,但并非人人都可分户!满足这4个条件才可以申请...

分户,一般指子女成年或者成家后从父母户口里面独立出去,自立一户;也可以是指夫妻离婚后一方将户口独立出去(离婚也可以不分户,变更婚姻状态就行)。简单的说分户是指原本在一个户口本上的人口,现在分出去自成一个户口本…

Synchronized的实现原理(一)

转载自 Synchronized的实现原理(一)synchronized,是Java中用于解决并发情况下数据同步访问的一个很重要的关键字。当我们想要保证一个共享资源在同一时间只会被一个线程访问到时,我们可以在代码中使用synchronized关键字对类或者对…

FOSS历史回顾:三代开源人的故事

现在是2016年,你环顾一下四周,开源早已无处不在了。开源无论是规范、形式、以及面貌都和最初的大相径庭,然而事实上,这也预示着新一代的开源程序员们的崛起。下面我们尝试解释下。 (以下这一段落为作者自谦&#xff09…

Spring中@Autowired、@Qualifier、@Resource的区别

转自: Spring中Autowired、Qualifier、Resource的区别_老周聊架构的博客-CSDN博客_qualifier和resource区别1、AutowiredAutowired 可以单独使用。如果单独使用,它将按类型装配。因此,如果在容器中声明了多个相同类型的bean,则会…

map分组后取前10个_海关数据 | 图解前10个月外贸

*内容转载自微信公众号:海关发布RECOMMEND【 推荐阅读 】海关数据 | 图解前三季度我国外贸海关数据 | 图解8月外贸海关数据 | 一图看懂前7个月外贸海关数据 | 图解上半年度外贸增3.9%声明本微信订阅号不以商业营利为目的,不排除部分文字内容或图片转载自…

回顾build 2016:你好,这是微软迄今最好的Windows开发平台

按:本文作者陈计节,ThoughtWorks 高级咨询师。多年的跨平台 .NET 开发者,全栈工程师,技术布道师。擅长互联网应用程序的设计、开发和运维等工作。 在最近的开发者大会(Build 2016)上,微软面向开…

深入理解多线程(二)—— Java的对象模型

转载自 深入理解多线程(二)—— Java的对象模型上一篇文章中简单介绍过synchronized关键字的方式,其中,同步代码块使用monitorenter和monitorexit两个指令实现,同步方法使用ACC_SYNCHRONIZED标记符实现。后面几篇文章会…

8.1-CPU结构(学习笔记)

【README】 本文总结自bilibili《计算机组成原理(哈工大刘宏伟)》的视频讲解,非常棒,墙裂推荐; 【1】CPU结构 Cpu的首要功能就是解释指令;功能列表如下: 1) 取指令:从内存中读取…

生物信息 python 书籍_用python做生物信息数据分析(1-环境准备)

写在前面四五年前,接触生物信息的时候,阴差阳错,我选择用perl。事实上,直到嫌我,我还是认为我当初的选择,完全正确!。在做一些小文本的快速处理上,perl在我看来,从来最优…

8.2-指令周期(学习笔记)

【README】 本文总结自bilibili《计算机组成原理(哈工大刘宏伟)》的视频讲解,非常棒,墙裂推荐; 【1】指令周期 【1.1】指令周期概述 1)指令周期:取出并执行一条指令所需的全部时间&#xff1…

深入理解多线程(三)—— Java的对象头

转载自 深入理解多线程(三)—— Java的对象头上一篇文章中我们从HotSpot的源码入手,介绍了Java的对象模型。这一篇文章在上一篇文章的基础上再来介绍一下Java的对象头。主要介绍一下对象头的作用,结构以及他和锁的关系。 Java对象…

python tkinter 背景色改变不了_python - Tkinter背景颜色问题 - 堆栈内存溢出

我有一个脚本,其中包含Tkinter模块,我想每隔3分钟更改一次背景颜色,例如绿色3分钟,然后橙色,然后红色。 我有显示绿色的代码,但无法更改它。当我在代码中创建函数时,会遇到一些不同的错误&#…

回顾微软近年来对于Linux和开源的策略

2014年十月,在旧金山举办的一场活动中,微软的CEO Satya Nadella向公众表示,微软“爱Linux”。作为昔日的竞争对手,微软对Linux的态度逐渐从敌对转变为合作。自那次发言以来,微软在开源方面频频重拳出击,似乎…

深入理解多线程(四)—— Moniter的实现原理

转载自 深入理解多线程(四)—— Moniter的实现原理本文是《深入理解多线程系列文章》的第四篇。点击查看原文,阅读该系列所有文章。 在深入理解多线程(一)——Synchronized的实现原理中介绍过关于Synchronize的实现原理…

(转)Spring Boot通过ImportBeanDefinitionRegistrar动态注入Bean

转自: Spring Boot通过ImportBeanDefinitionRegistrar动态注入Bean - 掘金在阅读SpringBoot源码时,看到SpringBoot中大量使用ImportBeanDefinitionRegistrar来实现Bean的动态注入。它是Spring中一个强大的扩展接口。本篇文章来https://juejin.cn/post/6…

通过图书编号查询python_Python图书接口调用代码实例

1.[代码][Python]代码#!/usr/bin/python# -*- coding: utf-8 -*-import json, urllibfrom urllib import urlencode#----------------------------------# 图书电商数据调用示例代码 - 聚合数据# 在线接口文档:http://www.juhe.cn/docs/50#-------------…