利用flume进行日志采集已在业界广泛使用,如果需要采集的log比较多,并且分布在多个服务器上,那么对flume agent的管理就是一个头疼的事情。
本篇主要介绍下利用zookeeper对flume进程进行监控,当flume异常退出时进行报警,以至于我们能够及时的发现故障解决故障。
思路很简单,其实开发起来也很简单,嘿嘿。。
主要是利用了zookeeper临时节点的功能,当flume agent启动时与zk建立连接创建一个临时节点,并周期的与zk进行心跳,当flume发生异常退出程序时,与zk的心跳失败,导致临时节点被zk删除,监听到这个事件之后就可以进行报警,达到监控的目的。
是不是很简单,但是如果只就这样去开发上线是有个bug的,当你正常退出flume进程时,zk上的临时节点也会被删除,同样会监听到此事件,也会报警。其实我们应该给每个flume agent在zk上创建一个临时节点的同时也应该创建一个永久性节点,这个永久性节点用来标识此agent的信息,当agent异常退出时,临时性节点会被删除但永久性节点不会被删除,这样当watcher监听到临时节点被删除,并且检测到永久性节点并没有被删除,那就可以断定agent是异常退出,需要报警,当agent正常退出时,会将永久性节点删除,同样临时节点也会被删除,这样watcher监听到临时节点被删除去检测永久性节点时,发现也被删除了,就可以断定此agent是正常退出,不需要报警。
此思路借鉴了HDFS HA机制,之所以利用zookeeper来进行监控是因为我们是将flume采集的log写入kafka,这样zookeeper已是我们的组件之一,重复利用现有的组件可以减少我们的维护量和开发量,并且zookeeper避免单点故障,也不用我们自己开发心跳模块和监听模块,开发量很小。
扯了那么多来点干货吧。
我没有直接在agent的启动流程中添加与zk的连接,感觉这样对flume的侵入性太大了,而是引入了watchdog,在watchdog中添加相关代码。
watchdog 是老版flume中的组件,用来监控agent进程,当agent异常退出之后进行重启。watchdog的具体代码可以参考github,我们在此基础上进行了修改。
在watchdog的startup中与zk建立连接,zk client用的是Curator 。
代码如下:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 
 | void startup() throws Exception {...
 
 CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_ADDRESS, new ExponentialBackoffRetry(1000, 3));
 client.start();
 
 client.create().withMode(CreateMode.PERSISTENT).forPath(PATH_P + "/test", new byte[0]);
 
 client.create().withMode(CreateMode.EPHEMERAL).forPath(PATH + "/test", new byte[0]);
 }
 
 | 
agent端的代码就算结束了,我们需要再起一个服务来监听zk上节点的信息。
监听服务我使用的是Curator的PathCache来监听一个目录的所有子节点的delete事件。此服务在与zk建立连接时注册一个watcher,代码是在PathCache的example的基础上进行了修改,代码如下:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 
 | public static void start() throws Exception{
 
 CuratorFramework client = null;
 PathChildrenCache cache = null;
 try
 {
 client = CuratorFrameworkFactory.newClient(ZK_ADDRESS, new ExponentialBackoffRetry(1000, 3));
 client.start();
 
 
 cache = new PathChildrenCache(client, PATH, true);
 cache.start();
 
 addListener(cache);
 }
 finally
 {
 CloseableUtils.closeQuietly(cache);
 CloseableUtils.closeQuietly(client);
 }
 }
 
 private static void addListener(PathChildrenCache cache)
 {
 
 PathChildrenCacheListener listener = new PathChildrenCacheListener()
 {
 @Override
 public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
 {
 switch ( event.getType() )
 {
 case CHILD_ADDED:
 {
 System.out.println("Node added: " + ZKPaths.getNodeFromPath(event.getData().getPath()));
 break;
 }
 
 case CHILD_UPDATED:
 {
 System.out.println("Node changed: " + ZKPaths.getNodeFromPath(event.getData().getPath()));
 break;
 }
 
 case CHILD_REMOVED:
 {
 System.out.println("判断永久节点是否存在,决定是否报警");
 System.out.println("Node removed: " + ZKPaths.getNodeFromPath(event.getData().getPath()));
 break;
 }
 }
 }
 };
 cache.getListenable().addListener(listener);
 }
 
 | 
这个服务比较简单,就一个类,打算随后将其集成到自动化部署平台中,上面只是伪代码,详细代码以后有时间再不全。