type
Post
status
Published
summary
在数据大小(32G)和服务器 内存 大小(360G)差 10 被的情况下,使用 pandas 来读取数据,但是在读取数据的时候会导致内存被占满最终导致 python 脚本被 kill。原因定位。
slug
llms-qa-pandasReadBigFile
date
Nov 26, 2024
tags
pandas
内存溢出
category
机器学习
password
icon
URL
Property
Nov 26, 2024 10:23 AM

问题

💡
现在有服务器内存 360G,没有运行其他程序。磁盘上有一个数据文件 32G,使用 pandas 的 pd.read_csv(f'{input_date_path}/{usr_filename}_{user_date_list}.txt',sep='\$\$',encoding='utf-8',header=None,engine='python') 来读取数据,但是在读取数据的时候会导致内存被占满最终导致 python 脚本被 kill。帮忙分析可能的原因

回答

在使用 pandas.read_csv 读取大数据文件时,内存占用可能会超过预期,这通常与以下几个因素有关:

1. 文件过大和数据加载方式问题

  • 文件大小:虽然文件本身是 32GB,但加载到内存中后可能会膨胀为多倍,因为 pandas 会将数据解码并存储为内存中的结构化对象。
  • 分隔符复杂度:你使用了 sep='$$' 和 engine='python',Python 引擎在解析复杂分隔符时速度慢且消耗更多内存。
  • 未指定 dtypepandas 默认会尝试推断列的类型,这会消耗额外内存。
解决方法
  • 按行分块加载文件,避免一次性加载所有数据。
  • 优化分隔符解析,使用 engine='c' 或检查是否能使用简单分隔符。
  • 显式指定每列的 dtype,避免推断类型。

2. 数据膨胀问题

加载 32GB 文件到内存后可能占用超出文件本身数倍的内存,原因可能包括:
  • 字符串类型存储:文本列的每个字符串都存储为 Python 对象,增加开销。
  • 对象列过多:如果数据包含许多对象类型列(如字符串),这些列会占用更多内存。
  • NaN 填充pandas 可能会填充缺失值,这也会增加内存需求。
解决方法
  • 如果可能,预处理数据,将文本列转换为数值格式或减少列数。
  • 使用更高效的数据存储格式,例如 Parquet 或 Feather
  • 转换对象列为 category 类型。

3. 内存碎片化

即使服务器有 360GB 内存,但如果内存分布不连续,可能导致加载大文件时内存不足。
解决方法
  • 优化操作系统的内存管理,重启服务器以释放内存。
  • 检查是否有其他内存泄漏问题(如 Python 对象未释放)。

4. Pandas 的内存分配策略

  • 单线程执行pandas 默认是单线程的,处理大文件时需要更长时间。
  • 内存复用:加载大文件可能会引发临时的内存开销,例如创建中间对象。
解决方法
  • 使用 chunk_size 参数分块读取,避免一次性加载整个文件。例如:
    • python 复制代码 chunk_size = 100000 # 每次读取 10 万行 chunks = pd.read_csv(f'{input_date_path}/{usr_filename}_{user_date_list}.txt', sep='$$', encoding='utf-8', header=None, engine='python', chunksize=chunk_size) df = pd.concat(chunk for chunk in chunks)
  • 使用 dask 代替 pandas 来分布式读取大文件:
    • python 复制代码 import dask.dataframe as dd df = dd.read_csv(f'{input_date_path}/{usr_filename}_{user_date_list}.txt', sep='$$', encoding='utf-8', header=None)

5. 文件内容问题

  • 数据可能包含大量无效或无意义的内容,导致加载后占用更多内存。
  • 检查文件是否存在异常,如行过长、列数异常。
解决方法
  • 预处理文件,例如提取必要部分或使用其他工具(如 awksed)对文件进行拆分。

优化建议

以下是推荐的优化策略汇总:
  1. 尝试分块读取:
    1. python 复制代码 chunks = pd.read_csv(f'{input_date_path}/{usr_filename}_{user_date_list}.txt', sep='$$', encoding='utf-8', header=None, engine='python', chunksize=10**6) df = pd.concat(chunks)
  1. 显式指定 dtype
    1. python 复制代码 df = pd.read_csv(f'{input_date_path}/{usr_filename}_{user_date_list}.txt', sep='$$', encoding='utf-8', header=None, engine='python', dtype={0: str, 1: float, 2: int})
  1. 考虑使用更高效的格式(如 Parquet)存储数据。
  1. 使用 dask 或其他大数据工具处理超大数据。
