加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala spark-streaming整合kafka (spark 2.3 kafka 0.10)

发布时间:2020-12-16 09:19:19 所属栏目:安全 来源:网络整理
导读:?Maven组件如下: ? ? dependency groupId org.apache.spark /groupId artifactId spark-streaming-kafka-0-10_2.11 /artifactId version 2.3.0 /version /dependency ?官网代码如下: ?pasting /* * Licensed to the Apache Software Foundation (ASF) unde

?Maven组件如下:? ?

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.3.0</version>
</dependency>

?官网代码如下:

?pasting

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License,Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// scalastyle:off println
package org.apache.spark.examples.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._

/**
* Consumes messages from one or more topics in Kafka and does wordcount.
* Usage: DirectKafkaWordCount <brokers> <topics>
* <brokers> is a list of one or more Kafka brokers
* <topics> is a list of one or more kafka topics to consume from
*
* Example:
* $ bin/run-example streaming.DirectKafkaWordCount broker1-host:port,broker2-host:port
* topic1,topic2
*/
object DirectKafkaWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println(s"""
|Usage: DirectKafkaWordCount <brokers> <topics>
| <brokers> is a list of one or more Kafka brokers
| <topics> is a list of one or more kafka topics to consume from
|
""".stripMargin)
System.exit(1)
}

StreamingExamples.setStreamingLogLevels()

val Array(brokers,topics) = args

// Create context with 2 second batch interval
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
val ssc = new StreamingContext(sparkConf,Seconds(2))

// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String,String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String,String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String,String](topicsSet,kafkaParams))

// Get the lines,split them into words,count the words and print
val lines = messages.map(_.value)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x,1L)).reduceByKey(_ + _)
wordCounts.print()

// Start the computation
ssc.start()
ssc.awaitTermination()
}
}
// scalastyle:on println

?

运行以上代码出现如下错误等:

?Exception in thread "main" org.apache.kafka.common.config.ConfigException: Missing required configuration "bootstrap.servers" which has no default value.

解决方法:

? 由错误可见,是因为没有设置kafka相关参数。

?把官网代码修改如下:

package cn.xdf.userprofile.stream
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds,StreamingContext}
import org.apache.spark.streaming.kafka010._

import scala.collection.mutable

object DirectKafka {
def main(args: Array[String]): Unit = {
if (args.length < 2) {
System.err.println(
s"""
|Usage: DirectKafkaWordCount <brokers> <topics>
| <brokers> is a list of one or more Kafka brokers
| <topics> is a list of one or more kafka topics to consume from
|
""".stripMargin)
System.exit(1)
}
val Array(brokers,topics)=args

var conf = new SparkConf()
.setAppName("DirectKafka")
.setMaster("local[2]")

val ssc = new StreamingContext(conf,Seconds(2))

val topicsSet=topics.split(",").toSet
val kafkaParams=mutable.HashMap[String,String]()
//必须添加以下参数,否则会报错
kafkaParams.put("bootstrap.servers" ,brokers)
kafkaParams.put("group.id","group1")
kafkaParams.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
kafkaParams.put("value.deserializer" ,"org.apache.kafka.common.serialization.StringDeserializer")
val messages=KafkaUtils.createDirectStream [String,String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String,String](topicsSet,kafkaParams
)
)
// Get the lines,count the words and print
val lines = messages.map(_.value)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x,1L)).reduceByKey(_ + _)
wordCounts.print()

// Start the computation
ssc.start()
ssc.awaitTermination()

}
}

?运行如下:

?启动kafka

?

? ?bin/kafka-server-start ./etc/kafka/server.properties &

[2018-10-22 11:24:14,748] INFO [GroupCoordinator 0]: Stabilized group group1 generation 1 (__consumer_offsets-40) (kafka.coordinator.group.GroupCoordinator)
[2018-10-22 11:24:14,761] INFO [GroupCoordinator 0]: Assignment received from leader for group group1 for generation 1 (kafka.coordinator.group.GroupCoordinator)
[2018-10-22 11:24:14,779] INFO Updated PartitionLeaderEpoch. New: {epoch:0,offset:0},Current: {epoch:-1,offset-1} for Partition: __consumer_offsets-40. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)
[2018-10-22 11:28:19,010] INFO [GroupCoordinator 0]: Preparing to rebalance group group1 with old generation 1 (__consumer_offsets-40) (kafka.coordinator.group.GroupCoordinator)
[2018-10-22 11:28:19,013] INFO [GroupCoordinator 0]: Group group1 with generation 2 is now empty (__consumer_offsets-40) (kafka.coordinator.group.GroupCoordinator)
[2018-10-22 11:29:29,424] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 11 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2018-10-22 11:39:29,414] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2018-10-22 11:49:29,414] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)

