IT序号网

scala之Spark Streaming Kafka CreateDirectStream 无法解析

third_qq_acbf90bbd2dede1d 2024年08月16日 编程语言 34 0

需要一些帮助,谢谢。

我正在使用 IntelliJ 和 SBT 来构建我的应用。

我正在开发一个应用程序来读取 Spark Streaming 中的 Kafka 主题,以便对其进行一些 ETL 工作。不幸的是,我无法从 Kafka 读取数据。

KafkaUtils.createDirectStream 没有解析并不断给我错误(无法解析符号)。我已经完成研究,看来我有正确的依赖项。

这是我的 build.sbt:

name := "ASUIStreaming" 
version := "0.1" 
scalacOptions += "-target:jvm-1.8" 
scalaVersion := "2.11.11" 
 
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0" 
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.1.0" 
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-8_2.11" % "2.1.0" 
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.1.0" 
libraryDependencies += "org.apache.kafka" %% "kafka-clients" % "0.8.2.1" 
libraryDependencies += "org.scala-lang.modules" %% "scala-parser-combinators" % "1.0.4" 

有什么建议吗?我还应该提到我在笔记本电脑上没有管理员访问权限,因为这是一台工作计算机,而且我使用的是可移植 JDK 和 IntelliJ 安装。但是,我工作中的同事处于同样的情况,并且对他们来说效果很好。

提前致谢!

请您参考如下方法:

这是我正在使用的主要 Spark Streaming 代码片段。 注意:我已经屏蔽了一些 secret 的工作数据,例如IP和Topic名称等。

import org.apache.kafka.clients.consumer.ConsumerRecord 
import kafka.serializer.StringDecoder 
import org.apache.spark._ 
import org.apache.spark.streaming._ 
import org.apache.spark 
import org.apache.kafka.clients.consumer._ 
import org.apache.kafka.common.serialization.StringDeserializer 
import scala.util.parsing.json._ 
import org.apache.spark.streaming.kafka._ 
 
 
 
object ASUISpeedKafka extends App 
 
{ 
  // Create a new Spark Context 
  val conf = new SparkConf().setAppName("ASUISpeedKafka").setMaster("local[*]") 
  val sc = new SparkContext(conf) 
  val ssc = new StreamingContext(sc, Seconds(2)) 
 
  //Identify the Kafka Topic and provide the parameters and Topic details 
  val kafkaTopic = "TOPIC1" 
    val topicsSet = kafkaTopic.split(",").toSet 
    val kafkaParams = Map[String, String] 
  ( 
 
    "metadata.broker.list" -> "IP1:PORT, IP2:PORT2", 
    "auto.offset.reset" -> "smallest" 
  ) 
 
  val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder] 
  ( 
  ssc, kafkaParams, topicsSet 
  ) 
} 


评论关闭
IT序号网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!