先说下程序功能,使用Spark Streaming 实时消费kafka,并将message写入HDFS上指定的topic目录中。
消费kafka使用的是Spark提供的Direct Approach方法,然后利用HDFS API将不同topic下的message写到各自topic目录下。

Spark Streaming简介

Spark Streaming是Spark的扩展,能够扩展的。高吞吐、容错的处理实时数据流。Spark Streaming的工作流程是将接受到实时的数据划分到不同的batch中,然后由Spark Engine处理并生成结果batch。如下图:
Spark Streaming工作流

Spark Streaming提供了一个高级的抽象模型,叫做discretized stream或者叫做DStream,它代表了一个持续的数据流。

Saprk Streaming中的Context是StreamingContext,StreamingContext是所有功能的主入口,可以通过两种方法create,

1
2
3
4
5
6
// first
val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))
// second
val sc = ... // existing SparkContext
val ssc = new StreamingContext(sc, Seconds(1))

然后由ssc(StreamingContext)创建一个DStream,DStream可以指定数据输入源(DStream支持很多数据源,如kakfa、flume、twitter、TCP socketd),这里来个简单的TCP socket例子

1
2
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)

lines的类型是DStream,代表从数据服务器上接受到的数据流。lines中的每一条记录是一行文本。可以对其进行一些高级操作如map、reduce、join和window,_需要注意的是有些RDD操作并没有对DStream开放,如果想使用那些API,则需要进行Transform Operation_,transform Operation就是讲DStream转换为RDD,方法调用为dStream.transform,代码示例

1
2
3
4
val dStreamToRdd = dStream.transform(rdd => {
...
rdd
})

Spark Streaming仅仅设置这些计算, 它并没有马上被执行。当所有的计算设置完后,我们可以调用下面的代码启动处理

1
2
ssc.start()             // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate

Spark Streaming Demo

来个简单的Demo看看,此Demo可以在${SPARK_HOME}/examples/src/main/scala/org/apache/spark/examples/streaming中找到

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel

/**
* Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
*
* Usage: NetworkWordCount <hostname> <port>
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
*
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
* `$ bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999`
*/
object NetworkWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: NetworkWordCount <hostname> <port>")
System.exit(1)
}

StreamingExamples.setStreamingLogLevels()

// Create the context with a 1 second batch size
val sparkConf = new SparkConf().setAppName("NetworkWordCount")
// 得到一个spark streaming的context,为spark streaming的主入口
val ssc = new StreamingContext(sparkConf, Seconds(1))

// Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
// 由ssc创建一个TCP socket的DStream,DStream存放的就是接收的数据流
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
// 调用start之后,才能进行真正的计算
ssc.start()
ssc.awaitTermination()
}
}

打开两个终端AB,在A输入nc -lk 9999命令,在B进入spark的home目录运行./bin/run-example streaming.NetworkWordCount localhost 9999命令。在A中输入一些测试数据,就可以在B查看计算结果。

Spark Streaming 消费Kafka

Spark消费kafka有两种方式,这里主要介绍第二种_Direct Approach (No Receivers)_

Direct Approach将kafka数据源包裹成了一个KafkaRDD,RDD里的partition 对应的数据源为kafka的partition(有利于并行的读取kafka message)。
kafka message并不是立马被读入spark内存,而是在Kafka存着呢,直到有实际的Action被触发,才会去kafka主动拉数据。
Direct Approach使用的是kafka simple consumer api,这样可以指定从某个offset处进行读取,有利于故障恢复。

Spark Streaming consumer Kafka to HDFS

本篇文章主要将kafka中的message通过spark streaming根据不同的topic写到不同的hdfs文件中,并且能够记录消费message的offset,以支持故障恢复。

offset存储方案选择

  • 利用checkpoint将offset存储在hdfs
    简单容易实现,根据需求有一定的局限,无法更好的满足需求
  • 将offset存储在HDFS
    一开始为了尽量减少依赖的组件,减少组件原因造成应用故障,使用选择将offset存储在hdfs上,但在开发中考虑到offset文件需要频繁的读写操作,可能会在性能上有所影响
  • 将offset存储在zk
    跟Kafka high-level consumer API一样将offset存储在zk上,代码逻辑图如下:
    Kafka direct API with zk
    由Spark driver计算下个batch的offsets,指导executor消费对应的topics和partitions。使消费Kafka消息,就像消费文件系统文件一样。

代码实现

Spark Streaming通过Direct Approach接收数据的入口自然是KafkaUtils.createDirectStream 了。在调用该方法时,会先创建KafkaCluster,得到offset

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def createDirectStream[
K: ClassTag,
V: ClassTag,
KD <: Decoder[K]: ClassTag,
VD <: Decoder[V]: ClassTag] (
ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Set[String]
): InputDStream[(K, V)] = {
val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
val kc = new KafkaCluster(kafkaParams)
val fromOffsets = getFromOffsets(kc, kafkaParams, topics)
new DirectKafkaInputDStream[K, V, KD, VD, (K, V)](
ssc, kafkaParams, fromOffsets, messageHandler)
}

