在flume中事务是一个重要的概念,事务保证了数据的可用性。这里的事务有别于数据库中的事务,比事务在回滚时,可能会造成数据重复,所以flume保证的是每条数据最少发送一次,以此来保证数据不丢失

此篇从具体的数据流中分析事务,配置的数据流是taildir+kafkachannel,然后kafkachannel+hdfsSink

kafkachannel中维护了两个事务,分别是put事务和take事务。

put事务

kafkachannel的put事务是由taildir触发的,我们从代码中跟下put事务的流程。

taildir的入口是TaildirSource.process,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public Status process() {
Status status = Status.READY;
try {
existingInodes.clear();
existingInodes.addAll(reader.updateTailFiles());
for (long inode : existingInodes) {
TailFile tf = reader.getTailFiles().get(inode);
// 判断是否需要tail
// 判断规则,修改时间是否大于上次记录的tial时间,记录的postition是否大于该文件的length
if (tf.needTail()) {
tailFileProcess(tf, true);
}
}
closeTailFiles();
...
} catch (Throwable t) {
...
}
return status;
}

当file的修改时间大于记录的上次tail时间或者记录的postition大于file的length时(从0处tail),需要tail该file。
文件的tail逻辑在tailFileProcess代码中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private void tailFileProcess(TailFile tf, boolean backoffWithoutNL)
throws IOException, InterruptedException {
while (true) {
reader.setCurrentFile(tf);
// 从文件中读取batchSize条数据
List<Event> events = reader.readEvents(batchSize, backoffWithoutNL);
if (events.isEmpty()) {
break;
}
sourceCounter.addToEventReceivedCount(events.size());
sourceCounter.incrementAppendBatchReceivedCount();
try {
// 事务的实现
getChannelProcessor().processEventBatch(events);
reader.commit();
} catch (ChannelException ex) {
logger.warn("The channel is full or unexpected failure. " +
"The source will try again after " + retryInterval + " ms");
TimeUnit.MILLISECONDS.sleep(retryInterval);
retryInterval = retryInterval << 1;
retryInterval = Math.min(retryInterval, maxRetryInterval);
continue;
}
retryInterval = 1000;
sourceCounter.addToEventAcceptedCount(events.size());
sourceCounter.incrementAppendBatchAcceptedCount();
// 追上写入的速度之后才会退出当前file?是否存在其它文件无法得到tail的机会??
// 这应该是个bug
if (events.size() < batchSize) {
break;
}
}
}

上面的bug是当一个fileGroup中有多个正在写入的文件时,如果某个文件的写入量大,致使每次都能从中读取batchSize条数据,则其它文件将没有机会被读取。
这个bug我已提交到社区FLUME-3101

下面看下事务具体是怎么实现的,
getChannelProcessor().processEventBatch(events) -> ChannelProcesser.processEventBatch,看下processEventBatch的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public void processEventBatch(List<Event> events) {
...
// 将event与channel组成map
for (Event event : events) {
...
}
// Process required channels
for (Channel reqChannel : reqChannelQueue.keySet()) {
// 得到channel对应的事务
Transaction tx = reqChannel.getTransaction();
Preconditions.checkNotNull(tx, "Transaction object must not be null");
try {
// 开始事务
tx.begin();
// 处理事务,这里是先将event写入内存,然后由commit批量将events写入kafka
List<Event> batch = reqChannelQueue.get(reqChannel);

for (Event event : batch) {
reqChannel.put(event);
}
// 提交事务,也是一个事务的结束
tx.commit();
} catch (Throwable t) {
// 发生议程,进行事务回滚
tx.rollback();
if (t instanceof Error) {
LOG.error("Error while writing to required channel: " + reqChannel, t);
throw (Error) t;
} else if (t instanceof ChannelException) {
throw (ChannelException) t;
} else {
throw new ChannelException("Unable to put batch on required " +
"channel: " + reqChannel, t);
}
} finally {
if (tx != null) {
tx.close();
}
}
}
// Process optional channels
for (Channel optChannel : optChannelQueue.keySet()) {
...
}
}

