快速搭建一个基于ELK+kafka+filebeat的日志分析平台
作为演示目的,我们模拟一个收集分析nginx日志的需求。其中需要两台服务器,一台运行ELK+kafaka, 另一台运行nginx+filebeat。
服务器基本情况如下:
编号 | IP | 用途 | 预装软件 |
---|---|---|---|
A | 172.18.107.120 | 运行ELK和Kafaka | git、docker、docker-compose |
B | 172.18.107.121 | 运行Nginx和Filebeat | nginx、docker |
安装ELK
进入A机器,执行以下步骤:
拉取部署脚本
xxxxxxxxxx
git clone https://github.com/deviantony/docker-elk.git
启动es、logstash、kibana服务
xxxxxxxxxx
cd docker-elk/
docker-compose up -d
至此,访问http://172.18.107.120:5601
即可看到kibana界面,默认登录账号为elastic
,登录密码为changeme
。
安装kafka
进入A机器,执行以下步骤
拉取部署脚本
xxxxxxxxxx
git clone https://github.com/wurstmeister/kafka-docker.git
修改
docker-compose.yml
,将KAFKA_ADVERTISED_HOST_NAME改成本机IPxxxxxxxxxx
KAFKA_ADVERTISED_HOST_NAME172.18.107.120
启动kafka
xxxxxxxxxx
cd kafka-docker
docker-compose up -d
安装filebeat
进入B机器,执行以下步骤
新建文件
/etc/filebeat/filebeat.yml
将下面内容根据实际参数进行调整
xxxxxxxxxx
filebeat.inputs
type log
enabledtrue
paths
# nginx日志路径 /var/log/nginx/access.log
fields
env prod
log_type nginx
exclude_files'.gz$'
#==================== Elasticsearch template setting ==========================
setup.template.settings
index.number_of_shards1
#----------------------------- Kafka output --------------------------------
output.kafka
hosts"172.18.107.120:9092" # kafka 地址
# message topic selection + partitioning
topic'%{[fields][log_type]}_%{[fields][env]}'
partition.round_robin
reachable_onlyfalse
required_acks1
compression gzip
max_message_bytes1000000
#================================ Processors =====================================
# Configure processors to enhance or manipulate events generated by the beat.
processors
drop_fields
fields"@timestamp" "beat.version"
#================================ Logging =====================================
# Sets log level. The default log level is info.
logging.level info
# logging.to_files: true
# logging.to_syslog: false
# logging.files:
# path: /var/filebeat/logs
# name: filebeat.log
# keepfiles: 7
启动filetbat服务
xxxxxxxxxx
docker run -d --name=filebeat \
--hostname nginx_server \
--user=root \
--volume="/etc/filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml:ro" \
--volume="/var/lib/docker/containers:/var/lib/docker/containers:ro" \
--volume="/var/log/nginx:/var/log/nginx:ro" \
docker.elastic.co/beats/filebeat:7.1.1 filebeat \
-e -strict.perms=false
配置logstash
修改
logstash/pipeline/logstash.conf
xxxxxxxxxx
input{
kafka{
bootstrap_servers => ["172.18.107.120:9092"]
topics => ["nginx_prod"]
session_timeout_ms => "36000"
auto_offset_reset => "latest"
consumer_threads =>2
decorate_events =>true
type => "nginx-prod"
group_id => "logstash"
client_id => "client1"
max_poll_records => "550"
max_poll_interval_ms => "300000"
}
}
filter {
mutate {
rename => { "type" => "log_type" }
}
json {
source => "message"
}
grok {
match => {'message' => '%{IPORHOST:remote_addr} - %{USERNAME:remote_user} \[%{HTTPDATE:time_local}\] \"%{DATA:request}\"(| )%{INT:status} %{NUMBER:bytes_sent} \"%{DATA:http_referer}\" \"%{DATA:http_user_agent}\" \"%{DATA:http_x_forward_for}\" \"%{DATA:upstream_addr}\" (%{NUMBER:upstream_response_time}|-) (%{NUMBER:request_time}|-)'}
remove_field => "message"
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "%{[log_type]}-%{+YYYY.MM.dd}"
template_overwrite=>true
user => elastic
password => changeme
}
}
重启logstash
xxxxxxxxxx
docker-compose restart logstash
配置kibana
以上步骤如果没问题的话,进入Index Management即可看到nginx日志有被写入到ES了。
创建一个kibana index pattern
至此,收集日志的工作已经完成,进入Discover可以浏览到日志内容。