HDFS是Hadoop的基石,对其的使用情况进行监控显的尤为重要。那么怎么来衡量一个用户对hdfs的使用情况呢,用什么来衡量呢?众所周知HDFS是由NameNode和DataNode组成,而NameNode又是HDFS的核心,NameNode中最重要的就是记录整个集群状态的元数据。了解集群的使用情况也就是目前的状态,这些状态记录在元数据中,而元数据中是通过记录各个用户对HDFS的操作来描绘集群的状态,那么是不是可以通过统计各个用户在某个时间段对HDFS的操作数来衡量他们使用HDFS的资源。答案是肯定是,而且此功能已经merge到2.7版本中,下面就来了解下这个patch

架构

nntop的架构图如下:
nntop架构图
nntop是在NN中的audit loggers中进行拦截,audit log从NN中接收所有user的操作。在audit log中进行拦截能够很好的扩展到其余的NN中。
HDFS默认的审计日志是DefaultAuditLogger,nntop是在HDFS中添加了一个新的审计日志类TopAuditLogger,由TopAuditLogger解析审计事件并传送给TopMetrics。TopMetrics中记录每个操作的top user list,TopMetrics也向Hadoop metrics system注册此metrics,实现了getMetrics方法,以便得到当前时间段的top user list。
TopMetrics能得到多个时间区间的top users,默认是1m,5m和25m,这是为了弥补没有画图接口。

细节设计

此功能的主要就是统计每个user的操作数,但是怎么统计呢?这是个问题。

提交patch的这位大神采用的滑动窗口的思想,并把当前窗口分为n个bucket(默认是10),分别在每个bucket中进行计数,然后对其窗口内的bucket进行求和得到总和。每个bucket有一个时间标识updatetime,当updatetime与now的差值超过了窗口的长度,则将此bucket置0,更新updatetime,窗口也就右移了一个bucket。

这样设计很巧妙没毛病,但是HDFS中的操作类型和用户都很多,统计应该怎样逐级统计呢?又为了弥补此功能没有友好的画图接口,此patch支持对多个时间间隔进行统计,则大神将统计的流程设计为树形结构,统计的入口是TopMetrics,然后TopMetrics会根据设置的需要统计的时间区间个数初始化对应的RollingWindowManager(一个区间一个RollingWindowManager),RollingWindowManager中又有一个metricsMap,用来记录metrics与RollingWindowMap的映射,RollingWindowMap中又是user和RollingWindow的映射,RollingWindow就是所谓的滑动窗口,里面由n个bucket进行统计。树形图如下:
统计流程

代码跟读

从上图的nntop架构图中可以看出是在写入auditlog之前进行了拦截,也就是在FSNamesystem.logAuditEvent()中进行判断的,logAuditEvent是将该操作在注册的各个auditsLoggers中都记录一边,auditsLoggers是通过initAuditLoggers向nn注册的,看下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private List<AuditLogger> initAuditLoggers(Configuration conf) {
// Initialize the custom access loggers if configured.
Collection<String> alClasses = conf.getStringCollection(DFS_NAMENODE_AUDIT_LOGGERS_KEY);
List<AuditLogger> auditLoggers = Lists.newArrayList();
...
// Make sure there is at least one logger installed.
if (auditLoggers.isEmpty()) {
auditLoggers.add(new DefaultAuditLogger());
}
// 开启nntop功能时,向auditsLoggers里注册TopAuditLogger
// 开启的开关由dfs.namenode.top.enabled控制
// Add audit logger to calculate top users
if (topConf.isEnabled) {
topMetrics = new TopMetrics(conf, topConf.nntopReportingPeriodsMs);
auditLoggers.add(new TopAuditLogger(topMetrics));
}

return Collections.unmodifiableList(auditLoggers);
}

由代码可以看出DefaultAuditLoggerTopAuditLogger都向auditsLoggers中注册,则这两个auditLogger的功能应该类似,DefaultAuditLogger只是将操作相关的信息记录在audit-log中,而TopAuditLogger是在记录之前进行了一次metrics的report,此时的metrics是TopMetrics的对象,在构造TopAuditLogger对象时被传入。

