top of page

Dominar las agregaciones con Apache Spark DataFrames y Spark SQL en Scala, Python y SQL

Si quieres aprovechar el potencial del big data, Apache Spark es tu framework de referencia. Ofrece API robustas y un ecosistema completo, perfecto para procesar grandes conjuntos de datos. En particular, la capacidad de Spark para realizar agregaciones mediante DataFrames y Spark SQL lo convierte en una herramienta invaluable. Esta publicación te guiará en la realización de agregaciones con Spark DataFrames y Spark SQL utilizando Scala y Python. Verás ejemplos de código prácticos para consolidar tus conocimientos.


Comprensión de los DataFrames de Spark


Los DataFrames de Spark son conjuntos de datos distribuidos organizados en columnas con nombre, similares a las tablas de una base de datos relacional. Esta estructura permite una manipulación eficiente de los datos. Los DataFrames mejoran el rendimiento de las operaciones gracias a funciones como Catalyst para la optimización de consultas y el motor Tungsten para la gestión de memoria.


La sintaxis de los DataFrames es sencilla e intuitiva, lo que permite ejecutar operaciones funcionales y similares a SQL sin problemas. Por ejemplo, se pueden realizar cálculos sin necesidad de código repetitivo extenso, lo que facilita el trabajo tanto a principiantes como a desarrolladores experimentados.


Configuración de su entorno Spark


Antes de profundizar en las agregaciones, establezcamos el entorno de Spark necesario para los ejemplos siguientes. Esto implica instalar Apache Spark y configurarlo para Scala y Python, según las preferencias de su sistema operativo y espacio de trabajo.


Configuración del entorno para Scala


Para Scala, comience instalando la herramienta Scala Build Tool (SBT). A continuación, se muestra una configuración sencilla para el archivo `build.sbt`:

name := "SparkDataFrameExample"
version := "0.1"
scalaVersion := "2.12.10"

libraryDependencies += "org.apache.spark" %% "spark-core" % "3.1.2"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.1.2"

Configuración del entorno para Python


Para los usuarios de Python, el requisito clave es tener instalado PySpark. Se puede instalar fácilmente mediante pip:

```bash
pip install pyspark
```

Una vez configurado su entorno, está listo para explorar las operaciones de agregación.


Agregaciones en Spark DataFrames


La agregación es un aspecto vital del análisis de datos que permite resumir y comprender patrones complejos. Spark ofrece potentes funciones para diversas operaciones de agregación, como `groupBy`, `agg` y diversas funciones de agregación.


Uso de Scala para agregaciones


Consideremos un conjunto de datos de ventas para ejemplos prácticos de agregación con Scala:

case class Sales(transactionId: Int, product: String, amount: Double, quantity: Int)

val salesData = Seq(
  Sales(1, "Widget", 20.0, 5),
  Sales(2, "Gadget", 30.0, 8),
  Sales(3, "Widget", 20.0, 3),
  Sales(4, "Gadget", 30.0, 10)
)

val salesDf = salesData.toDF()

Ahora, realicemos algunas agregaciones en este conjunto de datos:

// Total Sales Amount
val totalSales = salesDf.agg(sum("amount").as("total_amount"))

totalSales.show()

// Average Quantity
val averageQuantity = salesDf.agg(avg("quantity").as("average_quantity"))

averageQuantity.show()

// Grouped Aggregation by Product
val salesByProduct = salesDf
  .groupBy("product")
  .agg(sum("amount").as("total_sales"), avg("quantity").as("average_quantity"))

salesByProduct.show()

Uso de Python para agregaciones


En Python, el uso de PySpark para el mismo conjunto de datos de ventas se ve así:


from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, avg

spark = SparkSession.builder.appName("SalesAggregation").getOrCreate()
data = [
    (1, "Widget", 20.0, 5),
    (2, "Gadget", 30.0, 8),
    (3, "Widget", 20.0, 3),
    (4, "Gadget", 30.0, 10)
]

columns = ["transactionId", "product", "amount", "quantity"]
sales_df = spark.createDataFrame(data, columns)

Monto total de ventas

total_sales = sales_df.agg(sum("amount").alias("total_amount"))
total_sales.show()

