本文是继上篇《ELK 综合日志归档分析系统(1)-Elasticsearch安装配置》之后的第二篇,将详细介绍ELK之Logstash安装配置,关于基础环境配置,请参考第一篇文章。
1.Logstash介绍
Logstash 项目诞生于2009,她是一款非常优秀的日志收集处理框架,主要用于收集、过滤、分析服务器的日志。
2.logstash安装
Ubuntu(APT)在线安装
$ wget -qO - https://packages.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add - $ echo "deb https://packages.elastic.co/logstash/2.3/debian stable main" | sudo tee -a /etc/apt/sources.list $ sudo apt-get update && sudo apt-get install logstash
RedHat/CentOS(YUM)在线安装
$ rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch $ vim /etc/yum.repos.d/logstash-23.repos [logstash-2.3] name=Logstash repository for 2.3.x packages baseurl=https://packages.elastic.co/logstash/2.3/centos gpgcheck=1 gpgkey=https://packages.elastic.co/GPG-KEY-elasticsearch enabled=1 $ yum install logstash
3. logstash-Indexer配置
通过Logstash建立中心日志收集服务,并启动监听TCP端口,用于接收服务器发送过来的日志,并将日志暂存到Redis中.
logstash安装完毕后,默认的配置文件目录在/etc/logstash/conf.d,由于前端日志采集的服务器包括了Windows、Linux。我们建议在Linux系统下使用filebeat,Windows系统使用nxlog来作为Shipper。在日志传输的过程中,采用SSL对数据流量进行加密处理。
首先创建用于Filebeat接收日志的配置文件,这里需要使用beats插件,并启用SSL传输加密。
$ vim 01-filebeat-input.conf input { beats { #监听端口 port => 5044 #启用ssl ssl => true ssl_certificate_authorities => ["/etc/logstash/pki/ca.crt"] ssl_certificate => "/etc/logstash/pki/elk.wanglijie.cn.crt" ssl_key => "/etc/logstash/pki/elk.wanglijie.cn.key" ssl_verify_mode => "force_peer" } }
配置Logstash TCP Input,用于收集Nxlog发送过来的日志
$ vim 02-nxlog-input.conf input { tcp { port => 5002 codec => "json" ssl_extra_chain_certs => ["/etc/logstash/pki/ca.crt"] ssl_cert => "/etc/logstash/pki/elk.wanglijie.cn.cn.crt" ssl_key => "/etc/logstash/pki/elk.wanglijie.cn.key" ssl_enable => true ssl_verify => false } }
将采集的日志数据,发送的Redis中进行暂存.
$ vim 99-output.conf output { redis { host => "127.0.0.1" data_type => "list" key => "logstash" password=>"GZcY*****Vm" } # stdout { codec => rubydebug } }
4.Logstash Center配置
在Logstash indexer中,仅用来接收日志文件,并提交到Redis中,然后通过Logstash Center从Redis中获取接收的日志,并对日志进行过滤后,存入Elasticsearch集群.
Logstash安装请参考上文。这里主要对Center日志处理,分词等配置进行解析。
从Redis中取出日志
这里使用了spiped将处理外网的Redis映射到如本地的7480端口.
$ vim 01-input-cloud-redis.conf input { redis { host=> "127.0.0.1" data_type => "list" key => "logstash" codec => json port => 7480 password=>"GZcY*****Vm" } }
使用GeoIP分析IP地理位置
$ vim 02-geoip.conf filter{ geoip { source => "ip" target => "geoip" database => "/usr/share/GeoIP/GeoLiteCity.dat" add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ] add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ] } }
Linux syslog日志分析
$ vim 10-syslog.conf filter { if [type] == "syslog" { grok { match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" } #match => { "message" => "%{SYSLOGLINE}" } #add_field => [ "received_at", "%{@timestamp}" ] #add_field => [ "received_from", "%{host}" ] } syslog_pri { } date { match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ] } } if [type] == "syslog_cron" { grok { match => { "message" => "%{CRONLOG}" } } syslog_pri { } date { match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ] } } if [type] == "syslog_pamsession" { grok { match => { "message" => "%{SYSLOGPAMSESSION}" } } date { match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ] } } }
Apache日志文件处理
$ vim 11-apache-log.conf filter { if [type] == "apache_access" { grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } geoip { source => "clientip" target => "geoip" database => "/usr/share/GeoIP/GeoLiteCity.dat" add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ] add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ] } mutate { convert => [ "[geoip][coordinates]", "float"] } } if [type] == "apache_error" { grok { patterns_dir => ["/etc/logstash/patterns.d/"] #match => { "message" => "%{APACHEERRORLOG}" } match => { "message" => "%{HTTPD_ERRORLOG}"} overwrite => ["message"] } } }
Tomcat日志处理
$ vim 12-tomcat-log.conf filter { if [type] == "tomcat_catalina" and [message] !~ /(.+)/ { drop { } } if [type] == "tomcat_catalina" and "multiline" in [tags] { grok { match => [ "message", "%{JAVASTACKTRACEPART}" ] } } if [type] == "tomcat_access" { grok { match => { "message" => "%{COMMONAPACHELOG}" } } #Use GeoIP Locate the IP geographical location geoip { source => "clientip" target => "geoip" database => "/usr/share/GeoIP/GeoLiteCity.dat" add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ] add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ] } mutate { convert => [ "[geoip][coordinates]", "float"] } date { match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ] } } }
Windows 日志分析与处理
$ vim 20-windows-event-log-filter.conf filter { if [type] == "WindowsEventLog" { mutate { lowercase => [ "EventType", "FileName", "Hostname", "Severity" ] } mutate { rename => [ "Hostname", "source_host" ] } mutate { gsub => ["source_host","\.example\.com",""] } date { match => [ "EventTime", "YYYY-MM-dd HH:mm:ss +0800" ] timezone => "UTC" } mutate { rename => [ "Severity", "eventlog_severity" ] rename => [ "SeverityValue", "eventlog_severity_code" ] rename => [ "Channel", "eventlog_channel" ] rename => [ "SourceName", "eventlog_program" ] rename => [ "SourceModuleName", "nxlog_input" ] rename => [ "Category", "eventlog_category" ] rename => [ "EventID", "eventlog_id" ] rename => [ "RecordNumber", "eventlog_record_number" ] rename => [ "ProcessID", "eventlog_pid" ] } if [SubjectUserName] =~ "." { mutate { replace => [ "AccountName", "%{SubjectUserName}" ] } } if [TargetUserName] =~ "." { mutate { replace => [ "AccountName", "%{TargetUserName}" ] } } if [FileName] =~ "." { mutate { replace => [ "eventlog_channel", "%{FileName}" ] } } mutate { lowercase => [ "AccountName", "eventlog_channel" ] } mutate { remove_field => [ "SourceModuleType", "EventTimeWritten", "EventReceivedTime", "EventType" ] } } }
IIS日志访问日志处理
这里需要配置Nxlog自定义日志收集时的字段。
$ vim 21-filter-iis.conf filter { if [SourceName] == "IIS" { if [message] =~ "^#" { drop {} } useragent { add_tag => [ "UA" ] source => "csUser-Agent" } if "UA" in [tags] { mutate { rename => [ "name", "browser_name" ] } } mutate { rename => [ "s-ip","serverip"] rename => [ "cs-method","method" ] rename => [ "cs-uri-stem","request"] rename => [ "cs-uri-query","uri_query" ] rename => [ "s-port","port"] rename => [ "cs-username","username" ] rename => [ "c-ip","clientip"] rename => [ "cs-Referer","referer"] rename => [ "sc-status","response" ] rename => [ "sc-substatus","substatus"] rename => [ "sc-win32-status","win32-status"] rename => [ "timetaken","time_request" ] } geoip { source => "clientip" target => "geoip" database => "/usr/share/GeoIP/GeoLiteCity.dat" add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ] add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ] } mutate { remove_field => [ "SourceModuleType", "cs-Referer", "cs-uri-query", "cs-username", "csUser-Agent", "EventReceivedTime" ] } } }
将日志输出并存入ElasticSearch集群
$ vim 30-lumberjack-output.conf output { elasticsearch { hosts => ["10.112.49.38:9200","10.112.49.99:9200","10.112.49.169:9200"] codec => "json" #sniffing => true } #stdout { # codec => rubydebug #} }
配置完毕后,重启Logstash,并检查日志文件是否有错误。
转载请注明:自动化运维 » ELK 综合日志归档分析系统(2)-Logstash安装配置