ELK kafka filebeat kibana

Posted on Posted in linux

ELK kafka filebeat

可以参考https://liuhonghe.me/elkb.html

流程

filebeat 收集日志 -> kafka -> logstash -> es -> kibana web显示

从后往前安装, 从es开始

软件准备

  • elasticsearch-6.5.4
  • kafka_2.11-2.1.0.tgz
  • filebeat-6.5.4
  • kibana-6.5.4
  • logstash-6.5.4

防火墙

# es 端口
iptables -I INPUT -m state --state NEW -m tcp -p tcp --dport 9200 -j ACCEPT
iptables -I INPUT -m state --state NEW -m tcp -p tcp --dport 9300 -j ACCEPT
# kafka
iptables -I INPUT -m state --state NEW -m tcp -p tcp --dport 9092 -j ACCEPT
# kibana
iptables -I INPUT -m state --state NEW -m tcp -p tcp --dport 5601 -j ACCEPT

es 集群安装

  • 10.10.8.10
  • 10.10.8.11
  • 10.10.8.13
# 有关 java 参数
config/jvm.options

[10.10.8.10] # cat config/elasticsearch.yml | grep -v "#"
cluster.name: es-cluster # 集群名字
node.name: master-node # 节点名称
node.master: true # 设置主节点
node.data: false # 非数据节点
path.data: /home/data/services/elasticsearch/data # 要新建目录
path.logs: /home/data/services/elasticsearch/logs # 要新建目录
network.host: 10.10.8.10
http.port: 9200
discovery.zen.ping.unicast.hosts: ["10.10.8.10", "10.10.8.11", "10.10.8.13"] # 集群通信列表
discovery.zen.minimum_master_nodes: 1

[10.10.8.11] # cat config/elasticsearch.yml | grep -v "#"
cluster.name: es-cluster
node.name: data-node-1 # 节点名称
node.master: false # 非主节点
node.data: true # 数据节点
path.data: /home/data/services/elasticsearch/data
path.logs: /home/data/services/elasticsearch/logs
network.host: 10.10.8.11
http.port: 9200
discovery.zen.ping.unicast.hosts: ["10.10.8.10", "10.10.8.11", "10.10.8.13"]
discovery.zen.minimum_master_nodes: 2

[10.10.8.13] # cat config/elasticsearch.yml | grep -v "#"
cluster.name: es-cluster
node.name: data-node-2
node.master: false
node.data: true
path.data: /home/data/services/elasticsearch/data
path.logs: /home/data/services/elasticsearch/logs
network.host: 10.10.8.13
http.port: 9200
discovery.zen.ping.unicast.hosts: ["10.10.8.10", "10.10.8.11", "10.10.8.13"]
discovery.zen.minimum_master_nodes: 2

有关配置文件

