protectedvoiddoCommit()throws InterruptedException { if (type.equals(TransactionType.NONE)) { return; } // 判断需要commit的事务类型 // 此处先分析PUT的commit if (type.equals(TransactionType.PUT)) { if (!kafkaFutures.isPresent()) { kafkaFutures = Optional.of(new LinkedList<Future<RecordMetadata>>()); } try { long batchSize = producerRecords.get().size(); long startTime = System.nanoTime(); int index = 0; for (ProducerRecord<String, byte[]> record : producerRecords.get()) { index++; // 多线程之间共享一个producer实例(官方推荐,但也可以根据自己的情况而定) // The producer is thread safe and sharing a single producer instance // across threads will generally be faster than having multiple instances. kafkaFutures.get().add(producer.send(record, new ChannelCallback(index, startTime))); } //prevents linger.ms from being a problem // 强制发送累加队列RecordAccumulator中的数据 producer.flush(); // 等待各线程将数据发送至kafka for (Future<RecordMetadata> future : kafkaFutures.get()) { future.get(); } long endTime = System.nanoTime(); counter.addToKafkaEventSendTimer((endTime - startTime) / (1000 * 1000)); counter.addToEventPutSuccessCount(batchSize); producerRecords.get().clear(); kafkaFutures.get().clear(); } catch (Exception ex) { logger.warn("Sending events to Kafka failed", ex); thrownew ChannelException("Commit failed as send to Kafka failed", ex); } } else { ... } }
protected Event doTake()throws InterruptedException { // 事务类型 type = TransactionType.TAKE; try { // channelUUID是final类型的,那一个kafkachannel实例只有一个consumer? if (!(consumerAndRecords.get().uuid.equals(channelUUID))) { logger.info("UUID mismatch, creating new consumer"); decommissionConsumerAndRecords(consumerAndRecords.get()); consumerAndRecords.remove(); } } catch (Exception ex) { logger.warn("Error while shutting down consumer", ex); } if (!events.isPresent()) { events = Optional.of(new LinkedList<Event>()); } Event e; // Give the channel a chance to commit if there has been a rebalance if (rebalanceFlag.get()) { logger.debug("Returning null event after Consumer rebalance."); returnnull; } if (!consumerAndRecords.get().failedEvents.isEmpty()) { e = consumerAndRecords.get().failedEvents.removeFirst(); } else { if ( logger.isTraceEnabled() ) { logger.trace("Assignment during take: {}", consumerAndRecords.get().consumer.assignment().toString()); } try { long startTime = System.nanoTime(); if (!consumerAndRecords.get().recordIterator.hasNext()) { consumerAndRecords.get().poll(); } if (consumerAndRecords.get().recordIterator.hasNext()) { ConsumerRecord<String, byte[]> record = consumerAndRecords.get().recordIterator.next(); e = deserializeValue(record.value(), parseAsFlumeEvent); TopicPartition tp = new TopicPartition(record.topic(), record.partition()); OffsetAndMetadata oam = new OffsetAndMetadata(record.offset() + 1, batchUUID); consumerAndRecords.get().saveOffsets(tp,oam);
//Add the key to the header if (record.key() != null) { e.getHeaders().put(KEY_HEADER, record.key()); }
if (logger.isDebugEnabled()) { logger.debug("{} processed output from partition {} offset {}", new Object[] {getName(), record.partition(), record.offset()}); } } else { returnnull; } } catch (Exception ex) { logger.warn("Error while getting events from Kafka. This is usually caused by " + "trying to read a non-flume event. Ensure the setting for " + "parseAsFlumeEvent is correct", ex); thrownew ChannelException("Error while getting events from Kafka", ex); } } eventTaken = true; events.get().add(e); return e; }
protectedvoiddoCommit()throws InterruptedException { logger.trace("Starting commit"); if (type.equals(TransactionType.NONE)) { return; } if (type.equals(TransactionType.PUT)) { ... } else { // event taken ensures that we have collected events in this transaction // before committing // commit之前要保证当前事务中的event都被采集了 if (consumerAndRecords.get().failedEvents.isEmpty() && eventTaken) { logger.trace("About to commit batch"); long startTime = System.nanoTime(); // 提交offset consumerAndRecords.get().commitOffsets(); long endTime = System.nanoTime(); counter.addToKafkaCommitTimer((endTime - startTime) / (1000 * 1000)); if (logger.isDebugEnabled()) { logger.debug(consumerAndRecords.get().getCommittedOffsetsString()); } }
int takes = events.get().size(); if (takes > 0) { counter.addToEventTakeSuccessCount(takes); events.get().clear(); } } }