spark3.0教程:Streaming案例:过滤黑名单 作者:马育民 • 2025-12-09 23:02 • 阅读:10003 # 先启动服务器端 ``` nc -l -p 8888 ``` # 代码 ``` package top.malaoshi.blacklist import org.apache.spark.SparkConf import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.Seconds object Main { def main(args: Array[String]): Unit = { // 需要过滤的黑名单,为true表示拦截;为false或没在列表中,都会被放行 val bl = Array(("Jim", true), ("hack", true)) val sc = new SparkConf().setAppName("blacklist").setMaster("local[*]") // 创建 StreamingContext,设置每2秒刷新一次 val ssc = new StreamingContext(sc, Seconds(2)) // 设置并行度为3 val blRdd = ssc.sparkContext.parallelize(bl, 3) // 返回ReceiverInputDStream,设置主机名,端口号. val st = ssc.socketTextStream("localhost", 8888) // 对输入数据进行转换,(id,user) => (user, id user) ,以便对每个批次RDD,与之前定义好的黑名单进行leftOuterJoin操作. val users = st.map { l => (l.split(" ")(1), l) } // 输入0003 hack,打印 (hack,0003 hack) users.print() // 调用左外链接操作leftOuterJoin , 进行黑名单匹配,过滤掉. val validRddDS = users.transform(ld => { System.out.print("ld----") ld.foreach(println) // ld 就是 (hack,0003 hack) // ljoinRdd的值:能关联上,结果:(hack,(0003 hack,true));关联不上,结果:(tom,(0001 tom,None)) val ljoinRdd = ld.leftOuterJoin(blRdd) System.out.print("查看ljoinRdd--") ljoinRdd.foreach(println) val fRdd = ljoinRdd.filter(t => { // 能关联上,结果:(hack,(0003 hack,true)) -- t._2._2 的结果 true,getOrElse(false) 返回 true // 关联不上,结果:(tom,(0001 tom,None)) -- t._2._2 返回 None,getOrElse(false) 返回 false if (t._2._2.getOrElse(false)) { false // 被过滤,不会继续往下执行 } else { true // 会继续往下执行 } }) // t的值 都是能关联上,即:(hack,(0003 hack,true)) ,重新组织数据,返回 00003 hack,与输入数据格式相同 val validRdd = fRdd.map(t => t._2._1) validRdd }) // 打印白名单 validRddDS.print() // 执行 ssc.start() // 等待完成 ssc.awaitTermination } } ``` 原文出处:http://www.malaoshi.top/show_1GW2NS37ejI5.html