首先从该source中绑定的channel中拿到对应的Transaction,然后调用begin方法开始事务,等数据处理结束之后,调用commit提交事务,如果处理数据的过程中发生错误,则在catch中捕获,调用rollback进行事务回滚。

先看下数据处理的逻辑,通过reqChannel.put(event)将数据将入channel的内存中。看似调用的是channel的方法,其实channel的put只是对Transaction的put进行了下封装,而Transaction.put的具体实现是在channel中的Transaction.doPut里实现的。
reqChannel.put(event) -> BasicChannelSemantics.put -> BasicTransactionSemantics.put -> BasicTransactionSemantics.duPut
其中doPut是一个抽象方法,其具体实现放在各个channel的Transaction中。这里使用的kafkaChannel,其实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
protected void doPut(Event event) throws InterruptedException {
// 事务类型 PUT or TAKE
type = TransactionType.PUT;
...
Integer partitionId = null;
try {
if (staticPartitionId != null) {
partitionId = staticPartitionId;
}
//Allow a specified header to override a static ID
if (partitionHeader != null) {
String headerVal = event.getHeaders().get(partitionHeader);
if (headerVal != null) {
partitionId = Integer.parseInt(headerVal);
}
}
// 将event构建一个ProducerRecord对象放入producerRecords中,
// 等待commit时写入kafka
if (partitionId != null) {
producerRecords.get().add(
new ProducerRecord<String, byte[]>(topic.get(), partitionId, key,
serializeValue(event, parseAsFlumeEvent)));
} else {
producerRecords.get().add(
new ProducerRecord<String, byte[]>(topic.get(), key,
serializeValue(event, parseAsFlumeEvent)));
}
} catch (NumberFormatException e) {
throw new ChannelException("Non integer partition id specified", e);
} catch (Exception e) {
throw new ChannelException("Error while serializing event", e);
}
}

doPut首先给事务的类型赋值,然后将event放入内存中,如果此过程中没有发生错误,则会调用commit对内存中的event提交到kafka中。

下面看下commit的代码,commit的调用逻辑和put类似,具体实现是在KafkaChannel.KafkaTransaction的duCommit中,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
protected void doCommit() throws InterruptedException {
if (type.equals(TransactionType.NONE)) {
return;
}
// 判断需要commit的事务类型
// 此处先分析PUT的commit
if (type.equals(TransactionType.PUT)) {
if (!kafkaFutures.isPresent()) {
kafkaFutures = Optional.of(new LinkedList<Future<RecordMetadata>>());
}
try {
long batchSize = producerRecords.get().size();
long startTime = System.nanoTime();
int index = 0;
for (ProducerRecord<String, byte[]> record : producerRecords.get()) {
index++;
// 多线程之间共享一个producer实例(官方推荐,但也可以根据自己的情况而定)
// The producer is thread safe and sharing a single producer instance
// across threads will generally be faster than having multiple instances.
kafkaFutures.get().add(producer.send(record, new ChannelCallback(index, startTime)));
}
//prevents linger.ms from being a problem
// 强制发送累加队列RecordAccumulator中的数据
producer.flush();
// 等待各线程将数据发送至kafka
for (Future<RecordMetadata> future : kafkaFutures.get()) {
future.get();
}
long endTime = System.nanoTime();
counter.addToKafkaEventSendTimer((endTime - startTime) / (1000 * 1000));
counter.addToEventPutSuccessCount(batchSize);
producerRecords.get().clear();
kafkaFutures.get().clear();
} catch (Exception ex) {
logger.warn("Sending events to Kafka failed", ex);
throw new ChannelException("Commit failed as send to Kafka failed",
ex);
}
} else {
...
}
}

PUT类型的事务和TAKE类型的事务都是在doCommit中提交,这里调用的都是kafka的Java Api,需要注意的是各个线程之间共享一个producer实例,event发送到kafka可以认为是同步发送,因为调用了future.get等待各个线程的结束。
这里还调用了producer.flush(),这是为了防止配置了linger.ms对record进行合并发送,flush强制将队列中的数据发送到kafka。

