
时间 | 版本/阶段 |
|---|---|
2010 年 Spark 诞生 | Spark 项目在加州大学伯克利分校开源,PySpark 作为早期实验性接口出现,主要基于 RDD。 |
2014 年 Spark 1.0 | RDD 时代。PySpark 功能逐渐完善,但性能受限于 Python 与 JVM 之间的序列化开销,优化器支持较弱。 |
2016 年 Spark 2.0 | 里程碑版本。引入 DataFrame API 和 Catalyst 优化器。PySpark 代码可享受 SQL 级优化,性能大幅提升,成为主流用法。 |
2020 年 Spark 3.0 | Pandas 集成。引入 Pandas API on Spark,允许用户像使用 Pandas 一样操作大数据;优化 Arrow 格式,减少序列化开销。 |
2022 年 Spark 3.3+ | 云原生与 AI。增强对 Kubernetes 的支持; |
优势 | 说明 |
|---|---|
易用性 | 语法简洁,拥有庞大的 Python 数据科学生态库支持。 |
高性能 | 基于内存计算,比 Hadoop MapReduce 快 10-100 倍;支持 DAG 有向无环图优化。 |
可扩展性 | 轻松从单机扩展到数千台节点的集群,处理 PB 级数据。 |
多功能性 | 批处理、流处理、SQL、机器学习、图计算全支持。 |
容错性 | 基于 RDD 的血缘机制,节点故障时可自动恢复数据。 |
劣势 | 说明 |
|---|---|
小数据 overhead | 启动 Spark 上下文需要时间,处理小数据(<1GB)时不如 Pandas 快。 |
调试复杂 | 分布式环境下的报错堆栈往往很长,定位 Python 端的具体错误有时较难。 |
序列化开销 | 尽管有 Arrow 优化,但在涉及复杂 Python 对象(如自定义类)传输时,仍有性能损耗。 |
资源消耗 | 默认占用较多内存,不适合资源极其受限的边缘设备。 |
from pyspark.sql import SparkSessionfrom pyspark.sql.functions import col, sum as _sum# 1. 初始化 SparkSession (入口点)spark = SparkSession.builder \.appName("SimplePySparkDemo") \.master("local[*]") \.getOrCreate()# 2. 创建简单的 DataFrame (模拟读取数据)data = [("Alice", "Sales", 3000),("Bob", "Sales", 4000),("Charlie", "IT", 5000),("David", "IT", 4500),("Eve", "HR", 3500)]columns = ["Name", "Department", "Salary"]df = spark.createDataFrame(data, columns)print("--- 原始数据 ---")df.show()# 3. 数据转换:过滤 (Filter) 和 选择 (Select)# 只保留工资大于 4000 的员工high_salary_df = df.filter(col("Salary") > 4000).select("Name", "Salary")print("--- 工资大于 4000 的员工 ---")high_salary_df.show()# 4. 数据聚合:按部门分组求和 (GroupBy)dept_salary_df = df.groupBy("Department").agg(_sum("Salary").alias("Total_Salary"))print("--- 各部门工资总和 ---")dept_salary_df.show()# 5. 停止 Spark 会话spark.stop()
PySpark 允许你注册临时表,然后直接用 SQL 语句查询。看下面第13行代码,直接就是原生SQL语法,这对熟悉 SQL 的小伙伴们可以说是非常友好了!
from pyspark.sql import SparkSessionspark = SparkSession.builder.appName("SQLDemo").master("local[*]").getOrCreate()# 创建数据data = [("2023-01-01", 100), ("2023-01-02", 150), ("2023-01-03", 200)]df = spark.createDataFrame(data, ["Date", "Volume"])# 注册为临时视图df.createOrReplaceTempView("trades")# 使用 SQL 查询result = spark.sql("SELECT * FROM trades WHERE Volume > 120")print("--- SQL 查询结果 ---")result.show()spark.stop()
合集 | 文章 |
|---|---|