Cantidad promedio

average_quantity = sales_df.agg(avg("quantity").alias("average_quantity"))
average_quantity.show()

Agregación agrupada por producto

sales_by_product = sales_df.groupBy("product").agg(
sum("amount").alias("total_sales"),
    avg("quantity").alias("average_quantity")
)

sales_by_product.show()

Uso de Spark SQL para agregaciones


Además de utilizar DataFrames, Spark SQL permite ejecutar consultas SQL directamente en DataFrames, combinando la facilidad de uso de SQL con el rendimiento de Spark.


Comience creando una vista temporal con Python:

sales_df.createOrReplaceTempView("sales")

A continuación, ejecute comandos SQL como estos:

-- SQL query for total sales amount
SELECT SUM(amount) AS total_amount FROM sales;

-- SQL query for average quantity
SELECT AVG(quantity) AS average_quantity FROM sales;

-- SQL query for aggregated results by product
SELECT product, SUM(amount) AS total_sales, AVG(quantity) AS average_quantity
FROM sales
GROUP BY product;

Ejemplos prácticos de agregaciones


Equipados con conocimientos teóricos, profundicemos en ejemplos prácticos para fortalecer su comprensión de las agregaciones.


Contar productos distintos vendidos


Contar valores distintos es crucial para diversos análisis. Aquí te explicamos cómo lograrlo en Scala y Python.


Escala

val distinctProductsCount = salesDf.select("product").distinct().count()
println(s"Distinct products sold: $distinctProductsCount")

Pitón

distinct_products_count = sales_df.select("product").distinct().count()
print(f"Distinct products sold: {distinct_products_count}")

Cálculo de ventas totales por día


Imagina que quieres examinar las tendencias de ventas diarias. Primero, completarías el DataFrame "ventas" con información de fecha.


Preparar los datos


Agregar una columna de fecha para el ejemplo con Python:
data_with_dates = [
    (1, "Widget", 20.0, 5, "2023-10-01"),
    (2, "Gadget", 30.0, 8, "2023-10-01"),
    (3, "Widget", 20.0, 3, "2023-10-02"),
    (4, "Gadget", 30.0, 10, "2023-10-02")
]

columns_with_dates = ["transactionId", "product", "amount", "quantity", "date"]

sales_df_with_dates = spark.createDataFrame(data_with_dates, columns_with_dates)

Ejemplo de agregación

El código para sumar las ventas totales por fecha es similar tanto en Scala como en Python:


Escala
val dailySales = salesDfWithDates
  .groupBy("date")
  .agg(sum("amount").as("total_sales"))

dailySales.show()
Pitón
daily_sales = sales_df_with_dates.groupBy("date").agg(sum("amount").alias("total_sales"))

daily_sales.show()

Técnicas de optimización


Para maximizar el rendimiento al usar Spark para agregaciones, considere estas técnicas de optimización:


  1. Usar particionamiento : es beneficioso para conjuntos de datos grandes ya que minimiza la cantidad de datos involucrados en las agregaciones, acelerando así el proceso.


  2. Almacenamiento en caché de marcos de datos intermedios : el almacenamiento en caché puede mejorar el rendimiento cuando se ejecutan múltiples operaciones en el mismo conjunto de datos al reducir el recálculo innecesario.


  3. Aprovechar las uniones de difusión : cuando un DataFrame es significativamente más pequeño, difundirlo puede evitar la mezcla de grandes conjuntos de datos, lo que mejora la velocidad.


Resumiendo las ideas


Comprender y dominar las agregaciones con Apache Spark DataFrames y Spark SQL puede optimizar considerablemente sus análisis de big data. Con los conocimientos y ejemplos prácticos proporcionados, ahora cuenta con las herramientas para realizar un procesamiento avanzado de datos y extraer información valiosa de sus conjuntos de datos. ¡Siga experimentando para descubrir información más profunda y mejorar sus capacidades analíticas!


Vista de gran angular de una configuración moderna de procesamiento de datos con el logotipo de Spark
Data processing with Spark DataFrames and SQL

Bedford, Massachusetts 01730

bottom of page