架构师系列- 消息中间件(15)-kafka业务实战

7.1 顺序性场景

7.1.1 场景概述

假设我们要传输一批订单到另一个系统,那么订单对应状态的演变是有顺序性要求的。

已下单 → 已支付 → 已确认

不允许错乱!

7.1.2 顺序级别

1)全局有序:

串行化。每条经过kafka的消息必须严格保障有序性。

这就要求kafka单通道,每个groupid下单消费者

极大的影响性能,现实业务下几乎没必要

2)局部有序:

业务局部有序。同一条订单有序即可,不同订单可以并行处理。不同订单的顺序前后无所谓

充分利用kafka多分区的并发性,只需要想办法让需要顺序的一批数据进同一分区即可。

7.1.3 实现方案

1)发送端:

指定key发送,key=order.id即可,案例回顾:4.2.3,PartitionProducer

2)发送中:

给队列配置多分区保障并发性。

3)读取端:

单消费者:显然不合理

吞吐量显然上不去,kafka开多个分区还有何意义?

所以开多个消费者指定分区消费,理想状况下,每个分区配一个。

但是,这个吞吐量依然有限,那如何处理呢?

方案:多线程

在每个消费者上再开多线程,是个解决办法。但是,要警惕顺序性被打破!

参考下图:thread处理后,会将data变成 2-1-3

改进:接收后分发二级内存队列

消费者取到消息后不做处理,根据key二次分发到多个阻塞队列。

再开启多个线程,每个队列分配一个线程处理。提升吞吐量

 

 

7.1.4 代码验证

1)新建一个sort队列,2个分区

2)启动order项目

源码参考:

SortedProducer(顺序性发送端)

SortedConsumer(顺序性消费端 - 阻塞队列实现,方便大家理解设计思路)

SortedConsumer2(顺序性消费端 - 线程池实现,现实中推荐这种方式!)

 3)通过swagger请求

 

7.2 海量同步场景

假设大数据部门需要大屏来展示用户的打车订单情况,需要把订单数据送入druid

这里不涉及顺序,只要下单就传输,但是对实时性和并发量要求较高

7.2.1 常规架构

在下单完成mysql后,通过程序代码打印,直接进入kafka

或者logback和kafka集成,通过log输送

优点:

更符合常规的思维。将数据送给想要的部门

缺点:

耦合度高,将kafka发送消息嵌入了订单下单的主业务,形成代码入侵。

下单不关心,也不应该关注送入kafka的情况,一旦kafka不可用,程序受影响

7.2.2 解耦合

借助canal,监听订单表的数据变化,不再影响主业务。

 

7.2.3 部署实现

1)mysql部署注意,需要打开binlog,8.0 默认处于开启状态#启动mysql8
docker run --name mysql8 -v /opt/kafka/data/mysql8:/var/lib/mysql -p 3306:3306 -e TZ=Asia/Shanghai -e MYSQL_ROOT_PASSWORD=123456 -d daocloud.io/mysql:8.0
连上mysql,执行以下sql,添加canal用户CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal';
创建订单表CREATE TABLE `orders` (`id` int unsigned NOT NULL AUTO_INCREMENT,`name` varchar(255) DEFAULT NULL,PRIMARY KEY (`id`)
);2)canal部署#canal.properties
#附带资料里有,放到服务器 /opt/kafka/data/canal/ 目录下
#修改servers为你的kafka的机器地址
canal.serverMode = kafka
kafka.bootstrap.servers = 192.168.10.30:10903,192.168.10.30:10904
#docker-compose.yml
#附带资料里有canal.yml,随便找个目录,重命名为docker-compose.yml
#修改mysql的链接信息的链接信息
#然后在当前目录下执行 docker-compose up -d
version: '2'
services:canal:image: canal/canal-servercontainer_name: canalrestart: alwaysports:- "10908:11111"environment:#mysql的链接信息canal.instance.master.address: 192.168.10.30:3306canal.instance.dbUsername: canalcanal.instance.dbPassword: canal#投放到kafka的哪个主题?要提前准备好!canal.mq.topic: canalvolumes:- "/opt/kafka/data/canal/canal.properties:/home/admin/canal-server/conf/canal.properties"3)数据通道验证进入kafka容器,用上面3.2.4里的命令行方式监听canal队列./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic canal在mysql上创建orders表,增删数据试一下mysql> insert into orders (name) values ('张三');
Query OK, 1 row affected (0.03 sec)在kafka控制台,可以看到同步的消息{"data":[{"id":"1","name":"张三"}],"database":"canal","es":1611657853000,"id":5,"isDdl":false,"mysqlType":{"id":"int unsigned","name":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12},"table":"orders","ts":1611657853802,"type":"INSERT"}数据通道已打通,还缺少的是druid作为消费端来接收消息4)druid部署#druid.yml
#在附带资料里有
#随便找个目录,执行
docker-compose -f druid.yml up -d5)验证配置druid的数据源,从kafka读取数据,验证数据可以正确进入druid。

