flume通过kafka channel直接将数据写入kafka时,如果producer一次写入kafka的数据比较大时,会报异常,异常如下:

1
2
3
4
5
6
7
8
9
10
11
12
15 五月 2017 17:17:09,814 WARN  [PollableSourceRunner-TaildirSource-r-op_hiveserver] (org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit:560)  - Sending events to Kafka failed
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1730741 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:686)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:449)
at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:546)
at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:194)
at org.apache.flume.source.taildir.TaildirSource.tailFileProcess(TaildirSource.java:263)
at org.apache.flume.source.taildir.TaildirSource.process(TaildirSource.java:226)
at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1730741 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.

发送的数据量超过了max.request.size的阈值(默认是1048576),那就在kafka channel中将此参数调大,a1.channels.c1.kafka.producer.max.request.size = 10485760

接着又出现了如下异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
15 五月 2017 17:20:00,799 WARN  [PollableSourceRunner-TaildirSource-r-op_hiveserver] (org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit:560)  - Sending events to Kafka failed
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:56)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:43)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:552)
at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:194)
at org.apache.flume.source.taildir.TaildirSource.tailFileProcess(TaildirSource.java:263)
at org.apache.flume.source.taildir.TaildirSource.process(TaildirSource.java:226)
at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.

异常变了,变成server 不接收,那就是说还得改kafka集群的配置,在server.properties中添加如下配置:

1
2
message.max.bytes=10000120
replica.fetch.max.bytes=10485760
  • message.max.bytes是指kafka一次能接收数据的上限,message.max.bytes是针对整个集群进行配置,还可以对某个topic进行配置,参数是max.message.bytes,命令为bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --config max.message.bytes=10485760
  • replica.fetch.max.bytes是从partition中读取数据时的上限,但并不是决对的上限,假如某个非空partition的第一条message的大小超出了此值,依然能够读出。