Skip to content

Logstash通过restful接口获取远程内容 #21

Open
@Shellbye

Description

@Shellbye

之前有若干博客分别介绍了Logstash的安装( #7 #14 )、导入文本文件( #11 )、导入json文件( #10 )以及从MySQL导入文件( #8 #16 )。以上导的数据都是从一个地方,一次性到了另一个地方,虽然来源不一样,但是整体逻辑还是比较简单的,今天记录一个比较复杂的逻辑,即从MySQL(或其他来源)获取数据之后,将拿到的数据发送到远程restful接口(感谢JSONPlaceholder),然后将返回的内容经过处理之后输出到output

构造MySQL数据

创建并切换数据库,然后创建demo需要的表,插入3条数据

mysql> CREATE DATABASE demo CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
mysql> use demo;
mysql> CREATE TABLE user_id_tb (`id` int(11) NOT NULL AUTO_INCREMENT PRIMARY KEY, `user_id` int(11) ) DEFAULT CHARSET=utf8;
mysql> INSERT INTO user_id_tb(user_id) VALUES (2);
mysql> INSERT INTO user_id_tb(user_id) VALUES (1);
mysql> select * from user_id_tb;
+----+---------+
| id | user_id |
+----+---------+
|  1 |       2 |
|  2 |       1 |
+----+---------+
2 rows in set (0.00 sec)

配置Logstash

与之前的文章不同的是,这次配置文件中需要用到一个第三方的filter插件,logstash-filter-rest,这个插件是本文的关键,它是用来发送http请求并处理返回值的插件,具体的安装方法如下:

$LS_HOME/bin/logstash-plugin install logstash-filter-rest

接下来是配置文件

input {
  jdbc {
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_driver_library => "/Users/shellbye/Downloads/mysql-connector-java-5.1.32/mysql-connector-java-5.1.32-bin.jar"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/demo"
    jdbc_user => "root"
    jdbc_password => ""
    schedule => "* * * * *"
    jdbc_paging_enabled => true
    jdbc_page_size => 10000
    last_run_metadata_path => "/tmp/logstash_last_run_demo.file"
    statement => "SELECT id, user_id from user_id_tb"
    columns_charset => { "question" => "UTF-8" }
  }
}
# logstash-6.2.4/bin/logstash-plugin install logstash-filter-rest
# https://github.com/lucashenning/logstash-filter-rest
# https://github.com/elastic/logstash/issues/3489
filter {
  rest {
    request => {
      url => "https://jsonplaceholder.typicode.com/users/%{user_id}"
      method => "GET"
    }
    target => "user_info"
  }
  mutate {
        add_field => { "city" => "%{[user_info][address][city]}" }
  }
  mutate {
        remove_field => [ "user_info"]
  }
}

output {
    elasticsearch {
        index => "user_index"
        hosts => ["localhost:9200"]
        document_id => "%{user_id}"
    }
}

注意上面的remove_field,这里因为信息比较多,所以我就把完整的返回都删除了,只保留了我需要的字段(city),这里还能顺便看到logstash nested json即嵌套的json的读取方法。还有一个需要注意的地方,就是当你使用POST方法时,参数是默认在body里的,这是restful的约定,而不在form里。

结果如下

curl 'localhost:9200/user_index/_search?pretty'
{
  "took" : 6,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 2,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "user_index",
        "_type" : "doc",
        "_id" : "2",
        "_score" : 1.0,
        "_source" : {
          "id" : 1,
          "city" : "Wisokyburgh",
          "@timestamp" : "2018-05-11T02:01:02.231Z",
          "user_id" : 2,
          "@version" : "1"
        }
      },
      {
        "_index" : "user_index",
        "_type" : "doc",
        "_id" : "1",
        "_score" : 1.0,
        "_source" : {
          "id" : 2,
          "city" : "Gwenborough",
          "@timestamp" : "2018-05-11T02:01:02.261Z",
          "user_id" : 1,
          "@version" : "1"
        }
      }
    ]
  }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    ELKElasticsearch/Logstash/Kibana

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions