使用Logstash-input-jdbc同步mysql中的数据到es中

  1. 准备好es kibana logstash 保证同一版本并可以正常启动
  2. logstash 5.x.x版本以上可以直接使用logstash-input-jdbc插件, 不用安装ruby等环境
  3. 同步方式可选全量或增量方式, 全量方式每次全部同步, 不太妥当, 我们选择使用增量方式, 增量方式第一次启动时也可以进行全部同步, 后面会说具体配置

Logstash目录结构如下所示, 我们创建一个文件夹mysql用来存放mysql同步所需要的配置等文件
在这里插入图片描述
logstash使用jdbc也需要引入jdbc的jar包, jar包下载地址
链接:https://pan.baidu.com/s/1qNkuH2T-Ba-bM1wTNvKydQ
提取码:8a4h
把jar包放入上面创建的mysql文件夹中
在mysql文件夹中创建mysql.conf文件, 这个文件就是logstash启动时运行的配置文件, 所有的参数信息需要放在这里面, 复制时把#后面的注释删除

input {
    jdbc {
    #数据库连接参数
      jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/bd_mgmt"
      # mysql用户名
      jdbc_user => "root"
      # mysql密码
      jdbc_password => "root"
      # 刚才放入文件夹的jar包
      jdbc_driver_library => "/opt/soft/logstash-7.9.1/mysql/mysql-connector-java-5.1.49.jar"
      # 驱动类名
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      # 开启分页
      jdbc_paging_enabled => "true"
      # 最大页码
      jdbc_page_size => "50000"
      # 用于同步的查询sql
      statement_filepath => "/opt/soft/logstash-7.9.1/mysql/bdmgmt_baseflight.sql"
      #如果要使用其它字段追踪,而不是用时间开启这个配置
      use_column_value => true
      #设置要追踪的字段
      tracking_column => "updated_dt"
      # 是否记录sql_last_value
      record_last_run => true
      #上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值
      last_run_metadata_path => "/opt/soft/logstash-7.9.1/mysql/bdmgmt_baseflight.metadata"
      # cron表达式, 全是*表示每秒都判断是否有更新
      schedule => "* * * * *"
      # 加上jdbc时区, 要不然logstash的时间会不准确
      jdbc_default_timezone => "Asia/Shanghai"
    }
}

filter {
    json {
        source => "message"
        remove_field => ["message"]
    }
}

output {
    elasticsearch {
    # elasticsearch url
        hosts => ["localhost:9200"]
        # 下面两个参数可以开启更新模式
        #action => "update"
        #doc_as_upsert => true
        # 索引名
        index => "bdmgmt_flight"
        # 文档id 设置成数据库的sid
        document_id => "%{sid}"
        # 设置文档类型
        document_type => "flight"
    }
    stdout {
        codec => json_lines
    }
}

输入属性解析:
jdbc_driver_library:驱动路径
jdbc_driver_class :驱动类名称
jdbc_connection_string :数据库连接地址
jdbc_user : 数据库用户名
jdbc_password : 数据库密码
schedule: 定时任务配置,秒,分,时,天
statement : sql查询语句
lowercase_column_names:列名是否转为小写
use_column_value : 使用递增列的值
tracking_column_type : 递增字段的类型,numeric 表示数值类型, timestamp 表示时间戳类型, 只能选择这两个值中的一种, 默认numeric, 如果递增字段用时间类型, 选择timestamp 其他都选择numeric即可

tracking_column : 递增字段的名称,这里使用 update_time 这一列,这列的类型是 timestamp
last_run_metadata_path : 同步点文件,这个文件记录了上次的同步点,重启时会读取这个文件,这个文件可以手动修改
jdbc_paging_enabled : 是否开启分页查询
jdbc_page_size : 每页查询条数
更多输入属性参照:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html

