top of page

使用 Scala、Python 和 SQL 中的 Apache Spark DataFrames 和 Spark SQL 掌握聚合

如果您想驾驭大数据的力量,Apache Spark 是您的理想之选。它提供强大的 API 和丰富的生态系统,非常适合处理大型数据集。尤其是 Spark 能够使用 DataFrames 和 Spark SQL 进行聚合,这使其成为一个非常宝贵的工具。本文将指导您使用 Scala 和 Python 语言,使用 Spark DataFrames 和 Spark SQL 执行聚合操作。您将看到实用的代码示例来巩固您的理解。


理解 Spark DataFrames


Spark DataFrames 是按命名列组织的分布式数据集合,类似于关系数据库中的表。这种结构允许高效的数据操作。DataFrames 借助 Catalyst 等用于查询优化的功能和用于内存管理的 Tungsten 引擎等功能,提升了操作性能。


DataFrames 的语法简洁直观,让您能够无缝执行函数式和类似 SQL 的操作。例如,您无需编写大量样板代码即可执行计算,无论新手还是经验丰富的开发人员都能轻松上手。


设置您的 Spark 环境


在深入研究聚合之前,让我们先建立后续示例所需的 Spark 环境。这涉及安装 Apache Spark 并根据你的操作系统和工作区偏好,为其配置 Scala 和 Python 版本。


Scala 的环境设置


对于 Scala,首先安装 Scala 构建工具 (SBT)。以下是 `build.sbt` 文件的简单配置:

name := "SparkDataFrameExample"
version := "0.1"
scalaVersion := "2.12.10"

libraryDependencies += "org.apache.spark" %% "spark-core" % "3.1.2"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.1.2"

Python 环境设置


对于 Python 用户,关键要求是安装 PySpark。您可以通过 pip 轻松安装:

```bash
pip install pyspark
```

设置好环境后,您就可以探索聚合操作了。


Spark DataFrames 中的聚合


聚合是数据分析的一个重要方面,它可以帮助您汇总和理解复杂的数据模式。Spark 为各种聚合操作提供了强大的功能,包括 `groupBy`、`agg` 以及一系列聚合函数。


使用 Scala 进行聚合


让我们考虑使用 Scala 进行实际聚合示例的销售数据集:

case class Sales(transactionId: Int, product: String, amount: Double, quantity: Int)

val salesData = Seq(
  Sales(1, "Widget", 20.0, 5),
  Sales(2, "Gadget", 30.0, 8),
  Sales(3, "Widget", 20.0, 3),
  Sales(4, "Gadget", 30.0, 10)
)

val salesDf = salesData.toDF()

现在,让我们对这个数据集执行一些聚合:

// Total Sales Amount
val totalSales = salesDf.agg(sum("amount").as("total_amount"))

totalSales.show()

// Average Quantity
val averageQuantity = salesDf.agg(avg("quantity").as("average_quantity"))

averageQuantity.show()

// Grouped Aggregation by Product
val salesByProduct = salesDf
  .groupBy("product")
  .agg(sum("amount").as("total_sales"), avg("quantity").as("average_quantity"))

salesByProduct.show()

使用 Python 进行聚合


在 Python 中,使用 PySpark 处理相同的销售数据集如下所示:


from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, avg

spark = SparkSession.builder.appName("SalesAggregation").getOrCreate()
data = [
    (1, "Widget", 20.0, 5),
    (2, "Gadget", 30.0, 8),
    (3, "Widget", 20.0, 3),
    (4, "Gadget", 30.0, 10)
]

columns = ["transactionId", "product", "amount", "quantity"]
sales_df = spark.createDataFrame(data, columns)

总销售额

total_sales = sales_df.agg(sum("amount").alias("total_amount"))
total_sales.show()

平均数量

average_quantity = sales_df.agg(avg("quantity").alias("average_quantity"))
average_quantity.show()

按产品分组聚合

sales_by_product = sales_df.groupBy("product").agg(
sum("amount").alias("total_sales"),
    avg("quantity").alias("average_quantity")
)

sales_by_product.show()

使用 Spark SQL 进行聚合


除了使用 DataFrames 之外,Spark SQL 还允许直接在 DataFrames 上运行 SQL 查询,将 SQL 的易用性与 Spark 的性能相结合。


首先使用 Python 创建一个临时视图:

sales_df.createOrReplaceTempView("sales")

接下来,执行如下 SQL 命令:

-- SQL query for total sales amount
SELECT SUM(amount) AS total_amount FROM sales;

-- SQL query for average quantity
SELECT AVG(quantity) AS average_quantity FROM sales;

-- SQL query for aggregated results by product
SELECT product, SUM(amount) AS total_sales, AVG(quantity) AS average_quantity
FROM sales
GROUP BY product;

聚合的实例


有了理论知识,让我们深入研究实际示例,以加强您对聚合的理解。


统计不同产品的销量


计数不同的值对于各种分析都至关重要。以下是如何在 Scala 和 Python 中实现它。


Scala

val distinctProductsCount = salesDf.select("product").distinct().count()
println(s"Distinct products sold: $distinctProductsCount")

Python

distinct_products_count = sales_df.select("product").distinct().count()
print(f"Distinct products sold: {distinct_products_count}")

计算每日总销售额


假设你想查看每日销售趋势。首先,你需要在“sales”DataFrame 中添加日期信息。


准备数据


使用 Python 为示例添加日期列:
data_with_dates = [
    (1, "Widget", 20.0, 5, "2023-10-01"),
    (2, "Gadget", 30.0, 8, "2023-10-01"),
    (3, "Widget", 20.0, 3, "2023-10-02"),
    (4, "Gadget", 30.0, 10, "2023-10-02")
]

columns_with_dates = ["transactionId", "product", "amount", "quantity", "date"]

sales_df_with_dates = spark.createDataFrame(data_with_dates, columns_with_dates)

聚合示例

按日期汇总销售额的代码在 Scala 和 Python 中看起来类似:


Scala
val dailySales = salesDfWithDates
  .groupBy("date")
  .agg(sum("amount").as("total_sales"))

dailySales.show()
Python
daily_sales = sales_df_with_dates.groupBy("date").agg(sum("amount").alias("total_sales"))

daily_sales.show()

优化技术


为了在使用 Spark 进行聚合时最大限度地提高性能,请考虑以下优化技术:


  1. 使用分区:它对大型数据集有益,因为它最大限度地减少了聚合中涉及的数据量,从而加快了进程。


  2. 缓存中间数据帧:通过减少不必要的重新计算,缓存可以在同一数据集上运行多个操作时提高性能。


  3. 利用广播连接:当一个 DataFrame 明显较小时,广播它可以防止对大型数据集进行混洗,从而提高速度。


总结见解


理解并掌握使用 Apache Spark DataFrames 和 Spark SQL 进行聚合可以显著提升您的大数据分析能力。通过本文提供的知识和实践示例,您现在掌握了执行高级数据处理并从数据集中获取宝贵洞察的工具。持续尝试,探索更深入的洞察,提升您的分析能力!


带有 Spark 徽标的现代数据处理设置的广角视图
Data processing with Spark DataFrames and SQL

马萨诸塞州贝德福德 01730

bottom of page