使用Logstash-input-jdbc同步mysql中的数据到es中
- 准备好es kibana logstash 保证同一版本并可以正常启动
- logstash 5.x.x版本以上可以直接使用logstash-input-jdbc插件, 不用安装ruby等环境
- 同步方式可选全量或增量方式, 全量方式每次全部同步, 不太妥当, 我们选择使用增量方式, 增量方式第一次启动时也可以进行全部同步, 后面会说具体配置
Logstash目录结构如下所示, 我们创建一个文件夹mysql用来存放mysql同步所需要的配置等文件

logstash使用jdbc也需要引入jdbc的jar包, jar包下载地址
链接:https://pan.baidu.com/s/1qNkuH2T-Ba-bM1wTNvKydQ
提取码:8a4h
把jar包放入上面创建的mysql文件夹中
在mysql文件夹中创建mysql.conf文件, 这个文件就是logstash启动时运行的配置文件, 所有的参数信息需要放在这里面, 复制时把#后面的注释删除
input {
jdbc {
#数据库连接参数
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/bd_mgmt"
# mysql用户名
jdbc_user => "root"
# mysql密码
jdbc_password => "root"
# 刚才放入文件夹的jar包
jdbc_driver_library => "/opt/soft/logstash-7.9.1/mysql/mysql-connector-java-5.1.49.jar"
# 驱动类名
jdbc_driver_class => "com.mysql.jdbc.Driver"
# 开启分页
jdbc_paging_enabled => "true"
# 最大页码
jdbc_page_size => "50000"
# 用于同步的查询sql
statement_filepath => "/opt/soft/logstash-7.9.1/mysql/bdmgmt_baseflight.sql"
#如果要使用其它字段追踪,而不是用时间开启这个配置
use_column_value => true
#设置要追踪的字段
tracking_column => "updated_dt"
# 是否记录sql_last_value
record_last_run => true
#上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值
last_run_metadata_path => "/opt/soft/logstash-7.9.1/mysql/bdmgmt_baseflight.metadata"
# cron表达式, 全是*表示每秒都判断是否有更新
schedule => "* * * * *"
# 加上jdbc时区, 要不然logstash的时间会不准确
jdbc_default_timezone => "Asia/Shanghai"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
elasticsearch {
# elasticsearch url
hosts => ["localhost:9200"]
# 下面两个参数可以开启更新模式
#action => "update"
#doc_as_upsert => true
# 索引名
index => "bdmgmt_flight"
# 文档id 设置成数据库的sid
document_id => "%{sid}"
# 设置文档类型
document_type => "flight"
}
stdout {
codec => json_lines
}
}
输入属性解析:
jdbc_driver_library:驱动路径
jdbc_driver_class :驱动类名称
jdbc_connection_string :数据库连接地址
jdbc_user : 数据库用户名
jdbc_password : 数据库密码
schedule: 定时任务配置,秒,分,时,天
statement : sql查询语句
lowercase_column_names:列名是否转为小写
use_column_value : 使用递增列的值
tracking_column_type : 递增字段的类型,numeric 表示数值类型, timestamp 表示时间戳类型, 只能选择这两个值中的一种, 默认numeric, 如果递增字段用时间类型, 选择timestamp 其他都选择numeric即可
tracking_column : 递增字段的名称,这里使用 update_time 这一列,这列的类型是 timestamp
last_run_metadata_path : 同步点文件,这个文件记录了上次的同步点,重启时会读取这个文件,这个文件可以手动修改
jdbc_paging_enabled : 是否开启分页查询
jdbc_page_size : 每页查询条数
更多输入属性参照:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html
输出属性解析:
action : 动作,update标识更新,index标识新增(默认)
doc_as_upsert : update时如果不存在是否新增。
hosts : es集群地址,http请求地址。
index : 导入到 es 中的 index 名
document_type :导入到 es 中的 type名
document_id : 导入到 es 中的文档 id,这个需要设置成主键,否则同一条记录更新后在 es 中会出现两条记录,%{userId} 表示引用 mysql 表中 userId字段的值。
更多输出属性参照:https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html
创建上面文件中配置的bdmgmt_baseflight.sql , 编写sql文件, 其中:sql_last_value目前的时间, 根据时间进行搜索可以把新增和更新的数据全部查询出来, 每个表里不太相同, 但是想做增量更新, 表中必须要有一个时间字段. 作为更新了该条数据的标志
SELECT
f.sid,
f.DATE,
f. CODE,
f.PLANE_CODE,
f.TAKEOFF_PLAN_TIME,
f.TAKEOFF_ACTUAL_TIME,
f.ARRIVAL_PLAN_TIME,
f.ARRIVAL_ACTUAL_TIME,
f.DEP,
c2.AIRPORT_NAME DEP_NAME,
f.ARR,
c1.AIRPORT_NAME ARR_NAME
FROM
BASE_FLIGHT f
LEFT JOIN AIRPORT_CITY_CODE c1 on f.ARR = c1.THREE_CODE
LEFT JOIN AIRPORT_CITY_CODE c2 on f.DEP = c2.THREE_CODE
where (f.created_dt > :sql_last_value && f.updated_dt is null) || f.updated_dt > :sql_last_value
order by sid
cd到logstash的bin目录, 启动logstash(后台启动) 并且指定日志文件路径
nohup ./logstash -f ../dxjmysql/dxjmysql.conf >> /var/log/logstash/logstash_dxj.log 2>&1&
tail -f /var/log/logstash/logstash_dxj.log 可查看启动日志
关于一次性导入多个表, 查看下边的配置
注意事项,数据库里面的字段不要用type,如果有,as成其他的名字,不然的话,这里判断会有异常
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://localhost:3306/crm?zeroDateTimeBehavior=convertToNull"
jdbc_user => "root"
jdbc_password => ""
jdbc_driver_library => "D:/siyang/elasticsearch-5.2.2/logstash-5.2.2/mysql-connector-java-5.1.30.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
statement_filepath => "filename.sql"
record_last_run => true
last_run_metadata_path => "/opt/soft/logstash-7.9.1/mysql/xxx.metadata"
schedule => "* * * * *"
type => "jdbc_office"
}
jdbc {
jdbc_connection_string => "jdbc:mysql://localhost:3306/crm?zeroDateTimeBehavior=convertToNull"
jdbc_user => "root"
jdbc_password => ""
jdbc_driver_library => "D:/siyang/elasticsearch-5.2.2/logstash-5.2.2/mysql-connector-java-5.1.30.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
record_last_run => true
# 多个jdbc数据源的last_run_metadata_path 不能相同
last_run_metadata_path => "/opt/soft/logstash-7.9.1/mysql/xxx.metadata"
statement => "SELECT * from sys_user"
schedule => "* * * * *"
type => "jdbc_user"
}
}
output {
stdout {
codec => json_lines
}
if[type] == "jdbc_office"{
elasticsearch {
hosts => "localhost:9200"
index => "contacts4"
document_type => "office1"
document_id => "%{id}"
}
}
if[type] == "jdbc_user"{
elasticsearch {
hosts => "localhost:9200"
index => "contacts4"
document_type => "user1"
document_id => "%{id}"
}
}
}
如果数据库中是tinyint(1)类型, 在es中会自动判断成boolean类型, 如果自动判断类型出错, 可以先在es中创建好索引的mapping, 然后再导入数据. tinyint(1)在es中创建short类型的索引就匹配了