RabbitMQ深度探索:简单实现 MQ

基于多线程队列实现 MQ :

  1. 实现类:
    public class ThreadMQ {private static LinkedBlockingDeque<JSONObject> broker = new LinkedBlockingDeque<JSONObject>();public static void main(String[] args) {//创建生产者线程Thread producer = new Thread(new Runnable() {@Overridepublic void run() {while (true){try {Thread.sleep(1000);JSONObject data = new JSONObject();data.put("phone","11111111");broker.offer(data);}catch (Exception e){}}}},"生产者");producer.start();Thread consumer = new Thread(new Runnable() {@Overridepublic void run() {while (true){try {JSONObject data = broker.poll();if(data != null){System.out.println(Thread.currentThread().getName() + data.toJSONString());}}catch (Exception e){}}}},"消费者");consumer.start();}
    }

基于 netty 实现 MQ:

  1. 执行过程:
    1. 消费者 netty 客户端与 nettyServer 端 MQ 服务器保持长连接,MQ 服务器端保存消费者连接
    2. 生产者 netty 客户端发送请求给 nettyServer 端 MQ 服务器,MQ 服务器端再将消息内容发送给消费者
  2. 执行流程:
    1. 导入 Maven 依赖:
      <dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version>
      </dependency>
      <dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.0.23.Final</version>
      </dependency>
      <dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version>
      </dependency>
      <dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.11</version>
      </dependency>
      <dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>3.6.5</version>
      </dependency>
    2. 服务端:
      package com.qcby.springboot.MQ;import com.alibaba.fastjson.JSONObject;
      import io.netty.bootstrap.ServerBootstrap;
      import io.netty.buffer.ByteBuf;
      import io.netty.buffer.Unpooled;
      import io.netty.channel.*;
      import io.netty.channel.nio.NioEventLoopGroup;
      import io.netty.channel.socket.SocketChannel;
      import io.netty.channel.socket.nio.NioServerSocketChannel;
      import org.apache.commons.lang3.StringUtils;import java.io.UnsupportedEncodingException;
      import java.util.ArrayList;
      import java.util.concurrent.LinkedBlockingDeque;/*** @ClassName BoyatopMQServer2021* @Author* @Version V1.0**/
      public class BoyatopNettyMQServer {public void bind(int port) throws Exception {/*** Netty 抽象出两组线程池BossGroup和WorkerGroup* BossGroup专门负责接收客户端的连接, WorkerGroup专门负责网络的读写。*/EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();ServerBootstrap bootstrap = new ServerBootstrap();try {bootstrap.group(bossGroup, workerGroup)// 设定NioServerSocketChannel 为服务器端.channel(NioServerSocketChannel.class)//BACKLOG用于构造服务端套接字ServerSocket对象,标识当服务器请求处理线程全满时,//用于临时存放已完成三次握手的请求的队列的最大长度。如果未设置或所设置的值小于1,Java将使用默认值50。.option(ChannelOption.SO_BACKLOG, 100)// 服务器端监听数据回调Handler.childHandler(new BoyatopNettyMQServer.ChildChannelHandler());//绑定端口, 同步等待成功;ChannelFuture future = bootstrap.bind(port).sync();System.out.println("当前服务器端启动成功...");//等待服务端监听端口关闭future.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally {//优雅关闭 线程组bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {// 设置异步回调监听ch.pipeline().addLast(new BoyatopNettyMQServer.MayiktServerHandler());}}public static void main(String[] args) throws Exception {int port = 9008;new BoyatopNettyMQServer().bind(port);}private static final String type_consumer = "consumer";private static final String type_producer = "producer";private static LinkedBlockingDeque<String> msgs = new LinkedBlockingDeque<>();private static ArrayList<ChannelHandlerContext> ctxs = new ArrayList<>();// 生产者投递消息的:topicNamepublic class MayiktServerHandler extends SimpleChannelInboundHandler<Object> {/*** 服务器接收客户端请求** @param ctx* @param data* @throws Exception*/@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Object data)throws Exception {//ByteBuf buf=(ByteBuf)data;//byte[] req = new byte[buf.readableBytes()];//buf.readBytes(req);//String body = new String(req, "UTF-8");//System.out.println("body:"+body);JSONObject clientMsg = getData(data);String type = clientMsg.getString("type");switch (type) {case type_producer:producer(clientMsg);break;case type_consumer:consumer(ctx);break;}}private void consumer(ChannelHandlerContext ctx) {// 保存消费者连接ctxs.add(ctx);// 主动拉取mq服务器端缓存中没有被消费的消息String data = msgs.poll();if (StringUtils.isEmpty(data)) {return;}// 将该消息发送给消费者byte[] req = data.getBytes();ByteBuf firstMSG = Unpooled.buffer(req.length);firstMSG.writeBytes(req);ctx.writeAndFlush(firstMSG);}private void producer(JSONObject clientMsg) {// 缓存生产者投递 消息String msg = clientMsg.getString("msg");msgs.offer(msg); //保证消息不丢失还可以缓存硬盘//需要将该消息推送消费者ctxs.forEach((ctx) -> {// 将该消息发送给消费者String data = msgs.poll();if (data == null) {return;}byte[] req = data.getBytes();ByteBuf firstMSG = Unpooled.buffer(req.length);firstMSG.writeBytes(req);ctx.writeAndFlush(firstMSG);});}private JSONObject getData(Object data) throws UnsupportedEncodingException {ByteBuf buf = (ByteBuf) data;byte[] req = new byte[buf.readableBytes()];buf.readBytes(req);String body = new String(req, "UTF-8");return JSONObject.parseObject(body);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {ctx.close();}}
      }
    3. 生产端:
      package com.qcby.springboot.MQ;import com.alibaba.fastjson.JSONObject;
      import io.netty.bootstrap.Bootstrap;
      import io.netty.buffer.ByteBuf;
      import io.netty.buffer.Unpooled;
      import io.netty.channel.*;
      import io.netty.channel.nio.NioEventLoopGroup;
      import io.netty.channel.socket.SocketChannel;
      import io.netty.channel.socket.nio.NioSocketChannel;/*** @ClassName BoyatopNettyMQProducer* @Author* @Version V1.0**/
      public class BoyatopNettyMQProducer {public void connect(int port, String host) throws Exception {//配置客户端NIO 线程组EventLoopGroup group = new NioEventLoopGroup();Bootstrap client = new Bootstrap();try {client.group(group)// 设置为Netty客户端.channel(NioSocketChannel.class)/*** ChannelOption.TCP_NODELAY参数对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关。* Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到来,组装成大的数据包进行发送,虽然该算法有效提高了网络的有效负载,但是却造成了延时。* 而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输。和TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。*/.option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new BoyatopNettyMQProducer.NettyClientHandler());1. 演示LineBasedFrameDecoder编码器
      //                            ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
      //                            ch.pipeline().addLast(new StringDecoder());}});//绑定端口, 异步连接操作ChannelFuture future = client.connect(host, port).sync();//等待客户端连接端口关闭future.channel().closeFuture().sync();} finally {//优雅关闭 线程组group.shutdownGracefully();}}public static void main(String[] args) {int port = 9008;BoyatopNettyMQProducer client = new BoyatopNettyMQProducer();try {client.connect(port, "127.0.0.1");} catch (Exception e) {e.printStackTrace();}}public class NettyClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {JSONObject data = new JSONObject();data.put("type", "producer");JSONObject msg = new JSONObject();msg.put("userId", "123456");msg.put("age", "23");data.put("msg", msg);// 生产发送数据byte[] req = data.toJSONString().getBytes();ByteBuf firstMSG = Unpooled.buffer(req.length);firstMSG.writeBytes(req);ctx.writeAndFlush(firstMSG);}/*** 客户端读取到服务器端数据** @param ctx* @param msg* @throws Exception*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;byte[] req = new byte[buf.readableBytes()];buf.readBytes(req);String body = new String(req, "UTF-8");System.out.println("客户端接收到服务器端请求:" + body);}// tcp属于双向传输@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}}
      }
    4. 客户端:
      package com.qcby.springboot.MQ;import com.alibaba.fastjson.JSONObject;
      import io.netty.bootstrap.Bootstrap;
      import io.netty.buffer.ByteBuf;
      import io.netty.buffer.Unpooled;
      import io.netty.channel.*;
      import io.netty.channel.nio.NioEventLoopGroup;
      import io.netty.channel.socket.SocketChannel;
      import io.netty.channel.socket.nio.NioSocketChannel;/*** @ClassName BoyatopNettyMQProducer* @Author* @Version V1.0**/
      public class NettyMQConsumer {public void connect(int port, String host) throws Exception {//配置客户端NIO 线程组EventLoopGroup group = new NioEventLoopGroup();Bootstrap client = new Bootstrap();try {client.group(group)// 设置为Netty客户端.channel(NioSocketChannel.class)/*** ChannelOption.TCP_NODELAY参数对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关。* Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到来,组装成大的数据包进行发送,虽然该算法有效提高了网络的有效负载,但是却造成了延时。* 而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输。和TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。*/.option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new NettyMQConsumer.NettyClientHandler());1. 演示LineBasedFrameDecoder编码器
      //                            ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
      //                            ch.pipeline().addLast(new StringDecoder());}});//绑定端口, 异步连接操作ChannelFuture future = client.connect(host, port).sync();//等待客户端连接端口关闭future.channel().closeFuture().sync();} finally {//优雅关闭 线程组group.shutdownGracefully();}}public static void main(String[] args) {int port = 9008;NettyMQConsumer client = new NettyMQConsumer();try {client.connect(port, "127.0.0.1");} catch (Exception e) {e.printStackTrace();}}public class NettyClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {JSONObject data = new JSONObject();data.put("type", "consumer");// 生产发送数据byte[] req = data.toJSONString().getBytes();ByteBuf firstMSG = Unpooled.buffer(req.length);firstMSG.writeBytes(req);ctx.writeAndFlush(firstMSG);}/*** 客户端读取到服务器端数据** @param ctx* @param msg* @throws Exception*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;byte[] req = new byte[buf.readableBytes()];buf.readBytes(req);String body = new String(req, "UTF-8");System.out.println("客户端接收到服务器端请求:" + body);}// tcp属于双向传输@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}}
      }
  3. 持久化机制:
    1. 如果 MQ 接收到生产者投递信息,如果消费者不存在的情况下,消息是否会丢失?
    2. 答:不会丢失,消息确认机制必须要消费者消费成功之后,在通知给 MQ 服务器端,删除该消息
  4. MQ 服务器将该消息推送给消费者:
    1. 消费者已经和 MQ 服务器保持长连接
    2. 消费者在第一次启动的时候会主动拉取信息
  5. MQ 如何实现高并发思想:
    1. MQ 消费者根据自身能力情况,拉取 MQ 服务器端消费消息
    2. 默认的情况下取出一条消息
  6. 缺点:
    1. 存在延迟问题
  7. 需要考虑 MQ 消费者提高速率的问题:
    1. 如何提高消费者速率:消费者实现集群、消费者批量获取消息即可

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/69151.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

自定义多功能输入对话框:基于 Qt 打造灵活交互界面

一、引言 在使用 Qt 进行应用程序开发时&#xff0c;我们经常需要与用户进行交互&#xff0c;获取他们输入的各种信息。QInputDialog 是 Qt 提供的一个便捷工具&#xff0c;可用于简单的输入场景&#xff0c;但当需求变得复杂&#xff0c;需要支持更多类型的输入控件&#xff0…

国产编辑器EverEdit - 工具栏说明

1 工具栏 1.1 应用场景 当用户想显示/隐藏界面的标签栏、工具栏、状态栏、主菜单等界面元素时&#xff0c;可以通过EverEdit的菜单选项进行设置。 1.2 使用方法 选择菜单查看 -> 工具栏&#xff0c;在工具栏的子菜单中选择勾选或去掉勾选对应的选项。 标签栏&#xff1…

ASP.NET Core 中使用依赖注入 (DI) 容器获取并执行自定义服务

目录 一、ASP.NET Core 中使用依赖注入 (DI) 容器获取并执行自定义服务 1. app.Services 2. GetRequiredService() 3. Init() 二、应用场景 三、依赖注入使用拓展 1、使用场景 2、使用步骤 1. 定义服务接口和实现类 2. 注册服务到依赖注入容器 3. 使用依赖注入获取并…

虚幻UE5手机安卓Android Studio开发设置2025

一、下载Android Studio历史版本 步骤1&#xff1a;虚幻4.27、5.0、5.1、5.2官方要求Andrd Studio 4.0版本&#xff1b; 5.3、5.4、5.5官方要求的版本为Android Studio Flamingo | 2022.2.1 Patch 2 May 24, 2023 虚幻官网查看对应Andrd Studiob下载版本&#xff1a; https:/…

当大模型遇上Spark:解锁大数据处理新姿势

大模型与 Spark&#xff1a;技术初印象 在当今数字化浪潮中&#xff0c;大模型和 Spark 无疑是备受瞩目的两大技术。它们各自在人工智能和大数据处理领域大放异彩&#xff0c;而当这两者相遇&#xff0c;又会碰撞出怎样的火花呢&#xff1f;让我们先来分别认识一下大模型和 Sp…

第 1 天:UE5 C++ 开发环境搭建,全流程指南

&#x1f3af; 目标&#xff1a;搭建 Unreal Engine 5&#xff08;UE5&#xff09;C 开发环境&#xff0c;配置 Visual Studio 并成功运行 C 代码&#xff01; 1️⃣ Unreal Engine 5 安装 &#x1f539; 下载与安装 Unreal Engine 5 步骤&#xff1a; 注册并安装 Epic Game…

芝法酱学习笔记(2.6)——flink-cdc监听mysql binlog并同步数据至elastic-search和更新redis缓存

一、需求背景 在有的项目中&#xff0c;尤其是进销存类的saas软件&#xff0c;一开始为了快速把产品做出来&#xff0c;并没有考虑缓存问题。而这类软件&#xff0c;有着复杂的业务逻辑。如果想在原先的代码中&#xff0c;添加redis缓存&#xff0c;改动面将非常大&#xff0c…

VLAN 基础 | 不同 VLAN 间通信实验

注&#xff1a;本文为 “ Vlan 间通信” 相关文章合辑。 英文引文&#xff0c;机翻未校。 图片清晰度限于原文图源状态。 未整理去重。 How to Establish Communications between VLANs? 如何在 VLAN 之间建立通信&#xff1f; Posted on November 20, 2015 by RouterSwi…

LINUX部署微服务项目步骤

项目简介技术栈 主体技术&#xff1a;SpringCloud&#xff0c;SpringBoot&#xff0c;VUE2&#xff0c; 中间件&#xff1a;RabbitMQ、Redis 创建用户 在linux服务器home下创建用户qshh&#xff0c;用于后续本项目需要的环境进行安装配置 #创建用户 useradd 用户名 #设置登录密…

bat脚本实现自动化漏洞挖掘

bat脚本 BAT脚本是一种批处理文件&#xff0c;可以在Windows操作系统中自动执行一系列命令。它们可以简化许多日常任务&#xff0c;如文件操作、系统配置等。 bat脚本执行命令 echo off#下面写要执行的命令 httpx 自动存活探测 echo off httpx.exe -l url.txt -o 0.txt nuc…

堆的实现——堆的应用(堆排序)

文章目录 1.堆的实现2.堆的应用--堆排序 大家在学堆的时候&#xff0c;需要有二叉树的基础知识&#xff0c;大家可以看我的二叉树文章&#xff1a;二叉树 1.堆的实现 如果有⼀个关键码的集合 K {k0 , k1 , k2 , …&#xff0c;kn−1 } &#xff0c;把它的所有元素按完全⼆叉树…

edu小程序挖掘严重支付逻辑漏洞

edu小程序挖掘严重支付逻辑漏洞 一、敏感信息泄露 打开购电小程序 这里需要输入姓名和学号&#xff0c;直接搜索引擎搜索即可得到&#xff0c;这就不用多说了&#xff0c;但是这里的手机号可以任意输入&#xff0c;只要用户没有绑定手机号这里我们输入自己的手机号抓包直接进…

EF Core 学习笔记(数据迁移、一对多)

程序集依赖&#xff1a;Nuget:Microsoft.EntityFrameworkCoreTools 【定义配置文件】 定义上下文配置文件&#xff0c;继承DbContext类 public class InfoManageProDbContext : DbContext{/// <summary>/// 业务系统/// </summary>public DbSet<BusinessSyste…

FRP通过公网IP实现内网穿透

FRP通过公网IP实现内网穿透 一、简介二、安装服务端1、下载2、安装FRP3、使用 systemd 命令管理 frps 服务4、设置 frps 开机自启动 三、安装客户端1、下载2、安装FRP3、使用 systemd 命令管理 frpc 服务4、设置 frpc 开机自启动 四、访问仪表盘 一、简介 frp 是一款高性能的反…

K8S学习笔记-------1.安装部署K8S集群环境

1.修改为root权限 #sudo su 2.修改主机名 #hostnamectl set-hostname k8s-master01 3.查看网络地址 sudo nano /etc/netplan/01-netcfg.yaml4.使网络配置修改生效 sudo netplan apply5.修改UUID&#xff08;某些虚拟机系统&#xff0c;需要设置才能生成UUID&#xff09;#…

go运算符

内置运算符 算术运算符关系运算符逻辑运算符位运算符赋值运算符 算术运算符 注意&#xff1a; &#xff08;自增&#xff09;和–&#xff08;自减&#xff09;在 Go 语言中是单独的语句&#xff0c;并不是运算符 package mainimport "fmt"func main() {fmt.Printl…

【贪心算法篇】:“贪心”之旅--算法练习题中的智慧与策略(一)

✨感谢您阅读本篇文章&#xff0c;文章内容是个人学习笔记的整理&#xff0c;如果哪里有误的话还请您指正噢✨ ✨ 个人主页&#xff1a;余辉zmh–CSDN博客 ✨ 文章所属专栏&#xff1a;贪心算法篇–CSDN博客 文章目录 一.贪心算法1.什么是贪心算法2.贪心算法的特点 二.例题1.柠…

ARM TEE

在ARM的语境中&#xff0c;TEE是Trusted Execution Environment&#xff08;可信执行环境&#xff09;的缩写。ARM TEE就是基于ARM架构实现的可信执行环境&#xff0c;以下是具体介绍&#xff1a; 定义与原理 定义&#xff1a;ARM TEE是基于独立硬件&#xff0c;和主操作系统…

双亲委派(jvm)

1.双亲委派 在 Java 中&#xff0c;双薪委派通常是指双亲委派模型&#xff0c;它是 Java 类加载器的一种工作模式&#xff0c;用于确保类加载的安全性和一致性。以下是其相关介绍&#xff1a; 定义与作用 定义&#xff1a;双亲委派模型要求除了顶层的启动类加载器外&#xf…

阿里云 ubuntu22.04 中国区节点安装 Docker

下面是一份在 Ubuntu 22.04 (Jammy) 上&#xff0c;通过阿里云镜像源来安装并配置 Docker 的详细步骤示例&#xff0c;可在中国区阿里云节点使用&#xff1a; 一、卸载旧版本 (如已安装) 如果系统中已经安装了旧版 Docker (可能是 docker、docker-engine、docker.io、containe…