No title
基于 Spark 的电商实时页面访问量统计
案例场景说明
本案例基于电商实时页面访问量统计场景,实现从数据生成到 Spark 流处理的完整链路。核心流程如下:
- 数据生成:模拟用户访问日志(含 URL、IP、时间戳等字段),推送至 Kafka;
- 流处理:Spark 消费 Kafka 数据,过滤无效 URL 并统计每分钟页面访问量
- 结果展示:聚合结果实时输出到控制台。
案例逻辑总体图
1 | flowchart TD |
关键流程说明与 Spark API
- 数据源接入
• 输入源:从 Kafka 消费 JSON 格式的页面访问日志。
• API 使用:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "page_views")
.load()
```
2. **数据解析与清洗**
- **结构化处理**:提取 `URL`、`timestamp` 等字段,过滤非商品页面(如 `/login`)。
- **API 使用**:
```scala
.selectExpr("CAST(value AS STRING) AS log")
.withColumn("url", split(col("log"), " ")(2))
.filter(col("url").like("/product/%"))
```
3. **窗口聚合统计**
- **滑动窗口**:按 1 分钟窗口、10 秒滑动步长统计访问量。
- **API 使用**:
```scala
.groupBy(
window(col("timestamp"), "1 minute", "10 seconds"),
col("url")
)
.agg(count("*").alias("view_count"))
```
4. **结果输出**
- **控制台展示**:实时输出 Top 5 热门商品。
- **持久化存储(可选)**:写入 MySQL 供后续查询或 HBase 供离线分析。
- **API 使用**:
```scala
.writeStream
.outputMode("complete")
.format("console")
.start()
```
#### 技术亮点(供参考学习)
• **动态水位线**:通过 `withWatermark("timestamp", "2 minutes")` 处理延迟数据。
• **容错机制**:启用检查点保证 Exactly-Once 语义。
• **性能优化**:对齐 Kafka 分区与 Spark 并行度,避免数据倾斜。
#### 代码实现参考
##### 1. 生成测试数据并推送至 Kafka(Python 脚本)
```python
# generate_kafka_data.py
from kafka import KafkaProducer
import json
import random
import time
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: json.dumps(x).encode('utf-8')
)
# 模拟商品页面 URL(引用网页6的URL生成逻辑)
url_paths = ["/product/101", "/product/205", "/product/309", "/login", "/search"]
def generate_log():
return {
"user_id": random.randint(1000, 9999),
"timestamp": int(time.time() * 1000), # 毫秒级时间戳
"url": random.choice(url_paths),
"ip": f"{random.randint(1,255)}.{random.randint(1,255)}.0.1"
}
# 持续发送数据到 Kafka Topic "page_views"
while True:
log_data = generate_log()
producer.send('page_views', value=log_data)
time.sleep(0.1) # 每秒约生成10条数据
2. Spark 流处理作业(Scala,可能转译成 Java 更简单,不用配环境)
1 | // PageViewAnalysis.scala |
运行步骤
启动 Kafka:
1
2
3# 创建 Topic
kafka-topics.sh --create --bootstrap-server localhost:9092 \
--topic page_views --partitions 1 --replication-factor 1运行数据生成脚本:
1
python generate_kafka_data.py
提交 Spark 作业:
1
2
3spark-submit --class PageViewAnalysis \
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1 \
target/scala-2.12/pageview-analysis_2.12-1.0.jar
All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.