type
Post
status
Published
summary
在数据大小(32G)和服务器 内存 大小(360G)差 10 被的情况下,使用 pandas 来读取数据,但是在读取数据的时候会导致内存被占满最终导致 python 脚本被 kill。原因定位。
slug
llms-qa-pandasReadBigFile
date
Nov 26, 2024
tags
pandas
内存溢出
category
机器学习
password
icon
URL
Property
Nov 26, 2024 10:23 AM
问题
现在有服务器内存 360G,没有运行其他程序。磁盘上有一个数据文件 32G,使用 pandas 的 pd.read_csv(f'{input_date_path}/{usr_filename}_{user_date_list}.txt',sep='\$\$',encoding='utf-8',header=None,engine='python') 来读取数据,但是在读取数据的时候会导致内存被占满最终导致 python 脚本被 kill。帮忙分析可能的原因
回答
在使用
pandas.read_csv
读取大数据文件时,内存占用可能会超过预期,这通常与以下几个因素有关:1. 文件过大和数据加载方式问题
- 文件大小:虽然文件本身是 32GB,但加载到内存中后可能会膨胀为多倍,因为
pandas
会将数据解码并存储为内存中的结构化对象。
- 分隔符复杂度:你使用了
sep='$$'
和engine='python'
,Python 引擎在解析复杂分隔符时速度慢且消耗更多内存。
- 未指定
dtype
:pandas
默认会尝试推断列的类型,这会消耗额外内存。
解决方法:
- 按行分块加载文件,避免一次性加载所有数据。
- 优化分隔符解析,使用
engine='c'
或检查是否能使用简单分隔符。
- 显式指定每列的
dtype
,避免推断类型。
2. 数据膨胀问题
加载 32GB 文件到内存后可能占用超出文件本身数倍的内存,原因可能包括:
- 字符串类型存储:文本列的每个字符串都存储为 Python 对象,增加开销。
- 对象列过多:如果数据包含许多对象类型列(如字符串),这些列会占用更多内存。
- NaN 填充:
pandas
可能会填充缺失值,这也会增加内存需求。
解决方法:
- 如果可能,预处理数据,将文本列转换为数值格式或减少列数。
- 使用更高效的数据存储格式,例如
Parquet
或Feather
。
- 转换对象列为
category
类型。
3. 内存碎片化
即使服务器有 360GB 内存,但如果内存分布不连续,可能导致加载大文件时内存不足。
解决方法:
- 优化操作系统的内存管理,重启服务器以释放内存。
- 检查是否有其他内存泄漏问题(如 Python 对象未释放)。
4. Pandas 的内存分配策略
- 单线程执行:
pandas
默认是单线程的,处理大文件时需要更长时间。
- 内存复用:加载大文件可能会引发临时的内存开销,例如创建中间对象。
解决方法:
- 使用
chunk_size
参数分块读取,避免一次性加载整个文件。例如:
python 复制代码 chunk_size = 100000 # 每次读取 10 万行 chunks = pd.read_csv(f'{input_date_path}/{usr_filename}_{user_date_list}.txt', sep='$$', encoding='utf-8', header=None, engine='python', chunksize=chunk_size) df = pd.concat(chunk for chunk in chunks)
- 使用
dask
代替pandas
来分布式读取大文件:
python 复制代码 import dask.dataframe as dd df = dd.read_csv(f'{input_date_path}/{usr_filename}_{user_date_list}.txt', sep='$$', encoding='utf-8', header=None)
5. 文件内容问题
- 数据可能包含大量无效或无意义的内容,导致加载后占用更多内存。
- 检查文件是否存在异常,如行过长、列数异常。
解决方法:
- 预处理文件,例如提取必要部分或使用其他工具(如
awk
、sed
)对文件进行拆分。
优化建议
以下是推荐的优化策略汇总:
- 尝试分块读取:
python 复制代码 chunks = pd.read_csv(f'{input_date_path}/{usr_filename}_{user_date_list}.txt', sep='$$', encoding='utf-8', header=None, engine='python', chunksize=10**6) df = pd.concat(chunks)
- 显式指定
dtype
:
python 复制代码 df = pd.read_csv(f'{input_date_path}/{usr_filename}_{user_date_list}.txt', sep='$$', encoding='utf-8', header=None, engine='python', dtype={0: str, 1: float, 2: int})
- 考虑使用更高效的格式(如
Parquet
)存储数据。
- 使用
dask
或其他大数据工具处理超大数据。
如果上述优化后问题仍存在,可以进一步分析文件结构和数据特点。
问题
分块读取数据能解决数据膨胀的问题吗?
回答
分块读取数据可以缓解内存占用问题,但并不能直接解决数据膨胀本身的问题。要完全解决数据膨胀,还需要结合其他优化手段。以下是详细分析:
分块读取的作用
- 降低一次性内存需求:
- 分块读取将文件切分为较小的块,逐块加载到内存中,避免整个文件一次性加载所导致的内存不足。
- 内存占用的峰值会大幅下降,即使数据膨胀问题仍存在,单块数据的内存需求更易被系统承受。
- 适合大型文件的处理:
- 分块读取时,每次处理的数据量有限,可以有效避免系统因内存不足而终止脚本。
分块读取无法完全解决的问题
- 数据膨胀本质:
- 数据膨胀来源于文件内容在加载到内存时的处理(例如文本解码、NaN 填充、数据类型推断等)。
- 分块读取不能改变每块数据在内存中的表示方式,也就是说,每块数据读取到内存后仍会经历膨胀。
- 拼接后的内存占用:
- 如果分块读取后将所有块合并(例如使用
pd.concat
),最终内存占用仍会接近一次性加载整个文件的占用量。 - 只有在分块读取后,对每块数据单独处理并丢弃不必要数据,才能有效减少整体内存占用。
如何结合分块读取解决数据膨胀
要真正缓解数据膨胀,可以在分块读取的基础上进行进一步优化:
1. 限制内存膨胀的来源
- 指定列类型:通过
dtype
参数显式定义列类型,减少类型推断带来的额外开销。例如:
python 复制代码 chunks = pd.read_csv(f'{input_date_path}/{usr_filename}_{user_date_list}.txt', sep='$$', encoding='utf-8', header=None, engine='python', chunksize=10**6, dtype={0: str, 1: float, 2: int})
- 对象列优化:
- 将频繁重复的字符串列转为
category
类型以节省内存:
python 复制代码 for chunk in chunks: chunk['column_name'] = chunk['column_name'].astype('category')
2. 分块处理后即时释放内存
如果只需要对数据进行处理(如过滤、聚合等),可以在每次读取一块数据后立即处理并丢弃。例如:
python 复制代码 processed_chunks = [] for chunk in pd.read_csv(f'{input_date_path}/{usr_filename}_{user_date_list}.txt', sep='$$', encoding='utf-8', header=None, engine='python', chunksize=10**6): # 对分块进行处理,例如删除无用列 chunk = chunk[chunk[2] > 0] # 假设第三列有条件筛选 processed_chunks.append(chunk) # 合并结果 df = pd.concat(processed_chunks)
3. 考虑逐块写入
如果结果不需要一次性加载到内存,可以将每块处理后直接写入文件或数据库:
python 复制代码 output_file = 'processed_output.csv' for chunk in pd.read_csv(f'{input_date_path}/{usr_filename}_{user_date_list}.txt', sep='$$', encoding='utf-8', header=None, engine='python', chunksize=10**6): # 对分块进行处理 chunk = chunk[chunk[2] > 0] chunk.to_csv(output_file, mode='a', index=False, header=False)
4. 使用更高效的工具
dask
分布式处理:dask.dataframe
的设计适合处理内存外的数据,无需显式分块,它会在后台进行分块调度。
python 复制代码 import dask.dataframe as dd df = dd.read_csv(f'{input_date_path}/{usr_filename}_{user_date_list}.txt', sep='$$', header=None) filtered_df = df[df[2] > 0] filtered_df.to_parquet('output.parquet')
- 其他轻量化数据处理工具: 如
polars
(基于 Rust 的高效数据框架)在内存效率上更有优势。
总结
分块读取确实能缓解内存不足问题,但无法完全解决数据膨胀。为了有效解决这个问题,建议采取以下步骤:
- 分块读取以降低内存峰值。
- 明确指定数据类型以减少膨胀。
- 即时处理和释放内存,避免累积数据。
- 使用高效工具如
dask
或polars
替代pandas
。
通过上述方法,可以既解决内存不足,又减少数据膨胀的影响。
问题
df = pd.concat(chunk for chunk in chunks)也会有内存风险,使用 for 循环来一个个合并吧
回答
你说得对,
pd.concat
在处理多个分块时会将所有数据加载到内存中,同时构建一个新的大 DataFrame,可能导致内存不足的风险。通过 逐块合并 的方式,可以更好地控制内存使用。以下是推荐的逐块合并方法及其优化思路:问题分析
1. pd.concat
合并方式的内存风险:
pd.concat(chunk for chunk in chunks)
会将所有读取的块一次性合并到内存中。如果文件较大(例如 32G),即使分块读取单次内存使用较小,合并时仍需要一次性占用所有数据的内存,容易导致内存不足的问题。
2. 数据膨胀:
pd.read_csv
的内存需求通常会高于原始文件大小,原因包括:- 文本解码(例如 UTF-8 转为字符串)。
- 数据类型推断导致使用
object
类型,内存消耗较大。 - 填充缺失值可能增加内存占用。
3. 无优化的逐块处理:
pd.concat
将块全部合并为一个大的DataFrame
,如果只是为了后续处理数据,代码没有利用分块的优势,即便分块读取减少了初始内存占用,最终还是面临内存压力。
改进方法
方法 1:避免一次性合并,用逐块写入替代
如果不需要将所有数据加载到内存,可以在逐块处理后直接写入文件,避免合并操作。例如:
python 复制代码 chunk_size = 1000000 # 每次读取 100 万行 output_file = "processed_output.csv" is_first_chunk = True chunks = pd.read_csv(f'{input_date_path}/{usr_filename}_{user_date_list}.txt', sep='$$', encoding='utf-8', header=None, engine='python', chunksize=chunk_size) for chunk in chunks: # 对每块数据进行处理(如筛选、转换等) # chunk = chunk[chunk[2] > 0] # 示例:假设需要过滤第三列 # 追加写入文件(只在第一块写入表头) chunk.to_csv(output_file, mode='a', index=False, header=is_first_chunk) is_first_chunk = False
方法 2:逐块合并控制内存
如果确实需要将数据加载到一个大的
DataFrame
中,可以分批次合并,并在每次合并后释放内存:python 复制代码 chunk_size = 1000000 # 每次读取 100 万行 chunks = pd.read_csv(f'{input_date_path}/{usr_filename}_{user_date_list}.txt', sep='$$', encoding='utf-8', header=None, engine='python', chunksize=chunk_size) user_info = pd.DataFrame() # 初始化空 DataFrame for chunk in chunks: # 合并数据块 user_info = pd.concat([user_info, chunk], ignore_index=True) del chunk # 释放当前块内存
方法 3:优化读取数据的类型
显式定义数据的类型,避免
pandas
自动推断时消耗额外的内存。例如:python 复制代码 dtype_dict = {0: 'int64', 1: 'float32', 2: 'object'} # 假设有 3 列,定义每列的类型 chunks = pd.read_csv(f'{input_date_path}/{usr_filename}_{user_date_list}.txt', sep='$$', encoding='utf-8', header=None, engine='python', chunksize=chunk_size, dtype=dtype_dict) user_info = pd.concat(chunks, ignore_index=True)
方法 4:考虑替代工具(如 Dask 或 Polars)
如果数据规模特别大且单机内存有限,可以使用
dask.dataframe
来替代 pandas
,实现内存外处理。例如:python 复制代码 import dask.dataframe as dd df = dd.read_csv(f'{input_date_path}/{usr_filename}_{user_date_list}.txt', sep='$$', encoding='utf-8', header=None) # 进行 Dask 数据处理 filtered_df = df[df[2] > 0] # 假设需要过滤第三列 filtered_df.to_csv("processed_output_*.csv", single_file=True)
总结
你的代码当前存在内存风险,问题的核心是
pd.concat
的一次性合并将所有数据加载到内存。可以参考以下建议:- 如果最终数据不需要完全保存在内存:建议逐块写入文件。
- 如果必须在内存中保留:分批次合并并释放内存,避免一次性加载。
- 显式指定数据类型:减少数据膨胀,优化内存使用。
- 考虑大规模数据处理工具:如
dask
或polars
,在大数据场景下表现更优。
上述方法能有效避免内存不足的问题。