当前位置:   article > 正文

使用多进程库计算科学数据时出现内存错误

使用多进程库计算科学数据时出现内存错误

问题背景

我经常使用爬虫来做数据抓取,多线程爬虫方案是必不可少的,正如我在使用 Python 进行科学计算时,需要处理大量存储在 CSV 文件中的数据。由于每个处理过程需要很长时间才能完成,而您拥有多核处理器,所以您尝试使用多进程库中的 Pool 方法来提高计算效率。

在这里插入图片描述

您按照如下方式构建了多进程调用:

pool = Pool()
vector_components = []
for sample0 in range(samples):
    vector_field_x_i = vector_field_samples_x[sample]
    vector_field_y_i = vector_field_samples_y[sample]
    vector_component = pool.apply_async(vector_field_decomposer, args=(x_dim, y_dim, x_steps, y_steps,
                                                                       vector_field_x_i, vector_field_y_i))
    vector_components.append(vector_component)
pool.close()
pool.join()

vector_components = map(lambda k: k.get(), vector_components)

for vector_component in vector_components:
    CsvH.write_vector_field(vector_component, '../CSV/RotationalFree/rotational_free_x_'+str(sample)+'.csv')
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

使用此代码,当您处理 500 个元素,每个元素大小为 100 x 100 的数据时,一切正常。但是,当您尝试处理 500 个元素,每个元素大小为 400 x 400 时,在调用 get() 时会收到内存错误。

解决方案

出现内存错误的原因是您的代码在内存中保留了多个列表,包括 vector_field_x、vector_field_y、vector_components,以及在 map() 调用期间创建的 vector_components 的副本。当您尝试处理较大的数据时,这些列表可能变得非常大,从而导致内存不足。

为了解决此问题,您需要避免在内存中保存完整的列表。您可以使用多进程库中的 imap() 方法来实现这一点。imap() 方法返回一个迭代器而不是完整的列表,因此您不必将所有结果都保存在内存中。

以下是使用 imap() 方法重写代码的示例:

import multiprocessing
from functools import partial

def vector_field_decomposer(x_dim, y_dim, x_steps, y_steps, vector_fields):
    vector_field_x_i = vector_fields[0]
    vector_field_y_i = vector_fields[1]
    # Do whatever is normally done here.

if __name__ == "__main__":
    num_workers = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(num_workers)
    # Calculate a good chunksize (based on implementation of pool.map)
    chunksize, extra = divmod(samples // 4 * num_workers)
    if extra:
        chunksize += 1

    # Use partial so many arguments can be passed to vector_field_decomposer
    func = partial(vector_field_decomposer, x_dim, y_dim, x_steps, y_steps)
    # We use a generator expression as an iterable, so we don't create a full list.
    results = pool.imap(func, 
                        ((vector_field_samples_x[s], vector_field_samples_y[s]) for s in xrange(samples)),
                        chunksize=chunksize)
    for vector in results:
        CsvH.write_vector_field(vector_component, 
                                '../CSV/RotationalFree/rotational_free_x_'+str(sample)+'.csv')
    pool.close()
    pool.join()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

通过使用这种方法,您可以避免出现内存错误,并能够处理较大的数据。

请确保你的计算任务是可以并行化的,并且注意到在Windows系统上,mclapply可能不如在Unix-like系统(如Linux或Mac OS X)上有效。在Windows系统上,你可能需要使用parLapply函数来代替。如果有更多专业知识不懂得可以评论区一起讨论。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/article/detail/41840
推荐阅读
相关标签
  

闽ICP备14008679号