RabbiqMQ快速入门

RabbitMQ

官网地址: https://www.rabbitmq.com/

一个遵循AMQP协议,开源面向消息的中间件,支持多种编程语言。

Rabbitmq 能做什么?

  • 逻辑解耦,异步的消息任务
  • 消息持久化,重启不影响
  • 削峰,大规模的消息处理

主要的特点

可靠性:持久化,传输确认,发布确认
可扩展性:多个节点可以组成一个集群,可动态更改
多语言:支持多数编程语言
管理界面:有常见的用户界面,便于管理和监控

常见的应用场景:

并发请求的压力高可用设计(电商秒杀场景),
异步任务处理结果的回调设计(日志订单异步处理),
系统集成与分布式系统设计(各种子系统的消息同步)。

工作原理

简单介绍生产者和消费者会和服务器建立tcp链接,在tcp链接之上会建立多个信道channel,通过信道来发送消息,生产者生产消息后不直接直接发到队列中,而是发到一个交换空间:Exchange,
Exchange会根据Exchange类型和Routing Key来决定发到哪个队列中,消费者在从队列中拿到消息

具体工作模式

名词解释

ExChange :消息交换机,决定消息按照什么规则路由到那个对列中去
Queue :消息载体,每个消息都会被投到一个或多个队列
Binding:绑定,把exchange 和 queue按照路由规则绑定起来
Routing Key: 路由关键字,exchage根据这关键字来投递消息
Channel :消息通道,客户端的每个连接建立多个channel
Producer :消息生产者,用户投递消息的程序
Consumer :消息消费者,用于就是接收消息的程序

Exchage工作模式

Fanout: 类似广播,转发到所有绑定交换机的Queue
Direct: 类似单播,RoutingKey 和 BindingKey完全匹配
Topic : 类似组播,转发到符合通配符的Queue
headers:请求头与消息头匹配,才能接收到消息

环境配置

通过docker环境配置

# /www/rabbitmq目录可自定义,主要用于目录挂载
mkdir -p /www/rabbitmq
# 创建容器
docker run -d --hostname rabbit-node1 --name rabbit-node1 -p 5672:5672 -p15672:15672 -v /www/rabbitmq:/var/lib/rabbitmq rabbitmq:management
# 查看容器状态
docker ps | grep rabbit浏览器打开登录rabbitmq, 入口:http://localhost:15672
默认用户名: guest 密码: guest

golang实战

简单基本玩法

//下载类库
go get "github.com/streadway/amqp"

前期准备代码

//连接信息
const MQURL = "amqp://imoocuser:imoocuser@127.0.0.1:5672/imooc"//rabbitMQ结构体
type RabbitMQ struct {conn      *amqp.Connectionchannel   *amqp.Channel//队列名称QueueName string//交换机名称Exchange  string//bind Key 名称Key string//连接信息Mqurl     string
}//创建结构体实例
func NewRabbitMQ(queueName string,exchange string ,key string) *RabbitMQ {return &RabbitMQ{QueueName:queueName,Exchange:exchange,Key:key,Mqurl:MQURL}
}//断开channel 和 connection
func (r *RabbitMQ) Destory() {r.channel.Close()r.conn.Close()
}
//错误处理函数
func (r *RabbitMQ) failOnErr(err error, message string) {if err != nil {log.Fatalf("%s:%s", message, err)panic(fmt.Sprintf("%s:%s", message, err))}
}

简单模式

简单模式下 Exchange 和 key是为空的,不需要设置