?

?

运行spark

?/usr/local/spark-2.3.0/bin/spark-submit --class cn.xdf.userprofile.stream.DirectKafka --master yarn --driver-memory 2g ? ? --num-executors 1 ? ? ?--executor-memory 2g ? ? --executor-cores 1 ?userprofile2.0.jar localhost:9092 test?

2018-10-22 11:28:16 INFO ?DAGScheduler:54 - Submitting 1 missing tasks from ResultStage 483 (ShuffledRDD[604] at reduceByKey at DirectKafka.scala:46) (first 15 tasks are for partitions Vector(1))
2018-10-22 11:28:16 INFO ?TaskSchedulerImpl:54 - Adding task set 483.0 with 1 tasks
2018-10-22 11:28:16 INFO ?TaskSetManager:54 - Starting task 0.0 in stage 483.0 (TID 362,localhost,executor driver,partition 1,PROCESS_LOCAL,7649 bytes)
2018-10-22 11:28:16 INFO ?Executor:54 - Running task 0.0 in stage 483.0 (TID 362)
2018-10-22 11:28:16 INFO ?ShuffleBlockFetcherIterator:54 - Getting 0 non-empty blocks out of 1 blocks
2018-10-22 11:28:16 INFO ?ShuffleBlockFetcherIterator:54 - Started 0 remote fetches in 0 ms
2018-10-22 11:28:16 INFO ?Executor:54 - Finished task 0.0 in stage 483.0 (TID 362). 1091 bytes result sent to driver
2018-10-22 11:28:16 INFO ?TaskSetManager:54 - Finished task 0.0 in stage 483.0 (TID 362) in 4 ms on localhost (executor driver) (1/1)
2018-10-22 11:28:16 INFO ?TaskSchedulerImpl:54 - Removed TaskSet 483.0,whose tasks have all completed,from pool?
2018-10-22 11:28:16 INFO ?DAGScheduler:54 - ResultStage 483 (print at DirectKafka.scala:47) finished in 0.008 s
2018-10-22 11:28:16 INFO ?DAGScheduler:54 - Job 241 finished: print at DirectKafka.scala:47,took 0.009993 s
-------------------------------------------
Time: 1540178896000 ms
-------------------------------------------

?

?启动生产者

[ [email?protected] kafka_2.11-1.0.0]# bin/kafka-console-producer.sh --topic test --broker-list localhost:9092

?

> ?hello you

?

> ?hello me

?

查看结果:

(hello,2)
(me,1)
(you,1)
2018-10-22 11:57:08 INFO ?JobScheduler:54 - Finished job streaming job 1540180628000 ms.0 from job set of time 1540180628000 ms
2018-10-22 11:57:08 INFO ?JobScheduler:54 - Total delay: 0.119 s for time 1540180628000 ms (execution: 0.072 s)
2018-10-22 11:57:08 INFO ?ShuffledRDD:54 - Removing RDD 154 from persistence list
2018-10-22 11:57:08 INFO ?MapPartitionsRDD:54 - Removing RDD 153 from persistence list
2018-10-22 11:57:08 INFO ?BlockManager:54 - Removing RDD 153
2018-10-22 11:57:08 INFO ?BlockManager:54 - Removing RDD 154
2018-10-22 11:57:08 INFO ?MapPartitionsRDD:54 - Removing RDD 152 from persistence list
2018-10-22 11:57:08 INFO ?BlockManager:54 - Removing RDD 152
2018-10-22 11:57:08 INFO ?MapPartitionsRDD:54 - Removing RDD 151 from persistence list
2018-10-22 11:57:08 INFO ?BlockManager:54 - Removing RDD 151
2018-10-22 11:57:08 INFO ?KafkaRDD:54 - Removing RDD 150 from persistence list
2018-10-22 11:57:08 INFO ?BlockManager:54 - Removing RDD 150

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读