KafkaCluster是操作kafka的一个关键类,但由于其是private的,要想在自己的代码中使用该类得重写该类。

创建KafkaManager类,其中主要包含三个方法,分别是根据offset创建一个DStream、得到offset、更新zk上的offset,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.Decoder
import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaCluster, KafkaUtils}

import scala.reflect.ClassTag

/**
* Created by hunhun on 2016/5/27.
*/
class KafkaManager(val kafkaParams: Map[String, String]) {

// KafkaCluster in Spark is overwrited by myself
private val kc = new KafkaCluster(kafkaParams)

// 根据offset创建一个DStream
// return key message topic
def createDirectStream[K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag, VD <: Decoder[V]: ClassTag]
(ssc: StreamingContext, kafkaParams: Map[String, String], topics: Set[String]): InputDStream[(K, V, String)] = {
val groupId = kafkaParams.get("group.id").get
// 在zookeeper上读取offsets前先根据实际情况更新offsets
setOrUpdateOffsets(topics, groupId)

//从zookeeper上读取offset开始消费message
val messages = {
// Either 类型
val partitionsE = kc.getPartitions(topics)
if (partitionsE.isLeft)
// s"xx ${}" 字符串插值
throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")
val partitions = partitionsE.right.get
val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
if (consumerOffsetsE.isLeft)
throw new SparkException(s"get kafka consumer offsets failed: ${consumerOffsetsE.left.get}")
val consumerOffsets = consumerOffsetsE.right.get
// 从指定offsets处消费kafka
// messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message())
// MessageAndMetadata里包含message的topic message 等等信息
KafkaUtils.createDirectStream[K, V, KD, VD, (K, V, String)](
ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => ( mmd.key, mmd.message, mmd.topic))
}
messages
}

private def setOrUpdateOffsets(topics: Set[String], groupId: String): Unit = {
topics.foreach(topic => {
var hasConsumed = true
val partitionsE = kc.getPartitions(Set(topic))
if (partitionsE.isLeft)
throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")
val partitions = partitionsE.right.get
val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
if (consumerOffsetsE.isLeft) hasConsumed = false
// 某个groupid首次没有offset信息,会报错,从头开始读
if (hasConsumed) {// 消费过
/**
* 如果streaming程序执行的时候出现kafka.common.OffsetOutOfRangeException,
* 说明zk上保存的offsets已经过时了,即kafka的定时清理策略已经将包含该offsets的文件删除。
* 针对这种情况,只要判断一下zk上的consumerOffsets和earliestLeaderOffsets的大小,
* 如果consumerOffsets比earliestLeaderOffsets还小的话,说明consumerOffsets已过时,
* 这时把consumerOffsets更新为earliestLeaderOffsets
*/
val earliestLeaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
if (earliestLeaderOffsetsE.isLeft)
throw new SparkException(s"get earliest leader offsets failed: ${earliestLeaderOffsetsE.left.get}")
val earliestLeaderOffsets = earliestLeaderOffsetsE.right.get
val consumerOffsets = consumerOffsetsE.right.get

// 可能只是存在部分分区consumerOffsets过时,所以只更新过时分区的consumerOffsets为earliestLeaderOffsets
var offsets: Map[TopicAndPartition, Long] = Map()
consumerOffsets.foreach({ case(tp, n) =>
val earliestLeaderOffset = earliestLeaderOffsets(tp).offset
if (n < earliestLeaderOffset) {
println("consumer group:" + groupId + ",topic:" + tp.topic + ",partition:" + tp.partition +
" offsets已经过时,更新为" + earliestLeaderOffset)
offsets += (tp -> earliestLeaderOffset)
}
})
if (!offsets.isEmpty) {
kc.setConsumerOffsets(groupId, offsets)
}
} else {// 没有消费过
val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
var leaderOffsets: Map[TopicAndPartition, LeaderOffset] = null
if (reset == Some("smallest")) {// 从头消费
val leaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
if (leaderOffsetsE.isLeft)
throw new SparkException(s"get earliest leader offsets failed: ${leaderOffsetsE.left.get}")
leaderOffsets = leaderOffsetsE.right.get
} else { // 从最新offset处消费
val leaderOffsetsE = kc.getLatestLeaderOffsets(partitions)
if (leaderOffsetsE.isLeft)
throw new SparkException(s"get latest leader offsets failed: ${leaderOffsetsE.left.get}")
leaderOffsets = leaderOffsetsE.right.get
}
val offsets = leaderOffsets.map {
case (tp, offset) => (tp, offset.offset)
}
kc.setConsumerOffsets(groupId, offsets)
}
})
}

def updateZKOffsets(rdd: RDD[(String, String, String)]) : Unit = {
val groupId = kafkaParams.get("group.id").get
val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

for (offsets <- offsetsList) {
val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)
val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offsets.untilOffset)))
if (o.isLeft) {
println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
}
}
}
}