cluster.name:elasticsearch
配置es的集群名称,默认是elasticsearch,es会自动发现在同一网段下的es,如果在同一网段下有多个集群,就可以用这个属性来区分不同的集群。
node.name:”FranzKafka”
节点名,默认随机指定一个name列表中名字,该列表在es的jar包中config文件夹里name.txt文件中,其中有很多作者添加的有趣名字。
node.master:true
指定该节点是否有资格被选举成为node,默认是true,es是默认集群中的第一台机器为master,如果这台机挂了就会重新选举master。
node.data:true
指定该节点是否存储索引数据,默认为true。
index.number_of_shards:5
设置默认索引分片个数,默认为5片。
index.number_of_replicas:1
设置默认索引副本个数,默认为1个副本。
path.conf:/path/to/conf
设置配置文件的存储路径,默认是es根目录下的config文件夹。
path.data:/path/to/data
设置索引数据的存储路径,默认是es根目录下的data文件夹,可以设置多个存储路径,用逗号隔开,例:
path.data:/path/to/data1,/path/to/data2
path.work:/path/to/work
设置临时文件的存储路径,默认是es根目录下的work文件夹。
path.logs:/path/to/logs
设置日志文件的存储路径,默认是es根目录下的logs文件夹
path.plugins:/path/to/plugins
设置插件的存放路径,默认是es根目录下的plugins文件夹
bootstrap.mlockall:true
设置为true来锁住内存。因为当jvm开始swapping时es的效率会降低,所以要保证它不swap,可以把ES_MIN_MEM和ES_MAX_MEM两个环境变量设置成同一个值,并且保证机器有足够的内存分配给es。同时也要允许elasticsearch的进程可以锁住内存,linux下可以通过`ulimit-lunlimited`命令。
network.bind_host:192.168.0.1
设置绑定的ip地址,可以是ipv4或ipv6的,默认为0.0.0.0。network.publish_host:192.168.0.1
设置其它节点和该节点交互的ip地址,如果不设置它会自动判断,值必须是个真实的ip地址。
network.host:192.168.0.1
这个参数是用来同时设置bind_host和publish_host上面两个参数。
transport.tcp.port:9300
设置节点间交互的tcp端口,默认是9300。
transport.tcp.compress:true
设置是否压缩tcp传输时的数据,默认为false,不压缩。
http.port:9200
设置对外服务的http端口,默认为9200。
http.max_content_length:100mb
设置内容的最大容量,默认100mb
http.enabled:false
是否使用http协议对外提供服务,默认为true,开启。
gateway.type:local
gateway的类型,默认为local即为本地文件系统,可以设置为本地文件系统,分布式文件系统,hadoop的HDFS,和amazon的s3服务器,其它文件系统的设置方法下次再详细说。
gateway.recover_after_nodes:1
设置集群中N个节点启动时进行数据恢复,默认为1。
gateway.recover_after_time:5m
设置初始化数据恢复进程的超时时间,默认是5分钟。
gateway.expected_nodes:2
设置这个集群中节点的数量,默认为2,一旦这N个节点启动,就会立即进行数据恢复。
cluster.routing.allocation.node_initial_primaries_recoveries:4
初始化数据恢复时,并发恢复线程的个数,默认为4。
cluster.routing.allocation.node_concurrent_recoveries:2
添加删除节点或负载均衡时并发恢复线程的个数,默认为4。
indices.recovery.max_size_per_sec:0
设置数据恢复时限制的带宽,如入100mb,默认为0,即无限制。
indices.recovery.concurrent_streams:5
设置这个参数来限制从其它分片恢复数据时最大同时打开并发流的个数,默认为5。
discovery.zen.minimum_master_nodes:1
设置这个参数来保证集群中的节点可以知道其它N个有master资格的节点。默认为1,对于大的集群来说,可以设置大一点的值(2-4)
discovery.zen.ping.timeout:3s
设置集群中自动发现其它节点时ping连接超时时间,默认为3秒,对于比较差的网络环境可以高点的值来防止自动发现时出错。
discovery.zen.ping.multicast.enabled:false
设置是否打开多播发现节点,默认是true。
discovery.zen.ping.unicast.hosts:[“host1″,”host2:port”,”host3[portX-portY]”]
设置集群中master节点的初始列表,可以通过这些节点来自动发现新加入集群的节点。

启动

groupadd es
useradd -g es es
chown -R es:es elasticsearch
su es -c "./bin/elasticsearch -d" # -d 后台运行
# 主节点启动, 然后全部启动, 可以看到节点加入的log
[2019-01-25T00:28:13,027][INFO ][o.e.c.s.MasterService    ] [master-node] zen-disco-node-join[{data-node-1}{DF16ppt_Tfaj5efj07v6nA}{voPzaAgTShmN2q4AWAdKrA}{10.10.8.11}{10.10.8.11:9300}{ml.machine_memory=8151105536, ml.max_open_jobs=20, xpack.installed=true, ml.enabled=true}], reason: added {{data-node-1}{DF16ppt_Tfaj5efj07v6nA}{voPzaAgTShmN2q4AWAdKrA}{10.10.8.11}{10.10.8.11:9300}{ml.machine_memory=8151105536, ml.max_open_jobs=20, xpack.installed=true, ml.enabled=true},}
[2019-01-25T00:28:25,106][INFO ][o.e.c.s.MasterService    ] [master-node] zen-disco-node-join[{data-node-2}{ZSR1OOwhR9e75Ac1bJ4qyA}{imjiaBECQ0WqyqO6udu6mw}{10.10.8.13}{10.10.8.13:9300}{ml.machine_memory=8106225664, ml.max_open_jobs=20, xpack.installed=true, ml.enabled=true}], reason: added {{data-node-2}{ZSR1OOwhR9e75Ac1bJ4qyA}{imjiaBECQ0WqyqO6udu6mw}{10.10.8.13}{10.10.8.13:9300}{ml.machine_memory=8106225664, ml.max_open_jobs=20, xpack.installed=true, ml.enabled=true},}

访问主节点查看状态

  • http://10.10.8.10:9200/_cluster/health?pretty
  • http://10.10.8.10:9200/_cat/nodes?v

