5.logstash采集 API 对接
1 概述
logstash日志采集系统对MonitorAgent采集到的的主机日志数据进行处理,上报到gateway监控集群,通过监控指标进行数据的查询。
2 上报的指标信息
2.1 URI
POST api/objs/monitor/gateway/receive
2.2 参数信息
请求参数
| 名称 | 类型 | 是否必须 | 描述 |
| name | string | 是 | 指标名称 |
| fields | map[string]interface{} | 是 | 指标数据点(字典) |
| tags | map[string]string | 是 | 指标tag(字典) |
| type | string | 是 | 指标类型(GAUGE/COUNTER) |
| timestamp | int64 | 是 | unix时间戳(精确到毫秒,13位),多次测试上报时间戳必须依次递增,否则无法上报成功 |
| dStore | uint8 | 是 | 上报指标转发后端(storage,alerting)(默认0,转发到所有后端) 0: 转发到storage,alerting(兼容旧的逻辑) 1: 一个都不转发 2:两个都转发 4:只转发到storage 8:只转发到alerting 16: 只转发到Aplugin 20: 转发到storge和Aplugin |
tags参数
| tags | 值 | 描述 |
| ip | 100.73.18.12 | 主机地址 |
| tenant | nerv | 租户信息 |
| region | cn_hb_1 | 地域信息 |
| project_id | 12 | 项目id |
返回参数
| 名称 | 类型 | 描述 |
| error | error | 错误 |
| total | int | 上报指标数量 |
| invalid | int | 上报无效指标数量 |
| latency | int64 | 上报指标延迟 |
2.3 请求BODY
| [ { "tags":{ "tenant":"nerv", "region":"cn_hb_1", "ip":"100.73.18.12", "project_id":"0" }, "name":"test_logstash", "type":"GAUGE", "fields":{ "level":"info", "message":"time=\"2019-08-12T15:05:05+08:00\" level=info msg=\"where expr: (((ip = 100.73.18.12) AND (tenant = 'nerv')) AND (region = 'cn_hb_1')), trange: {2019-08-12 13:59:17 +0800 CST 2019-08-12 15:05:04 +0800 CST}\" file=\"ast/compile.go:111\"", "msg":"where expr: (((ip = 100.73.18.12) AND (tenant = 'nerv')) AND (region = 'cn_hb_1')), trange: {2019-08-12 13:59:17 +0800 CST 2019-08-12 15:05:04 +0800 CST}", "file":"ast/compile.go:111" } } ] |
2.4 响应BODY
| { "err": null, "total": 4, "invalid": 0, "latency": 0 } |
3 logstash接入示例
3.1 日志原文
| time="2019-08-14T17:40:05+08:00" level=info msg="where expr: (((ip = 100.73.18.12) AND (tenant = 'nerv')) AND (region = 'cn_hb_1')), trange: {2019-08-12 13:59:17 +0800 CST 2019-08-12 15:05:04 +0800 CST}" file="ast/compile.go:111" |
3.2 logstash配置样例
| input { file { path => ["/root/nerv-app/nerv-monitor-agent/log/app.log"] # 只使用绝对路径。若是多个文件,使用数组方式进行具体说明。 #start_position => "beginning" # 从头开始读取文件数据,读到最后一行变为tail -f状态。若没有这行,默认从结束位置开始读取数据 } }
filter { grok{ # 自定义grok,如果日志中有时间,将匹配结果存入log_timestamp,其余字段,存入[fields][*]下 match => { "message" => "time=\"%{NOTSPACE:log_timestamp}\" level=%{NOTSPACE:[level]} msg=\"%{GREEDYDATA:[msg]}\" file=\"%{NOTSPACE:[file]}\""} } }
output { http { http_method => "post" # 值可以是put或者post,默认没有设置 url => "http://100.73.141.101:3345/api/objs/monitor/gateway/receive" # 接收数据的地址 format => "json_batch" # 格式化输出方式,json_batch表示输出json数组 mapping => { "tags" => { # 指标tags(ip、tenant、regeion、project_id) "ip" => "100.73.18.12" # 主机地址 "tenant" => "rrxcloud" # 租户名称 "project_id" => "5" # 项目ID "region" => "cn_hb_1" } # 地域 "name" => "test_logstash" # 指标名称 "type" => "GAUGE" # 指标类型(GAUGE/COUNTER) "fields" => { # 指标上报的数据 "Log_Total" => "%{Log_Total}" "Log_type" => "%{Log_type}" "log_Level" => "%{Log_Level}" "Log_Ip" => "%{Log_Ip}" "message" => "%{message}" } } } #stdout { # 标准输出,用于终端输出调试 # codec => json { # 输出为json字符串。json换为rubydebug,数据会按照ruby格式输出 # } #} } |
output{
elasticsearch{
hosts => ["100.76.37.64:9200","100.76.37.65:9200","100.76.37.66:9200"]
manage_template => true
index => "logstash_network-%{+YYYY-MM-dd}"
}
elasticsearch{
hosts => ["100.115.3.1:9201","100.115.3.2:9201","100.115.3.3:9201"]
manage_template => true
index => "logstash_network-%{+YYYY-MM-dd}"
}
http {
http_method => "post"
url => "http://100.73.141.101:3345/api/objs/monitor/gateway/receive"
format => "json_batch"
mapping => { "tags" => {
"ip" => "100.73.18.12"
"tenant" => "rrxcloud"
"project_id" => "5"
"region" => "cn_hb_1" }
"name" => "test_logstash"
"type" => "GAUGE"
"fields" => {
"Log_Total" => "%{Log_Total}"
"Log_type" => "%{Log_type}"
"log_Level" => "%{Log_Level}"
"Log_Ip" => "%{Log_Ip}"
"message" => "%{message}" }
}
}
}