快速搭建一个基于ELK+kafka+filebeat的日志分析平台
作为演示目的,我们模拟一个收集分析nginx日志的需求。其中需要两台服务器,一台运行ELK+kafaka, 另一台运行nginx+filebeat。
服务器基本情况如下:
| 编号 | IP | 用途 | 预装软件 |
|---|---|---|---|
| A | 172.18.107.120 | 运行ELK和Kafaka | git、docker、docker-compose |
| B | 172.18.107.121 | 运行Nginx和Filebeat | nginx、docker |
安装ELK
进入A机器,执行以下步骤:
拉取部署脚本
xxxxxxxxxxgit clone https://github.com/deviantony/docker-elk.git启动es、logstash、kibana服务
xxxxxxxxxxcd docker-elk/docker-compose up -d
至此,访问http://172.18.107.120:5601即可看到kibana界面,默认登录账号为elastic,登录密码为changeme。
安装kafka
进入A机器,执行以下步骤
拉取部署脚本
xxxxxxxxxxgit clone https://github.com/wurstmeister/kafka-docker.git修改
docker-compose.yml,将KAFKA_ADVERTISED_HOST_NAME改成本机IPxxxxxxxxxxKAFKA_ADVERTISED_HOST_NAME172.18.107.120启动kafka
xxxxxxxxxxcd kafka-dockerdocker-compose up -d
安装filebeat
进入B机器,执行以下步骤
新建文件
/etc/filebeat/filebeat.yml将下面内容根据实际参数进行调整
xxxxxxxxxxfilebeat.inputstypelogenabledtruepaths/var/log/nginx/access.log # nginx日志路径fieldsenvprodlog_typenginxexclude_files'.gz$'#==================== Elasticsearch template setting ==========================setup.template.settingsindex.number_of_shards1#----------------------------- Kafka output --------------------------------output.kafkahosts"172.18.107.120:9092" # kafka 地址# message topic selection + partitioningtopic'%{[fields][log_type]}_%{[fields][env]}'partition.round_robinreachable_onlyfalserequired_acks1compressiongzipmax_message_bytes1000000#================================ Processors =====================================# Configure processors to enhance or manipulate events generated by the beat.processorsdrop_fieldsfields"@timestamp" "beat.version"#================================ Logging =====================================# Sets log level. The default log level is info.logging.levelinfo# logging.to_files: true# logging.to_syslog: false# logging.files:# path: /var/filebeat/logs# name: filebeat.log# keepfiles: 7启动filetbat服务
xxxxxxxxxxdocker run -d --name=filebeat \--hostname nginx_server \--user=root \--volume="/etc/filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml:ro" \--volume="/var/lib/docker/containers:/var/lib/docker/containers:ro" \--volume="/var/log/nginx:/var/log/nginx:ro" \docker.elastic.co/beats/filebeat:7.1.1 filebeat \-e -strict.perms=false
配置logstash
修改
logstash/pipeline/logstash.confxxxxxxxxxxinput{kafka{bootstrap_servers => ["172.18.107.120:9092"]topics => ["nginx_prod"]session_timeout_ms => "36000"auto_offset_reset => "latest"consumer_threads =>2decorate_events =>truetype => "nginx-prod"group_id => "logstash"client_id => "client1"max_poll_records => "550"max_poll_interval_ms => "300000"}}filter {mutate {rename => { "type" => "log_type" }}json {source => "message"}grok {match => {'message' => '%{IPORHOST:remote_addr} - %{USERNAME:remote_user} \[%{HTTPDATE:time_local}\] \"%{DATA:request}\"(| )%{INT:status} %{NUMBER:bytes_sent} \"%{DATA:http_referer}\" \"%{DATA:http_user_agent}\" \"%{DATA:http_x_forward_for}\" \"%{DATA:upstream_addr}\" (%{NUMBER:upstream_response_time}|-) (%{NUMBER:request_time}|-)'}remove_field => "message"}}output {elasticsearch {hosts => ["elasticsearch:9200"]index => "%{[log_type]}-%{+YYYY.MM.dd}"template_overwrite=>trueuser => elasticpassword => changeme}}重启logstash
xxxxxxxxxxdocker-compose restart logstash
配置kibana
以上步骤如果没问题的话,进入Index Management即可看到nginx日志有被写入到ES了。

创建一个kibana index pattern

至此,收集日志的工作已经完成,进入Discover可以浏览到日志内容。
