利用flume进行日志采集已在业界广泛使用,如果需要采集的log比较多,并且分布在多个服务器上,那么对flume agent的管理就是一个头疼的事情。
本篇主要介绍下利用zookeeper对flume进程进行监控,当flume异常退出时进行报警,以至于我们能够及时的发现故障解决故障。
思路很简单,其实开发起来也很简单,嘿嘿。。
主要是利用了zookeeper临时节点的功能,当flume agent启动时与zk建立连接创建一个临时节点,并周期的与zk进行心跳,当flume发生异常退出程序时,与zk的心跳失败,导致临时节点被zk删除,监听到这个事件之后就可以进行报警,达到监控的目的。
是不是很简单,但是如果只就这样去开发上线是有个bug的,当你正常退出flume进程时,zk上的临时节点也会被删除,同样会监听到此事件,也会报警。其实我们应该给每个flume agent在zk上创建一个临时节点的同时也应该创建一个永久性节点,这个永久性节点用来标识此agent的信息,当agent异常退出时,临时性节点会被删除但永久性节点不会被删除,这样当watcher监听到临时节点被删除,并且检测到永久性节点并没有被删除,那就可以断定agent是异常退出,需要报警,当agent正常退出时,会将永久性节点删除,同样临时节点也会被删除,这样watcher监听到临时节点被删除去检测永久性节点时,发现也被删除了,就可以断定此agent是正常退出,不需要报警。
此思路借鉴了HDFS HA机制,之所以利用zookeeper来进行监控是因为我们是将flume采集的log写入kafka,这样zookeeper已是我们的组件之一,重复利用现有的组件可以减少我们的维护量和开发量,并且zookeeper避免单点故障,也不用我们自己开发心跳模块和监听模块,开发量很小。
扯了那么多来点干货吧。
我没有直接在agent的启动流程中添加与zk的连接,感觉这样对flume的侵入性太大了,而是引入了watchdog,在watchdog中添加相关代码。
watchdog 是老版flume中的组件,用来监控agent进程,当agent异常退出之后进行重启。watchdog的具体代码可以参考github,我们在此基础上进行了修改。
在watchdog的startup
中与zk建立连接,zk client用的是Curator 。
代码如下:
1 2 3 4 5 6 7 8 9 10
| void startup() throws Exception { ... CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_ADDRESS, new ExponentialBackoffRetry(1000, 3)); client.start(); client.create().withMode(CreateMode.PERSISTENT).forPath(PATH_P + "/test", new byte[0]); client.create().withMode(CreateMode.EPHEMERAL).forPath(PATH + "/test", new byte[0]); }
|
agent端的代码就算结束了,我们需要再起一个服务来监听zk上节点的信息。
监听服务我使用的是Curator的PathCache
来监听一个目录的所有子节点的delete事件。此服务在与zk建立连接时注册一个watcher,代码是在PathCache的example的基础上进行了修改,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| public static void start() throws Exception {
CuratorFramework client = null; PathChildrenCache cache = null; try { client = CuratorFrameworkFactory.newClient(ZK_ADDRESS, new ExponentialBackoffRetry(1000, 3)); client.start(); cache = new PathChildrenCache(client, PATH, true); cache.start(); addListener(cache); } finally { CloseableUtils.closeQuietly(cache); CloseableUtils.closeQuietly(client); } }
private static void addListener(PathChildrenCache cache) { PathChildrenCacheListener listener = new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { switch ( event.getType() ) { case CHILD_ADDED: { System.out.println("Node added: " + ZKPaths.getNodeFromPath(event.getData().getPath())); break; }
case CHILD_UPDATED: { System.out.println("Node changed: " + ZKPaths.getNodeFromPath(event.getData().getPath())); break; }
case CHILD_REMOVED: { System.out.println("判断永久节点是否存在,决定是否报警"); System.out.println("Node removed: " + ZKPaths.getNodeFromPath(event.getData().getPath())); break; } } } }; cache.getListenable().addListener(listener); }
|
这个服务比较简单,就一个类,打算随后将其集成到自动化部署平台中,上面只是伪代码,详细代码以后有时间再不全。