python多进程并发的示例分析-创新互联

这篇文章主要介绍python多进程并发的示例分析,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!

创新互联专注于企业营销型网站建设、网站重做改版、安吉网站定制设计、自适应品牌网站建设、H5技术商城网站开发、集团公司官网建设、外贸网站建设、高端网站制作、响应式网页设计等建站业务,价格优惠性价比高,为安吉等各大城市提供网站开发制作服务。

前言

下午需要简单处理一份数据,就直接随手写脚本处理了,但发现效率太低,速度太慢,就改成多进程了;

程序涉及计算、文件读写,鉴于计算内容挺多的,就用多进程了(计算密集)。

代码

import pandas as pd
from pathlib import Path
from concurrent.futures import ProcessPoolExecutor

parse_path = '/data1/v-gazh/CRSP/dsf_full_fields/parse'
source_path = '/data1/v-gazh/CRSP/dsf_full_fields/2th_split' # 目录中有3.3W个csv文件,串行的话,效率大打折扣


def parseData():
  source_path_list = list(Path(source_path).glob('*.csv'))
  multi_process = ProcessPoolExecutor(max_workers=20)
  multi_results = multi_process.map(func, source_path_list)


def func(p):
  source_p = str(p)
  parse_p = str(p).replace('2th_split', 'parse')
  df = pd.read_csv(source_p)
  df['date'] = pd.to_datetime(df['date'].astype(str)).dt.date
  df.sort_values(['date'], inplace=True)
  # 处理close为负的值(abs),添加status标识
  df['is_close'] = df['PRC'].map(lambda x: 0 if x < 0 or pd.isna(x) else 1)
  df['PRC'] = df['PRC'].abs()
  df.rename(columns={'CFACPR': 'factor'}, inplace=True)
  df['adj_low'] = df['BIDLO'] * df['factor']
  df['adj_high'] = df['ASKHI'] * df['factor']
  df['adj_close'] = df['PRC'] * df['factor']
  df['adj_open'] = df['OPENPRC'] * df['factor']
  df['adj_volume'] = df['VOL'] / df['factor']
  # calc change
  df['change'] = df['adj_close'].diff(1) / df['adj_close'].shift(1)   df.drop_duplicates(inplace=True)
  df.to_csv(parse_p, index=False)
parseData()

以上是“python多进程并发的示例分析”这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注创新互联成都网站设计公司行业资讯频道!

另外有需要云服务器可以了解下创新互联scvps.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。


当前标题:python多进程并发的示例分析-创新互联
当前网址:http://scyanting.com/article/docjdp.html