Description
在之前的一篇博客中( #8 ),我记录了怎么从MySQL中,利用Logstash导数据到ES中,但是导出的过程比较粗暴,是一次性全部导完的。但是在实际使用中,我们常常需要根据某些字段值实时的导入数据,比如数据库条目中的create_time
或update_time
等,这样做的好处是显而易见的,因为这样可以避免大量已经导入的数据重复的导入,对于远程导入,这种方式还比较节约带宽,而且高效。本文就记录了利用Logtash来完成这种形式的导入。
从MySQL导数据到Elasticsearch,分别需要MySQL(安装参考 #9 )、Elasticsearch(安装参考 #5 )和中间件Logstash(安装参考 #7 )。
构造MySQL数据
创建并切换数据库
mysql> CREATE DATABASE demo CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
mysql> use demo;
创建demo需要的表
mysql> CREATE TABLE `demo_questions_001` (`id` int(11) NOT NULL AUTO_INCREMENT PRIMARY KEY, `subject` varchar(31) DEFAULT NULL, `question` varchar(300) DEFAULT NULL, created_at timestamp default current_timestamp) DEFAULT CHARSET=utf8;
Query OK, 0 rows affected (0.05 sec)
插入数据
mysql> INSERT INTO demo_questions_001(subject, question) VALUES('math', 'x+y=z+1');
Query OK, 1 row affected (0.01 sec)
mysql> select * from demo_questions_001;
+----+---------+----------+---------------------+
| id | subject | question | created_at |
+----+---------+----------+---------------------+
| 1 | math | x+y=z+1 | 2018-05-07 20:30:56 |
+----+---------+----------+---------------------+
1 row in set (0.00 sec)
配置Logstash
Logstash有专门真对MySQL的input插件,官方文档在这里,这里唯一需要注意的是下载mysql-connector
的jar
包,并在配置文件中使用绝对路径配置其位置。jar
包可以从这里下载。与上一篇博客( #8 )不同的地方在于这次用到了sql_last_value
,这是一个用来记录上一次执行的位置的值,官方文档如下:
The value used to calculate which rows to query. Before any query is run, this is set to Thursday, 1 January 1970, or 0 if use_column_value is true and tracking_column is set. It is updated accordingly after subsequent queries are run.
该值默认是一个1970年1月1日的时间戳,也可以根据用户的定制设置为其他值。如果想要重置sql_last_value
,需要用到last_run_metadata_path
(如下),可以修改这个值,或者将旧值对应的文件删除。
具体的demo.conf
如下:
input {
jdbc {
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_driver_library => "/Users/shellbye/Downloads/mysql-connector-java-5.1.32/mysql-connector-java-5.1.32-bin.jar"
jdbc_connection_string => "jdbc:mysql://localhost:3306/demo"
jdbc_user => "root"
jdbc_password => ""
schedule => "* * * * *"
jdbc_paging_enabled => true
jdbc_page_size => 10000
last_run_metadata_path => "/tmp/logstash.file"
statement => "SELECT id, subject, question from demo_questions_001 where created_at > :sql_last_value"
columns_charset => { "question" => "UTF-8" }
}
}
output {
elasticsearch {
index => "question_index_from_mysql_2"
hosts => ["localhost:9200"]
document_id => "%{id}"
}
}
启动传输
...
[2018-05-07T20:43:25,950][INFO ][logstash.pipeline ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x6dabc970 run>"}
[2018-05-07T20:43:26,093][INFO ][logstash.agent ] Pipelines running {:count=>1, :pipelines=>["main"]}
[2018-05-07T20:44:01,709][INFO ][logstash.inputs.jdbc ] (0.009032s) SELECT version()
[2018-05-07T20:44:01,998][INFO ][logstash.inputs.jdbc ] (0.000736s) SELECT version()
[2018-05-07T20:44:02,088][INFO ][logstash.inputs.jdbc ] (0.006108s) SELECT count(*) AS `count` FROM (SELECT id, subject, question from demo_questions_001 where created_at > '1970-01-01 08:00:00') AS `t1` LIMIT 1
[2018-05-07T20:44:02,154][INFO ][logstash.inputs.jdbc ] (0.000719s) SELECT * FROM (SELECT id, subject, question from demo_questions_001 where created_at > '1970-01-01 08:00:00') AS `t1` LIMIT 10000 OFFSET 0
[2018-05-07T20:45:00,293][INFO ][logstash.inputs.jdbc ] (0.001596s) SELECT version()
[2018-05-07T20:45:00,307][INFO ][logstash.inputs.jdbc ] (0.003129s) SELECT version()
[2018-05-07T20:45:00,330][INFO ][logstash.inputs.jdbc ] (0.007516s) SELECT count(*) AS `count` FROM (SELECT id, subject, question from demo_questions_001 where created_at > '2018-05-07 20:44:01') AS `t1` LIMIT 1
[2018-05-07T20:46:00,295][INFO ][logstash.inputs.jdbc ] (0.000376s) SELECT version()
[2018-05-07T20:46:00,305][INFO ][logstash.inputs.jdbc ] (0.000409s) SELECT version()
[2018-05-07T20:46:00,313][INFO ][logstash.inputs.jdbc ] (0.002396s) SELECT count(*) AS `count` FROM (SELECT id, subject, question from demo_questions_001 where created_at > '2018-05-07 20:45:00') AS `t1` LIMIT 1
[2018-05-07T20:46:00,332][INFO ][logstash.inputs.jdbc ] (0.002503s) SELECT * FROM (SELECT id, subject, question from demo_questions_001 where created_at > '2018-05-07 20:45:00') AS `t1` LIMIT 10000 OFFSET 0
[2018-05-07T20:47:00,328][INFO ][logstash.inputs.jdbc ] (0.000395s) SELECT version()
[2018-05-07T20:47:00,352][INFO ][logstash.inputs.jdbc ] (0.000892s) SELECT version()
[2018-05-07T20:47:00,364][INFO ][logstash.inputs.jdbc ] (0.002741s) SELECT count(*) AS `count` FROM (SELECT id, subject, question from demo_questions_001 where created_at > '2018-05-07 20:46:00') AS `t1` LIMIT 1
从日志中可以看到这样每次读取数据库时,就不是全量的读取了,而是只读取那些未读取的,这样做显然是对的。😘