YARN采用了事件驱动机制,其核心服务实际上都是一个异步调度器,包括Resourcemanager、Nodemanager、MRAppMaster等。本篇以MRAppMaster为例,其内部包含一个异步调度器AsyncDispatcher,AsyncDispatcher在yarn中的主要作用是对发生的一系列事件找到各个事件对应的handle进行处理,从其功能上可以看出其内部应该有一个队列,队列主要用来存放等待调度的事件,还应该有一个事件与handle的映射表,用来处理各个事件。
通过查看代码首先可以发现AsyncDispatcher是一个服务,继承了AbstractService,其次是通过阻塞队列 存放事件,然后单独起一个线程从阻塞队列中消费事件,通过事先定义好的事件和处理器的映射表找到各自的处理器进行处理。
下面看下代码内容,首先看下属性:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 private final BlockingQueue<Event> eventQueue;private volatile boolean stopped = false ;private volatile boolean drainEventsOnStop = false ;private volatile boolean drained = true ;private Object waitForDrained = new Object();private volatile boolean blockNewEvents = false ;private EventHandler handlerInstance = null ;private Thread eventHandlingThread;protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;private boolean exitOnDispatchException;
类在使用之前需要进行实例化,一般都是通过构造函数进行实例化,下面就看下该类的构造函数:
1 2 3 4 5 6 7 8 9 public AsyncDispatcher () { this (new LinkedBlockingQueue<Event>()); } public AsyncDispatcher (BlockingQueue<Event> eventQueue) { super ("Dispatcher" ); this .eventQueue = eventQueue; this .eventDispatchers = new HashMap<Class<? extends Enum>, EventHandler>(); }
该类有两个构造函数,一个是默认的构造函数,一个是带一个BlockingQueue参数的构造函数,而默认的构造函数在其内部又调用了带BlockingQueue参数的构造函数。
构造函数中对其关键属性eventQueue
和eventDispatchers
进行了赋值,其中如果eventQueue不指定的话就实例化一个LinkedBlockingQueue对象。
AsyncDispatcher是在Resourcemanager中被当做一个服务而启动的,看下代码:
1 2 3 4 5 6 protected Dispatcher createDispatcher () { return new AsyncDispatcher(); } rmDispatcher = setupDispatcher(); addIfService(rmDispatcher);
将AsyncDispatcher实例赋值给rmDispatcher,然后将rmDispatcher作为一个服务启动。服务启动时都会先进行init然后start,init和start代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 protected void serviceInit (Configuration conf) throws Exception { this .exitOnDispatchException = conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR); super .serviceInit(conf); } protected void serviceStart () throws Exception { super .serviceStart(); eventHandlingThread = new Thread(createThread()); eventHandlingThread.setName("AsyncDispatcher event handler" ); eventHandlingThread.start(); }
服务启动时,在serviceStart方法中起一个AsyncDispatcher event handler 线程,从eventQueue 中取出event进行调度。 看下eventHandlingThread线程的run方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 Runnable createThread () { return new Runnable() { @Override public void run () { while (!stopped && !Thread.currentThread().isInterrupted()) { drained = eventQueue.isEmpty(); if (blockNewEvents) { synchronized (waitForDrained) { if (drained) { waitForDrained.notify(); } } } Event event; try { event = eventQueue.take(); } catch (InterruptedException ie) { if (!stopped) { LOG.warn("AsyncDispatcher thread interrupted" , ie); } return ; } if (event != null ) { dispatch(event); } } } }; }
在run中,首先判断该服务是否处于正在stop过程中,如果正在stop并且drainEventsOnStop
为true,则进入if (blockNewEvents)
语句中(blockNewEvents在serviceStop中,当drainEventsOnStop为true时,blockNewEvents被赋值为true ),通过drained
判断eventQueue中是否还有剩余的event,如果没有剩余event则通知在serviceStop
中阻塞的线程继续进行stop操作。如果有剩余则和正常逻辑(正常逻辑指在没有进行stop操作时的逻辑)一样,从eventQueue中取出一个event,通过dispatch
进行调度。dispatch代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 protected void dispatch (Event event) { ... Class<? extends Enum> type = event.getType().getDeclaringClass(); try { EventHandler handler = eventDispatchers.get(type); if (handler != null ) { handler.handle(event); } else { throw new Exception("No handler for registered for " + type); } } catch (Throwable t) { LOG.fatal("Error in dispatcher thread" , t); if (exitOnDispatchException && (ShutdownHookManager.get().isShutdownInProgress()) == false && stopped == false ) { LOG.info("Exiting, bbye.." ); System.exit(-1 ); } } }
这里涉及到一个属性eventDispatchers
,这个map中存放的是事件类型和对应的事件处理器,通过key事件类型得到事件处理器之后,用该事件处理器的handle
方法进行处理。
这里只是从eventDispatchers
中取出key对应的value,那么eventDispatchers
中的key和value是怎么put进去的呢?在ResourceManager.java
中不难发现eventDispatchers中的值是通过register
方法注册进去的,下面看下代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public void register (Class<? extends Enum> eventType, EventHandler handler) { EventHandler<Event> registeredHandler = (EventHandler<Event>) eventDispatchers.get(eventType); LOG.info("Registering " + eventType + " for " + handler.getClass()); if (registeredHandler == null ) { eventDispatchers.put(eventType, handler); } else if (!(registeredHandler instanceof MultiListenerHandler)){ MultiListenerHandler multiHandler = new MultiListenerHandler(); multiHandler.addHandler(registeredHandler); multiHandler.addHandler(handler); eventDispatchers.put(eventType, multiHandler); } else { MultiListenerHandler multiHandler = (MultiListenerHandler) registeredHandler; multiHandler.addHandler(handler); } }
从register方法中可以得出可以对同一个事件类型注册多个事件处理器,多个事件处理器用MultiListenerHandler
存储。
对某一个事件注册事件处理器时,先判断eventDispatchers是否存在该事件的处理器,没有则直接注册,如果存在并且不是MultiListenerHandler
类型,则构建一个MultiListenerHandler
的实例,将已经存在的处理器和新添加的处理器都添加到MultiListenerHandler
中,如果存在并且是MultiListenerHandler
类型,则直接向其追加处理器handler。下面看下MultiListenerHandler
是个什么鬼
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 static class MultiListenerHandler implements EventHandler <Event > { List<EventHandler<Event>> listofHandlers; public MultiListenerHandler () { listofHandlers = new ArrayList<EventHandler<Event>>(); } @Override public void handle (Event event) { for (EventHandler<Event> handler: listofHandlers) { handler.handle(event); } } void addHandler (EventHandler<Event> handler) { listofHandlers.add(handler); } }
MultiListenerHandler是AsyncDispatcher的一个内部静态类,有一个list属性listofHandlers
来存放多个事件处理器。该类也实现了EventHandler
接口,通过调用handle
方法,将事件依次通过列表中的事件处理器handler来处理。
细心的朋友可能已经在想eventQueue中的event是在哪put进去的呢?
在AsyncDispatcher中查看eventQueue的put方法都在哪里调用了。其实在AsyncDispatcher中只有一处调用了eventQueue.put
方法,那就是在GenericEventHandler
类的handle中。GenericEventHandler是AsyncDispatcher的一个内部类,在AsyncDispatcher.getEventHandler()
方法中实例化,代码如下:
1 2 3 4 5 6 public EventHandler getEventHandler () { if (handlerInstance == null ) { handlerInstance = new GenericEventHandler(); } return handlerInstance; }
可以看出一个调度器里实例化一个GenericEventHandler对象,下次获取就可以直接返回handlerInstance了。
向eventQueue中put数据时,先通过AsyncDispatcher.getEventHandler()
方法,得到GenericEventHandler实例handlerInstance,然后调用GenericEventHandler.handle(Event)
方法put进去的,下面看下GenericEventHandler.handle方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public void handle (Event event) { if (blockNewEvents) { return ; } drained = false ; int qSize = eventQueue.size(); if (qSize !=0 && qSize %1000 == 0 ) { LOG.info("Size of event-queue is " + qSize); } int remCapacity = eventQueue.remainingCapacity(); if (remCapacity < 1000 ) { LOG.warn("Very low remaining capacity in the event-queue: " + remCapacity); } try { eventQueue.put(event); } catch (InterruptedException e) { if (!stopped) { LOG.warn("AsyncDispatcher thread interrupted" , e); } throw new YarnRuntimeException(e); } }
向eventQueue中put数据时,先统计下当前eventQueue是的使用情况,如果剩余空间小于1000,则打印一条warn日志。eventQueue是LinkedBlockingQueue类型的对象,调用LinkedBlockingQueue默认构造函数在实例化时会将Integer.MAX_VALUE作为空间大小。
至此AsyncDispatcher类从start、put、take都已介绍完,最后介绍下AsyncDispatcher的stop。看下serviceStop
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 protected void serviceStop () throws Exception { if (drainEventsOnStop) { blockNewEvents = true ; LOG.info("AsyncDispatcher is draining to stop, igonring any new events." ); synchronized (waitForDrained) { while (!drained && eventHandlingThread.isAlive()) { waitForDrained.wait(1000 ); LOG.info("Waiting for AsyncDispatcher to drain." ); } } } stopped = true ; if (eventHandlingThread != null ) { eventHandlingThread.interrupt(); try { eventHandlingThread.join(); } catch (InterruptedException ie) { LOG.warn("Interrupted Exception while stopping" , ie); } } super .serviceStop(); }
调度器stop时,主要是校验下drainEventsOnStop的值,为false则直接stop掉调度器,为true则阻塞新eventput进eventQueue,等待eventQueue中的event被处理完再stop。
事件驱动设计思想的引入,使得YARN具有低耦合,高内聚的特点,各个模块只需完成各自的功能,而模块之间则采用事件联系起来,系统设计简单且维护方便。