最近要搭建一个日志分析平台,整体架构思路是通过flume采集日志到kafka中,从kafka之后分两条路走,一条是实时,一条是离线。实时会用spark或者storm(还没开始做),离线用hadoop+hive,将kafka中的数据消费到hdfs上,通过mr处理进hive,进行统计分析。

采用flume的理由:
flume是java开发的,利于维护和二次开发
flume可扩展,也有较好的容错性,性能也不错
flume支持对现有应用无缝接入,对较老的应用程序侵入性较低

本篇主要简单介绍下flume,因为flume是初次使用所以简单记录下。

Flume版本演变

Flume 是 cloudera 开发的实时日志收集系统,其可以实时的将分布在不同节点、机器上的日志收集到一个存储设备中。目前Flume存在两个大版本,其一为初始发行版本目前被统称为 Flume OG(original generation),属于 cloudera。其二为重构后的版本统称为Flume NG(next generation)。Flume NG对OG的核心组件、核心配置以及代码架构进行了重构。

Flume OG简介

OG的架构为比较流行的主从架构,分为master和node,但角色分为3种,分别为agent、collector和master,agent和collector部署在node上,node的角色根据配置的不同又分为 logical node(逻辑节点)、physical node(物理节点)。

数据流是由agent收集日志数据,集中到collector,再由collector汇入存储终端,master在此过程中的作用是管理agent和collector的活动。

agent和collector内部组件相同,都是由source读取数据到channel,然后sink消费channel中的数据到存储终端或下一个source。

Flume NG简介

Flume NG只有一个agent角色节点,较之OG,将collector和master节点删除,并去掉了node概念,但agent的内部组件依然是source、channel和sink。

Flume NG主要有以下几个核心概念:

  • Event:一个数据单元,带有一个可选的消息头
  • Flow:Event从源点到达目的节点的迁移的抽象
  • Client:操作位于源点处的Event,将其发送到Flume Agent (ExecSource是一个client,将本地文件作为flume的输入)
  • Agent:一个独立的jvm进程,包含组件Source、Channel、Sink
  • Source:用来消费传递到该组件的Event
  • Channel:中转Event的一个临时存储,保存有Source组件传递过来的Event
  • Sink:从Channel中读取,将Event传递到存储设备中或Flow Pipeline中的下一个Agent(如果有的话),然后移除Event

Flume采集数据的大致流程是client将event传送到flume agent,在agent中通过source将event放入channel,然后sink去channel中取数据存储到存储设备中或者flow pipeline。

NG核心组件

source

source是主要作用是从外部client处接收数据并将这些数据存储在配置好的channel中。source支持的外部client包括avro,log4j,syslog 和 http post(body为json格式)。Flume还支持无缝接入现有程序,使其直接读取程序的原始日志文件,有两种方式可以实现:

  • ExecSource: 以运行 Linux 命令的方式,持续的输出最新的数据,如 tail -f 文件名 指令,在这种方式下,取的文件名必须是指定的。 ExecSource 可以实现对日志的实时收集,但是存在Flume不运行或者指令执行出错时,将无法收集到日志数据,无法保证日志数据的完整性。
  • SpoolSource: 监测配置的目录下新增的文件,并将文件中的数据读取出来。需要注意两点:拷贝到 spool 目录下的文件不可以再打开编辑;spool 目录下不可包含相应的子目录

channel

channel是agent存储events的队列,source向其中加入events,sink则从其中消费events。
常用的channel有两个,分别是Memory Channel 和File Channel

  • Memory Channel将events存在内存队列中(队列的大小可以在配置文件中指定)。Memory Channel主要用需要高吞吐量的,并允许存在丢失数据风险的情况下。
  • File Channel将events写入文件中,持久化所有事件。优点是容量较大且死掉时数据可恢复。缺点是速度较慢。

sink

