怎么将nginx日志导入elasticsearch

本篇内容主要讲解“怎么将nginx日志导入elasticsearch”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“怎么将nginx日志导入elasticsearch”吧!

成都创新互联10多年成都定制网站服务;为您提供网站建设,网站制作,网页设计及高端网站定制服务,成都定制网站及推广,对成都三轮搅拌车等多个方面拥有丰富的网站推广经验的网站建设公司。

将nginx日志通过filebeat收集后传入logstash,经过logstash处理后写入elasticsearch。filebeat只负责收集工作,logstash完成日志的格式化,数据的替换,拆分 ,以及将日志写入elasticsearch后的索引的创建。

1、配置nginx日志格式

log_format main    '$remote_addr $http_x_forwarded_for [$time_local] $server_name $request ' 
            '$status $body_bytes_sent $http_referer ' 
            '"$http_user_agent" '
            '"$connection" '
            '"$http_cookie" '
            '$request_time '
            '$upstream_response_time';

2、安装配置filebeat,启用nginx module

tar -zxvf filebeat-6.2.4-linux-x86_64.tar.gz -c /usr/local
cd /usr/local;ln -s filebeat-6.2.4-linux-x86_64 filebeat
cd /usr/local/filebeat

启用nginx模块

./filebeat modules enable nginx

查看模块

./filebeat modules list

创建配置文件

vim /usr/local/filebeat/blog_module_logstash.yml
filebeat.modules:
- module: nginx
 access:
  enabled: true
  var.paths: ["/home/weblog/blog.cnfol.com_access.log"]
 #error:
 # enabled: true
 # var.paths: ["/home/weblogerr/blog.cnfol.com_error.log"]


output.logstash:
 hosts: ["192.168.15.91:5044"]

启动filebeat

./filebeat -c blog_module_logstash.yml -e

3、配置logstash

tar -zxvf logstash-6.2.4.tar.gz /usr/local
cd /usr/local;ln -s logstash-6.2.4 logstash
创建一个nginx日志的pipline文件
cd /usr/local/logstash

logstash内置的模板目录

vendor/bundle/jruby/2.3.0/gems/logstash-patterns-core-4.1.2/patterns

编辑 grok-patterns 添加一个支持多ip的正则

forword (?:%{ipv4}[,]?[ ]?)+|%{word}

官方grok

#

创建logstash pipline配置文件

#input {
# stdin {}
#}
# 从filebeat接受数据
input {
 beats {
 port => 5044
 host => "0.0.0.0"
 }
}