输出属性解析:
action : 动作,update标识更新,index标识新增(默认)
doc_as_upsert : update时如果不存在是否新增。
hosts : es集群地址,http请求地址。
index : 导入到 es 中的 index 名
document_type :导入到 es 中的 type名
document_id : 导入到 es 中的文档 id,这个需要设置成主键,否则同一条记录更新后在 es 中会出现两条记录,%{userId} 表示引用 mysql 表中 userId字段的值。
更多输出属性参照:https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html

创建上面文件中配置的bdmgmt_baseflight.sql , 编写sql文件, 其中:sql_last_value目前的时间, 根据时间进行搜索可以把新增和更新的数据全部查询出来, 每个表里不太相同, 但是想做增量更新, 表中必须要有一个时间字段. 作为更新了该条数据的标志

SELECT
        f.sid,
        f.DATE,
        f. CODE,
        f.PLANE_CODE,
        f.TAKEOFF_PLAN_TIME,
        f.TAKEOFF_ACTUAL_TIME,
        f.ARRIVAL_PLAN_TIME,
        f.ARRIVAL_ACTUAL_TIME,
        f.DEP,
        c2.AIRPORT_NAME DEP_NAME,
        f.ARR,
        c1.AIRPORT_NAME ARR_NAME
FROM
        BASE_FLIGHT f
        LEFT JOIN AIRPORT_CITY_CODE c1 on f.ARR = c1.THREE_CODE
        LEFT JOIN AIRPORT_CITY_CODE c2 on f.DEP = c2.THREE_CODE
where (f.created_dt > :sql_last_value && f.updated_dt is null) || f.updated_dt > :sql_last_value
order by sid

cd到logstash的bin目录, 启动logstash(后台启动) 并且指定日志文件路径
nohup ./logstash -f ../dxjmysql/dxjmysql.conf >> /var/log/logstash/logstash_dxj.log 2>&1&
tail -f /var/log/logstash/logstash_dxj.log 可查看启动日志

关于一次性导入多个表, 查看下边的配置
注意事项,数据库里面的字段不要用type,如果有,as成其他的名字,不然的话,这里判断会有异常

input {
    jdbc {
      jdbc_connection_string => "jdbc:mysql://localhost:3306/crm?zeroDateTimeBehavior=convertToNull"
      jdbc_user => "root"
      jdbc_password => ""
      jdbc_driver_library => "D:/siyang/elasticsearch-5.2.2/logstash-5.2.2/mysql-connector-java-5.1.30.jar"
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
      statement_filepath => "filename.sql"
      record_last_run => true
      last_run_metadata_path => "/opt/soft/logstash-7.9.1/mysql/xxx.metadata"
      schedule => "* * * * *"
      type => "jdbc_office"
    }
    jdbc {
      jdbc_connection_string => "jdbc:mysql://localhost:3306/crm?zeroDateTimeBehavior=convertToNull"
      jdbc_user => "root"
      jdbc_password => "" 
      jdbc_driver_library => "D:/siyang/elasticsearch-5.2.2/logstash-5.2.2/mysql-connector-java-5.1.30.jar"
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
      record_last_run => true
      # 多个jdbc数据源的last_run_metadata_path 不能相同
      last_run_metadata_path => "/opt/soft/logstash-7.9.1/mysql/xxx.metadata"
      statement => "SELECT * from sys_user"
      schedule => "* * * * *"
      type => "jdbc_user"
    }
}
output {
 stdout {
        codec => json_lines
    }
    if[type] == "jdbc_office"{
        elasticsearch {
        hosts  => "localhost:9200"
        index => "contacts4"
        document_type => "office1"
        document_id => "%{id}"
        }
    }
    if[type] == "jdbc_user"{
        elasticsearch {
        hosts  => "localhost:9200"
        index => "contacts4"
        document_type => "user1"
        document_id => "%{id}"
        }
    }
}

如果数据库中是tinyint(1)类型, 在es中会自动判断成boolean类型, 如果自动判断类型出错, 可以先在es中创建好索引的mapping, 然后再导入数据. tinyint(1)在es中创建short类型的索引就匹配了