如果上述优化后问题仍存在,可以进一步分析文件结构和数据特点。

问题

💡
分块读取数据能解决数据膨胀的问题吗?

回答

分块读取数据可以缓解内存占用问题,但并不能直接解决数据膨胀本身的问题。要完全解决数据膨胀,还需要结合其他优化手段。以下是详细分析:

分块读取的作用

  1. 降低一次性内存需求
      • 分块读取将文件切分为较小的块,逐块加载到内存中,避免整个文件一次性加载所导致的内存不足。
      • 内存占用的峰值会大幅下降,即使数据膨胀问题仍存在,单块数据的内存需求更易被系统承受。
  1. 适合大型文件的处理
      • 分块读取时,每次处理的数据量有限,可以有效避免系统因内存不足而终止脚本。

分块读取无法完全解决的问题

  1. 数据膨胀本质
      • 数据膨胀来源于文件内容在加载到内存时的处理(例如文本解码、NaN 填充、数据类型推断等)。
      • 分块读取不能改变每块数据在内存中的表示方式,也就是说,每块数据读取到内存后仍会经历膨胀。
  1. 拼接后的内存占用
      • 如果分块读取后将所有块合并(例如使用 pd.concat),最终内存占用仍会接近一次性加载整个文件的占用量。
      • 只有在分块读取后,对每块数据单独处理并丢弃不必要数据,才能有效减少整体内存占用。

如何结合分块读取解决数据膨胀

要真正缓解数据膨胀,可以在分块读取的基础上进行进一步优化:

1. 限制内存膨胀的来源

  • 指定列类型:通过 dtype 参数显式定义列类型,减少类型推断带来的额外开销。例如:
    • python 复制代码 chunks = pd.read_csv(f'{input_date_path}/{usr_filename}_{user_date_list}.txt', sep='$$', encoding='utf-8', header=None, engine='python', chunksize=10**6, dtype={0: str, 1: float, 2: int})
  • 对象列优化
    • 将频繁重复的字符串列转为 category 类型以节省内存:
      • python 复制代码 for chunk in chunks: chunk['column_name'] = chunk['column_name'].astype('category')

2. 分块处理后即时释放内存

如果只需要对数据进行处理(如过滤、聚合等),可以在每次读取一块数据后立即处理并丢弃。例如:
python 复制代码 processed_chunks = [] for chunk in pd.read_csv(f'{input_date_path}/{usr_filename}_{user_date_list}.txt', sep='$$', encoding='utf-8', header=None, engine='python', chunksize=10**6): # 对分块进行处理,例如删除无用列 chunk = chunk[chunk[2] > 0] # 假设第三列有条件筛选 processed_chunks.append(chunk) # 合并结果 df = pd.concat(processed_chunks)

3. 考虑逐块写入

如果结果不需要一次性加载到内存,可以将每块处理后直接写入文件或数据库:
python 复制代码 output_file = 'processed_output.csv' for chunk in pd.read_csv(f'{input_date_path}/{usr_filename}_{user_date_list}.txt', sep='$$', encoding='utf-8', header=None, engine='python', chunksize=10**6): # 对分块进行处理 chunk = chunk[chunk[2] > 0] chunk.to_csv(output_file, mode='a', index=False, header=False)

4. 使用更高效的工具

  • dask 分布式处理: dask.dataframe 的设计适合处理内存外的数据,无需显式分块,它会在后台进行分块调度。
    • python 复制代码 import dask.dataframe as dd df = dd.read_csv(f'{input_date_path}/{usr_filename}_{user_date_list}.txt', sep='$$', header=None) filtered_df = df[df[2] > 0] filtered_df.to_parquet('output.parquet')
  • 其他轻量化数据处理工具: 如 polars(基于 Rust 的高效数据框架)在内存效率上更有优势。

总结

分块读取确实能缓解内存不足问题,但无法完全解决数据膨胀。为了有效解决这个问题,建议采取以下步骤:
  1. 分块读取以降低内存峰值
  1. 明确指定数据类型以减少膨胀
  1. 即时处理和释放内存,避免累积数据
  1. 使用高效工具如 dask 或 polars 替代 pandas
通过上述方法,可以既解决内存不足,又减少数据膨胀的影响。