filter {
 # 添加一个调试的开关
 mutate{add_field => {"[@metadata][debug]"=>true}}
 grok {
 # 过滤nginx日志
 #match => { "message" => "%{nginxaccess_test2}" }
 #match => { "message" => '%{iporhost:clientip} # (?[^\#]*) # \[%{httpdate:[@metadata][webtime]}\] # %{notspace:hostname} # %{word:verb} %{uripathparam:request} http/%{number:httpversion} # %{number:response} # (?:%{number:bytes}|-) # (?:"(?:%{notspace:referrer}|-)"|%{notspace:referrer}|-) # (?:"(?[^#]*)") # (?:"(?:%{number:connection}|-)"|%{number:connection}|-) # (?:"(?[^#]*)") # %{number:request_time:float} # (?:%{number:upstream_response_time:float}|-)' }
 #match => { "message" => '(?:%{iporhost:clientip}|-) (?:%{two_ip:http_x_forwarded_for}|%{ipv4:http_x_forwarded_for}|-) \[%{httpdate:[@metadata][webtime]}\] (?:%{hostname:hostname}|-) %{word:method} %{uripathparam:request} http/%{number:httpversion} %{number:response} (?:%{number:bytes}|-) (?:"(?:%{notspace:referrer}|-)"|%{notspace:referrer}|-) %{qs:agent} (?:"(?:%{number:connection}|-)"|%{number:connection}|-) (?:"(?[^#]*)") %{number:request_time:float} (?:%{number:upstream_response_time:float}|-)' }
    match => { "message" => '(?:%{iporhost:clientip}|-) %{forword:http_x_forwarded_for} \[%{httpdate:[@metadata][webtime]}\] (?:%{hostname:hostname}|-) %{word:method} %{uripathparam:request} http/%{number:httpversion} %{number:response} (?:%{number:bytes}|-) (?:"(?:%{notspace:referrer}|-)"|%{notspace:referrer}|-) %{qs:agent} (?:"(?:%{number:connection}|-)"|%{number:connection}|-) %{qs:cookie} %{number:request_time:float} (?:%{number:upstream_response_time:float}|-)' }
 }
 # 将默认的@timestamp(beats收集日志的时间)的值赋值给新字段@read_tiimestamp
 ruby { 
 #code => "event.set('@read_timestamp',event.get('@timestamp'))"
 #将时区改为东8区
 code => "event.set('@read_timestamp',event.get('@timestamp').time.localtime + 8*60*60)"
 }
 # 将nginx的日志记录时间格式化
 # 格式化时间 20/may/2015:21:05:56 +0000
 date {
 locale => "en"
 match => ["[@metadata][webtime]","dd/mmm/yyyy:hh:mm:ss z"]
 }
 # 将bytes字段由字符串转换为数字
 mutate {
 convert => {"bytes" => "integer"}
 }
 # 将cookie字段解析成一个json
 #mutate {
 # gsub => ["cookies",'\;',',']
 #} 
 # 如果有使用到CDN加速http_x_forwarded_for会有多个ip,第一个ip是用户真实ip
 if[http_x_forwarded_for] =~ ", "{
     ruby {
         code => 'event.set("http_x_forwarded_for", event.get("http_x_forwarded_for").split(",")[0])'
        }
    }
 # 解析ip,获得ip的地理位置
 geoip {
 source => "http_x_forwarded_for"
 # # 只获取ip的经纬度、国家、城市、时区
 fields => ["location","country_name","city_name","region_name"] 
 }
 # 将agent字段解析,获得浏览器、系统版本等具体信息
 useragent {
 source => "agent"
 target => "useragent"
 }
 #指定要删除的数据
 #mutate{remove_field=>["message"]}
 # 根据日志名设置索引名的前缀
 ruby {
 code => 'event.set("@[metadata][index_pre]",event.get("source").split("/")[-1])'
 } 
 # 将@timestamp 格式化为2019.04.23
 ruby {
 code => 'event.set("@[metadata][index_day]",event.get("@timestamp").time.localtime.strftime("%y.%m.%d"))'
 }
 # 设置输出的默认索引名
 mutate {
 add_field => {
  #"[@metadata][index]" => "%{@[metadata][index_pre]}_%{+yyyy.mm.dd}"
  "[@metadata][index]" => "%{@[metadata][index_pre]}_%{@[metadata][index_day]}"
 }
 }
 # 将cookies字段解析成json
# mutate {
# gsub => [
#  "cookies", ";", ",",
#  "cookies", "=", ":"
# ]
# #split => {"cookies" => ","}
# }
# json_encode {
# source => "cookies"
# target => "cookies_json"
# }
# mutate {
# gsub => [
#  "cookies_json", ',', '","',
#  "cookies_json", ':', '":"'
# ]
# }
# json {
# source => "cookies_json"
# target => "cookies2"
# }
 # 如果grok解析存在错误,将错误独立写入一个索引
 if "_grokparsefailure" in [tags] {
 #if "_dateparsefailure" in [tags] {
 mutate {
  replace => {
  #"[@metadata][index]" => "%{@[metadata][index_pre]}_failure_%{+yyyy.mm.dd}"
  "[@metadata][index]" => "%{@[metadata][index_pre]}_failure_%{@[metadata][index_day]}"
  }
 }
 # 如果不存在错误就删除message
 }else{
 mutate{remove_field=>["message"]}
 }
}

output {
 if [@metadata][debug]{
 # 输出到rubydebuyg并输出metadata
 stdout{codec => rubydebug{metadata => true}}
 }else{
 # 将输出内容转换成 "."
 stdout{codec => dots} 
 # 将输出到指定的es
 elasticsearch {
  hosts => ["192.168.15.160:9200"]
  index => "%{[@metadata][index]}"
  document_type => "doc"
 } 
 }
}

启动logstash

nohup bin/logstash -f test_pipline2.conf &

到此,相信大家对“怎么将nginx日志导入elasticsearch”有了更深的了解,不妨来实际操作一番吧!这里是创新互联网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!


新闻名称:怎么将nginx日志导入elasticsearch
本文路径:http://scyanting.com/article/jogeps.html