无论是在doPut还是在doCommit中发生错误,都会对事务进行回滚。回滚是在doRollback中,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
protected void doRollback() throws InterruptedException {
if (type.equals(TransactionType.NONE)) {
return;
}
if (type.equals(TransactionType.PUT)) {
// PUT时发生错误,则把内存中的数据清空
// 但没有对回滚的次数进行统计
producerRecords.get().clear();
kafkaFutures.get().clear();
} else {
...
}
}

由上面的分析可知,kafkachannel是将event通过doPut写入内存,然后通过doCommit将内存中的数据发送到kafka,这个事务是将event写入到kafka时才结束。
而memorychannel则是将event通过doPut写入内存(putList)中,然后通过doCommit将putList中的数据写入queue中,写入queue成功则事务结束。可见如果使用kafkachannel向kafka中写数据时会比memorychannel要高效,更重要的是能保证数据的事务性

下面看下take事务

take事务

kafkachannel中的take事务是由sink触发的,这里是指hdfsSink,下面看下take的事务代码。

此处的sink用的是HDFSEventSink,其process代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// 非线程安全
public Status process() throws EventDeliveryException {
// 拿到sink关联的channel
Channel channel = getChannel();
// 从channel中得到Transaction
Transaction transaction = channel.getTransaction();
List<BucketWriter> writers = Lists.newArrayList();
// 开始事务
transaction.begin();
try {
int txnEventCount = 0;
for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
// 从channel中取出一条数据
Event event = channel.take();
if (event == null) {
break;
}
...
synchronized (sfWritersLock) {
bucketWriter = sfWriters.get(lookupPath);
// we haven't seen this file yet, so open it and cache the handle
// 没有文件的句柄,则新建一个
if (bucketWriter == null) {
hdfsWriter = writerFactory.getWriter(fileType);
bucketWriter = initializeBucketWriter(realPath, realName,
lookupPath, hdfsWriter, closeCallback);
sfWriters.put(lookupPath, bucketWriter);
}
}
// track the buckets getting written in this transaction
// 一次事务中,take的event可能来自不同topic的parition,则需要同时打开多个文件句柄
if (!writers.contains(bucketWriter)) {
writers.add(bucketWriter);
}

// Write the data to HDFS
try {
bucketWriter.append(event);
} catch (BucketClosedException ex) {
...
}
}
...
// flush all pending buckets before committing the transaction
for (BucketWriter bucketWriter : writers) {
bucketWriter.flush();
}
// 事务提交
transaction.commit();
...
} catch (IOException eIO) {
// 发生异常进行事务回滚
transaction.rollback();
LOG.warn("HDFS IO error", eIO);
return Status.BACKOFF;
} catch (Throwable th) {
transaction.rollback();
LOG.error("process failed", th);
if (th instanceof Error) {
throw (Error) th;
} else {
throw new EventDeliveryException(th);
}
} finally {
transaction.close();
}
}