7.3 kafka监控

7.3.1 eagle简介

Kafka Eagle监控系统是一款用来监控Kafka集群的工具,支持管理多个Kafka集群、管理Kafka主题(包含查看、删除、创建等)、消费者组合消费者实例监控、消息阻塞告警、Kafka集群健康状态查看等。

7.3.2 部署

推荐docker-compose启动

将配备的资料中 eagle.yml , 拷贝到服务器任意目录

修改对应的ip地址为你服务器的地址

 

#注意ip地址:192.168.10.30,全部换成你自己服务器的version: '3'
services:zookeeper:image: zookeeper:3.4.13kafka-1:container_name: kafka-1image: wurstmeister/kafka:2.12-2.2.2ports:- 10903:9092- 10913:10913environment:KAFKA_BROKER_ID: 1 HOST_IP: 192.168.10.30KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181#docker部署必须设置外部可访问ip和端口,否则注册进zk的地址将不可达造成外部无法连接KAFKA_ADVERTISED_HOST_NAME: 192.168.10.30KAFKA_ADVERTISED_PORT: 10903 KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=192.168.10.30 -Dcom.sun.management.jmxremote.rmi.port=10913"JMX_PORT: 10913volumes:- /etc/localtime:/etc/localtimedepends_on:- zookeeper           kafka-2:container_name: kafka-2image: wurstmeister/kafka:2.12-2.2.2ports:- 10904:9092- 10914:10914environment:KAFKA_BROKER_ID: 2 HOST_IP: 192.168.10.30KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_HOST_NAME: 192.168.10.30KAFKA_ADVERTISED_PORT: 10904 KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=192.168.10.30 -Dcom.sun.management.jmxremote.rmi.port=10914"JMX_PORT: 10914volumes:- /etc/localtime:/etc/localtimedepends_on:- zookeeper eagle:image: gui66497/kafka_eaglecontainer_name: kerestart: alwaysdepends_on:- kafka-1- kafka-2ports:- "10907:8048"environment:ZKSERVER: "zookeeper:2181"

执行 docker-compose -f eagle.yml up -d

7.3.3 使用说明

访问 : http://192.168.10.30:10907/ke/

默认用户名密码: admin/ 123456

如果要删除topic等操作,需要管理token: keadmin

 

与km到底选哪个呢?

  • 界面美观程度和监控曲线优于km,有登录权限控制
  • 功能操作上不如km简单直白,但是km需要配置一定的连接信息

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/829498.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ElasticSearch教程入门到精通——第二部分(基于ELK技术栈elasticsearch 7.x+8.x新特性)

ElasticSearch教程入门到精通——第二部分(基于ELK技术栈elasticsearch 7.x8.x新特性) 1. JavaAPI-环境准备1.1 新建Maven工程——添加依赖1.2 HelloElasticsearch 2. 索引2.1 索引——创建2.2 索引——查询2.3 索引——删除 3. 文档3.1 文档——重构3.2…

react,Chart

一、基础图:https://ant-design-charts.antgroup.com/ Ant Design Charts 1. 首先要下载ant-design/charts,然后在页面中添加如下柱状图代码: import React from react; import { Column } from ant-design/chartsconst DemoColumn: React.FC () …

百度智能云千帆 ModelBuilder 技术实践系列:通过 SDK 快速构建并发布垂域模型

​百度智能云千帆大模型平台(百度智能云千帆大模型平台 ModelBuilder)作为面向企业开发者的一站式大模型开发平台,自上线以来受到了广大开发者、企业的关注。至今已经上线收纳了超过 70 种预置模型服务,用户可以快速的调用&#x…

深入理解冯诺依曼体系结构

文章目录 冯诺依曼体系结构概念冯诺依曼体系结构的优势冯诺依曼体系结构的现实体现 冯诺依曼体系结构概念 冯诺依曼体系结构也称普林斯顿结构,是现代计算机发展的基础。它的主要特点是“程序存储,共享数据,顺序执行”,即程序指令和…

代码随想录算法训练营Day10 | 232.用栈实现队列、225. 用队列实现栈

232.用栈实现队列 题目:请你仅使用两个栈实现先入先出队列。队列应当支持一般队列支持的所有操作(push、pop、peek、empty): 实现 MyQueue 类: void push(int x) 将元素 x 推到队列的末尾int pop() 从队列的开头移除…

【leetcode面试经典150题】75. 二叉树展开为链表(C++)

