Flume简介及初次使用
最近要搭建一个日志分析平台,整体架构思路是通过flume采集日志到kafka中,从kafka之后分两条路走,一条是实时,一条是离线。实时会用spark或者storm(还没开始做),离线用hadoop+hive,将kafka中的数据消费到hdfs上,通过mr处理进hive,进行统计分析。
采用flume的理由:
flume是java开发的,利于维护和二次开发
flume可扩展,也有较好的容错性,性能也不错
flume支持对现有应用无缝接入,对较老的应用程序侵入性较低
本篇主要简单介绍下flume,因为flume是初次使用所以简单记录下。
Flume版本演变
Flume 是 cloudera 开发的实时日志收集系统,其可以实时的将分布在不同节点、机器上的日志收集到一个存储设备中。目前Flume存在两个大版本,其一为初始发行版本目前被统称为 Flume OG(original generation),属于 cloudera。其二为重构后的版本统称为Flume NG(next generation)。Flume NG对OG的核心组件、核心配置以及代码架构进行了重构。
Flume OG简介
OG的架构为比较流行的主从架构,分为master和node,但角色分为3种,分别为agent、collector和master,agent和collector部署在node上,node的角色根据配置的不同又分为 logical node(逻辑节点)、physical node(物理节点)。
数据流是由agent收集日志数据,集中到collector,再由collector汇入存储终端,master在此过程中的作用是管理agent和collector的活动。
agent和collector内部组件相同,都是由source读取数据到channel,然后sink消费channel中的数据到存储终端或下一个source。
Flume NG简介
Flume NG只有一个agent角色节点,较之OG,将collector和master节点删除,并去掉了node概念,但agent的内部组件依然是source、channel和sink。
Flume NG主要有以下几个核心概念:
- Event:一个数据单元,带有一个可选的消息头
- Flow:Event从源点到达目的节点的迁移的抽象
- Client:操作位于源点处的Event,将其发送到Flume Agent (ExecSource是一个client,将本地文件作为flume的输入)
- Agent:一个独立的jvm进程,包含组件Source、Channel、Sink
- Source:用来消费传递到该组件的Event
- Channel:中转Event的一个临时存储,保存有Source组件传递过来的Event
- Sink:从Channel中读取,将Event传递到存储设备中或Flow Pipeline中的下一个Agent(如果有的话),然后移除Event
Flume采集数据的大致流程是client将event传送到flume agent,在agent中通过source将event放入channel,然后sink去channel中取数据存储到存储设备中或者flow pipeline。
NG核心组件
source
source是主要作用是从外部client处接收数据并将这些数据存储在配置好的channel中。source支持的外部client包括avro,log4j,syslog 和 http post(body为json格式)。Flume还支持无缝接入现有程序,使其直接读取程序的原始日志文件,有两种方式可以实现:
- ExecSource: 以运行 Linux 命令的方式,持续的输出最新的数据,如 tail -f 文件名 指令,在这种方式下,取的文件名必须是指定的。 ExecSource 可以实现对日志的实时收集,但是存在Flume不运行或者指令执行出错时,将无法收集到日志数据,无法保证日志数据的完整性。
- SpoolSource: 监测配置的目录下新增的文件,并将文件中的数据读取出来。需要注意两点:拷贝到 spool 目录下的文件不可以再打开编辑;spool 目录下不可包含相应的子目录。
channel
channel是agent存储events的队列,source向其中加入events,sink则从其中消费events。
常用的channel有两个,分别是Memory Channel 和File Channel
- Memory Channel将events存在内存队列中(队列的大小可以在配置文件中指定)。Memory Channel主要用需要高吞吐量的,并允许存在丢失数据风险的情况下。
- File Channel将events写入文件中,持久化所有事件。优点是容量较大且死掉时数据可恢复。缺点是速度较慢。
sink
sink将events从channel中取出,导向下一个agent或者文件系统(可以是hdfs、kafka),可以针对不同的存储设备开发自定义的sink
flume+kafka使用
最近要搭建一个日志分析平台,整体架构思路是通过flume采集日志到kafka中,从kafka之后分两条路走,一条是实时,一条是离线。实时会用spark或者storm(还没开始做),离线用hadoop+hive,将kafka中的数据消费到hdfs上,通过mr处理进hive,进行统计分析。架构图如下:
编译flume
由于需要对kafka sink进行稍微改动添加一个将log内容按需拼接为json然后sink到kafka的功能,所以这里下载flume源码包,对其进行修改并编译。步骤如下:
- 修改代码
更改KafkaSink.java代码,为了扩展性,新建一个类SzwKafkaSink.java,对event实体进行修改,xxSink都继承AbstractSink,需要实现process方法,具体的处理逻辑也在此方法中。
1 | public Status process() throws EventDeliveryException { |
由于将string拼接为json需要依赖json的jar包,则将所依赖的jar包添加到maven的pom文件中,否则随后的mvn过程中会发生错误。pom文件位于flume-ng-kafka-sink目录下,添加内容如下,version可以去maven网站上查找别的版本。
1 | <dependency> |
- 编译flume
在命令行输入mvn clean install -DskipTests
,如果报oom的错误,则输入export MAVEN_OPTS="-Xmx1024m -XX:MaxPermSize=512m"
,然后再次执行mvn命令。
部署flume
解压编译好的flume,在其home目录下的conf文件夹中的flume-env.sh文件中设置下$JAVA_HOME
。
依然在conf文件夹中新建一个conf文件,在其内设置source、channel和sink相关配置,此处是要将log文件sink到kafka中,则创建_flume-kafka.conf_文件,文件内容如下:
1 | agent1.sources = r1 |
运行flume agent
运行Flume agent可以使用
supervise
,命令启动,这样当agent挂掉之后会自动重启agent。
在运行命令之前最好现在kafka集群上创建好topic,创建命令bin/kafka-topics.sh --create --zookeeper hadoop02:2181/kafka --replication-factor 1 --partitions 1 --topic szwtest
,否则会自动创建一个topic,replication-factor和partitions会进行默认设置,都是1。
命令行运行
1 | bin/flume-ng agent --conf conf/ -f conf/flume-kafka.conf -n agent1 -Dflume.root.logger=DEBUG,console |
后台运行
1 | nohup bin/flume-ng agent --conf conf/ -f conf/flume-kafka.conf -n agent1 >out.log & |
Flume简单性能测试
- 写个shell脚本循环往log里插入文本1百万条(大概不到3分钟插完)
- 在插入的过程中启动agent收集日志(先有log文本再启动agent,大概3分钟多点收集结束)
收集log使用的linux命令
tail -f xx.log
,如果先启动agent,log文件不存在,使用tail命令就无法读取内容,agent也就捕获不到标准输出,不过可以使用tail --follow=name --retry xx.log
,这样当log不存在时就会进行重试。
测试结果如下:
机器配置为虚拟机8核、8G内存。
此次测试也就是3mins左右的时间写入1000000条log,agent进行实时收集时使用的cpu和mem情况。
agent的堆内存的设置是export JAVA_OPTS="-Xms256m -Xmx512m"
在此过程中看出cpu比较平缓,维持在5%以下,使用的堆大小维持在100M以下,但是在agent启动初期cpu和mem会有一个比较大的波动,可能是因为启动初期会有大量的log进入内存,随后随着sink的消费cpu消耗处于平缓状态,mem会有所波动但会维持在100M以下。
cpu和mem的信息是用jvisualvm.exe监控的,在_flume-env.sh_中加入export JAVA_OPTS="-Dcom.sun.management.jmxremote.port=9999 -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false"
。
远程debug调式flume agent
- 将flume源码导入ide中(我导入的是intellij idea)
- 在flume部署目录的bin下找到flume-ng文件,在
run_flume()
中添加如下代码:FLUME_JAVA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000"
- 在ide中进行远程debug
更多Flume内容查看官网用户手册
agent1.sources.avro-source1.command = /usr/local/bin/tail -n +$(tail -n1 /home/storm/tmp/n) –max-unchanged-stats=600 -F /home/storm/tmp/id.txt | awk ‘ARNGIND==1{i=$0;next}{i++; if($0~/文件已截断/)i=0; print i >> “/home/storm/tmp/n”;print $1”—“i}’ /home/storm/tmp/n -