如您需要技术咨询、解决方案定制、故障排除、运维监控等服务,可联系ericwcn#at#163.com。

ELK 综合日志归档分析系统(2)-Logstash安装配置

Linux 立杰 385℃

本文是继上篇《ELK 综合日志归档分析系统(1)-Elasticsearch安装配置》之后的第二篇,将详细介绍ELK之Logstash安装配置,关于基础环境配置,请参考第一篇文章。

1.Logstash介绍

Logstash 项目诞生于2009,她是一款非常优秀的日志收集处理框架,主要用于收集、过滤、分析服务器的日志。

2.logstash安装

Ubuntu(APT)在线安装

$ wget -qO - https://packages.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -
$ echo "deb https://packages.elastic.co/logstash/2.3/debian stable main" | sudo tee -a /etc/apt/sources.list
$ sudo apt-get update && sudo apt-get install logstash

RedHat/CentOS(YUM)在线安装

$ rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch
$ vim /etc/yum.repos.d/logstash-23.repos
[logstash-2.3]
name=Logstash repository for 2.3.x packages
baseurl=https://packages.elastic.co/logstash/2.3/centos
gpgcheck=1
gpgkey=https://packages.elastic.co/GPG-KEY-elasticsearch
enabled=1

$ yum install logstash

3. logstash-Indexer配置

通过Logstash建立中心日志收集服务,并启动监听TCP端口,用于接收服务器发送过来的日志,并将日志暂存到Redis中.
logstash安装完毕后,默认的配置文件目录在/etc/logstash/conf.d,由于前端日志采集的服务器包括了Windows、Linux。我们建议在Linux系统下使用filebeat,Windows系统使用nxlog来作为Shipper。在日志传输的过程中,采用SSL对数据流量进行加密处理。
首先创建用于Filebeat接收日志的配置文件,这里需要使用beats插件,并启用SSL传输加密。

$ vim 01-filebeat-input.conf 
input {
beats {
    #监听端口
        port => 5044
        #启用ssl
        ssl => true
        ssl_certificate_authorities => ["/etc/logstash/pki/ca.crt"]
        ssl_certificate => "/etc/logstash/pki/elk.wanglijie.cn.crt"
        ssl_key => "/etc/logstash/pki/elk.wanglijie.cn.key"
        ssl_verify_mode => "force_peer"
    }
}

配置Logstash TCP Input,用于收集Nxlog发送过来的日志

$ vim 02-nxlog-input.conf 
input {
    tcp {
        port => 5002
        codec => "json"
        ssl_extra_chain_certs => ["/etc/logstash/pki/ca.crt"]
        ssl_cert => "/etc/logstash/pki/elk.wanglijie.cn.cn.crt"
        ssl_key => "/etc/logstash/pki/elk.wanglijie.cn.key"
        ssl_enable => true
        ssl_verify => false
    }
}

将采集的日志数据,发送的Redis中进行暂存.

$ vim 99-output.conf
output {
    redis { host => "127.0.0.1" data_type => "list" key => "logstash" password=>"GZcY*****Vm" }
#    stdout { codec => rubydebug }
}

4.Logstash Center配置

在Logstash indexer中,仅用来接收日志文件,并提交到Redis中,然后通过Logstash Center从Redis中获取接收的日志,并对日志进行过滤后,存入Elasticsearch集群.
Logstash安装请参考上文。这里主要对Center日志处理,分词等配置进行解析。

从Redis中取出日志

这里使用了spiped将处理外网的Redis映射到如本地的7480端口.

$ vim 01-input-cloud-redis.conf 
input {
    redis {
        host=> "127.0.0.1"
        data_type => "list"
        key => "logstash"
        codec => json
        port => 7480
        password=>"GZcY*****Vm"
    }
}

使用GeoIP分析IP地理位置

$ vim 02-geoip.conf 
filter{
       geoip {
          source => "ip"
          target => "geoip"
          database => "/usr/share/GeoIP/GeoLiteCity.dat"
          add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
          add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
        }
}

Linux syslog日志分析

$ vim 10-syslog.conf 
filter {
  if [type] == "syslog" {
    grok {
      match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
      #match => { "message" => "%{SYSLOGLINE}" }
      #add_field => [ "received_at", "%{@timestamp}" ]
      #add_field => [ "received_from", "%{host}" ]
    }
    syslog_pri { }
    date {
      match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
    }
  }

  if [type] == "syslog_cron" {
    grok {
      match => { "message" => "%{CRONLOG}" }
    }
    syslog_pri { }
    date {
      match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
    }
  }  

  if [type] == "syslog_pamsession" {
    grok {
      match => { "message" => "%{SYSLOGPAMSESSION}" }
    }
    date {
      match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
    }
  }  
  
}

Apache日志文件处理