【leetcode面试经典150题】专栏系列将为准备暑期实习生以及秋招的同学们提高在面试时的经典面试算法题的思路和想法。本专栏将以一题多解和精简算法思路为主,题解使用C语言。(若有使用其他语言的同学也可了解题解思路,本质上语法内容一致&…

修改Docker容器内文件的三种方式

说明:本文介绍修改Docker容器内文件的三种方式 方式一:直接修改 敲下面的命令,进入Docker容器,如mysql docker exec -it mysql /bin/bash修改mysql的配置文件,/etc/my.cnf vim /etc/my.cnf如下,如果vim…

【Hadoop】-Apache Hive使用语法与概念原理[15]

一、数据库操作 创建数据库 create database if not exists myhive; 使用数据库 use myhive; 查看数据库详细信息 desc database myhive; 数据库本质上就是在HDFS之上的文件夹。 默认数据库的存放路径是HDFS的:/user/hive/warehouse内 创建数据库并指定hdfs…

attempt to compare nil with number -- 黑马点评出现问题

问题情况 : 主要问题 : 调用lua执行redis时,有一个值会接受nil(因为redis中没有该数据)或者数值,当该值为nil时执行报错,因为会用到将该值与其他数字比较,故报错attempt to compare nil with number 当然…

生成完美口型同步的 AI 代言人视频(及其实现原理详解)

目录 什么是Heygen? Heygen注册 Video Translation(视频翻译 完美口型同步) 实现原理详解 视频翻译部分 完美口型同步部分 什么是Heygen? Heygen是一款在线工具,可帮助您生成具有完美口型同步的 AI 代言人视频。 Heygen注册 https:…

关于springboot内置tomcat最大请求数配置的一些问题

前言 springboot内置了tomcat。那么一个springboot web应用,最大的请求链接数是多少呢?很早以前就知道这个是有个配置,需要的时候,百度一下即可。但,事实并非如此,有几个问题我想大多数人还真不知道。比如…

前端学习<四>JavaScript——54-原型链

常见概念 构造函数 构造函数-扩展 原型规则和示例 原型链 instanceof 构造函数 任何一个函数都可以被 new,new 了之后,就成了构造方法。 如下: function Foo(name, age) {this.name name;this.age age;//retrun this; //默认有这…

大型语言模型高效推理综述

论文地址:2404.14294.pdf (arxiv.org) 大型语言模型(LLMs)由于在各种任务中的卓越表现而受到广泛关注。然而,LLM推理的大量计算和内存需求给资源受限的部署场景带来了挑战。该领域的努力已经朝着开发旨在提高LLM推理效率的技术方…

C语言递归刷题(一)

目录 走台阶题目思路代码 西格玛题目思路代码 用函数实现数的阶乘题目思路代码 digit题目思路代码 Hermite多项式题目思路代码 排列数题目思路代码 逆序输出题目思路代码 结语 走台阶 题目 描述 小乐乐上课需要走n阶台阶,因为他腿比较长,所以每次可以选…

挑战特斯拉?深蓝汽车与华为强强联手

作为中国乃至全球汽车行业的盛宴,4月25日在中国国家展览中心揭幕的2024北京国际车展,吸引了无数企业行业人士的关注。 而就在车展开幕当天,深蓝汽车发布会就爆出了一个大新闻:深蓝汽车将携手华为,打造比特斯拉更好的智…

【开发问题记录】启动某个服务时请求失败(docker-componse创建容器时IP参数不正确)

问题记录 一、问题描述1.1 产生原因1.2 产生问题 二、问题解决2.1 找到自己的docker-compose.yml文件2.2 重新编辑docker-compose.yml文件2.3 通过docker-componse重新运行docker-compose.yml文件2.4 重新启动docker容器2.5 查看seata信息 一、问题描述 1.1 产生原因 因为我是…

FPGA 以太网通信UDP通信环回

1 实验任务 上位机通过网口调试助手发送数据给 FPGA , FPGA 通过 PL 端以太网接口接收数据并将接收到的数据发送给上位机,完成以太网 UDP 数据的环回。 2 系统设计 系统时钟经过PLL时钟模块后,生成了两种不同频率和相位的时钟信号&#…

Python 面向对象——6.封装

本章学习链接如下: Python 面向对象——1.基本概念 Python 面向对象——2.类与对象实例属性补充解释,self的作用等 Python 面向对象——3.实例方法,类方法与静态方法 Python 面向对象——4.继承 Python 面向对象——5.多态 1. 封装的基…

unity cinemachine相机 (案例 跟随角色移动)

安装相机包 打开包管理工具 在 unity registry 搜索cinemachine 会在maincamera中生成一个组件cinemachineBrain 只能通过虚拟相机操控 主相机 虚拟相机的参数 案例 1.固定相机效果 位置 在固定的地方 默认的模式 2.相机跟随人物效果 焦距设置 20 跟随设置 把playere…

使用Tortoise 创建远程分支

1。首先创建本地分支branch1,右键tortoise git->创建分支,输入分支名称branch1,确定。 2。右键tortoise git->推送,按下图设置,确定,git会判断远程有没有分支branch1,如果没有会自动创建…