TopAuditLogger初始化并向auditsLoggers中注册之后,当有hdfs操作时,会调用FSNamesystem.logAuditEvent()方法对操作进行记录,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void logAuditEvent(boolean succeeded,
UserGroupInformation ugi, InetAddress addr, String cmd, String src,
String dst, HdfsFileStatus stat) {
FileStatus status = null;
...
for (AuditLogger logger : auditLoggers) {
// DefaultAuditLogger extends HdfsAuditLogger
// HdfsAuditLogger implements AuditLogger
if (logger instanceof HdfsAuditLogger) {
HdfsAuditLogger hdfsLogger = (HdfsAuditLogger) logger;
hdfsLogger.logAuditEvent(succeeded, ugi.toString(), addr, cmd, src, dst,
status, ugi, dtSecretManager);
} else {
// TopAuditLogger implements AuditLogger
// 则此处调用TopAuditLogger的logAuditEvent方法
logger.logAuditEvent(succeeded, ugi.toString(), addr,
cmd, src, dst, status);
}
}
}

FSNamesystem.logAuditEvent()中会对auditsLoggers进行遍历,将操作记录在所有的auditsLogger中,通过调用auditsLogger的logAuditEvent方法,看下TopAuditLogger.logAuditEvent方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public void logAuditEvent(boolean succeeded, String userName,
InetAddress addr, String cmd, String src, String dst, FileStatus status) {
// 核心代码
try {
topMetrics.report(succeeded, userName, addr, cmd, src, dst, status);
} catch (Throwable t) {
LOG.error("An error occurred while reflecting the event in top service, "
+ "event: (cmd={},userName={})", cmd, userName);
}
// 与DefaultAuditLogger中的logAuditEvent类似
if (LOG.isDebugEnabled()) {
final StringBuilder sb = new StringBuilder();
sb.append("allowed=").append(succeeded).append("\t");
sb.append("ugi=").append(userName).append("\t");
sb.append("ip=").append(addr).append("\t");
sb.append("cmd=").append(cmd).append("\t");
sb.append("src=").append(src).append("\t");
sb.append("dst=").append(dst).append("\t");
if (null == status) {
sb.append("perm=null");
} else {
sb.append("perm=");
sb.append(status.getOwner()).append(":");
sb.append(status.getGroup()).append(":");
sb.append(status.getPermission());
}
LOG.debug("------------------- logged event for top service: " + sb);
}
}

logAuditEvent在写入audit-log之前先向metrics系统report,report代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
public void report(long currTime, String userName, String cmd) {
LOG.debug("a metric is reported: cmd: {} user: {}", cmd, userName);
userName = UserGroupInformation.trimLoginMethod(userName);
// 一个时间区间一个rollingWindowManager
// 遍历所有的manager,让所有区间中的metrics加1
for (RollingWindowManager rollingWindowManager : rollingWindowManagers
.values()) {
rollingWindowManager.recordMetric(currTime, cmd, userName, 1);
rollingWindowManager.recordMetric(currTime,
TopConf.ALL_CMDS, userName, 1);
}
}

其中有一个重要的属性需要说一下rollingWindowManagers,rollingWindowManagers是一个key为Integer,value为RollingWindowManager的map,map的长度由dfs.namenode.top.windows.minutes属性的元素个数控制,key的值就是该属性的元素,该属性的值是一个数组,默认是{"1","5","25"}。此属性代表这我要统计三个时间区间的值,三个区间分别为1分钟、5分钟和25分钟。这样做是为了观察一个趋势。

每个时间区间对应一个RollingWindowManager,而rollingWindowManagers是存放这些manager的map,各个区间具体的metrics统计由各自的manager控制。manager通过调用recordMetric对metrics进行更新,代码如下:

1
2
3
4
public void recordMetric(long time, String command, String user, long delta) {
RollingWindow window = getRollingWindow(command, user);
window.incAt(time, delta);
}

RollingWindow是通过getRollingWindow得到的,从上面统计流程的分析中得知RollingWindow位于统计中的最底层,由RollingWindow进行滑动窗口进行统计,下面看下getRollingWindow代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private RollingWindow getRollingWindow(String metric, String user) {
// ConcurrentHashMap<String, RollingWindowMap> metricMap
RollingWindowMap rwMap = metricMap.get(metric);
if (rwMap == null) {
rwMap = new RollingWindowMap();
RollingWindowMap prevRwMap = metricMap.putIfAbsent(metric, rwMap);
if (prevRwMap != null) {
rwMap = prevRwMap;
}
}
RollingWindow window = rwMap.get(user);
if (window != null) {
return window;
}
window = new RollingWindow(windowLenMs, bucketsPerWindow);
RollingWindow prevWindow = rwMap.putIfAbsent(user, window);
if (prevWindow != null) {
window = prevWindow;
}
return window;
}