sink的process中先从对应的channel中得到Transaction,然后调用begin开始执行事务,然后开始处理数据。
处理数据时,调用channel.take从channel中take一条event,take最终调用的是KafkaTransaction.doTake,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
protected Event doTake() throws InterruptedException {
// 事务类型
type = TransactionType.TAKE;
try {
// channelUUID是final类型的,那一个kafkachannel实例只有一个consumer?
if (!(consumerAndRecords.get().uuid.equals(channelUUID))) {
logger.info("UUID mismatch, creating new consumer");
decommissionConsumerAndRecords(consumerAndRecords.get());
consumerAndRecords.remove();
}
} catch (Exception ex) {
logger.warn("Error while shutting down consumer", ex);
}
if (!events.isPresent()) {
events = Optional.of(new LinkedList<Event>());
}
Event e;
// Give the channel a chance to commit if there has been a rebalance
if (rebalanceFlag.get()) {
logger.debug("Returning null event after Consumer rebalance.");
return null;
}
if (!consumerAndRecords.get().failedEvents.isEmpty()) {
e = consumerAndRecords.get().failedEvents.removeFirst();
} else {
if ( logger.isTraceEnabled() ) {
logger.trace("Assignment during take: {}",
consumerAndRecords.get().consumer.assignment().toString());
}
try {
long startTime = System.nanoTime();
if (!consumerAndRecords.get().recordIterator.hasNext()) {
consumerAndRecords.get().poll();
}
if (consumerAndRecords.get().recordIterator.hasNext()) {
ConsumerRecord<String, byte[]> record = consumerAndRecords.get().recordIterator.next();
e = deserializeValue(record.value(), parseAsFlumeEvent);
TopicPartition tp = new TopicPartition(record.topic(), record.partition());
OffsetAndMetadata oam = new OffsetAndMetadata(record.offset() + 1, batchUUID);
consumerAndRecords.get().saveOffsets(tp,oam);

//Add the key to the header
if (record.key() != null) {
e.getHeaders().put(KEY_HEADER, record.key());
}

long endTime = System.nanoTime();
counter.addToKafkaEventGetTimer((endTime - startTime) / (1000 * 1000));

if (logger.isDebugEnabled()) {
logger.debug("{} processed output from partition {} offset {}",
new Object[] {getName(), record.partition(), record.offset()});
}
} else {
return null;
}
} catch (Exception ex) {
logger.warn("Error while getting events from Kafka. This is usually caused by " +
"trying to read a non-flume event. Ensure the setting for " +
"parseAsFlumeEvent is correct", ex);
throw new ChannelException("Error while getting events from Kafka", ex);
}
}
eventTaken = true;
events.get().add(e);
return e;
}

doTake其实就是使用consumer消费kafka。理想情况下应该让一个consumer消费多个topic的一个partition但这里consumer是和channelUUID对应的,而channelUUID又是final类型的,那是不是说kafkachannel实例中只有一个consumer
这里的消费逻辑是consumer通过poll将数据拉到本地内存中,然后在sink中一条一条的取,每取一条offset就加1,内存取完之后再调用一次poll。
sink拿到event之后,根据event的信息放入相应的bucketWriter中,取出batchSize大小之后将所有的bucketWriter进行一次flush。flush成功之后进行事务的commit

commit调用的是doCommit,下面看下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
protected void doCommit() throws InterruptedException {
logger.trace("Starting commit");
if (type.equals(TransactionType.NONE)) {
return;
}
if (type.equals(TransactionType.PUT)) {
...
} else {
// event taken ensures that we have collected events in this transaction
// before committing
// commit之前要保证当前事务中的event都被采集了
if (consumerAndRecords.get().failedEvents.isEmpty() && eventTaken) {
logger.trace("About to commit batch");
long startTime = System.nanoTime();
// 提交offset
consumerAndRecords.get().commitOffsets();
long endTime = System.nanoTime();
counter.addToKafkaCommitTimer((endTime - startTime) / (1000 * 1000));
if (logger.isDebugEnabled()) {
logger.debug(consumerAndRecords.get().getCommittedOffsetsString());
}
}

int takes = events.get().size();
if (takes > 0) {
counter.addToEventTakeSuccessCount(takes);
events.get().clear();
}
}
}

这里offset是手动触发的,调用的是kafka consumer的apiconsumer.commitSync(offsets)
如果在commit或者flush的过程中发生异常,则进行事务回滚,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected void doRollback() throws InterruptedException {
if (type.equals(TransactionType.NONE)) {
return;
}
if (type.equals(TransactionType.PUT)) {
...
} else {
// 回滚次数统计
counter.addToRollbackCounter(events.get().size());
// 将内存中的event放入failedEvents中
consumerAndRecords.get().failedEvents.addAll(events.get());
events.get().clear();
}
}

总结

flume的事务保证了数据不会丢失,是flume中一个重要的概念。

疑虑

HdfsSink 和 kafkachannel consumer都是单线程吗?
一个kafkachannel实例一个consumer,sink从consumer中取数,然后分给不同的bucketWriter,可以认为consumer是单线程,处理数据是多线程?