Docker kafka

阅读目录

  • 一、下载镜像
  • 二、先启动zookeeper
  • 三、启动kafka
  • 四、创建一个topic(使用代码次步可省略)
  • 五、kafka设置分区数量
  • 六、python代码


回到顶部

一、下载镜像

docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka

回到顶部

二、先启动zookeeper

#单机方式
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper

回到顶部

三、启动kafka

#单机方式
docker run -d --name kafka \
-p 9092:9092 \
-e KAFKA_BROKER_ID=0 \
-e KAFKA_ZOOKEEPER_CONNECT=10.0.0.101:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://10.0.0.101:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka

回到顶部

四、创建一个topic(使用代码次步可省略)

#进入容器
docker exec -it ${CONTAINER ID} /bin/bash
cd opt/bin
#单机方式:创建一个主题
bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic mykafka
#运行一个生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mykafka
#运行一个消费者
bin/kafka-console-consumer.sh --zookeeper zookeeper:2181 --topic mykafka --from-beginning

回到顶部

五、kafka设置分区数量

#分区数量的作用:有多少分区就能负载多少个消费者,生产者会自动分配给分区数据,每个消费者只消费自己分区的数据,每个分区有自己独立的offset
#进入kafka容器
vi opt/kafka/config/server.properties
修改run.partitions=2
#退出容器
ctrl+p+q
#重启容器
docker restart kafka
​
#修改指定topic
./kafka-topics.sh --zookeeper localhost:2181 --alter --partitions 3 --topic topicname

回到顶部

六、python代码

#生产者
from kafka import KafkaProducer
import json
import datetime
​
topic='test'
producer = KafkaProducer(bootstrap_servers='10.0.0.101:9092',value_serializer=lambda m:json.dumps(m).encode("utf-8"))  # 连接kafka
# 参数bootstrap_servers:指定kafka连接地址
# 参数value_serializer:指定序列化的方式,我们定义json来序列化数据,当字典传入kafka时自动转换成bytes
# 用户密码登入参数
# security_protocol="SASL_PLAINTEXT"
# sasl_mechanism="PLAIN"
# sasl_plain_username="maple"
# sasl_plain_password="maple"
​
for i in range(1000):data={"num":i,"ts":datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}producer.send(topic,data)
​
producer.close()
#消费者
from kafka import KafkaConsumer
import time
​
topic = 'test'
consumer = KafkaConsumer(topic, bootstrap_servers=['10.0.0.101:9092'], group_id="test", auto_offset_reset="earliest")
# 参数bootstrap_servers:指定kafka连接地址
# 参数group_id:如果2个程序的topic和group_id相同,那么他们读取的数据不会重复,2个程序的topic相同,group_id不同,那么他们各自消费相同的数据,互不影响
# 参数auto_offset_reset:默认为latest表示offset设置为当前程序启动时的数据位置,earliest表示offset设置为0,在你的group_id第一次运行时,还没有offset的时候,给你设定初始offset。一旦group_id有了offset,那么此参数就不起作用了
​
​
for msg in consumer:recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)print(recv)# time.sleep(1)
#运行3个消费者结果
test:0:3212: key=None value=b'{"num": 981, "ts": "2021-02-23 16:38:14"}'
test:0:3213: key=None value=b'{"num": 982, "ts": "2021-02-23 16:38:14"}'
test:0:3214: key=None value=b'{"num": 987, "ts": "2021-02-23 16:38:14"}'
test:0:3215: key=None value=b'{"num": 997, "ts": "2021-02-23 16:38:14"}'
test:0:3216: key=None value=b'{"num": 998, "ts": "2021-02-23 16:38:14"}'
test:0:3217: key=None value=b'{"num": 999, "ts": "2021-02-23 16:38:14"}'
​
test:1:353: key=None value=b'{"num": 970, "ts": "2021-02-23 16:38:14"}'
test:1:354: key=None value=b'{"num": 977, "ts": "2021-02-23 16:38:14"}'
test:1:355: key=None value=b'{"num": 978, "ts": "2021-02-23 16:38:14"}'
test:1:356: key=None value=b'{"num": 979, "ts": "2021-02-23 16:38:14"}'
test:1:357: key=None value=b'{"num": 984, "ts": "2021-02-23 16:38:14"}'
test:1:358: key=None value=b'{"num": 985, "ts": "2021-02-23 16:38:14"}'
test:1:359: key=None value=b'{"num": 994, "ts": "2021-02-23 16:38:14"}'
​
test:2:317: key=None value=b'{"num": 989, "ts": "2021-02-23 16:38:14"}'
test:2:318: key=None value=b'{"num": 990, "ts": "2021-02-23 16:38:14"}'
test:2:319: key=None value=b'{"num": 991, "ts": "2021-02-23 16:38:14"}'
test:2:320: key=None value=b'{"num": 992, "ts": "2021-02-23 16:38:14"}'
test:2:321: key=None value=b'{"num": 993, "ts": "2021-02-23 16:38:14"}'
test:2:322: key=None value=b'{"num": 995, "ts": "2021-02-23 16:38:14"}'
test:2:323: key=None value=b'{"num": 996, "ts": "2021-02-23 16:38:14"}'

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/386429.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