logstash

用 grok 语法可以过滤多余字段

# 测试一下
bin/logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}' 

配置

cat etc/logstash_kafka.conf 
input{
    kafka{
        bootstrap_servers => ["10.10.8.10:9092,10.10.8.11:9092,10.10.8.13:9092"] # kafka
        group_id => "logstash-hdfs2"
        auto_offset_reset => "earliest"
        consumer_threads => 5
        decorate_events => false
 
        topics => ["test"]
        type => "filebeat"
      }
}
 
output {
    elasticsearch {
        hosts => ["10.10.8.10:9200","10.10.8.11:9200","10.10.8.13:9200"] # es
        index => "tomcat_logs_index_%{+YYYY.MM.dd}"
        sniffing => true
        template_overwrite => true
    }
}

启动

bin/logstash -f etc/logstash_kafka.conf --path.data=/home/data/services/logstash/logs/data

zookeeper 安装

参考 https://liuhonghe.me/marathon-mesos-zookeeper-docker.html

kafka 集群安装

  • 10.10.8.10
  • 10.10.8.11
  • 10.10.8.13
[10.10.8.10] # cat config/server.properties | grep -v "#" | grep -v ^$
broker.id=1 # 一个kafka 一个值
listeners=PLAINTEXT://10.10.8.10:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/home/data/services/kafka/logs # 新建目录
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.10.8.11:2181,10.10.8.12:2181,10.10.8.13:2181 # zookeeper 地址
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0

[10.10.8.11] # cat config/server.properties | grep -v "#" | grep -v ^$
broker.id=2
listeners=PLAINTEXT://10.10.8.11:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/home/data/services/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.10.8.11:2181,10.10.8.12:2181,10.10.8.13:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0

[10.10.8.13] # cat cat config/server.properties | grep -v "#" | grep -v ^$
broker.id=3
listeners=PLAINTEXT://10.10.8.13:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/home/data/services/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.10.8.11:2181,10.10.8.12:2181,10.10.8.13:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0

相关配置

broker.id=0  #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
port=19092 #当前kafka对外提供服务的端口默认是9092
host.name=192.168.7.100 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
num.network.threads=3 #这个是borker进行网络处理的线程数
num.io.threads=8 #这个是borker进行I/O处理的线程数
log.dirs=/opt/kafka/kafkalogs/ #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
num.partitions=1 #默认的分区数,一个topic默认1个分区数
log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
message.max.byte=5242880  #消息保存的最大值5M
default.replication.factor=2  #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
replica.fetch.max.bytes=5242880  #取消息的最大直接数
log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能
zookeeper.connect=192.168.7.100:12181,192.168.7.101:12181,192.168.7.107:1218 #设置zookeeper的连接端口

启动

bin/kafka-server-start.sh -daemon config/server.properties

kafka 工具

filebeat

# es 相关配置要注释掉
#-------------------------- Elasticsearch output ------------------------------
# output.elasticsearch:
  # Array of hosts to connect to.
  # hosts: ["localhost:9200"]

  # Optional protocol and basic auth credentials.
  #protocol: "https"
  #username: "elastic"
  #password: "changeme"

cat filebeat.yml | grep -v "#" | grep -v ^$
filebeat.inputs:
- type: log
  enabled: true # 开启
  paths:
    - /tmp/*.log
filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false
setup.template.settings:
  index.number_of_shards: 3
tags: ["8-10-tmp"]
setup.kibana:
output.kafka:
  enabled: true
  hosts: ["10.10.8.10:9092", "10.10.8.11:9092", "10.10.8.13:9092"] # kafka 节点
  topic: test
processors:
  - add_host_metadata: ~
  - add_cloud_metadata: ~
  - drop_fields:
      fields: ["tags", "prospector", "beat", "host", "input"] # 过滤掉的模块

启动

./filebeat -e -c filebeat.yml

kibana

cat config/kibana.yml  | grep -v "#" | grep -v ^$
server.port: 5601
server.host: "10.10.8.10"
elasticsearch.url: "http://10.10.8.10:9200" # 主 es
logging.dest: stdout
# logging.dest: /home/data/services/kibana/logs/kibana.log

安装插件

https://github.com/sivasamyk/logtrail

启动

bin/kibana

使用

建立一个tomcat_logs_index*的索引, 就能看到日志

» 转载请注明来源:呢喃 » ELK kafka filebeat kibana

Leave a Reply

Your email address will not be published. Required fields are marked *

16 − 2 =