// first val conf = newSparkConf().setAppName(appName).setMaster(master) val ssc = newStreamingContext(conf, Seconds(1)) // second val sc = ... // existing SparkContext val ssc = newStreamingContext(sc, Seconds(1))
/** * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. * * Usage: NetworkWordCount <hostname> <port> * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data. * * To run this on your local machine, you need to first run a Netcat server * `$ nc -lk 9999` * and then run the example * `$ bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999` */ objectNetworkWordCount{ defmain(args: Array[String]) { if (args.length < 2) { System.err.println("Usage: NetworkWordCount <hostname> <port>") System.exit(1) }
StreamingExamples.setStreamingLogLevels()
// Create the context with a 1 second batch size val sparkConf = newSparkConf().setAppName("NetworkWordCount") // 得到一个spark streaming的context,为spark streaming的主入口 val ssc = newStreamingContext(sparkConf, Seconds(1))
// Create a socket stream on target ip:port and count the // words in input stream of \n delimited text (eg. generated by 'nc') // Note that no duplication in storage level only for running locally. // Replication necessary in distributed scenario for fault tolerance. // 由ssc创建一个TCP socket的DStream,DStream存放的就是接收的数据流 val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() // 调用start之后,才能进行真正的计算 ssc.start() ssc.awaitTermination() } }
//从zookeeper上读取offset开始消费message val messages = { // Either 类型 val partitionsE = kc.getPartitions(topics) if (partitionsE.isLeft) // s"xx ${}" 字符串插值 thrownewSparkException(s"get kafka partition failed: ${partitionsE.left.get}") val partitions = partitionsE.right.get val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions) if (consumerOffsetsE.isLeft) thrownewSparkException(s"get kafka consumer offsets failed: ${consumerOffsetsE.left.get}") val consumerOffsets = consumerOffsetsE.right.get // 从指定offsets处消费kafka // messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message()) // MessageAndMetadata里包含message的topic message 等等信息 KafkaUtils.createDirectStream[K, V, KD, VD, (K, V, String)]( ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => ( mmd.key, mmd.message, mmd.topic)) } messages }
privatedefsetOrUpdateOffsets(topics: Set[String], groupId: String): Unit = { topics.foreach(topic => { var hasConsumed = true val partitionsE = kc.getPartitions(Set(topic)) if (partitionsE.isLeft) thrownewSparkException(s"get kafka partition failed: ${partitionsE.left.get}") val partitions = partitionsE.right.get val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions) if (consumerOffsetsE.isLeft) hasConsumed = false // 某个groupid首次没有offset信息,会报错,从头开始读 if (hasConsumed) {// 消费过 /** * 如果streaming程序执行的时候出现kafka.common.OffsetOutOfRangeException, * 说明zk上保存的offsets已经过时了,即kafka的定时清理策略已经将包含该offsets的文件删除。 * 针对这种情况,只要判断一下zk上的consumerOffsets和earliestLeaderOffsets的大小, * 如果consumerOffsets比earliestLeaderOffsets还小的话,说明consumerOffsets已过时, * 这时把consumerOffsets更新为earliestLeaderOffsets */ val earliestLeaderOffsetsE = kc.getEarliestLeaderOffsets(partitions) if (earliestLeaderOffsetsE.isLeft) thrownewSparkException(s"get earliest leader offsets failed: ${earliestLeaderOffsetsE.left.get}") val earliestLeaderOffsets = earliestLeaderOffsetsE.right.get val consumerOffsets = consumerOffsetsE.right.get
// 可能只是存在部分分区consumerOffsets过时,所以只更新过时分区的consumerOffsets为earliestLeaderOffsets var offsets: Map[TopicAndPartition, Long] = Map() consumerOffsets.foreach({ case(tp, n) => val earliestLeaderOffset = earliestLeaderOffsets(tp).offset if (n < earliestLeaderOffset) { println("consumer group:" + groupId + ",topic:" + tp.topic + ",partition:" + tp.partition + " offsets已经过时,更新为" + earliestLeaderOffset) offsets += (tp -> earliestLeaderOffset) } }) if (!offsets.isEmpty) { kc.setConsumerOffsets(groupId, offsets) } } else {// 没有消费过 val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase) var leaderOffsets: Map[TopicAndPartition, LeaderOffset] = null if (reset == Some("smallest")) {// 从头消费 val leaderOffsetsE = kc.getEarliestLeaderOffsets(partitions) if (leaderOffsetsE.isLeft) thrownewSparkException(s"get earliest leader offsets failed: ${leaderOffsetsE.left.get}") leaderOffsets = leaderOffsetsE.right.get } else { // 从最新offset处消费 val leaderOffsetsE = kc.getLatestLeaderOffsets(partitions) if (leaderOffsetsE.isLeft) thrownewSparkException(s"get latest leader offsets failed: ${leaderOffsetsE.left.get}") leaderOffsets = leaderOffsetsE.right.get } val offsets = leaderOffsets.map { case (tp, offset) => (tp, offset.offset) } kc.setConsumerOffsets(groupId, offsets) } }) }
defupdateZKOffsets(rdd: RDD[(String, String, String)]) : Unit = { val groupId = kafkaParams.get("group.id").get val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
for (offsets <- offsetsList) { val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition) val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offsets.untilOffset))) if (o.isLeft) { println(s"Error updating the offset to Kafka cluster: ${o.left.get}") } } } }
/** * Created by hunhun on 2016/5/23. */ objectSparkConsumerKafka{ defmain(args: Array[String]) { if (args.length < 4) { System.err.println( s""" |Usage: DirectKafkaWordCount <brokers> <topics> <groupid> | <brokers> is a list of one or more Kafka brokers | <topics> is a list of one or more kafka topics to consume from | <groupid> is a consume group | <hdfspath> is a HDFS Path, like /user/admin/scalapath | """.stripMargin) System.exit(1) }
valArray(brokers, topics, groupId, hdfsPath) = args val sparkConf = newSparkConf().setAppName("SparkConsumerKafka") // spark.streaming.kafka.maxRatePerPartition 限速 val ssc = newStreamingContext(sparkConf, Seconds(60))
// Create direct kafka stream with brokers and topics val topicsSet = topics.split(",").toSet val kafkaParams = Map[String, String]( "metadata.broker.list" -> brokers, "group.id" -> groupId, "auto.offset.reset" -> "smallest" )
val km = newKafkaManager(kafkaParams)
val messages = km.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) // foreachRDD是DStream的output操作 messages.foreachRDD( rdd => { if (!rdd.isEmpty()){ rdd.foreachPartition{ partitionOfRecords => // 得到HDFS的操作client // 此代码必须放在worker中创建,如果在driver中创建,则将会被序列化到worker中 // 在worker中操作时会报错。 val conf = newConfiguration() val fs = FileSystem.get(conf)
/** * Created by hunhun on 2016/5/26. */ objectWriteToHdfsWithCheckpoint{ defmain(args: Array[String]) { if (args.length < 2) { System.err.println(s""" |Usage: DirectKafkaWordCount <brokers> <topics> | <brokers> is a list of one or more Kafka brokers | <topics> is a list of one or more kafka topics to consume from | """.stripMargin) System.exit(1) }
valArray(brokers, topics) = args
// Create context with 2 second batch interval val sparkConf = newSparkConf().setAppName("ConsumerKafkaToHdfsWithCheckPoint")
// Create direct kafka stream with brokers and topics val topicsSet = topics.split(",").toSet val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "auto.offset.reset" -> "smallest") val checkpointPath = "hdfs://127.0.0.1:8020/spark_checkpoint" deffunctionToCreateContext(): StreamingContext = { val ssc = newStreamingContext(sparkConf, Seconds(2)) val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) // checkpoint 在哪保存? 这样会不会出现offset保存了但没有成功写入hdfs // checkpoint调用的位置会有影响吗? ssc.checkpoint(checkpointPath)
messages.map(_._2).foreachRDD(rdd => { rdd.foreachPartition{ partitionOfRecords => val conf = newConfiguration() val fs = FileSystem.get(conf) val path = newPath("/user/admin/scalapath/test" + System.currentTimeMillis()) val outputStream : FSDataOutputStream = if (fs.exists(path)){ fs.append(path) }else{ fs.create(path) } partitionOfRecords.foreach(record => outputStream.write((record + "\n").getBytes("UTF-8"))) outputStream.close() } }) ssc } // 决定否创建新的Context // 没有checkpoint就创建一个新的StreamingContext即初次启动应用 // 如果有checkpoint则checkpoint中记录的信息恢复StreamingContext val context = StreamingContext.getOrCreate(checkpointPath, functionToCreateContext _)