这些年Android面试的那些套路,社招面试心得

前言 说不焦虑其实是假的,因为无论是现在还是最近几年,很早就有人察觉Android开发的野蛮生长时代已经过去。过去的优势是市场需要,这个技术少有人有,所以在抢占市场的时候,基本上满足需要就已经可以了。但是现在&…

flask第一章:项目环境搭建

windows环境pycharmpython3 1、命令提示窗口 1)创建项目目录:mkdir myblog 2)cd myblog 3)创建虚拟环境:python -m venv myvenv 4)激活虚拟环境:venv\Scripts\activate 5)安装flask&…

windows docker 空出C盘 迁移到其他盘

下面是操作方法: 首先关闭docker 关闭所有发行版:wsl --shutdown 将docker-desktop-data导出到D:\SoftwareData\wsl\docker-desktop-data\docker-desktop-data.tar(注意,原有的docker images不会一起导出)wsl --expo…

安卓开发入门到精通!免费Android高级工程师学习资源,系列篇

前言 2017年进大学开始接触Android,从刚开始接触就不断地听到Android市场饱和,工作难找等消息。虽然当时也非常迷茫,不过由于第一次深入接触编程语言,再加上自己的一点兴趣,就一直坚持下来了。 到现在要毕业了&#…

安卓开发基础面试题,9次Android面试经验总结,面试必备

前言 上回承诺过大家,一定会出 HTTP 的系列文章,今天终于整理完成了。作为一个 web 开发,HTTP 几乎是天天要打交道的东西,但我发现大部分人对 HTTP 只是浅尝辄止,对更多的细节及原理就了解不深了,在面试的…

基于TCP的在线聊天程序

在线聊天服务端 import tkinter import tkinter.font as tkFont import socket import threading import time import sys class ServerUI():local127.0.0.1port5505global serverSock;flagFalsedef __init__(self):self.roottkinter.Tk()self.root.title(在线聊天-服务端v1.0)…

Docker安装Confluence

Docker安装Confluence 参考链接: https://my.oschina.net/u/2289161/blog/1648587 https://hub.docker.com/r/cptactionhank/atlassian-confluence/dockerfile https://my.oschina.net/u/2289161/blog/1647061 https://my.oschina.net/u/2289161/blog/838218 https://hub.…

安卓开发基础面试题,Android面试必备的集合源码详解,附小技巧

去年无疑是 Flutter 技术如火如荼发展的一年。 每一个移动开发者都在为 Flutter 带来的“快速开发、富有表现力和灵活的 UI、原生性能”的特色和理念而痴狂,从超级 App 到独立应用,从纯 Flutter 到混合栈,开发者们在不同的场景下乐此不疲的探…

『算法』读书笔记 1.4算法分析 Part1

Chapter 1 本章结构 1.1Java语法 1.2数据抽象 1.3集合类抽象数据类型:背包 (Bags) 、队列 (Queues) 、栈 (Stacks) 1.4算法分析 1.5连通性问题-Case Study: Union - Find ADT 本节开篇使用了一个ThreeSum程序进行示例: ThreeSum所起到的作用…

