长沙网站建设的首选免费logo设计一键生成无水印
web/
2025/9/26 3:20:07/
文章来源:
长沙网站建设的首选,免费logo设计一键生成无水印,网站建设营销一站式服务,wordpress点赞和打赏使用Kafka Streams开发流处理应用 一、简介1.1 Kafka Streams定义1.2 Kafka Streams的优势1.3 Kafka Streams应用场景 二、环境搭建2.1 安装Kafka2.2 安装Kafka Streams2.3 构建Kafka集群 三、Kafka Streams编程API介绍3.1 Kafka Streams主要API3.2 应用程序的配置和参数3.3 To… 使用Kafka Streams开发流处理应用 一、简介1.1 Kafka Streams定义1.2 Kafka Streams的优势1.3 Kafka Streams应用场景 二、环境搭建2.1 安装Kafka2.2 安装Kafka Streams2.3 构建Kafka集群 三、Kafka Streams编程API介绍3.1 Kafka Streams主要API3.2 应用程序的配置和参数3.3 Topology的定义和构建3.4 各种数据处理操作的使用map、filter、flatmap等 四、流处理实战案例4.1 流处理应用的开发步骤步骤一创建 Kafka Streams 实例步骤二定义输入与输出主题步骤三启动 Kafka Streams 实例 4.2 事件日志监控案例场景描述解决方案 4.3 用户行为统计案例场景描述解决方案 五、性能优化5.1 如何评估Kafka Streams应用的性能5.1.1 吞吐量5.1.2 延迟5.1.3 内存占用 5.2 优化并行度和吞吐量5.2.1 调整线程池大小5.2.2 调整partition数量5.2.3 使用压缩算法 5.3 实现数据压缩 六、在生产中的应用6.1 高可用性集群部署6.2 监控和报警6.3 日志管理 一、简介
1.1 Kafka Streams定义
Kafka Streams是一款开源、分布式和水平扩展的流处理平台其在Apache Kafka之上进行构建借助其高性能、可伸缩性和容错性可以实现高效的流处理应用程序。
1.2 Kafka Streams的优势
Kafka Streams的优势包括
基于Kafka生态系统可以更轻松地集成到已有Kafka环境中。容易部署和管理可以通过Docker等容器技术轻松实现自动化部署和运维。对于流式数据处理任务Kafka Streams相比其他框架具有更高的性能。
1.3 Kafka Streams应用场景
Kafka Streams主要用于以下应用场景
实时数据处理通过实时地流式计算对数据进行快速分析和处理。流式ETL将数据从一个数据源抽取到另一个数据源或将数据进行转换、清洗和聚合操作。流-表格Join将一条流数据与一个表进行关联查询实现实时查询和联合分析。
二、环境搭建
2.1 安装Kafka
在官网下载Kafka的二进制包解压后即可使用。安装过程可以参考官方文档。
2.2 安装Kafka Streams
在Maven或Gradle项目的pom.xml或build.gradle文件中添加以下依赖即可安装Kafka Streams
dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-streams/artifactIdversion2.8.0/version
/dependency2.3 构建Kafka集群
构建Kafka集群可以使用Docker Compose等工具实现自动化部署。对于测试环境可以使用单台机器构建一个多节点Kafka集群对于生产环境需要根据业务需求和QPS等指标确定集群规模。
三、Kafka Streams编程API介绍
3.1 Kafka Streams主要API
Kafka Streams是一个Java API它允许用户使用简单的Java函数对流式数据进行转换和处理。Kafka Streams主要包括以下API
StreamBuilder用于为Kafka流构建拓扑结构。KStream和KTable可以将Kafka主题中的消息转换为键值对流或表。GlobalKTable类似于KTable但在所有分区中都具有全局状态。Serializer和Deserializer用于序列化和反序列化Java对象以便写入和读取Kafka流。Processor和Transformer用于自定义操作和转换流。
3.2 应用程序的配置和参数
在Kafka Streams应用程序中可以使用以下几种参数来配置应用程序的行为
Bootstrapping Servers指定Kafka集群的引导服务器。应用程序ID每个应用程序必须具有唯一的ID。Serde配置用于指定如何序列化和反序列化记录键和记录值。缓存大小控制用于控制应用程序的本地缓存大小。规则配置用于指定消耗和生产数据的语义。
3.3 Topology的定义和构建
Topology是Kafka Streams应用程序中数据流的物理表示。它是由Processors和State Stores组成的拓扑结构。每个Processor表示一个数据流操作而State Store表示一个具有本地状态的存储设备。Toplogy的构建可以使用StreamBuilder API进行操作。
3.4 各种数据处理操作的使用map、filter、flatmap等
Kafka Streams API提供了各种常见的数据处理操作以便处理流数据。以下是一些基本的数据处理操作
map用于将一个记录键值对转换为另一个键值对。filter用于根据某些条件过滤掉记录。flatMap用于将一个键值对映射为多个键值对。groupByKey按键对记录进行分组。reduceByKey用于针对相同键的记录进行聚合操作。aggregateByKey用于针对相同键的记录进行聚合并将结果插入到全局状态存储中。
示例代码如下
//定义并构建拓扑结构
StreamBuilder builder new StreamBuilder();
KStreamString, String textLines builder.stream(TextLinesTopic);
KStreamString, String wordCounts textLines.flatMapValues(textLine - Arrays.asList(textLine.split(\\W))).groupBy((key, word) - word).count();
wordCounts.to(WordsWithCountsTopic, Produced.with(stringSerde, longSerde));//进行map操作
KStreamString, String upperCaseLines textLines.map((key, value) - KeyValue.pair(key, value.toUpperCase()));//进行filter操作
KStreamString, String shortLines textLines.filter((key, value) - value.length() 10);//进行reduceByKey操作
KTableString, Long wordCountTable textLines.flatMapValues(textLine - Arrays.asList(textLine.toLowerCase().split(\\W))).groupBy((key, word) - word).count(Materialized.as(wordCountStore));四、流处理实战案例
4.1 流处理应用的开发步骤
步骤一创建 Kafka Streams 实例
Properties props new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, my-stream-processing-application);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);
StreamsBuilder builder new StreamsBuilder();
KafkaStreams streams new KafkaStreams(builder.build(), props);props.put(StreamsConfig.APPLICATION_ID_CONFIG, my-stream-processing-application) 指定流处理应用的唯一标识符。props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092) 指定 Kafka 集群的开头地址。StreamsBuilder builder new StreamsBuilder() 创建 StreamsBuilder 实例并用其构建 TOPOLOGY。
步骤二定义输入与输出主题
final String inputTopic streams-input;
final String outputTopic streams-output;
KStreamString, String inputStream builder.stream(inputTopic);
KStreamString, String outputStream inputStream.mapValues(value - value.toUpperCase());
outputStream.to(outputTopic);builder.stream(inputTopic) 从名为 inputTopic 的主题中读取消息返回类型为 KStreamString, String。inputStream.mapValues(value - value.toUpperCase()) 对 inputStream 中的每条消息进行处理并将结果写入 outputStream。outputStream.to(outputTopic) 将 outputStream 中的所有消息写入名为 outputTopic 的主题。
步骤三启动 Kafka Streams 实例
streams.start();streams.start() 启动 Kafka Streams 实例并开始处理消息。
4.2 事件日志监控案例
场景描述
假设我们的后端服务正在每分钟以一个 JSON 对象形式向 Kafka 主题发出 HTTP 请求日志信息其中数据格式为
{timestamp : 2019-01-02T13:54:34.123Z,method: GET,endpoint: http://localhost:8080/api/v1/users,status_code: 200,response_time: 23.4
}现在我们需要实时地可视化用户请求日志更新格式如下
{“time”: ”2019-01-02 14:30:22”,“users”: [{“Country”: “CA”, ”Count”: 60},{“Country”: “US”, “Count”: 38},{“Country”: “CN”, “Count”: 6},]
}解决方案
使用 Kafka Streams 构建一个流处理应用来预处理请求日志条目。根据所需对日志进行聚合和转换比如按国家进行分组和计数并将结果写出到名为 Kafka 主题的输出主题中。最后一旦流处理应用中有新输出条目写出就可以从输出主题中读取并使用任何可用于可视化的工具进行消费。
4.3 用户行为统计案例
场景描述
假设我们正在使用 Kafka 主题从一个移动应用收集用户事件。每个事件都必须记录三个主要属性事件发生的时间戳、时间戳对应的小时和用户类型。
{ timestamp: 1517791088000, hour_of_day: 7, user_type: bronze
}现在我们需要实时地聚合这些事件以了解用户行为例如每小时访问的总用户数和所有不同金属等级的用户数。
解决方案
使用 Kafka Streams 构建一个流处理应用该应用将源主题中的事件作为输入并组合输出结果到目标主题中。
KStreamString, String input stream builder.stream(user_events);
KTableWindowedString, Long hourlyUserCounts inputstream.map((key, value) - new KeyValue(parseTimestamp(value).toString(yyyyMMddHH), value)).groupByKey().count(TimeWindows.of(Duration.ofHours(1)));
KTableWindowedString, Long userCountsByType inputstream.groupByKey().count().groupBy((key, value) - key.split(:)[0], Grouped.with(Serdes.String(), Serdes.Long())).reduce((v1, v2) - v1 v2);hourlyUserCounts.toStream().to(hourly_user_counts, Produced.with(stringSerde, longSerde):
userCountsByType.toStream().to(user_counts_by_type, Produced.with(stringSerde, longSerde));以上示例代码将接收到的用户事件数据即 user_events 主题中的消息转换成 yyyyMMddHH 为时间窗口的键然后进行聚合计数。最终以类似方式对所有用户进行计数并编写将其写出到另外两个主题的代码。
五、性能优化
5.1 如何评估Kafka Streams应用的性能
评估Kafka Streams应用的性能需要关注以下几个方面
5.1.1 吞吐量
吞吐量是指Kafka Streams应用在单位时间内处理的消息数量。可以通过以下指标来评估吞吐量
输入速率Kafka集群每秒发送到Kafka Streams应用的消息数量。处理时延从消息到达Kafka Streams应用到处理完成所需的时间。处理速率Kafka Streams应用每秒处理的消息数量。
5.1.2 延迟
延迟是指Kafka Streams应用处理消息所需的时间。可以通过以下指标来评估延迟
最大延迟Kafka Streams应用处理消息所需的最长时间。平均延迟Kafka Streams应用处理消息所需的平均时间。
5.1.3 内存占用
内存占用是指Kafka Streams应用使用的内存数量。可以通过以下指标来评估内存占用
堆内存使用率Java堆空间已使用的比例。非堆内存使用率Java非堆空间已使用的比例。GC时间Java垃圾回收所需的时间。
5.2 优化并行度和吞吐量
为了提高Kafka Streams应用的并行度和吞吐量可以采用以下优化方式
5.2.1 调整线程池大小
Kafka Streams应用使用线程池处理消息可以通过增加线程池大小来提高并行度和吞吐量。
// 创建线程池指定线程池大小为10
ExecutorService executorService Executors.newFixedThreadPool(10);// 提交任务到线程池
for (int i 0; i 1000; i) {executorService.submit(new Runnable() {public void run() {// 处理消息的逻辑}});
}5.2.2 调整partition数量
将topic划分成多个partition可以提高Kafka Streams应用的并行度和吞吐量。可以通过以下指令调整partition数量
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic mytopic --partitions 105.2.3 使用压缩算法
使用压缩算法可以减少Kafka Streams应用传输过程中的数据量从而提高吞吐量和降低延迟。可以在Kafka Streams应用中配置压缩算法
// 创建Streams配置对象
Properties streamsConfig new Properties();
// 配置默认的压缩算法为gzip
streamsConfig.put(StreamsConfig.COMPRESSION_TYPE_CONFIG, gzip);5.3 实现数据压缩
为了在Kafka Streams应用中实现数据压缩可以使用Gzip压缩算法对消息进行压缩和解压缩
import java.util.Base64;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;public class GzipUtils {public static String compress(String str) {try {if (str null || str.length() 0) {return str;} else {ByteArrayOutputStream out new ByteArrayOutputStream();GZIPOutputStream gzip new GZIPOutputStream(out);gzip.write(str.getBytes());gzip.close();byte[] compressed out.toByteArray();out.close();return Base64.getEncoder().encodeToString(compressed);}} catch (IOException e) {throw new RuntimeException(e);}}public static String uncompress(String str) {try {if (str null || str.length() 0) {return str;} else {byte[] compressed Base64.getDecoder().decode(str);ByteArrayInputStream in new ByteArrayInputStream(compressed);GZIPInputStream gzip new GZIPInputStream(in);ByteArrayOutputStream out new ByteArrayOutputStream();byte[] buffer new byte[4096];int bytesRead -1;while ((bytesRead gzip.read(buffer)) 0) {out.write(buffer, 0, bytesRead);}gzip.close();in.close();out.close();return new String(out.toByteArray(), UTF-8);}} catch (IOException e) {throw new RuntimeException(e);}}}使用示例
// 压缩字符串
String compressedStr GzipUtils.compress(hello world);
// 解压缩字符串
String uncompressedStr GzipUtils.uncompress(compressedStr);注释以上代码实现了Gzip算法的压缩和解压缩功能。压缩时使用java.util.zip.GZIPOutputStream对消息进行压缩解压缩时使用java.util.zip.GZIPInputStream对消息进行解压缩并使用java.util.Base64对压缩后的字节数组进行编码和解码。
六、在生产中的应用
Kafka Streams是一个分布式流处理框架能够轻松地处理实时数据。在生产中应用Kafka Streams时需要注意以下几个方面。
6.1 高可用性集群部署
为了确保Kafka Streams在生产环境中的高可用性我们需要将其部署在一个高可用性集群中。这意味着Kafka Streams需要有多个实例运行即多个Kafka Streams应用程序实例。这些实例应该被分布在多个物理机或虚拟机上以避免单点故障。
以下是一个基于Java的Kafka Streams高可用性集群部署示例 Properties properties new Properties();properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, my-kafka-streams-app);properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);properties.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());properties.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());KafkaStreams streams new KafkaStreams(topology, properties);streams.start();6.2 监控和报警
在生产环境中当Kafka Streams应用程序出现故障或异常时我们需要及时得到通知并采取相应的措施。因此对Kafka Streams进行监控是非常重要的。
例如我们可以使用Kafka Streams提供的StreamsConfig.STATE_DIR_CONFIG属性将状态存储在本地文件系统中以便在发生错误时进行还原。此外我们还可以使用一些开源监控工具如Prometheus和Grafana来监控Kafka Streams应用程序的运行状况并发送报警信息。
以下是一个基于Java的Kafka Streams监控和报警示例 Properties properties new Properties();properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, my-kafka-streams-app);properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);properties.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());properties.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());properties.setProperty(StreamsConfig.STATE_DIR_CONFIG, /tmp/kafka-streams);KafkaStreams streams new KafkaStreams(topology, properties);streams.start();// 使用Prometheus和Grafana进行监控并发送报警信息MonitoringInterceptorUtils monitoringInterceptorUtils new MonitoringInterceptorUtils();monitoringInterceptorUtils.register(streams);6.3 日志管理
在生产环境中我们需要对Kafka Streams应用程序的日志进行管理。如果我们不谨慎处理日志那么将可能对性能产生负面影响并导致无法排查问题。
为了管理Kafka Streams应用程序的日志我们可以将其记录到文件或日志收集系统如ELK或Graylog中以便更好地进行分析和调试。
以下是一个基于Java的Kafka Streams日志管理示例 Properties properties new Properties();properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, my-kafka-streams-app);properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);properties.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());properties.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());properties.setProperty(StreamsConfig.STATE_DIR_CONFIG, /tmp/kafka-streams);KafkaStreams streams new KafkaStreams(topology, properties);streams.start();// 将日志记录到文件中Appender fileAppender RollingFileAppender.newBuilder().setName(fileLogger).setFileName(/tmp/kafka-streams.log).build();fileAppender.start();LoggerContext context (LoggerContext) LogManager.getContext(false);Configuration config context.getConfiguration();config.addAppender(fileAppender);AppenderRef ref AppenderRef.createAppenderRef(fileLogger, null, null);AppenderRef[] refs new AppenderRef[] {ref};LoggerConfig loggerConfig LoggerConfig.createLogger(false, Level.INFO, my.kafkastreams, true, refs, null, config, null);loggerConfig.addAppender(fileAppender, null, null);config.addLogger(my.kafkastreams, loggerConfig);context.updateLoggers();
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/81978.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!