利用flume进行日志采集已在业界广泛使用,如果需要采集的log比较多,并且分布在多个服务器上,那么对flume agent的管理就是一个头疼的事情。
本篇主要介绍下利用zookeeper对flume进程进行监控,当flume异常退出时进行报警,以至于我们能够及时的发现故障解决故障。

思路很简单,其实开发起来也很简单,嘿嘿。。
主要是利用了zookeeper临时节点的功能,当flume agent启动时与zk建立连接创建一个临时节点,并周期的与zk进行心跳,当flume发生异常退出程序时,与zk的心跳失败,导致临时节点被zk删除,监听到这个事件之后就可以进行报警,达到监控的目的。

是不是很简单,但是如果只就这样去开发上线是有个bug的,当你正常退出flume进程时,zk上的临时节点也会被删除,同样会监听到此事件,也会报警。其实我们应该给每个flume agent在zk上创建一个临时节点的同时也应该创建一个永久性节点,这个永久性节点用来标识此agent的信息,当agent异常退出时,临时性节点会被删除但永久性节点不会被删除,这样当watcher监听到临时节点被删除,并且检测到永久性节点并没有被删除,那就可以断定agent是异常退出,需要报警,当agent正常退出时,会将永久性节点删除,同样临时节点也会被删除,这样watcher监听到临时节点被删除去检测永久性节点时,发现也被删除了,就可以断定此agent是正常退出,不需要报警。

此思路借鉴了HDFS HA机制,之所以利用zookeeper来进行监控是因为我们是将flume采集的log写入kafka,这样zookeeper已是我们的组件之一,重复利用现有的组件可以减少我们的维护量和开发量,并且zookeeper避免单点故障,也不用我们自己开发心跳模块和监听模块,开发量很小。

扯了那么多来点干货吧。

我没有直接在agent的启动流程中添加与zk的连接,感觉这样对flume的侵入性太大了,而是引入了watchdog,在watchdog中添加相关代码。

watchdog 是老版flume中的组件,用来监控agent进程,当agent异常退出之后进行重启。watchdog的具体代码可以参考github,我们在此基础上进行了修改。

在watchdog的startup中与zk建立连接,zk client用的是Curator
代码如下:

1
2
3
4
5
6
7
8
9
10
void startup() throws Exception {
...
// 与zk建立连接
CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_ADDRESS, new ExponentialBackoffRetry(1000, 3));
client.start();
// 创建永久节点
client.create().withMode(CreateMode.PERSISTENT).forPath(PATH_P + "/test", new byte[0]);
// 创建临时节点
client.create().withMode(CreateMode.EPHEMERAL).forPath(PATH + "/test", new byte[0]);
}

agent端的代码就算结束了,我们需要再起一个服务来监听zk上节点的信息。

监听服务我使用的是Curator的PathCache来监听一个目录的所有子节点的delete事件。此服务在与zk建立连接时注册一个watcher,代码是在PathCache的example的基础上进行了修改,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public static void start() throws Exception
{

CuratorFramework client = null;
PathChildrenCache cache = null;
try
{
client = CuratorFrameworkFactory.newClient(ZK_ADDRESS, new ExponentialBackoffRetry(1000, 3));
client.start();
// path cache 类似 zk中的watcher
// in this example we will cache data. Notice that this is optional.
cache = new PathChildrenCache(client, PATH, true);
cache.start();
// 给cache注册监听器,对相应事件进行处理
addListener(cache);
}
finally
{
CloseableUtils.closeQuietly(cache);
CloseableUtils.closeQuietly(client);
}
}

private static void addListener(PathChildrenCache cache)
{
// a PathChildrenCacheListener is optional. Here, it's used just to log changes
PathChildrenCacheListener listener = new PathChildrenCacheListener()
{
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
{
switch ( event.getType() )
{
case CHILD_ADDED:
{
System.out.println("Node added: " + ZKPaths.getNodeFromPath(event.getData().getPath()));
break;
}

case CHILD_UPDATED:
{
System.out.println("Node changed: " + ZKPaths.getNodeFromPath(event.getData().getPath()));
break;
}

case CHILD_REMOVED:
{
System.out.println("判断永久节点是否存在,决定是否报警");
System.out.println("Node removed: " + ZKPaths.getNodeFromPath(event.getData().getPath()));
break;
}
}
}
};
cache.getListenable().addListener(listener);
}

这个服务比较简单,就一个类,打算随后将其集成到自动化部署平台中,上面只是伪代码,详细代码以后有时间再不全。