JS调用MetaMask调用启动转账

1 、代码必须跑在nginx下,否则没有eth对象。 2、可以下载ganache来单跑个私服,然后安装谷歌metamask浏览器插件来实验 3、账户1:0xFA387e41FA471172cC729167EBD4862aA7020D91 账户2:0x818DF62ff0bE3B28AE8be25e2e848E10138018B7 4、1000000000000000 …

安卓开发工程师面试题!春招我借这份PDF的复习思路,不吃透都对不起自己

写在前面 身边有不少去大厂面试的朋友,其中小金面试字节跳动的经历很有意义,在这里分享给大家。小金是末流211计算机专业大三本科生,前几天面试了字节跳动的广州Android开发实习生。下面是他的面试经历,还有一些他自己的经验。 …

合算的日本料理

巨鹿路和那个茂名路路口的《和味》,有预订的话才98一个人,味道不错,楼上的桃子MM服务狠好,笑容狠甜。那里的东西味道还是狠正宗的,除了一个色拉不对。那里的清酒和梅酒都不错,尤其梅酒。生牛肉虽然没有大渔…

安卓开发必须会的技能!浅谈Android消息机制原理,威力加强版

目录 想要成为一名优秀的Android开发,你需要一份完备的知识体系,在这里,让我们一起成长为自己所想的那样。 PagerAdapter 介绍ViwePager 缓存策略ViewPager 布局处理ViewPager 事件处理相关内容 Android 基础 1.Activity 1、 什么是 Activi…

NuGet 无法连接到远程服务器-解决方法(转)

原地址: http://www.lixin.me/blog/2012/03/01/29362 今天打开NuGet的Manage NuGet Packages,显示“无法连接到远程服务器”。打开Setting-》Package Manager-》Package Sources。看到里面有一个源:https://go.microsoft.com/fwlink/?LinkID…

安卓开发面试书籍,全世界都在问Android开发凉了吗?建议收藏

前言 本想今年辞掉工作大干一场,没想到碰到疫情,家里蹲了3个月…,还好字节能给一次机会。前阵子字节跳动的提前批开始了,看宣传是说有海量HC,机会多多,本着涨涨面经的心理,然后就投递了一下杭州…

杭州集训Day5

下面是Day5的题目!(其实都咕了好几天了 1007040210. T1 皇后 XY 的疑难 (1s 512MB) 1.1 题目描述有一个n*n的王国城堡地图上,皇后XY喜欢看骑士之间的战斗,于是他准备布置m个骑士,其中每一个骑士都可以向8个方向&#x…

安卓开发面试书籍,每个程序员都必须掌握的8种数据结构!面试必会

前言 本篇文章主要记录分享我的面试准备过程。 很多朋友问我为什么离职 关于离职原因,马云有一句经典的话“要么钱没给到位,要么心委屈了”,想必大家耳熟能详了,我这里再细说一下我个人离职原因: 工资倒挂&#xf…

使用thinkPHP做注册程序的实例

登录界面&#xff1a; 数据库和数据表的结构 具体的操作步骤如下&#xff1a; 第一步&#xff1a;入口文件index.php内容 (此文件基本是属于固定的格式&#xff09; <?phpdefine(THINK_PATH,./ThinkPHP/);define(APP_NAME,MyApp);define(APP_PAHT,./MyApp/);require_once T…

安卓开发面试技能介绍,来一份全面的面试宝典练练手,不吃透都对不起自己

前言 网上有很多对程序员简历的一些指导&#xff0c;这里就不重述&#xff0c;大家可以搜下网上其他大神的总结&#xff0c;结合自身情况修改下。我有几点建议&#xff1a; 1.尽量不要花哨&#xff0c;程序员和设计师或者产品运营还不一样&#xff0c;我们的简历成功与否决定…

上交所行情文件导入数据库

事情的起因很简单&#xff0c;需要将股票收盘行情导入数据库&#xff0c;因为科创板交易时间延长&#xff0c;需要将原有的程序进行改造&#xff0c;众所周知&#xff0c;程序员永远是不够用的&#xff0c;只能自己解决这个问题。 方式是用定时器调用shell脚本。 上交所的mktdt…