课程回顾:
1、并行计算中的两个框架 spark flink
2、批处理和流处理的区别: spark 先批 后实现的流 flink
3、RDD :弹性分布式数据集
sc.textFile()
RDD的转换: filter map flatmap reducebykey
RDD 动作: count collect
RDD 持久化:
4、Dataframe tempView Spark SQL
5、Streaming: 3.5 不支持python 用JAVA , 2.4.5可以用python写
6、结构化流: 无界的二维表
7、机器学习:Spark ML ,并行训练,决策树、随机深林、逻辑斯蒂回归等
flink上的ML 叫 ALINK
8、深度学习:神经网络,卷积神经网络、图神经网络、循环神经网络;
大部分的编程语言是python
人脸识别,目标检测,图像计数
生成式AI :文本生成图片 、文本生成视频、智能问答
Spark+Kafka构建实时分析Dashboard https://dblab.xmu.edu.cn/post/spark-kafka-dashboard/
淘宝双11数据分析与预测 https://dblab.xmu.edu.cn/post/8116/
基于售交易数据的Spark数据处理与分析 https://dblab.xmu.edu.cn/blog/2652/
基于信用卡逾期数据的Spark数据处理与分析 https://dblab.xmu.edu.cn/blog/2707/
1 案例的目标:
购物日志的模拟产生过程,实时的统计购物日志中男和女的数据,做一个网站,能够实时展示统计结果
2、流程
手机作为kafka的生成者角色出现,送到某个topic
通过sleep的方式进行数据模拟发送,案例中每秒10个数据
Spark Struct Steaming :
一个输入: kafka 源
一个输出: kafka topic
不一样
Flask: 做为kafka的消费者存在,消费到的数据通过
SOCKETIO的方式主动推送给浏览器,浏览器收到数据后会自动更新图表
3、

把数据拷贝linux

安装依赖
pip install flask==1.1.2 -i https://pypi.douban.com/simple
pip install Flask-SocketIO==5.1.0 -i https://pypi.douban.com/simple
pip install kafka-python==2.0.2 -i https://pypi.douban.com/simple
启动kafka ,创建两个topic

新建一个终端,启动kafka的server

创建两个topic sex result
cd ~/kafka
./bin/kafka-topics.sh --list --bootstrap-server localhost:9092
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic sex
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic result

写模拟发送程序
解压数据

日志发送,kafka的生产者

消费者做测试

先启动消费者
再启动生产者

启动HDFS,用HDFS作为检查点处理

编辑流计算的文件

修改刚才的消费者,查看 topic result的结果、

先启动 streaming
再启动消费者
最后启动生产者 ,让它持续一分钟, 修改count

cd ~
spark3/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 streamingcount.py localhost:9092 test-consumer-group sex 1
包版本

下载flask项目,上传到linux


卸载 jinjia2默认版本 3.1.2 ,安装3.0.3
pip uninstall Jinja2
pip install Jinja2==3.0.2 -i https://pypi.douban.com/simple

Itsdangerous 2.1.2 降级 2.0.1
pip uninstall itsdangerous
pip install itsdangerous==2.0.1 -i https://pypi.douban.com/simple
werkzeug.wrappers 3.0.1 降级 1.0.1
pip uninstall werkzeug
pip install werkzeug==1.0.1 -i https://pypi.douban.com/simple