由此代码可以看出RollingWindow、RollingWindowMap、metricsMap和RollingWindowManager的关系,metricsMap是RollingWindowManager的一个属性,metricsMap是ConcurrentHashMap(线程安全)类型,key是操作名,value是RollingWindowMap,而RollingWindowMap也是ConcurrentHashMap类型的map,key是user name,value是RollingWindow。

recordMetric时通过command和user定位到RollingWindow,然后调用RollingWindow.incAt进行累加,代码如下:

1
2
3
4
5
6
7
8
9
10
public void incAt(long time, long delta) {
int bi = computeBucketIndex(time);
Bucket bucket = buckets[bi];
// If the last time the bucket was updated is out of the scope of the
// rolling window, reset the bucket.
if (bucket.isStaleNow(time)) {
bucket.safeReset(time);
}
bucket.inc(delta);
}

RollingWindow进行metrics累加时,通过当前time定位到某个bucket中,让bucket进行累加。bucket进行累加时先判断当前bucket是否过时,也就是说当前bucket是否还在以当前time为结尾的时间窗口中,不在则进行重置。

定位bucket的代码是在computeBucketIndex中,

1
2
3
4
5
6
private int computeBucketIndex(long time) {
// time%windwoLenMs 得到time所在window中的偏移量
int positionOnWindow = (int) (time % windowLenMs);
int bucketIndex = positionOnWindow * buckets.length / windowLenMs;
return bucketIndex;
}

计算time所在的bucket时,先计算time所在window的偏移量positionOnWindow*(time%windowLenMs),然后计算每个bucket的时间长度time_length(windowLenMs/buckets.length)*,然后让positionOnWindow/time_length就得到了bucketIndex,也就是上面的代码bucketIndex = positionOnWindow * buckets.length / windowLenMs

然后调用bucket.isStaleNow判断是否过时,过时则重置。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
boolean isStaleNow(long time) {
long utime = updateTime.get();
return time - utime >= windowLenMs;
}

void safeReset(long time) {
// At any point in time, only one thread is allowed to reset the
// bucket
synchronized (this) {
if (isStaleNow(time)) {
// reset the value before setting the time, it allows other
// threads to safely assume that the value is updated if the
// time is not stale
value.set(0);
updateTime.set(time);
}
// else a concurrent thread has already reset it: do nothing
}
}

Bucket是RollingWindow的一个内部类,有两个属性value和updateTime,都是AtomicLong类型的,这里需要注意的是updateTime并不是在value每次递增的时候改变,updateTime只改变一次,其实相当于bucket开始计数的startTime,当次startTime与当前时间time的差值大于windowLenMs时,就认为bucket过时了,需要重置。

get统计数据

通过jmx获得数据的入口函数是getTopUserOpCounts,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public String getTopUserOpCounts() {
...
Date now = new Date();
// get 统计结果
// private TopMetrics topMetrics;
final List<RollingWindowManager.TopWindow> topWindows =
topMetrics.getTopWindows();
Map<String, Object> topMap = new TreeMap<String, Object>();
topMap.put("windows", topWindows);
topMap.put("timestamp", DFSUtil.dateToIso8601String(now));
ObjectMapper mapper = new ObjectMapper();
try {
return mapper.writeValueAsString(topMap);
} catch (IOException e) {
LOG.warn("Failed to fetch TopUser metrics", e);
}
return null;
}

通过TopMetrics.getTopWindows得到

1
2
3
4
5
6
7
8
9
10
11
public List<TopWindow> getTopWindows() {
long monoTime = Time.monotonicNow();
List<TopWindow> windows = Lists.newArrayListWithCapacity
(rollingWindowManagers.size());
for (Entry<Integer, RollingWindowManager> entry : rollingWindowManagers
.entrySet()) {
TopWindow window = entry.getValue().snapshot(monoTime);
windows.add(window);
}
return windows;
}

getTopWindows只是遍历rollingWindowManagers得到多个时间段的统计结果,而真正取得topuser的是各个时间区间对应的RollingWindowManager.snapshot。

在跟读snapshot代码之前,先跟读下RollingWindowManager类,此类中有一些内部类,

