配置logstash
进入logstash安装目录下config配置文件夹
# 配置数据源 配置应用日志存储路径 或者 导入应用日志到此路径下
input {file{path => ['/usr/local/elk/*.log']type => 'user_log'start_position => "beginning"# 处理日志以时间戳开头 合并异常报错等换行日志codec => multiline {pattern => "^%{TIMESTAMP_ISO8601}"negate => truewhat => "previous"}}
}filter {# 第一步:提取原始 JSON 字符串 按日志文件格式过滤grok {match => {"message" => "%{TIMESTAMP_ISO8601:timestamp} %{DATA:thread} %{LOGLEVEL:log_level} %{DATA:logger} - %{GREEDYDATA:json_message}"}remove_field => ["message"] # 清理冗余字段}# 第二步:备份原始 timestamp 到新字段(非日期类型)mutate {copy => { "[timestamp]" => "[original_timestamp]" } # 直接复制字符串值}# 第三步:时间戳转换(仅处理 @timestamp)date {match => ["timestamp", "YYYY-MM-dd HH:mm:ss.SSS"]target => "@timestamp"# 切换日志时区 否则日志会慢八小时timezone => "Asia/Shanghai"}# 第四步:解析 JSON 并提取 takeTime 到顶层字段json {source => "json_message" # 从预提取字段反序列化target => "parsed_data" # 存储解析结果至新字段skip_on_invalid_json => true # 忽略无效 JSON 格式}mutate {# 将 takeTime 提升到顶层字段 提取接口耗时字段方便分析rename => { "[parsed_data][takeTime]" => "[extracted_takeTime]" }# 确保数值类型convert => { "[extracted_takeTime]" => "integer" }}}output {elasticsearch {hosts => ["http://192.168.98.136:9200"]index => "temp_log-%{+YYYY.MM.dd}" # 输出到es的索引文件名user => "elastic"password => "changeme"}stdout { codec => rubydebug }
}
进入Kibana
- 菜单Management -> Stack Management -> Data/数据 -> Index Management/索引管理
即可看到logstash输入到es的日志 日志名为logstash配置里output定义的 - 菜单Stack Management -> Kibana -> Index pattern/数据视图
即可创建视图 - 菜单Discover即可查询日志
es分析 可直接postman调用
// 获取es所有索引列表
http://ip:xxxx/_cat/indices?v// 获取es索引字段类型
http://ip:xxxx/索引名_temp_log7-2025.04.20/_mapping
// 获取某时间段内 serviceName值出现最多的前十条
{"query": {"bool": {"filter": [{"range": {"@timestamp": {"gte": "2025-04-20T20:00:00.000","lte": "2025-04-20T21:00:00.000"}// 两种格式// "@timestamp": {// "gte": "2025-04-20T21:20:55.000+08:00", // 本地时间 8 点(上海时区)// "lte": "2025-04-20T21:25:00.000+08:00", // 本地时间 9 点// "time_zone": "+08:00" // 显式声明时区:ml-citation// }}}// ,// {// "term": {// "sourceSystem": {// "value": "xxx"// }// }// }]}},"from": 0, // 分页开始的位置,默认为0"size": 10, // 每页文档数量,默认10"sort": [{ "extracted_takeTime": { "order": "asc" }} // 可根据相应字段排序
],"aggs": {"service_name_freq": {"terms": {"script": {"source": "def matcher = /serviceName=([^,]+)/.matcher(params._source.message);matcher.find() ? matcher.group(1) : 'unknown'"},"size": 10 // 返回前10高频值}}}// "aggs": {
// "service_count": {
// "terms": {
// "field": "parsed_data.methodStringName.keyword",
// "size": 10, // 仅返回最高频结果
// "include": "getxxxInfo" // 显式指定目标值
// }
// }
// }
}