如何利用 Python Dask 进行可扩展数据处理和分析
- Claude Paugh
- 2天前
- 讀畢需時 7 分鐘
在当今数据驱动的世界中,高效地处理和分析海量数据集对软件工程师和数据科学家来说是一项重大挑战。像 Pandas 这样的传统数据处理库虽然用户友好,但可能难以应对许多组织面临的海量数据。这时,Dask 库就显得至关重要了。
借助 Python Dask 库,您可以轻松地使用 Python 对大数据执行复杂的计算。与 GPU 相比,您还可以在成本更低的 CPU 上执行此操作,因此,重要的是要认识到可以在 CPU 上完成的数据整理和预处理,以及最适合 GPU 的算法操作和图像/视频处理。
在这篇博文中,我们将深入探讨 Dask 库的功能,学习如何将其集成到您的工作流程中,并最终利用其功能来优化您的数据处理任务。
了解 Python Dask 库:概述
Dask 是一个开源并行计算库,允许用户高效地将 Python 应用程序从单机扩展到大型集群。与主要在内存中运行的 Pandas 不同,Dask 擅长管理并行计算,并且可以处理大于可用内存的数据集。
Dask 的主要组件包括:
Dask 数组:用于大型多维数组。
Dask DataFrames :用于操作类似于 Pandas DataFrames 的大型数据集。
Dask Bags :用于处理非结构化数据,类似于 Python 的列表。
Dask 的真正魅力在于它能够与现有的 Python 库相结合并无缝运行。
安装 Dask
在深入研究 Dask 之前,您需要在您的环境中安装它。
您可以使用 pip 轻松安装 Dask:
```bash
pip install dask
```
如果您计划将 Dask 与分布式调度程序一起使用,请包含以下内容以获得完整功能:
```bash
pip install dask[distributed]
```
安装完成后,通过检查 Python 环境中的 Dask 版本来验证您的安装:
import dask
print(dask.__version__)
Dask 数组:核外计算
Dask 数组是处理内存无法容纳的大规模数值数据的强大工具。对于内存而言过大的数组可以分解成较小的块,从而允许并行执行计算。
创建 Dask 数组
你可以从 NumPy 轻松创建 Dask 数组。例如:
import dask.array as da
import numpy as np
创建一个大型 NumPy 数组
numpy_array = np.random.rand(10000, 10000)
从 NumPy 数组创建 Dask 数组
x = da.from_array(numpy_array, chunks=(1000, 1000))
这里, “numpy_array”可以是任何大数组, “chunks”控制数组的分割方式。
基本操作
Dask 数组允许执行类似于 NumPy 的操作,但只有在明确要求时才会执行计算。例如:
result = (x + 1).mean()
要计算结果,您可以使用:
final_result = result.compute()
这种方法可以显著提高性能,尤其是在处理大数据集时。
Dask DataFrames:大数据的熟悉界面
如果您熟悉 Pandas,您会发现 Dask DataFrames 非常直观。它们允许通过熟悉的界面处理大型数据集,同时充分利用并行计算的优势。
创建 Dask DataFrames
您可以通过读取 CSV 文件或转换 Pandas DataFrame 来创建 Dask DataFrame,如下所示:
import dask.dataframe as dd
将大型 CSV 文件读入 Dask DataFrame
df = dd.read_csv('large_dataset.csv')
此操作将 CSV 文件划分为多个分区,从而实现并行批处理。Dask 还支持使用 SQL 从 JSON、HDF5、Parquet、Orc 和数据库表中读取数据。Dask DataFrames 可以写入 CSV、HDF5、Parquet 和 SQL。
Dask DataFrames 上的操作
可以轻松执行常见的 DataFrame 操作,例如过滤和合并:
filtered_df = df[df['avg_gain'] > 30]
这些操作是惰性的,这意味着 Dask不会执行任务,直到你调用
.compute()
探索 Dask Bags:处理非结构化数据
Dask Bags 对于处理 Python 对象集合非常有用。这对于处理 JSON 文件、文本数据或任何非结构化类型尤其有利。from_sequence 函数提供了读取 Python 可迭代对象的选项。此外,它还支持读取 Avro 格式的数据。Dask Bags 还可以写入 CSV、JSON 和 Avro 格式的数据。
制作 Dask Bags
您可以通过读取 JSON 文件来创建 Dask Bag:
import dask.bag as db
bag = db.read_text('data/*.json')
Dask Bags 上的操作
Dask Bags 允许进行标准的映射和过滤操作。以下是映射操作的示例:
mapped_bag = bag.map(lambda x: x['slope'])
与数组和数据帧类似,记得调用 `.compute()` 来执行操作:
final_output = mapped_bag.compute()
使用 Dask 进行调度
Dask 会根据您的需求使用不同的调度器。主要选项包括单线程调度器和多线程或分布式调度器。
本地调度器
本地调度程序易于处理较小的工作负载。它利用 Python 的线程功能并行运行任务,非常适合在中等规模的数据集上进行快速计算。
分布式调度器
对于较大的数据集或密集计算,使用“dask.distributed”模块可以显著提升性能。设置 Dask 集群来管理跨多个工作器的计算。以下是快速设置步骤:
from dask.distributed import Client
client = Client() # This starts a local cluster
# or
client = Client(processes=False)
您可以监控集群的状态和任务提交情况,从而更好地控制资源。多节点调度器需要在每个节点上安装 Dask,并为调度器和工作器进行一些额外的配置。对于单节点,可以使用dask.config.set(scheduler...)来设置全局默认调度器。 命令。这可以全局执行:
dask.config.set(scheduler='threads')
x.compute()
或者作为上下文管理器:
with dask.config.set(scheduler='threads'):
x.compute()
基于池的单机调度程序允许您提供自定义池或指定所需的工作器数量:
from concurrent.futures import ThreadPoolExecutor
with dask.config.set(pool=ThreadPoolExecutor(4)):
x.compute()
with dask.config.set(num_workers=4):
x.compute()
如果您对使用或不使用 Kubernetes 的 Dask 云部署感兴趣,我建议您阅读 Dask 文档中的“部署 Dask 集群” 。
它涵盖了单节点、云、Kubernetes 和 HPC 等场景。文档末尾还包含一些高级知识,可帮助您快速入门。它涵盖了使用Coiled和Saturn Cloud作为 SaaS 选项(而非 DIY)的运营主题和参考资料。
如果您遇到 Python 解释器无法释放 GIL 的可扩展性问题,那么部署工作线程很可能是必须的。您可以自定义每个工作线程的线程数,但原则上每个处理器核心(CPU 或 vCPU)只使用一个线程。
这可以分布在多个工作线程上,也可以只分布在一个工作线程上。例如,我的云提供商(显然不是 Kubernetes)为我分配了 12 个 vCPU,我可以进行以下配置:
dask worker --nworkers 12 --nthreads 1 tcp://192.0.0.100:8786
# or
dask worker --nworkers 1 --nthreads 12 tcp://192.0.0.100:8786
这些显然是在 shell 或脚本中执行的命令行选项,完整的命令行参考在这里。在这些示例中,使用了 IPv4 地址;使用主机名是首选配置,并且 Dask 支持 DNS 的名称解析。还值得注意的是,从命令行启动时,你可以对工作进程设置内存限制。
从命令行启动调度程序,如下所示:
$ dask scheduler
Scheduler at: tcp://192.0.0.100:8786
您应该注意到,Dask 工作进程会指向启动时要使用的调度程序。Dask 的一个缺点是,您可以自行管理:单个调度程序容易导致单点故障。如果您使用的是云服务提供商,他们可以选择创建多个节点来响应单个地址请求,因此研究这些选项可以缓解单点故障 (SPOF)。
启动调度器和工作器后,它们都会附带诊断 Web 服务器,其 URL 与 IP 地址或部署主机相同,使用端口8787 ,以便您可以监控调度器和工作器。您将看到任务图表、工作器内存消耗和线程活动。您可以在此处找到有关可用 HTTP 端点的更多信息。
使用 Dask 优化性能
为了最大限度地发挥 Dask 的潜力,优化性能是关键。以下两种策略可以带来很好的效果:
数据分区
将数据合理划分为可管理的块可以提高性能。例如,在 Dask DataFrames 中,明确指定分区数可以加快计算速度。
每个分区本质上都是一个包装好的 Pandas DataFrame,因为 Dask DataFrame 的功能本质上是提供大规模的 Pandas 数据。例如,如果您在 Dask DataFrame 中有一个包含 10 亿行数据的数据集,并且需要 100 个分区,那么 Dask 会在底层创建 1000 万个 Pandas DataFrame。
Dask 负责处理高级数据访问,但每个底层 DataFrame 都是一个 Pandas 实现。您可以使用 Pandas 中的关键字 args,但 Dask 使用的 Pandas 版本会有所限制。
避免太多小任务
创建过小的任务会导致效率低下。每个任务都会产生开销,如果任务过于细粒度,可能会抵消并行处理的优势。较少的大型任务最适合 Dask 的大规模运行。
Dask 的常见用例
Dask 适用于各种数据处理和分析场景:
大规模数据分析
Dask DataFrames 非常适合分析超出内存限制的大型数据集,其工作原理与用于标准分析的 Pandas 类似。请检查 Dask 使用的 Pandas 版本,以了解其支持的功能。
机器学习
Dask 与 Scikit-learn 等热门库无缝集成。例如,您可以利用 Dask-ML 高效地在大型数据集上扩展机器学习任务,从而显著缩短处理时间。Dask 提供 Python 函数的分布式并行处理,因此如果您有函数,只需使用装饰器即可将它们并行化。不过,函数嵌套存在限制。
数据提取和转换
Dask 简化了读取和转换大型数据集的过程,这对于分析前的预处理阶段至关重要,使您能够轻松处理多种格式。
利用 Dask 实现数据工作流
利用 Dask 进行可扩展的数据处理和分析具有显著的优势,尤其是在处理大规模数据操作时。通过了解其核心功能,您可以有效地将 Dask 嵌入到您的 Python 数据科学工作流程中。
Dask 的并行处理功能使您能够处理传统工具无法胜任的操作。无论是处理大型数组、DataFrame 还是非结构化数据,Dask 都能为高效的数据操作开辟新的途径。
现在,您已准备好利用 Dask 的强大功能,使您的数据处理任务既高效又可扩展。
在进一步探索可扩展数据处理的过程中,不妨尝试使用 Dask,看看它如何增强你的工作流程。祝你编码愉快!