Logstash
-  测试数据准备 DROP DATABASE IF EXISTS es;CREATE DATABASE es DEFAULT CHARACTER SET utf8;USE es;CREATE TABLE book (id INT NOT NULL,title VARCHAR(20),author VARCHAR(20),price DECIMAL(6,2),PRIMARY KEY(id) );DROP PROCEDURE IF EXISTS batchInsertBook;DELIMITER $$ CREATE PROCEDURE batchInsertBook(IN seed INT, IN loops INT) BEGINDECLARE i INT;DECLARE id INT;SET i = 0;SET id = seed;WHILE i < loops DOINSERT INTO book(id, title,author, price) VALUES(id, '雪山飞狐', '金庸', 50),(id+1, '神雕侠侣', '金庸', 60),(id+2, '三国演义', '罗贯中', 50),(id+3, '西游记', '吴承恩', 40);SET id = id + 4;SET i = i + 1;END WHILE; END $$ DELIMITER ;-- 禁用索引,加快数据导入速度 ALTER TABLE book DISABLE KEYS;-- 调用存储过程导入数据 CALL batchInsertBook(1, 100);-- 添加索引 ALTER TABLE book ENABLE KEYS;-- 修改表的引擎为innodb ALTER TABLE book ENGINE INNODB;mysql> select count(*) from book; +----------+ | count(*) | +----------+ | 40000 | +----------+ 1 row in set (0.03 sec)
-  docker安装logstash# 拉取镜像 docker pull logstash:7.12.1
-  在宿主机配置目录 mkdir -p /root/logstash
-  在宿主机创建 /root/logstash/logstash.yml,内容为空即可,该步骤不能省略
-  在宿主机创建 /root/logstash/logstash.confinput {jdbc {jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-8.0.27.jar"jdbc_driver_class => "com.mysql.cj.jdbc.Driver"jdbc_connection_string => "jdbc:mysql://192.168.126.1:3306/es?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=GMT%2B8"jdbc_user => "root"jdbc_password => "root"schedule => "* * * * *" statement => "SELECT * FROM book"type => "jdbc"} }filter { }output {stdout {codec => json_lines} }-  本次连接的是 windows上的MySQL,通过IpV4的IP地址连接 
-  上传 maven仓库中的jar# \apache-maven-3.9.6\repository\mysql\mysql-connector-java\8.0.11\mysql-connector-java-8.0.11.jar [root@localhost logstash]# ls mysql-connector-java-8.0.11.jar [root@localhost logstash]# chmod 644 mysql-connector-java-8.0.11.jar
-  开启 windows的root远程登录mysql -uroot -proot use mysql; update user set host = '%' where user = 'root'; FLUSH PRIVILEGES;mysql> select host,user from user; +-----------+------------------+ | host | user | +-----------+------------------+ | % | root | | localhost | mysql.infoschema | | localhost | mysql.session | | localhost | mysql.sys | +-----------+------------------+ 4 rows in set (0.00 sec)
 
-  
-  启动 logstash容器docker run -d \--name logstash \-v ~/logstash/logstash.yml:/usr/share/logstash/config/logstash.yml \-v ~/logstash/logstash.conf:/usr/share/logstash/pipeline/logstash.conf \-v ~/logstash/mysql-connector-java-8.0.11.jar:/usr/share/logstash/mysql-connector-java-8.0.11.jar \logstash:7.12.1
-  查看日志 # 查看数据同步 docker logs -f logstash 
-  将数据输出到 ElasticSearch{% note blue ‘fas fa-bullhorn’ modern %} 既然我们能从 mysql中读取数据,并输出到stdout,那么我们同样可以从mysql中读取数据,然后将数据输出到ElasticSearch,修改logstash.conf,内容如下{% endnote %} input {jdbc {jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-8.0.11.jar"jdbc_driver_class => "com.mysql.cj.jdbc.Driver"jdbc_connection_string => "jdbc:mysql://192.168.126.1:3306/es?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=GMT%2B8"jdbc_user => "root"jdbc_password => "root"schedule => "* * * * *"statement => "SELECT * FROM book"type => "jdbc"} }filter {}output {elasticsearch {hosts => ["192.168.32.128:9200"]index => "book"document_id => "%{id}"}stdout {codec => json_lines} }
-  确保 es是启动的 
-  重启 [root@192 logstash]# docker restart logstash logstash
-  进入如下界面  
-  查看是否同步到 es 
-  增量同步 -  修改 logstash.conf,内容如下input {jdbc {jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-8.0.11.jar"jdbc_driver_class => "com.mysql.cj.jdbc.Driver"jdbc_connection_string => "jdbc:mysql://192.168.126.1:3306/es?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=GMT%2B8"jdbc_user => "root"jdbc_password => "root"schedule => "* * * * *"type => "jdbc"# 记录查询结果中,某个字段的值use_column_value => true# 记录id字段的值,用于增量同步tracking_column => "id"# 指明要记录的字段的类型tracking_column_type => numeric# 指定要记录上一次查询的数据record_last_run => true# :sql_last_value代表上次查询出来的最大的“tracking_column”中的值statement => "SELECT * FROM book where id > :sql_last_value"last_run_metadata_path => "syncpoint_table"} }filter { }output {elasticsearch {hosts => ["192.168.32.128:9200"]index => "book"document_id => "%{id}"}stdout {codec => json_lines} } 
 
-