Skip to content

从MySQL用Logstash导数据到Elasticsearch #8

Open
@Shellbye

Description

@Shellbye

从MySQL导数据到Elasticsearch,分别需要MySQL(安装参考 #9 )、Elasticsearch(安装参考 #5 )和中间件Logstash(安装参考 #7 )。

构造MySQL数据

首先创建demo要用的数据库

mysql> CREATE DATABASE demo CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

切换数据库

mysql> use demo;

创建demo需要的题目表

mysql> CREATE TABLE `demo_questions` (`id` int(11) NOT NULL AUTO_INCREMENT PRIMARY KEY, `subject` varchar(31) DEFAULT NULL, `question` varchar(300) DEFAULT NULL) DEFAULT CHARSET=utf8;

向表里插入几条数据

mysql> INSERT INTO demo_questions(subject, question) VALUES('math', 'x+y=z+1');
Query OK, 1 row affected (0.00 sec)

mysql> INSERT INTO demo_questions(subject, question) VALUES('math', 'a+b=c+d');
Query OK, 1 row affected (0.00 sec)

mysql> INSERT INTO demo_questions(subject, question) VALUES('math', '1+2+3+4=');
Query OK, 1 row affected (0.00 sec)

OK,到这里数据就构造好了,如下

mysql> select * from demo_questions;
+----+---------+----------+
| id | subject | question |
+----+---------+----------+
|  1 | math    | x+y=z+1  |
|  2 | math    | a+b=c+d  |
|  3 | math    | 1+2+3+4= |
+----+---------+----------+
3 rows in set (0.00 sec)

配置Logstash

Logstash有专门真对MySQL的input插件,官方文档在这里,这里唯一需要注意的是下载mysql-connectorjar包,并在配置文件中使用绝对路径配置其位置。jar包可以从这里下载。具体的demo.conf如下:

input {
  jdbc {
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_driver_library => "/Users/shellbye/Downloads/mysql-connector-java-5.1.32/mysql-connector-java-5.1.32-bin.jar"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/demo"
    jdbc_user => "root"
    jdbc_password => ""
    schedule => "* * * * *"
    jdbc_paging_enabled => true
    jdbc_page_size => 10000
    statement => "SELECT id, subject, question from demo_questions"
    columns_charset => { "question" => "UTF-8" }
  }
}
output {
    elasticsearch {
        index => "question_index_from_mysql"
        hosts => ["localhost:9200"]
        document_id => "%{id}"
    }
}

启动数据传输

shellbye@localhost:/etc/logstash/conf.d$ /usr/share/logstash/bin/logstash -f m.conf
WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults
Could not find log4j2 configuration at path /usr/share/logstash/config/log4j2.properties. Using default config which logs errors to the console
[INFO ] 2018-04-26 16:43:26.406 [main] scaffold - Initializing module {:module_name=>"netflow", :directory=>"/usr/share/logstash/modules/netflow/configuration"}
[INFO ] 2018-04-26 16:43:26.435 [main] scaffold - Initializing module {:module_name=>"fb_apache", :directory=>"/usr/share/logstash/modules/fb_apache/configuration"}
[WARN ] 2018-04-26 16:43:27.145 [LogStash::Runner] multilocal - Ignoring the 'pipelines.yml' file because modules or command line options are specified
[INFO ] 2018-04-26 16:43:27.484 [LogStash::Runner] runner - Starting Logstash {"logstash.version"=>"6.2.4"}
[INFO ] 2018-04-26 16:43:27.716 [Api Webserver] agent - Successfully started Logstash API endpoint {:port=>9600}
[INFO ] 2018-04-26 16:43:28.707 [Ruby-0-Thread-1: /usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/stud-0.0.23/lib/stud/task.rb:22] pipeline - Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>1, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[INFO ] 2018-04-26 16:43:29.214 [[main]-pipeline-manager] elasticsearch - Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://localhost:9200/]}}
[INFO ] 2018-04-26 16:43:29.223 [[main]-pipeline-manager] elasticsearch - Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://localhost:9200/, :path=>"/"}
[WARN ] 2018-04-26 16:43:29.392 [[main]-pipeline-manager] elasticsearch - Restored connection to ES instance {:url=>"http://localhost:9200/"}
[INFO ] 2018-04-26 16:43:29.813 [[main]-pipeline-manager] elasticsearch - ES Output version determined {:es_version=>6}
[WARN ] 2018-04-26 16:43:29.815 [[main]-pipeline-manager] elasticsearch - Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>6}
[INFO ] 2018-04-26 16:43:29.824 [[main]-pipeline-manager] elasticsearch - Using mapping template from {:path=>nil}
[INFO ] 2018-04-26 16:43:29.837 [[main]-pipeline-manager] elasticsearch - Attempting to install template {:manage_template=>{"template"=>"logstash-*", "version"=>60001, "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"_default_"=>{"dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"keyword"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}}
[INFO ] 2018-04-26 16:43:29.880 [[main]-pipeline-manager] elasticsearch - New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//localhost:9200"]}
[INFO ] 2018-04-26 16:43:30.193 [Ruby-0-Thread-1: /usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/stud-0.0.23/lib/stud/task.rb:22] pipeline - Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x21336b3c@/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:247 run>"}
[INFO ] 2018-04-26 16:43:30.242 [Ruby-0-Thread-1: /usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/stud-0.0.23/lib/stud/task.rb:22] agent - Pipelines running {:count=>1, :pipelines=>["main"]}
[INFO ] 2018-04-26 16:44:01.600 [Ruby-0-Thread-13: /usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:284] jdbc - (0.015967s) SELECT version()
[INFO ] 2018-04-26 16:44:01.660 [Ruby-0-Thread-13: /usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:284] jdbc - (0.032411s) SELECT count(*) AS `count` FROM (SELECT id, subject, question from demo_questions) AS `t1` LIMIT 1
[INFO ] 2018-04-26 16:44:01.673 [Ruby-0-Thread-13: /usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:284] jdbc - (0.004498s) SELECT * FROM (SELECT id, subject, question from demo_questions) AS `t1` LIMIT 10000 OFFSET 0

因为我们这里数据比较少,所以基本上第一行select的日志出来时,数据就已经同步完了。
这个时候可以看到ES里面对应的索引里有了3条数据

shellbye@localhost:/var/log/logstash$ curl localhost:9200/_cat/indices?v
health status index                     uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open   question_index_from_mysql lcZyx7rvQvmVoeFdwjqn7Q   5   1          3            0     15.4kb         15.4kb

Metadata

Metadata

Assignees

No one assigned

    Labels

    ELKElasticsearch/Logstash/KibanaMySQL

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions