本人搭建clickhouse流式数据处理系统时,由于驱动兼容性的问题踩了一系列坑。在此记录一个成功配置的参数和步骤。
-
系统组成
基于 docker 搭建 kafka + flink + clickhouse + grafana 的流式数据处理系统。 -
系统功能
支持数据的处理、查询和观测。 -
组件版本
| 组件名称 | 版本 |
|---|---|
| flink | 1.16.3 |
| kafka | 7.2.10 |
| clickhouse | 23.8 |
1. 通过 docker-compose 编排容器
在工程路径下创建文件 docker-compose.yml
version: '3.8'services:# ---------------------------# Zookeeper# ---------------------------zookeeper:image: confluentinc/cp-zookeeper:7.2.10container_name: zookeeperenvironment:ZOOKEEPER_CLIENT_PORT: 2181ZOOKEEPER_TICK_TIME: 2000ports:- "2181:2181"# ---------------------------# Kafka Broker# ---------------------------kafka:image: confluentinc/cp-kafka:7.2.10container_name: kafkaports:- "9092:9092"- "9093:9093"environment:KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXTKAFKA_LISTENERS: INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL://localhost:9093KAFKA_INTER_BROKER_LISTENER_NAME: INTERNALKAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1depends_on:- zookeeper# ---------------------------# Flink JobManager# ---------------------------jobmanager:image: flink:1.16.3container_name: flink-jobmanagerports:- "8081:8081"command: jobmanagerenvironment:- JOB_MANAGER_RPC_ADDRESS=jobmanagervolumes:- ./data/flink-jobs:/opt/flink/usrlibdepends_on:- kafka# ---------------------------# Flink TaskManager# ---------------------------taskmanager:image: flink:1.16.3container_name: flink-taskmanagercommand: taskmanagerenvironment:- JOB_MANAGER_RPC_ADDRESS=jobmanagerscale: 1volumes:- ./data/flink-tasks:/opt/flink/usrlibdepends_on:- jobmanager# ---------------------------# ClickHouse# ---------------------------clickhouse:image: clickhouse/clickhouse-server:23.8container_name: clickhouseports:- "8123:8123" # HTTP interface for queries- "9000:9000" # Native TCP interfacevolumes:- ./data/clickhouse:/var/lib/clickhouseulimits:nofile:soft: 262144hard: 262144# ---------------------------# Grafana# ---------------------------grafana:image: grafana/grafana:10.0.0container_name: grafanaports:- "3000:3000"depends_on:- clickhouseenvironment:GF_INSTALL_PLUGINS: vertamedia-clickhouse-datasource
执行 docker-compose 命令
docker-compose -f docker-compose.yml up -d
2. 安装必要的库
登陆已经运行的 docker 容器 flink-jobmanager
docker exec -it flink-jobmanager /bin/bash
进入路径 /opt/flink/lib
cd /opt/flink/lib
下载或构建必要的jar包,jar包列表如下:
| No. | artifactId | version | groupId | 方式 |
|---|---|---|---|---|
| 1 | clickhouse-jdbc | 0.3.1-patch | ru.yandex.clickhouse | 下载 |
| 2 | flink-connector-base | 1.16.3 | org.apache.flink | 下载 |
| 3 | flink-table-common | 1.16.0 | org.apache.flink | 下载 |
| 4 | guava | 31.0.1-jre | com.google.guava | 下载 |
| 5 | flink-connector-clickhous | 1.16.0-SNAPSHOT | org.apache.flink | 构建 |
| 6 | flink-connector-jdbc | 1.16.3-patch | 构建 |
构建第5项 flink-connector-clickhouse-1.16.0-SNAPSHOT.jar
构建 的方法如下:
-
下载源码:https://github.com/itinycheng/flink-connector-clickhouse/tree/release-1.16
-
切换到分支:release-1.16
-
使用jdk8构建安装:
mvn clean install -DskipTests
- 拷贝到容器中
docker cp ~/.m2/repository/org/apache/flink/flink-connector-clickhouse/1.16.0-SNAPSHOT/flink-connector-clickhouse-1.16.0-SNAPSHOT.jar flink-jobmanager:/opt/flink/lib/
构建第6项 flink-connector-jdbc-1.16.3-patch.jar
- 新建工程,编辑 pom.xml 文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>lab.zhangrj.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>1.16.3-patch</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.16.3</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>1.16.3</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>1.13.6</version><scope>provided</scope></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.29</version><scope>provided</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.4</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>log4j:*</exclude><exclude>org.apache.hadoop:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers combine.children="append"><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"></transformer></transformers></configuration></execution></executions></plugin></plugins></build>
</project>
- 使用jdk8构建安装
- 拷贝到容器中:
docker cp ~/.m2/repository/lab/zhangrj/flink/flink-connector-jdbc/1.16.3-patch/flink-connector-jdbc-1.16.3-patch.jar flink-jobmanager:/opt/flink/lib/
将上述的jar包也同样拷贝进 flink-taskmanager 中。
3. 测试
3.1. 构建测试场景
- 在 kafka 上创建topic:events
- 在 flink-jobmanager创建表和任务:
CREATE OR REPLACE TABLE events (user_id INT,action STRING,ts TIMESTAMP(3),WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka','topic' = 'events','properties.bootstrap.servers' = 'kafka:9092','scan.startup.mode' = 'earliest-offset','format' = 'json'
);CREATE OR REPLACE TABLE metrics (metric_name STRING,metric_value DOUBLE,window_start TIMESTAMP(3),window_end TIMESTAMP(3)
) WITH ('connector' = 'clickhouse','url' = 'clickhouse://clickhouse:8123','database-name' = 'default','table-name' = 'metrics','sink.batch-size' = '500','sink.flush-interval' = '1000','sink.max-retries' = '3','sink.ignore-delete'='false'
);INSERT INTO metrics
SELECT'click_count' AS metric_name,COUNT(*) AS metric_value,window_start,window_end
FROM TABLE(TUMBLE(TABLE events, DESCRIPTOR(ts), INTERVAL '1' MINUTE)
)
GROUP BY window_start, window_end;
一切正常的话,在 flink 控制台的运行任务列表(http://localhost:8081/#/job/running )中,应该可以看到刚创建的任务。
- 在 grafana(http://localhost:3000/)上创建数据视图
SELECTwindow_start,metric_value
FROM metrics
WHERE metric_name = 'click_count'
ORDER BY window_start
3.2. 测试步骤
- 发送数据给kafka
{"user_id": 12345,"action": "click","ts": "2025-11-15 16:10:35.123"
}
- 得到数据视图效果
![image]()
