课程回顾:

1并行计算中的两个框架  spark flink

2批处理和流处理的区别: spark 先批 后实现的流   flink

3RDD :弹性分布式数据集

sc.textFile()

RDD的转换: filter  map  flatmap  reducebykey

RDD 动作: count  collect

RDD 持久化:

4Dataframe   tempView  Spark SQL

5Streaming: 3.5 不支持python JAVA 2.4.5可以用python

6结构化流:  无界的二维表

7机器学习:Spark ML ,并行训练,决策树、随机深林、逻辑斯蒂回归等

             flink上的ML ALINK

8深度学习:神经网络,卷积神经网络、图神经网络、循环神经网络;

   大部分的编程语言是python

   人脸识别,目标检测,图像计数

   生成式AI :文本生成图片 、文本生成视频、智能问答

 

 

Spark+Kafka构建实时分析Dashboard  https://dblab.xmu.edu.cn/post/spark-kafka-dashboard/

 

淘宝双11数据分析与预测 https://dblab.xmu.edu.cn/post/8116/

 

基于售交易数据的Spark数据处理与分析 https://dblab.xmu.edu.cn/blog/2652/

基于信用卡逾期数据的Spark数据处理与分析 https://dblab.xmu.edu.cn/blog/2707/

 

 

1 案例的目标:

  购物日志的模拟产生过程,实时的统计购物日志中男和女的数据,做一个网站,能够实时展示统计结果

2流程

   手机作为kafka的生成者角色出现,送到某个topic

   通过sleep的方式进行数据模拟发送,案例中每秒10个数据

   Spark Struct Steaming :

   一个输入: kafka

   一个输出: kafka  topic

不一样

  Flask: 做为kafka的消费者存在,消费到的数据通过

  SOCKETIO的方式主动推送给浏览器,浏览器收到数据后会自动更新图表

3

 

把数据拷贝linux

 

 

安装依赖

pip install flask==1.1.2 -i https://pypi.douban.com/simple

pip install Flask-SocketIO==5.1.0 -i https://pypi.douban.com/simple

pip install kafka-python==2.0.2 -i https://pypi.douban.com/simple

 

启动kafka ,创建两个topic

新建一个终端,启动kafkaserver

 

创建两个topic sex result

cd ~/kafka

./bin/kafka-topics.sh  --list  --bootstrap-server  localhost:9092

./bin/kafka-topics.sh  --create  --bootstrap-server  localhost:9092 --replication-factor  1  --partitions  1  --topic  sex

./bin/kafka-topics.sh  --create  --bootstrap-server  localhost:9092 --replication-factor  1  --partitions  1  --topic  result

 

 

写模拟发送程序

解压数据

 

日志发送,kafka的生产者

 

 

  

 

消费者做测试

先启动消费者

再启动生产者

 

 

启动HDFS,用HDFS作为检查点处理

 

编辑流计算的文件

 

修改刚才的消费者,查看 topic result的结果、

 

 

先启动 streaming

再启动消费者

最后启动生产者 ,让它持续一分钟, 修改count

 

 

 

cd ~

spark3/bin/spark-submit  --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 streamingcount.py localhost:9092 test-consumer-group sex 1

包版本

 

 

 

下载flask项目,上传到linux

 

 

 

卸载 jinjia2默认版本 3.1.2 ,安装3.0.3

pip uninstall Jinja2

pip install Jinja2==3.0.2 -i https://pypi.douban.com/simple

 

 

Itsdangerous 2.1.2 降级 2.0.1

pip uninstall itsdangerous

pip install itsdangerous==2.0.1 -i https://pypi.douban.com/simple

 

 

werkzeug.wrappers 3.0.1  降级 1.0.1

 

pip uninstall werkzeug

pip install werkzeug==1.0.1 -i https://pypi.douban.com/simple