spark3.0教程:Streaming案例-电商平台实时分析热门商品 作者:马育民 • 2025-12-16 16:51 • 阅读:10003 # 介绍 双11、双12、618期间,电商平台根据客户点击广告进来查看商品,实时记录商品查看次数、用户停留时间、用户是否收藏商品、购买数量。 通过这些数据能分析出这段时间内哪些商品最受普遍人们的关注,同时也可以针对这些数据进行用户商品推荐、将热门商品上架到首页黄金板块、实时更新热榜等 ### 流程 [](https://www.malaoshi.top/upload/0/0/1GW2PyiRxtFy.png) - 使用Scoket来模拟用户浏览商品产生实时数据,数据包括用户当前浏览的商品以及浏览商品的次数和停留时间和是否收藏该商品。 - 使用Spark Streaming构建实时数据处理系统,来计算当前电商平台最受人们关注的商品。 # 创建工程 略 ### pom.xml 由于是实时计算,所以需要依赖 `spark-streaming_2.12` ``` 4.0.0 org.example spark_std 1.0-SNAPSHOT 8 8 UTF-8 org.apache.spark spark-core_2.12 3.0.0 org.apache.spark spark-mllib_2.12 3.0.0 org.jblas jblas 1.2.3 org.apache.spark spark-streaming_2.12 3.0.0 org.apache.spark spark-yarn_2.12 3.0.0 src/main/scala net.alchim31.maven scala-maven-plugin 3.3.2 compile testCompile ``` # 模拟发送数据 使用 socket 发送数据,数据格式: ``` 商品ID::浏览次数::停留时间::是否收藏::购买件数 ``` ### 代码 ``` package top.malaoshi.guanggao; import java.io.IOException; import java.io.OutputStream; import java.io.PrintWriter; import java.net.ServerSocket; import java.net.Socket; import java.util.Random; public class SimulatorSocket { public static void main(String[] args) throws Exception { //创建线程来启动模拟器 new Thread(new SimulatorSocketLog()).start(); } static class SimulatorSocketLog implements Runnable{ //假设一共有200个商品 private int GOODSID = 200; //随机发送消息的条数 private int MSG_NUM = 30; //假设用户浏览该商品的次数 private int BROWSE_NUM = 5; //假设用户浏览商品停留的时间 private int STAY_TIME = 10; //用来体现用户是否收藏,收藏为1,不收藏为0,差评为-1 int[] COLLECTION = new int[]{-1,0,1}; //用来模拟用户购买商品的件数,0比较多是为了增加没有买的概率毕竟不买的还是很多的,很多用户都只是看看 private int[] BUY_NUM = new int[]{0,1,0,2,0,0,0,1,0}; public void run() { Random r = new Random(); try { /* *创建一个服务器端,监听9999端口,客户端就是Streaming,通过看源码才知道,Streaming *socketTextStream 其实就是相当于一个客户端 */ ServerSocket sScoket = new ServerSocket(9999); System.out.println("启动成功!"); while(true){ //随机消息数 int msgNum = r.nextInt(MSG_NUM)+1; //开始监听 Socket socket = sScoket.accept(); //创建输出流 OutputStream os = socket.getOutputStream(); //包装输出流 PrintWriter pw = new PrintWriter(os); for (int i = 0; i < msgNum; i++) { //消息格式:商品ID::浏览次数::停留时间::是否收藏::购买件数 StringBuffer sb = new StringBuffer(); sb.append("goodsID-"+(r.nextInt(GOODSID)+1)); sb.append("::"); sb.append(r.nextInt(BROWSE_NUM)+1); sb.append("::"); sb.append(r.nextInt(STAY_TIME)+r.nextFloat()); sb.append("::"); sb.append(COLLECTION[r.nextInt(2)]); sb.append("::"); sb.append(BUY_NUM[r.nextInt(9)]); System.out.println(sb); //发送消息 pw.println(sb); } pw.flush(); pw.close(); try { Thread.sleep(5000); } catch (InterruptedException e) { // TODO Auto-generated catch block System.out.println("thread sleep failed"); } } } catch (IOException e) { // TODO Auto-generated catch block System.out.println("port used"); } } } } ``` # spark Streaming实时分析 接收数据并进行处理 ``` package top.malaoshi.guanggao; import org.apache.spark.HashPartitioner; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.Optional; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2; import java.util.List; public class Main { /** * 计算商品关注度 * @param pv 浏览次数 * @param time 停留时间 * @param collection 是否收藏 * @param num 购买件数 * @return */ static double calc(double pv,double time,double collection,double num){ return pv * 0.8+time * 0.6 + collection * 1 + num * 1; } public static void main(String[] args) { SparkConf sparkConf = new SparkConf().setAppName("StreamingGoods").setMaster("local[2]"); //AppName 自然就是当前Job的名字,Master就是Spark的主机地址,这里采用的是本地模式 // new Duration(5000) 窗口时间,单位毫秒,即:每5秒钟,将接收的数据进行计算 JavaStreamingContext jsc = new JavaStreamingContext(sparkConf,new Duration(5000)); jsc.checkpoint("checkPoint/guanggao"); JavaReceiverInputDStream jds = jsc.socketTextStream("127.0.0.1", 9999); //mapToPair就是将rdd转换为键值对rdd,与map不同的是添加了一个key JavaPairDStream splitMess = jds.mapToPair(new PairFunction(){ public Tuple2 call(String line) throws Exception { //消息格式:商品ID::浏览次数::停留时间::是否收藏::购买件数 String[] lineSplit = line.split("::"); double pv = Double.parseDouble(lineSplit[1]); double time = Double.parseDouble(lineSplit[2]); double collection = Double.parseDouble(lineSplit[3]); double num = Double.parseDouble(lineSplit[4]); double followValue = calc(pv,time,collection,num); return new Tuple2(lineSplit[0], followValue); }}); JavaPairDStream UpdateFollowValue = splitMess.updateStateByKey( new Function2, Optional,Optional>(){ public Optional call(List newValues, Optional statValue) throws Exception { // 对相同的key进行value统计,实现累加 Double updateValue = statValue.orElse(0.0); for (Double values : newValues) { updateValue += values; } return Optional.of(updateValue); }},new HashPartitioner(jsc.sparkContext().defaultParallelism())); UpdateFollowValue.foreachRDD(new VoidFunction>(){ public void call(JavaPairRDD followValue) throws Exception { // 将商品关注度作为key,商品名作为value,然后根据key倒序排序(只能根据key进行排序) JavaPairRDD followValueSort = followValue.mapToPair(new PairFunction,Double,String>(){ public Tuple2 call( Tuple2 valueToKey) throws Exception { return new Tuple2(valueToKey._2,valueToKey._1); } }).sortByKey(false); // 再将商品名作为key,关注度作为value,取前10个商品 List> list = followValueSort.mapToPair(new PairFunction,String, Double>() { public Tuple2 call( Tuple2 arg0) throws Exception { // TODO Auto-generated method stub return new Tuple2(arg0._2,arg0._1); } }).take(10); for (Tuple2 tu : list) { System.out.println("商品ID: "+tu._1+" 关注度: "+tu._2); } }}); jsc.start(); try { jsc.awaitTermination(); } catch (InterruptedException e) { e.printStackTrace(); } } } ``` ### 1. 环境初始化(Spark Streaming上下文) ```java SparkConf sparkConf = new SparkConf().setAppName("StreamingGoods").setMaster("local[2]"); JavaStreamingContext jsc = new JavaStreamingContext(sparkConf,new Duration(5000)); jsc.checkpoint("checkPoint/guanggao"); ``` - **核心参数**: - `local[2]`:本地模式,启动2个线程(Spark Streaming至少需要2个线程:1个接收数据,1个处理数据); - `new Duration(5000)`:批次间隔(Batch Interval),每5秒处理一次接收到的数据; - `checkpoint`:设置检查点目录,用于 `updateStateByKey` 保存累加状态(容错+状态持久化)。 - **关键注意**:`updateStateByKey` 必须依赖Checkpoint,否则会报错。 ### 2. 数据接收(Socket数据源) ```java JavaReceiverInputDStream jds = jsc.socketTextStream("127.0.0.1", 9999); ``` - 从本地9999端口接收字符串格式的实时数据,数据格式为:`商品ID::浏览次数::停留时间::是否收藏::购买件数`(如 `1001::2::15.5::1::0`); - `JavaReceiverInputDStream`:基于接收器的DStream,适用于Socket、Kafka(旧版)等数据源。 ### 3. 单批次关注度计算(mapToPair) ```java JavaPairDStream splitMess = jds.mapToPair(new PairFunction(){ public Tuple2 call(String line) throws Exception { String[] lineSplit = line.split("::"); double pv = Double.parseDouble(lineSplit[1]); double time = Double.parseDouble(lineSplit[2]); double collection = Double.parseDouble(lineSplit[3]); double num = Double.parseDouble(lineSplit[4]); double followValue = calc(pv,time,collection,num); return new Tuple2(lineSplit[0], followValue); } }); // 关注度计算公式:浏览(0.8)+停留(0.6)+收藏(1)+购买(1) static double calc(double pv,double time,double collection,double num){ return pv * 0.8+time * 0.6 + collection * 1 + num * 1; } ``` - **核心逻辑**: - 解析每行数据,提取商品ID和行为指标(浏览、停留、收藏、购买); - 调用 `calc` 方法计算单批次内该商品的关注度分值; - 转换为 `(商品ID, 单批次关注度)` 的键值对DStream。 - **潜在风险**:未处理数据格式异常(如字段缺失、非数字值),会导致 `ArrayIndexOutOfBoundsException` 或 `NumberFormatException`。 ### 4. 全局关注度累加(updateStateByKey) ```java JavaPairDStream UpdateFollowValue = splitMess.updateStateByKey( new Function2, Optional,Optional>(){ public Optional call(List newValues, Optional statValue) throws Exception { // 累加逻辑:历史状态值 + 本批次新值 Double updateValue = statValue.orElse(0.0); // 无历史状态则初始化为0 for (Double values : newValues) { updateValue += values; } return Optional.of(updateValue); } },new HashPartitioner(jsc.sparkContext().defaultParallelism()) ); ``` - **`updateStateByKey` 核心作用**:跨批次维护Key的状态(即商品关注度的全局累加),是Spark Streaming实现“有状态计算”的核心算子; - **参数解析**: - `List newValues`:本批次当前商品ID的所有关注度分值(一个批次内可能有多个同商品数据); - `Optional statValue`:该商品ID的历史累加状态(Checkpoint中保存); - `HashPartitioner`:指定分区器,保证状态更新的一致性(避免数据倾斜)。 - **逻辑说明**:历史累加值 + 本批次所有新值 = 最新全局关注度。 ### 5. Top10商品筛选与输出 ```java UpdateFollowValue.foreachRDD(new VoidFunction>(){ public void call(JavaPairRDD followValue) throws Exception { // 步骤1:交换KV,转为 (关注度, 商品ID),用于按关注度排序 JavaPairRDD followValueSort = followValue.mapToPair( new PairFunction,Double,String>(){ public Tuple2 call(Tuple2 valueToKey) throws Exception { return new Tuple2(valueToKey._2,valueToKey._1); } }).sortByKey(false); // false=降序排序 // 步骤2:交换KV回原格式,取Top10 List> list = followValueSort.mapToPair( new PairFunction,String, Double>() { public Tuple2 call(Tuple2 arg0) throws Exception { return new Tuple2(arg0._2,arg0._1); } }).take(10); // 步骤3:输出Top10结果 for (Tuple2 tu : list) { System.out.println("商品ID: "+tu._1+" 关注度: "+tu._2); } } }); ``` - **核心步骤**: 1. 交换KV:Spark的 `sortByKey` 只能按Key排序,因此先将 `(商品ID, 关注度)` 转为 `(关注度, 商品ID)`; 2. 降序排序:`sortByKey(false)` 按关注度从高到低排序; 3. 还原KV:将排序后的结果转回 `(商品ID, 关注度)` 格式; 4. 取Top10:`take(10)` 获取前10条数据并打印。 - **关键注意**:`take(10)` 是Driver端操作,若RDD数据量极大,需注意Driver内存占用。 ### 6. 启动流式任务 ```java jsc.start(); // 启动流式计算 jsc.awaitTermination(); // 阻塞主线程,等待任务终止(如手动停止/异常) ``` 原文出处:http://www.malaoshi.top/show_1GW2PywMKKie.html