首先看下TopWindowOpUser,这三者的关系是TopWindow中有个属性top,类型List<Op>Op有一个属性topUsers,类型是List<User>。TopWindow是当前window中各个metrics的一个快照,各个metrics以Op为对象存放在top的list中,list中一个Op就是一个metrics,Op中存放的是操作此metrics的所有user及其操作总次数,Op中的各个user信息是存放在User对象中,User对象中记录着当前user对Op的操作次数。

还有两个内部类TopNNameValuePair,TopN是一个优先级队列(PriorityQueue,关于优先级队列会在随后的blog中进行分析),存放的元素是NameValuePair对象,NameValuePair实现了Comparable接口,重写compareTo方法,实现NameValuePair对象以value的大小作为比较的依据*(NameValuePair中的name是user,value是user对Op的操作次数)*。NameValuePair实现了自己定义的compareTo方法之后在TopN的优先级队列中就可以依靠value的值进行排序,则TopN的对象就是操作过Op的所有user的topN排序的结果。

RollingWindowManager中的内部类的关系屡清楚之后就来看下snapshot代码,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public TopWindow snapshot(long time) {
TopWindow window = new TopWindow(windowLenMs);
...
// metricMap 存放的是 metrics -> RollingWindowMap
for (Map.Entry<String, RollingWindowMap> entry : metricMap.entrySet()) {
String metricName = entry.getKey();
// RollingWindowMap 存放的是 username -> RollingWindow
RollingWindowMap rollingWindows = entry.getValue();
// 通过metricName和time得到当前window中user的topN
// 顺序是从小到大,升序
TopN topN = getTopUsersForMetric(time, metricName, rollingWindows);
final int size = topN.size();
if (size == 0) {
continue;
}
Op op = new Op(metricName, topN.getTotal());
window.addOp(op);
// Reverse the users from the TopUsers using a stack,
// since we'd like them sorted in descending rather than ascending order
// 栈,利用栈先进后出的特点,将topN中的升序变为降序
Stack<NameValuePair> reverse = new Stack<NameValuePair>();
for (int i = 0; i < size; i++) {
reverse.push(topN.poll());
}
for (int i = 0; i < size; i++) {
NameValuePair userEntry = reverse.pop();
// topN中的NameValuePair以User的形态存放到Op的topUsers list中
User user = new User(userEntry.name, Long.valueOf(userEntry.value));
op.addUser(user);
}
}
return window;
}

snapshot将当前状态存放在TopWindow中返回,每个Op的topUsers是通过方法getTopUsersForMetric得到的,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private TopN getTopUsersForMetric(long time, String metricName, 
RollingWindowMap rollingWindows) {
TopN topN = new TopN(topUsersCnt);
Iterator<Map.Entry<String, RollingWindow>> iterator =
rollingWindows.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, RollingWindow> entry = iterator.next();
String userName = entry.getKey();
RollingWindow aWindow = entry.getValue();
// getSum 得到当前window中各个bucket计数的总和
long windowSum = aWindow.getSum(time);
// do the gc here
if (windowSum == 0) {
LOG.debug("gc window of metric: {} userName: {}",
metricName, userName);
iterator.remove();
continue;
}
LOG.debug("offer window of metric: {} userName: {} sum: {}",
metricName, userName, windowSum);
// 将NameValuePair放入TopN中,重写offer,对NameValuePair进行排序放入堆中
topN.offer(new NameValuePair(userName, windowSum));
}
LOG.info("topN size for command {} is: {}", metricName, topN.size());
return topN;
}

附加

  • window中bucket的个数关系到统计的精度,window中bucket的个数越多,精度越高。
    解析:window中bucket中的统计次数是以bucket的updateTime为开始时间,则如果window的长度是60s,bucket的个数是6个,则每个bucket的长度是10s。如果从时间0开始统计,则bucket0的updateTime是第0s,bucket1的updateTime是第10s,bucket9的updateTime是第50s,在65时,用户调用getTopUserOpCounts方法统计第65s时的topN user,则当前window应该是5s到65s,可是bucket0统计的时间区间是0-10s,此时bucket0的updateTime已经超过了统计的阈值,bucket0已经过时,则此时统计的区间只能从bucket1,也就是从10s处开始统计,从10s-65s处,也就是少统计了5s,即5s-10s区间中的值,误差是5s,如果从bucket的长度为5s,则统计的结果值就是正确的,不会出现误差,则得出结论,bucket个数越多,则精度越高。但是bucket也不能太多,会损耗性能。

  • 为什么采用滑动窗口

参考

Patch地址
HDFS-6982设计文档
HDFS nnTop统计功能