创建主类SparkConsumerKafka

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import kafka.serializer.StringDecoder
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataOutputStream, Path, FileSystem}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable.ListBuffer
import scala.util.Random

/**
* Created by hunhun on 2016/5/23.
*/
object SparkConsumerKafka {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println( s"""
|Usage: DirectKafkaWordCount <brokers> <topics> <groupid>
| <brokers> is a list of one or more Kafka brokers
| <topics> is a list of one or more kafka topics to consume from
| <groupid> is a consume group
| <hdfspath> is a HDFS Path, like /user/admin/scalapath
|
""".stripMargin)
System.exit(1)
}

val Array(brokers, topics, groupId, hdfsPath) = args
val sparkConf = new SparkConf().setAppName("SparkConsumerKafka")
// spark.streaming.kafka.maxRatePerPartition 限速
val ssc = new StreamingContext(sparkConf, Seconds(60))

// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String](
"metadata.broker.list" -> brokers,
"group.id" -> groupId,
"auto.offset.reset" -> "smallest"
)

val km = new KafkaManager(kafkaParams)

val messages = km.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,
kafkaParams, topicsSet)
// foreachRDD是DStream的output操作
messages.foreachRDD( rdd => {
if (!rdd.isEmpty()){
rdd.foreachPartition{ partitionOfRecords =>
// 得到HDFS的操作client
// 此代码必须放在worker中创建,如果在driver中创建,则将会被序列化到worker中
// 在worker中操作时会报错。
val conf = new Configuration()
val fs = FileSystem.get(conf)

var messageList = new ListBuffer[String]
var topic = ""
// 此处代码只为得到topic
// 之所以将message放入messageList中是因为partitionOfRecords变为list之后
// 拿到一个record的topic之后,partitionOfRecords中的内容将消失,具体原因不知道
// 代码如下:
// val topic = partitionOfRecords.toList(0)._3
// 然后写入hdfs时调用
// partitionOfRecords.foreach(record => outputStream.write((record._2 + "\n").getBytes("UTF-8")))
// 此时写入hdfs的内容为null,不知道为什么为null
// 所以只好在得到topic的同时把message先存入messageList
partitionOfRecords.foreach(record => {
messageList += record._2
if (topic == ""){
topic = record._3
}
})

if (topic != ""){
// 拼出各个topic message的文件地址
val path = new Path(hdfsPath + "/" + topic + "/" + Random.nextInt(100) + topic
+ System.currentTimeMillis())
// 创建一个HDFS outputStream流
val outputStream = if (fs.exists(path)){
fs.append(path)
}else{
fs.create(path)
}
// 将message逐条写入
messageList.foreach(message => outputStream.write((message + "\n").getBytes("UTF-8")))
outputStream.close()
}

}
// 更新zk上的offset
km.updateZKOffsets(rdd)
}
})

ssc.start()
ssc.awaitTermination()
}
}

附加

将offset写入checkpoint中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import kafka.serializer.StringDecoder
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataOutputStream, Path, FileSystem}
import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.expressions.Second
import org.apache.spark.streaming.kafka.{HasOffsetRanges, OffsetRange, KafkaUtils}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
* Created by hunhun on 2016/5/26.
*/
object WriteToHdfsWithCheckpoint {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println(s"""
|Usage: DirectKafkaWordCount <brokers> <topics>
| <brokers> is a list of one or more Kafka brokers
| <topics> is a list of one or more kafka topics to consume from
|
""".stripMargin)
System.exit(1)
}

val Array(brokers, topics) = args

// Create context with 2 second batch interval
val sparkConf = new SparkConf().setAppName("ConsumerKafkaToHdfsWithCheckPoint")

// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers,
"auto.offset.reset" -> "smallest")
val checkpointPath = "hdfs://127.0.0.1:8020/spark_checkpoint"
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(sparkConf, Seconds(2))
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
// checkpoint 在哪保存? 这样会不会出现offset保存了但没有成功写入hdfs
// checkpoint调用的位置会有影响吗?
ssc.checkpoint(checkpointPath)

messages.map(_._2).foreachRDD(rdd => {
rdd.foreachPartition{ partitionOfRecords =>
val conf = new Configuration()
val fs = FileSystem.get(conf)
val path = new Path("/user/admin/scalapath/test" + System.currentTimeMillis())
val outputStream : FSDataOutputStream = if (fs.exists(path)){
fs.append(path)
}else{
fs.create(path)
}
partitionOfRecords.foreach(record => outputStream.write((record + "\n").getBytes("UTF-8")))
outputStream.close()
}
})
ssc
}
// 决定否创建新的Context
// 没有checkpoint就创建一个新的StreamingContext即初次启动应用
// 如果有checkpoint则checkpoint中记录的信息恢复StreamingContext
val context = StreamingContext.getOrCreate(checkpointPath, functionToCreateContext _)

context.start()
context.awaitTermination()
}
}