问题

💡
df = pd.concat(chunk for chunk in chunks)也会有内存风险,使用 for 循环来一个个合并吧

回答

你说得对,pd.concat 在处理多个分块时会将所有数据加载到内存中,同时构建一个新的大 DataFrame,可能导致内存不足的风险。通过 逐块合并 的方式,可以更好地控制内存使用。以下是推荐的逐块合并方法及其优化思路:

问题分析

1. pd.concat 合并方式的内存风险

  • pd.concat(chunk for chunk in chunks) 会将所有读取的块一次性合并到内存中。如果文件较大(例如 32G),即使分块读取单次内存使用较小,合并时仍需要一次性占用所有数据的内存,容易导致内存不足的问题。

2. 数据膨胀

  • pd.read_csv 的内存需求通常会高于原始文件大小,原因包括:
    • 文本解码(例如 UTF-8 转为字符串)。
    • 数据类型推断导致使用 object 类型,内存消耗较大。
    • 填充缺失值可能增加内存占用。

3. 无优化的逐块处理

  • pd.concat 将块全部合并为一个大的 DataFrame,如果只是为了后续处理数据,代码没有利用分块的优势,即便分块读取减少了初始内存占用,最终还是面临内存压力。

改进方法

方法 1:避免一次性合并,用逐块写入替代

如果不需要将所有数据加载到内存,可以在逐块处理后直接写入文件,避免合并操作。例如:
python 复制代码 chunk_size = 1000000 # 每次读取 100 万行 output_file = "processed_output.csv" is_first_chunk = True chunks = pd.read_csv(f'{input_date_path}/{usr_filename}_{user_date_list}.txt', sep='$$', encoding='utf-8', header=None, engine='python', chunksize=chunk_size) for chunk in chunks: # 对每块数据进行处理(如筛选、转换等) # chunk = chunk[chunk[2] > 0] # 示例:假设需要过滤第三列 # 追加写入文件(只在第一块写入表头) chunk.to_csv(output_file, mode='a', index=False, header=is_first_chunk) is_first_chunk = False

方法 2:逐块合并控制内存

如果确实需要将数据加载到一个大的 DataFrame 中,可以分批次合并,并在每次合并后释放内存:
python 复制代码 chunk_size = 1000000 # 每次读取 100 万行 chunks = pd.read_csv(f'{input_date_path}/{usr_filename}_{user_date_list}.txt', sep='$$', encoding='utf-8', header=None, engine='python', chunksize=chunk_size) user_info = pd.DataFrame() # 初始化空 DataFrame for chunk in chunks: # 合并数据块 user_info = pd.concat([user_info, chunk], ignore_index=True) del chunk # 释放当前块内存

方法 3:优化读取数据的类型

显式定义数据的类型,避免 pandas 自动推断时消耗额外的内存。例如:
python 复制代码 dtype_dict = {0: 'int64', 1: 'float32', 2: 'object'} # 假设有 3 列,定义每列的类型 chunks = pd.read_csv(f'{input_date_path}/{usr_filename}_{user_date_list}.txt', sep='$$', encoding='utf-8', header=None, engine='python', chunksize=chunk_size, dtype=dtype_dict) user_info = pd.concat(chunks, ignore_index=True)

方法 4:考虑替代工具(如 Dask 或 Polars)

如果数据规模特别大且单机内存有限,可以使用 dask.dataframe 来替代 pandas,实现内存外处理。例如:
python 复制代码 import dask.dataframe as dd df = dd.read_csv(f'{input_date_path}/{usr_filename}_{user_date_list}.txt', sep='$$', encoding='utf-8', header=None) # 进行 Dask 数据处理 filtered_df = df[df[2] > 0] # 假设需要过滤第三列 filtered_df.to_csv("processed_output_*.csv", single_file=True)

总结

你的代码当前存在内存风险,问题的核心是 pd.concat 的一次性合并将所有数据加载到内存。可以参考以下建议:
  1. 如果最终数据不需要完全保存在内存:建议逐块写入文件。
  1. 如果必须在内存中保留:分批次合并并释放内存,避免一次性加载。
  1. 显式指定数据类型:减少数据膨胀,优化内存使用。
  1. 考虑大规模数据处理工具:如 dask 或 polars,在大数据场景下表现更优。
上述方法能有效避免内存不足的问题。

  • Twikoo
  • Giscus
  • GitTalk