sink将events从channel中取出,导向下一个agent或者文件系统(可以是hdfs、kafka),可以针对不同的存储设备开发自定义的sink

flume+kafka使用

最近要搭建一个日志分析平台,整体架构思路是通过flume采集日志到kafka中,从kafka之后分两条路走,一条是实时,一条是离线。实时会用spark或者storm(还没开始做),离线用hadoop+hive,将kafka中的数据消费到hdfs上,通过mr处理进hive,进行统计分析。架构图如下:
日志分析系统架构图

编译flume

由于需要对kafka sink进行稍微改动添加一个将log内容按需拼接为json然后sink到kafka的功能,所以这里下载flume源码包,对其进行修改并编译。步骤如下:

  • 修改代码

更改KafkaSink.java代码,为了扩展性,新建一个类SzwKafkaSink.java,对event实体进行修改,xxSink都继承AbstractSink,需要实现process方法,具体的处理逻辑也在此方法中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
public Status process() throws EventDeliveryException {
...
try {
long processedEvents = 0;

transaction = channel.getTransaction();
transaction.begin();

messageList.clear();
for (; processedEvents < batchSize; processedEvents += 1) {
event = channel.take();

if (event == null) {
// no events available in channel
break;
}
// log内容的byte数组
byte[] eventBody = event.getBody();
Map<String, String> headers = event.getHeaders();

if ((eventTopic = headers.get(TOPIC_HDR)) == null) {
eventTopic = topic;
}

eventKey = headers.get(KEY_HDR);
...
// string to json for by szw
// 添加一个方法,将string 根据需求拼接为json
eventBody = SzwKafkaSink.stringToJson(eventBody);
if (eventBody != null) {
// create a message and add to buffer
KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>
(eventTopic, eventKey, eventBody);
messageList.add(data);
}

}

// publish batch and commit.
if (processedEvents > 0) {
long startTime = System.nanoTime();
// 调用kafka api 发送events
producer.send(messageList);
long endTime = System.nanoTime();
counter.addToKafkaEventSendTimer((endTime-startTime)/(1000*1000));
counter.addToEventDrainSuccessCount(Long.valueOf(messageList.size()));
}

transaction.commit();

} catch (Exception ex) {
...
} finally {
...
}

return result;
}

public static byte[] stringToJson(byte[] bytes) throws UnsupportedEncodingException {
String str = new String(bytes, "UTF-8");
Map<String, Object> map = new HashMap<String, Object>();
String[] array = {};
int index = -1;
if ((index = str.indexOf("[SESSIONID-")) >= 0){
str = str.substring(index + 1);
array = str.split("\\] \\[");
int len = array.length;
array[len-1] = array[len-1].substring(0, array[len-1].length()-1);
for (int i=0; i<len; i++){
String[] kvArr = array[i].split("-", 2);
try {
map.put(kvArr[0], new JSONObject(kvArr[1]));
} catch (JSONException e) {
map.put(kvArr[0], kvArr[1]);
}
}
}
if (map.size() > 0) {
JSONObject json = new JSONObject(map);
return json.toString().getBytes();
}else {
return null;
}
}

由于将string拼接为json需要依赖json的jar包,则将所依赖的jar包添加到maven的pom文件中,否则随后的mvn过程中会发生错误。pom文件位于flume-ng-kafka-sink目录下,添加内容如下,version可以去maven网站上查找别的版本。

1
2
3
4
5
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20090211</version>
</dependency>
  • 编译flume

在命令行输入mvn clean install -DskipTests,如果报oom的错误,则输入export MAVEN_OPTS="-Xmx1024m -XX:MaxPermSize=512m",然后再次执行mvn命令。

部署flume

解压编译好的flume,在其home目录下的conf文件夹中的flume-env.sh文件中设置下$JAVA_HOME