$ vim 11-apache-log.conf 
filter {
	 if [type] == "apache_access" {
		grok {
		  match => { "message" => "%{COMBINEDAPACHELOG}" }
		}
		geoip {
		  source => "clientip"
		  target => "geoip"
		  database => "/usr/share/GeoIP/GeoLiteCity.dat"
		  add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
		  add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
		}
		mutate {
		  convert => [ "[geoip][coordinates]", "float"]
		}
	}
	
	if [type] == "apache_error" {
		grok {
			patterns_dir => ["/etc/logstash/patterns.d/"]
			#match => { "message" => "%{APACHEERRORLOG}" }			
                        match => { "message" => "%{HTTPD_ERRORLOG}"}
			overwrite => ["message"]
		}
	}
}

Tomcat日志处理

$ vim 12-tomcat-log.conf 
filter {
	if [type] == "tomcat_catalina" and [message] !~ /(.+)/ {
		drop { }
	}
	if [type] == "tomcat_catalina" and "multiline" in [tags] {
		grok {
			match => [ "message", "%{JAVASTACKTRACEPART}" ]
		}
	}
	
	if [type] == "tomcat_access" {
		grok {
			match => { "message" => "%{COMMONAPACHELOG}" }
		}
		#Use GeoIP Locate the IP geographical location
		geoip {
		  source => "clientip"
		  target => "geoip"
		  database => "/usr/share/GeoIP/GeoLiteCity.dat"
		  add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
		  add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
		}
		mutate {
		  convert => [ "[geoip][coordinates]", "float"]
		}
		date {
			match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
		}
	}
}

Windows 日志分析与处理

$ vim 20-windows-event-log-filter.conf
filter {
 
    if [type] == "WindowsEventLog" {
        mutate {
            lowercase => [ "EventType", "FileName", "Hostname", "Severity" ]
        }
        mutate {
            rename => [ "Hostname", "source_host" ]
        }
        mutate {
            gsub => ["source_host","\.example\.com",""]
        }
        date {
            match => [ "EventTime", "YYYY-MM-dd HH:mm:ss +0800" ]
	    timezone => "UTC"
        }
        mutate {
            rename => [ "Severity", "eventlog_severity" ]
            rename => [ "SeverityValue", "eventlog_severity_code" ]
            rename => [ "Channel", "eventlog_channel" ]
            rename => [ "SourceName", "eventlog_program" ]
            rename => [ "SourceModuleName", "nxlog_input" ]
            rename => [ "Category", "eventlog_category" ]
            rename => [ "EventID", "eventlog_id" ]
            rename => [ "RecordNumber", "eventlog_record_number" ]
            rename => [ "ProcessID", "eventlog_pid" ]
            
        }
 
        if [SubjectUserName] =~ "." {
            mutate {
                replace => [ "AccountName", "%{SubjectUserName}" ]
            }
        }
        if [TargetUserName] =~ "." {
            mutate {
                replace => [ "AccountName", "%{TargetUserName}" ]
            }
        }
        if [FileName] =~ "." {
            mutate {
                replace => [ "eventlog_channel", "%{FileName}" ]
            }
        }
 
        mutate {
            lowercase => [ "AccountName", "eventlog_channel" ]
        }
 
        mutate {
            remove_field => [ "SourceModuleType", "EventTimeWritten", "EventReceivedTime", "EventType" ]
        }
    }
}

IIS日志访问日志处理

这里需要配置Nxlog自定义日志收集时的字段。

$ vim 21-filter-iis.conf 
filter {
    if [SourceName] == "IIS" {

        if [message] =~ "^#" {
            drop {}
        }

        useragent {
            add_tag => [ "UA" ]
            source => "csUser-Agent"
        }
        if "UA" in [tags] {
            mutate {
                rename => [ "name", "browser_name" ]
            }
        }
        mutate {
            rename => [ "s-ip","serverip"]            
            rename => [ "cs-method","method" ]
            rename => [ "cs-uri-stem","request"]
            rename => [ "cs-uri-query","uri_query" ]
            rename => [ "s-port","port"]
            rename => [ "cs-username","username" ]
            rename => [ "c-ip","clientip"]
            rename => [ "cs-Referer","referer"]
            rename => [ "sc-status","response" ]
            rename => [ "sc-substatus","substatus"]
            rename => [ "sc-win32-status","win32-status"]
            rename => [ "timetaken","time_request" ]
            
        }        
        geoip {
          source => "clientip"
          target => "geoip"
          database => "/usr/share/GeoIP/GeoLiteCity.dat"
          add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
          add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
        }
        
        mutate {
            remove_field => [
                "SourceModuleType",
                "cs-Referer",
                "cs-uri-query",
                "cs-username",
                "csUser-Agent",
                "EventReceivedTime"
            ]
        }
    }
}

将日志输出并存入ElasticSearch集群

$ vim 30-lumberjack-output.conf 
output {
	elasticsearch {		
		hosts => ["10.112.49.38:9200","10.112.49.99:9200","10.112.49.169:9200"]
		codec => "json"
		#sniffing => true

	}
	#stdout {
	#	codec => rubydebug
	#}
}

配置完毕后,重启Logstash,并检查日志文件是否有错误。

转载请注明:知识库 » ELK 综合日志归档分析系统(2)-Logstash安装配置

喜欢 (0)