YARN采用了事件驱动机制,其核心服务实际上都是一个异步调度器,包括Resourcemanager、Nodemanager、MRAppMaster等。本篇以MRAppMaster为例,其内部包含一个异步调度器AsyncDispatcher,AsyncDispatcher在yarn中的主要作用是对发生的一系列事件找到各个事件对应的handle进行处理,从其功能上可以看出其内部应该有一个队列,队列主要用来存放等待调度的事件,还应该有一个事件与handle的映射表,用来处理各个事件。
通过查看代码首先可以发现AsyncDispatcher是一个服务,继承了AbstractService,其次是通过阻塞队列 存放事件,然后单独起一个线程从阻塞队列中消费事件,通过事先定义好的事件和处理器的映射表找到各自的处理器进行处理。
下面看下代码内容,首先看下属性:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 private  final  BlockingQueue<Event> eventQueue;private  volatile  boolean  stopped = false ;private  volatile  boolean  drainEventsOnStop = false ;private  volatile  boolean  drained = true ;private  Object waitForDrained = new  Object();private  volatile  boolean  blockNewEvents = false ;private  EventHandler handlerInstance = null ;private  Thread eventHandlingThread;protected  final  Map<Class<? extends Enum>, EventHandler> eventDispatchers;private  boolean  exitOnDispatchException;
类在使用之前需要进行实例化,一般都是通过构造函数进行实例化,下面就看下该类的构造函数:
1 2 3 4 5 6 7 8 9 public  AsyncDispatcher ()    this (new  LinkedBlockingQueue<Event>()); } public  AsyncDispatcher (BlockingQueue<Event> eventQueue)    super ("Dispatcher" );   this .eventQueue = eventQueue;   this .eventDispatchers = new  HashMap<Class<? extends Enum>, EventHandler>(); } 
该类有两个构造函数,一个是默认的构造函数,一个是带一个BlockingQueue参数的构造函数,而默认的构造函数在其内部又调用了带BlockingQueue参数的构造函数。
构造函数中对其关键属性eventQueue和eventDispatchers进行了赋值,其中如果eventQueue不指定的话就实例化一个LinkedBlockingQueue对象。
AsyncDispatcher是在Resourcemanager中被当做一个服务而启动的,看下代码:
1 2 3 4 5 6 protected  Dispatcher createDispatcher ()    return  new  AsyncDispatcher(); } rmDispatcher = setupDispatcher(); addIfService(rmDispatcher); 
将AsyncDispatcher实例赋值给rmDispatcher,然后将rmDispatcher作为一个服务启动。服务启动时都会先进行init然后start,init和start代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 protected  void  serviceInit (Configuration conf)  throws  Exception   this .exitOnDispatchException =       conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,         Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);   super .serviceInit(conf); } protected  void  serviceStart ()  throws  Exception      super .serviceStart();   eventHandlingThread = new  Thread(createThread());   eventHandlingThread.setName("AsyncDispatcher event handler" );   eventHandlingThread.start(); } 
服务启动时,在serviceStart方法中起一个AsyncDispatcher event handler 线程,从eventQueue 中取出event进行调度。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 Runnable createThread ()   {  return  new  Runnable() {     @Override      public  void  run ()         while  (!stopped && !Thread.currentThread().isInterrupted()) {       	         drained = eventQueue.isEmpty();                                    if  (blockNewEvents) {           synchronized  (waitForDrained) {             if  (drained) {               waitForDrained.notify();             }           }         }         Event event;         try  {                      event = eventQueue.take();         } catch (InterruptedException ie) {           if  (!stopped) {             LOG.warn("AsyncDispatcher thread interrupted" , ie);           }           return ;         }                  if  (event != null ) {           dispatch(event);         }       }     }   }; } 
在run中,首先判断该服务是否处于正在stop过程中,如果正在stop并且drainEventsOnStop为true,则进入if (blockNewEvents)语句中(blockNewEvents在serviceStop中,当drainEventsOnStop为true时,blockNewEvents被赋值为true ),通过drained判断eventQueue中是否还有剩余的event,如果没有剩余event则通知在serviceStop中阻塞的线程继续进行stop操作。如果有剩余则和正常逻辑(正常逻辑指在没有进行stop操作时的逻辑)一样,从eventQueue中取出一个event,通过dispatch进行调度。dispatch代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 protected  void  dispatch (Event event)       ...      Class<? extends Enum> type = event.getType().getDeclaringClass();   try {   	     EventHandler handler = eventDispatchers.get(type);     if (handler != null ) {              handler.handle(event);     } else  {       throw  new  Exception("No handler for registered for "  + type);     }   } catch  (Throwable t) {          LOG.fatal("Error in dispatcher thread" , t);          if  (exitOnDispatchException         && (ShutdownHookManager.get().isShutdownInProgress()) == false          && stopped == false ) {       LOG.info("Exiting, bbye.." );       System.exit(-1 );     }   } } 
这里涉及到一个属性eventDispatchers,这个map中存放的是事件类型和对应的事件处理器,通过key事件类型得到事件处理器之后,用该事件处理器的handle方法进行处理。
这里只是从eventDispatchers中取出key对应的value,那么eventDispatchers中的key和value是怎么put进去的呢?在ResourceManager.java中不难发现eventDispatchers中的值是通过register方法注册进去的,下面看下代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public  void  register (Class<? extends Enum> eventType,     EventHandler handler)          EventHandler<Event> registeredHandler = (EventHandler<Event>)   	eventDispatchers.get(eventType);   LOG.info("Registering "  + eventType + " for "  + handler.getClass());      if  (registeredHandler == null ) {     eventDispatchers.put(eventType, handler);   } else  if  (!(registeredHandler instanceof  MultiListenerHandler)){                         MultiListenerHandler multiHandler = new  MultiListenerHandler();     multiHandler.addHandler(registeredHandler);     multiHandler.addHandler(handler);     eventDispatchers.put(eventType, multiHandler);   } else  {                    MultiListenerHandler multiHandler     = (MultiListenerHandler) registeredHandler;     multiHandler.addHandler(handler);   } } 
从register方法中可以得出可以对同一个事件类型注册多个事件处理器,多个事件处理器用MultiListenerHandler存储。
对某一个事件注册事件处理器时,先判断eventDispatchers是否存在该事件的处理器,没有则直接注册,如果存在并且不是MultiListenerHandler类型,则构建一个MultiListenerHandler的实例,将已经存在的处理器和新添加的处理器都添加到MultiListenerHandler中,如果存在并且是MultiListenerHandler类型,则直接向其追加处理器handler。下面看下MultiListenerHandler是个什么鬼
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 static  class  MultiListenerHandler  implements  EventHandler <Event >   List<EventHandler<Event>> listofHandlers;   public  MultiListenerHandler ()       listofHandlers = new  ArrayList<EventHandler<Event>>();   }   @Override    public  void  handle (Event event)       for  (EventHandler<Event> handler: listofHandlers) {       handler.handle(event);     }   }   void  addHandler (EventHandler<Event> handler)       listofHandlers.add(handler);   } } 
MultiListenerHandler是AsyncDispatcher的一个内部静态类,有一个list属性listofHandlers来存放多个事件处理器。该类也实现了EventHandler接口,通过调用handle方法,将事件依次通过列表中的事件处理器handler来处理。
细心的朋友可能已经在想eventQueue中的event是在哪put进去的呢?
在AsyncDispatcher中查看eventQueue的put方法都在哪里调用了。其实在AsyncDispatcher中只有一处调用了eventQueue.put方法,那就是在GenericEventHandler类的handle中。GenericEventHandler是AsyncDispatcher的一个内部类,在AsyncDispatcher.getEventHandler()方法中实例化,代码如下:
1 2 3 4 5 6 public  EventHandler getEventHandler ()    if  (handlerInstance == null ) {     handlerInstance = new  GenericEventHandler();   }   return  handlerInstance; } 
可以看出一个调度器里实例化一个GenericEventHandler对象,下次获取就可以直接返回handlerInstance了。
向eventQueue中put数据时,先通过AsyncDispatcher.getEventHandler()方法,得到GenericEventHandler实例handlerInstance,然后调用GenericEventHandler.handle(Event)方法put进去的,下面看下GenericEventHandler.handle方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public  void  handle (Event event)          if  (blockNewEvents) {     return ;   }   drained = false ;      int  qSize = eventQueue.size();      if  (qSize !=0  && qSize %1000  == 0 ) {     LOG.info("Size of event-queue is "  + qSize);   }      int  remCapacity = eventQueue.remainingCapacity();      if  (remCapacity < 1000 ) {     LOG.warn("Very low remaining capacity in the event-queue: "          + remCapacity);   }   try  {   	     eventQueue.put(event);   } catch  (InterruptedException e) {     if  (!stopped) {       LOG.warn("AsyncDispatcher thread interrupted" , e);     }     throw  new  YarnRuntimeException(e);   } } 
向eventQueue中put数据时,先统计下当前eventQueue是的使用情况,如果剩余空间小于1000,则打印一条warn日志。eventQueue是LinkedBlockingQueue类型的对象,调用LinkedBlockingQueue默认构造函数在实例化时会将Integer.MAX_VALUE作为空间大小。 
至此AsyncDispatcher类从start、put、take都已介绍完,最后介绍下AsyncDispatcher的stop。看下serviceStop方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 protected  void  serviceStop ()  throws  Exception         if  (drainEventsOnStop) {   	     blockNewEvents = true ;     LOG.info("AsyncDispatcher is draining to stop, igonring any new events." );          synchronized  (waitForDrained) {       while  (!drained && eventHandlingThread.isAlive()) {         waitForDrained.wait(1000 );         LOG.info("Waiting for AsyncDispatcher to drain." );       }     }   }   stopped = true ;   if  (eventHandlingThread != null ) {     eventHandlingThread.interrupt();     try  {       eventHandlingThread.join();     } catch  (InterruptedException ie) {       LOG.warn("Interrupted Exception while stopping" , ie);     }   }      super .serviceStop(); } 
调度器stop时,主要是校验下drainEventsOnStop的值,为false则直接stop掉调度器,为true则阻塞新eventput进eventQueue,等待eventQueue中的event被处理完再stop。
事件驱动设计思想的引入,使得YARN具有低耦合,高内聚的特点,各个模块只需完成各自的功能,而模块之间则采用事件联系起来,系统设计简单且维护方便。