//创建简单模式下RabbitMQ实例
func NewRabbitMQSimple(queueName string) *RabbitMQ {//创建RabbitMQ实例rabbitmq := NewRabbitMQ(queueName,"","")var err error//获取connectionrabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)rabbitmq.failOnErr(err, "failed to connect rabb"+"itmq!")//获取channelrabbitmq.channel, err = rabbitmq.conn.Channel()rabbitmq.failOnErr(err, "failed to open a channel")return rabbitmq
}//简单模式队列生产
func (r *RabbitMQ) PublishSimple(message string) {//1.申请队列,如果队列不存在会自动创建,存在则跳过创建_, err := r.channel.QueueDeclare(r.QueueName,//是否持久化false,//是否自动删除false,//是否具有排他性false,//是否阻塞处理false,//额外的属性nil,)if err != nil {fmt.Println(err)}//调用channel 发送消息到队列中r.channel.Publish(r.Exchange,r.QueueName,//如果为true,根据自身exchange类型和routekey规则无法找到符合条件的队列会把消息返还给发送者false,//如果为true,当exchange发送消息到队列后发现队列上没有消费者,则会把消息返还给发送者false,amqp.Publishing{ContentType: "text/plain",Body:        []byte(message),})
}//simple 模式下消费者
func (r *RabbitMQ) ConsumeSimple() {//1.申请队列,如果队列不存在会自动创建,存在则跳过创建q, err := r.channel.QueueDeclare(r.QueueName,//是否持久化false,//是否自动删除false,//是否具有排他性false,//是否阻塞处理false,//额外的属性nil,)if err != nil {fmt.Println(err)}//接收消息msgs, err :=r.channel.Consume(q.Name, // queue//用来区分多个消费者"",     // consumer//是否自动应答true,   // auto-ack//是否独有false,  // exclusive//设置为true,表示 不能将同一个Conenction中生产者发送的消息传递给这个Connection中 的消费者false,  // no-local//列是否阻塞false,  // no-waitnil,    // args)if err != nil {fmt.Println(err)}forever := make(chan bool)//启用协程处理消息go func() {for d := range msgs {//消息逻辑处理,可以自行设计逻辑log.Printf("Received a message: %s", d.Body)}}()log.Printf(" [*] Waiting for messages. To exit press CTRL+C")<-forever}

工作模式

一个消息只能被一个消费者获取(场景:生产消息大于消费消息的时候),更简单模式代码一样,只是同事开启了多个消费端,起到负载均衡的作用

订阅模式

该模式下,队列为空,key为空;只需设置交换空间即可;消息被投递到多个队列中,一个消息被多个消费者消费

//订阅模式创建RabbitMQ实例
func NewRabbitMQPubSub(exchangeName string) *RabbitMQ {//创建RabbitMQ实例rabbitmq := NewRabbitMQ("",exchangeName,"")var err error//获取connectionrabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)rabbitmq.failOnErr(err,"failed to connect rabbitmq!")//获取channelrabbitmq.channel, err = rabbitmq.conn.Channel()rabbitmq.failOnErr(err, "failed to open a channel")return rabbitmq
}//订阅模式生产
func (r *RabbitMQ) PublishPub(message string) {//1.尝试创建交换机err := r.channel.ExchangeDeclare(r.Exchange,"fanout",true,false,//true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定false,false,nil,)r.failOnErr(err, "Failed to declare an excha"+"nge")//2.发送消息err = r.channel.Publish(r.Exchange,"",false,false,amqp.Publishing{ContentType: "text/plain",Body:        []byte(message),})
}//订阅模式消费端代码
func (r *RabbitMQ) RecieveSub() {//1.试探性创建交换机err := r.channel.ExchangeDeclare(r.Exchange,//交换机类型"fanout",true,false,//YES表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定false,false,nil,)r.failOnErr(err, "Failed to declare an exch"+"ange")//2.试探性创建队列,这里注意队列名称不要写q, err := r.channel.QueueDeclare("", //随机生产队列名称false,false,true,false,nil,)r.failOnErr(err, "Failed to declare a queue")//绑定队列到 exchange 中err = r.channel.QueueBind(q.Name,//在pub/sub模式下,这里的key要为空"",r.Exchange,false,nil)//消费消息messges, err := r.channel.Consume(q.Name,"",true,false,false,false,nil,)forever := make(chan bool)go func() {for d := range messges {log.Printf("Received a message: %s", d.Body)}}()fmt.Println("退出请按 CTRL+C\n")<-forever
}

路由模式

在路由模式下,一个消息可以被多个消费者获取,该模式生产端可以指定消费端;交换机的类型需要设置为direct,并且需要设置bind key。

/路由模式
//创建RabbitMQ实例
func NewRabbitMQRouting(exchangeName string,routingKey string) *RabbitMQ {//创建RabbitMQ实例rabbitmq := NewRabbitMQ("",exchangeName,routingKey)var err error//获取connectionrabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)rabbitmq.failOnErr(err,"failed to connect rabbitmq!")//获取channelrabbitmq.channel, err = rabbitmq.conn.Channel()rabbitmq.failOnErr(err, "failed to open a channel")return rabbitmq
}//路由模式发送消息
func (r *RabbitMQ) PublishRouting(message string )  {//1.尝试创建交换机err := r.channel.ExchangeDeclare(r.Exchange,//要改成direct"direct",true,false,false,false,nil,)r.failOnErr(err, "Failed to declare an excha"+"nge")//2.发送消息err = r.channel.Publish(r.Exchange,//要设置r.Key,false,false,amqp.Publishing{ContentType: "text/plain",Body:        []byte(message),})
}
//路由模式接受消息
func (r *RabbitMQ) RecieveRouting() {//1.试探性创建交换机err := r.channel.ExchangeDeclare(r.Exchange,//交换机类型"direct",true,false,false,false,nil,)r.failOnErr(err, "Failed to declare an exch"+"ange")//2.试探性创建队列,这里注意队列名称不要写q, err := r.channel.QueueDeclare("", //随机生产队列名称false,false,true,false,nil,)r.failOnErr(err, "Failed to declare a queue")//绑定队列到 exchange 中err = r.channel.QueueBind(q.Name,//需要绑定keyr.Key,r.Exchange,false,nil)//消费消息messges, err := r.channel.Consume(q.Name,"",true,false,false,false,nil,)forever := make(chan bool)go func() {for d := range messges {log.Printf("Received a message: %s", d.Body)}}()fmt.Println("退出请按 CTRL+C\n")<-forever
}

Topic模式,话题模式

一个消息可以被多个消费者获取,消息的目标queue可用BindingKey以通配符,的方式指定。
交换的类型设置为 topic,在接受端通过匹配规则匹配(例如:hello.*.world)

//话题模式
//创建RabbitMQ实例
func NewRabbitMQTopic(exchangeName string,routingKey string) *RabbitMQ {//创建RabbitMQ实例rabbitmq := NewRabbitMQ("",exchangeName,routingKey)var err error//获取connectionrabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)rabbitmq.failOnErr(err,"failed to connect rabbitmq!")//获取channelrabbitmq.channel, err = rabbitmq.conn.Channel()rabbitmq.failOnErr(err, "failed to open a channel")return rabbitmq
}
//话题模式发送消息
func (r *RabbitMQ) PublishTopic(message string )  {//1.尝试创建交换机err := r.channel.ExchangeDeclare(r.Exchange,//要改成topic"topic",true,false,false,false,nil,)r.failOnErr(err, "Failed to declare an excha"+"nge")//2.发送消息err = r.channel.Publish(r.Exchange,//要设置r.Key,false,false,amqp.Publishing{ContentType: "text/plain",Body:        []byte(message),})
}
//话题模式接受消息
//要注意key,规则
//其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)
//匹配 imooc.* 表示匹配 imooc.hello, 但是imooc.hello.one需要用imooc.#才能匹配到
func (r *RabbitMQ) RecieveTopic() {//1.试探性创建交换机err := r.channel.ExchangeDeclare(r.Exchange,//交换机类型"topic",true,false,false,false,nil,)r.failOnErr(err, "Failed to declare an exch"+"ange")//2.试探性创建队列,这里注意队列名称不要写q, err := r.channel.QueueDeclare("", //随机生产队列名称false,false,true,false,nil,)r.failOnErr(err, "Failed to declare a queue")//绑定队列到 exchange 中err = r.channel.QueueBind(q.Name,//在pub/sub模式下,这里的key要为空r.Key,r.Exchange,false,nil)//消费消息messges, err := r.channel.Consume(q.Name,"",true,false,false,false,nil,)forever := make(chan bool)go func() {for d := range messges {log.Printf("Received a message: %s", d.Body)}}()fmt.Println("退出请按 CTRL+C\n")<-forever
}

参考: https://www.cnblogs.com/luotianshuai/p/7469365.html#4199652

转载于:https://www.cnblogs.com/nirao/p/11176137.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/350035.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java命令行界面(第13部分):JArgs

JArgs 1.0的区别在于&#xff0c;这是我的第13篇文章的主题&#xff0c;该文章是关于Java命令行参数解析的。 JArgs是一个开放源代码&#xff08; BSD许可证 &#xff09;库&#xff0c;主要由Steve Purcell和Ewan Mellor等 不同的贡献者支持。 事实证明&#xff0c;这在第一次…

pthread vs openMP之我见

前两天看了些并行计算的文章&#xff0c;了解了一些并行计算的方法和原理。然后发现多线程实现里面还有个openMP&#xff0c;这个以前从来没见过&#xff08;火星了&#xff09;&#xff0c;之前只是知道pthread线程库和微软也实现了一套线程。又看了看openMP的一些教程才知道它…

线程池默认多少个线程_我需要多少个线程?

线程池默认多少个线程这取决于您的应用程序。 但是&#xff0c;对于那些希望对如何从生产站点购买的所有昂贵内核中挤出大量资金的人&#xff0c;请多多包涵&#xff0c;我将阐明围绕多线程 Java应用程序的奥秘。 内容针对最典型的Java EE应用程序进行了“优化”&#xff0c;该…

mysql error writing_MySQL:Error writing file (Errcode: 28)解决方法

问题描述&#xff1a;在执行创建表语句时提示&#xff1a;mysql> CREATE TABLE cash_request (id int(11) NOT NULL auto_increment,dev_id int(11) NOT NULL,bank_account_info varchar(255) NOT NULL,money int(11) NOT NULL,status tinyint(1) NOT NULL default 1,is_fan…

[暑假集训Day4T3]曲线

三分模板。 三分法求单峰函数最优值,之后每次取所有二次函数最优值即可 #pragma GCC optimize(3,"Ofast","inline") #include<iostream> #include<cstdio> #define N 100005 #define eps 1e-9 using namespace std; int read() {int x0,f1;cha…

模拟Spring Security上下文进行单元测试

今天&#xff0c;在为一种Java方法编写单元测试用例时&#xff0c;如下所示&#xff1a; public ApplicationUser getApplicationUser() {ApplicationUser applicationUser (ApplicationUser) SecurityContextHolder.getContext().getAuthentication().getPrincipal();return…

mysql semi-synchronous_MySQL Semisynchronous Replication介绍

前言MySQL 5.5版本之前默认的复制是异步(Asynchronous )模式的, MySQL 5.5 以plugins的方式提供了Semisynchronous Replication 模式。在介绍 semi sync 之前,我们先了解&#xff1a;半同步 Asynchronous 和 同步 Synchronous 。异步复制模式主库将已经提交的事务event 写入bin…

Jquery屏蔽回车键

1 $(function(){2 3 $(“#tagForm input”).keypress(4 5 function(event){6 7 if(event.keyCode 13){8 9 returnfalse;10 11 }12 13 });14 15 })转载于:https://www.cnblogs.com/pfs1314/archive/2011/04/19/2020706.html

滑坡泥石流的防御措施_滑坡泥石流防御

什么是滑坡、泥石流?滑坡是指山坡在河流冲刷、降雨、地震、人工切坡等因素影响下&#xff0c;土层或岩层整体或分散地顺斜坡向下滑动的现象。滑坡也叫地滑&#xff0c;群众中还有“走山”、“垮山”或“山剥皮”等俗称。泥石流是指在降水、溃坝或冰雪融化形成的地面流水作用下…

Event Delegate(代理)异常:该委托必须有一个目标 解决方法

正文待叙转载于:https://www.cnblogs.com/kodong/archive/2013/04/19/3031212.html

自定义注解 实现自定义消息_实现自定义的未来

自定义注解 实现自定义消息上一次我们学习了java.util.concurrent.Future<T>背后的原理 。 我们还发现&#xff0c; Future<T>通常由库或框架返回。 但是&#xff0c;没有什么可以阻止我们在有意义的情况下自行实现所有功能。 它不是特别复杂&#xff0c;可以显着改…

菜单 java_java 菜单

继承体系MenuBar,Menu,MenuItem之间的关系&#xff1a;先创建菜单条&#xff0c;再创建菜单&#xff0c;每一个菜单中建立菜单项。也可以菜单添加到菜单中&#xff0c;作为子菜单。通过setMenuBar()方法&#xff0c;将菜单添加到Frame中。package june610;import java.awt.File…

Jsp、Servlet

1 forward、redirect forward 转发是服务器行为&#xff0c;浏览器根本不知道服务器发送的内容是从哪儿来&#xff0c;所以它的地址栏中还是原来的地址。 redirect 重定向是客户端行为。redirect就是服务端根据逻辑,发送一个状态码,告诉浏览器重新去请求那个地址&#xff0c;一…

Java Finalizer和Java文件输入/输出流

在与主题直接合作或花时间学习它们之后&#xff0c;我经常会发现自己在网上注意到更多主题。 最近的Stephen Connolly &#xff08; CloudBees &#xff09;发表FileInputStream / FileOutputStream被认为有害的消息引起了我的注意&#xff0c;因为我最近在Java的finalizer中遇…

java 运行main_使用maven运行Java Main的三种方法解析

maven使用exec插件运行java main方法&#xff0c;以下是3种不同的操作方式。一、从命令行运行1、运行前先编译代码&#xff0c;exec&#xff1a;java不会自动编译代码&#xff0c;你需要手动执行mvn compile来完成编译。mvn compile2、编译完成后&#xff0c;执行exec运行main方…

CentOS7 修复boot目录

这里为了达到实验目的&#xff0c;首先删除boot目录下所有内容 重启后发现系统进不去了&#xff0c;这正是我们想要的 进入系统救援模式&#xff0c;以重新引导系统 进入救援模式后&#xff0c;输入以下命令进行修复boot目录 重启后&#xff0c;能正常引导系统了 转载于:https:…

java corepoolsize_理解ThreadPoolExecutor线程池的corePoolSize、maximumPoolSize和poolSize

我们知道&#xff0c;受限于硬件、内存和性能&#xff0c;我们不可能无限制的创建任意数量的线程&#xff0c;因为每一台机器允许的最大线程是一个有界值。也就是说ThreadPoolExecutor管理的线程数量是有界的。线程池就是用这些有限个数的线程&#xff0c;去执行提交的任务。然…

开式蓄冷罐与闭式蓄冷罐_一罐来统治所有人

开式蓄冷罐与闭式蓄冷罐跳下内存通道 早在1998年&#xff0c;当我是一名C / C 开发人员时&#xff0c;尝试使用Java时&#xff0c;有关该语言的一些内容对我来说就显得有些恼火了。 我记得很担心这些 为什么没有合适的编辑器呢&#xff1f; C / C 有很多。 我为Java拥有的只是…

嵊州D5T2 折纸 folding

折纸 folding 【问题描述】 在非常紧张的 NOIP 考试中&#xff0c;有人喜欢啃指甲&#xff0c;有人喜欢转铅笔&#xff0c;有人喜欢撕 纸条&#xff0c;……而小 x 喜欢迷折纸。 现有一个 W * H 的矩形纸张&#xff0c;监考老师想知道&#xff0c;小 x 至少要折多少次才能使 矩…

使用Portworx和Couchbase的有状态容器

容器本应是短暂的&#xff0c;因此可以很好地扩展以用于无状态应用程序。 有状态的容器&#xff08;例如Couchbase&#xff09;需要区别对待。 管理Docker容器的持久性概述了如何管理有状态容器的持久性。 该博客将说明如何使用Docker Volume Plugins和Portworx创建有状态的容…