依然在conf文件夹中新建一个conf文件,在其内设置source、channel和sink相关配置,此处是要将log文件sink到kafka中,则创建_flume-kafka.conf_文件,文件内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
agent1.sources = r1
agent1.channels = c1
agent1.sinks = k1
# For each one of the sources, the type is defined
# execSource 读取某个log文本
agent1.sources.r1.type = exec
agent1.sources.r1.command=tail -f /home/hadoop/flume-test-log/flumetest.log
# 如果command中有管道符 例如 tail -f /home/hadoop/flume-test-log/flumetest.log | grep xx
# 则加上下面的属性
# agent1.sources.r1.shell = /bin/bash -c

# The channel can be defined as follows.
agent1.sources.r1.channels = c1

# Each sink's type must be defined
agent1.sinks.k1.type = org.apache.flume.sink.kafka.SzwKafkaSink
#agent1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
# kafka topic
agent1.sinks.k1.topic = test
# kafka 地址
agent1.sinks.k1.brokerList = 127.0.0.1:9092
agent1.sinks.k1.requiredAcks = 1
agent1.sinks.k1.batchSize = 50

#Specify the channel the sink should use
agent1.sinks.k1.channel = c1

# Each channel's type is defined.
agent1.channels.c1.type = memory

# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100

运行flume agent

运行Flume agent可以使用supervise,命令启动,这样当agent挂掉之后会自动重启agent。

在运行命令之前最好现在kafka集群上创建好topic,创建命令bin/kafka-topics.sh --create --zookeeper hadoop02:2181/kafka --replication-factor 1 --partitions 1 --topic szwtest,否则会自动创建一个topic,replication-factorpartitions会进行默认设置,都是1。
命令行运行

1
bin/flume-ng agent --conf conf/ -f conf/flume-kafka.conf -n agent1 -Dflume.root.logger=DEBUG,console

后台运行

1
nohup bin/flume-ng agent --conf conf/ -f conf/flume-kafka.conf -n agent1 >out.log &

Flume简单性能测试

  • 写个shell脚本循环往log里插入文本1百万条(大概不到3分钟插完)
  • 在插入的过程中启动agent收集日志(先有log文本再启动agent,大概3分钟多点收集结束)

收集log使用的linux命令tail -f xx.log,如果先启动agent,log文件不存在,使用tail命令就无法读取内容,agent也就捕获不到标准输出,不过可以使用tail --follow=name --retry xx.log,这样当log不存在时就会进行重试。

测试结果如下:
cpu/mem结果图1
cpu/mem结果图2
cpu/mem结果图3
机器配置为虚拟机8核、8G内存。

此次测试也就是3mins左右的时间写入1000000条log,agent进行实时收集时使用的cpu和mem情况。
agent的堆内存的设置是export JAVA_OPTS="-Xms256m -Xmx512m"
在此过程中看出cpu比较平缓,维持在5%以下,使用的堆大小维持在100M以下,但是在agent启动初期cpu和mem会有一个比较大的波动,可能是因为启动初期会有大量的log进入内存,随后随着sink的消费cpu消耗处于平缓状态,mem会有所波动但会维持在100M以下。

cpu和mem的信息是用jvisualvm.exe监控的,在_flume-env.sh_中加入
export JAVA_OPTS="-Dcom.sun.management.jmxremote.port=9999 -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false"

远程debug调式flume agent

  • 将flume源码导入ide中(我导入的是intellij idea)
  • 在flume部署目录的bin下找到flume-ng文件,在run_flume()中添加如下代码:
    FLUME_JAVA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000"
  • 在ide中进行远程debug

更多Flume内容查看官网用户手册


agent1.sources.avro-source1.command = /usr/local/bin/tail -n +$(tail -n1 /home/storm/tmp/n) –max-unchanged-stats=600 -F /home/storm/tmp/id.txt | awk ‘ARNGIND==1{i=$0;next}{i++; if($0~/文件已截断/)i=0; print i >> “/home/storm/tmp/n”;print $1”—“i}’ /home/storm/tmp/n -