python

pandas 멀티프로세스 사용하기 + 큰 csv 데이터 나눠서 가져오기

프로젝트를 진행하면서 , 대량의 cvs 데이터를 읽어와 처리할 일이 생겼다. 

컴퓨터 램의 용량보다 데이터가 무조건 크므로, csv_read 함수를 사용하면 램이 터져나갈 것. 

이를 해결하기 위해 chunksize를 구분하여 일부분씩 쪼개서 가져오는 방법을 선택했다. 

csv_chunk = pd.read_csv("G:\ks_data/"+filename+".csv",chunksize=2000000)
    for chunk in csv_chunk :
    	print(chunk)

방식은 c언어 등에서 fopen - fread 시에 사용하는 방법과 유사한데, 한번 read_csv를 실행하면 동일 파일에 대해 파일 포인터같은게 유지되어 파일 스트림 방식으로 데이터를 읽어오게 된다. 

 

즉, chunksize를 지정했다면 아래의 for문에서 데이터를 읽어 올 떄 한번에 chunksize 만큼씩만 읽어온다는 의미이고, 별도의 반복문 지정 없이도 읽어왔던 부분 바로 다음부터 다시 데이터를 읽어오기 시작한다. 

 

따라서 불필요한 반복문이나 파일 읽기등의 동작을 수행하지 않아도 된다는 뜻. 생각보다 매우 간편했다. 

chunksize가 커질수록 한번에 많은 데이터를 읽어오고, 그만큼 반복문을 적게 도니 작업 수행 시간은 줄어들 것이다.  다만 이는 램 용량이 충분하지 않으면 오히려 부하를 일으킬 수 있으니 적절히 잘 조절해야 할듯. 

 

이제 멀티프로세스인데, 파이썬은 굉장히 간단하게 이 방식을 구현할 수 있다.

import multiprocessing as mp

def parallel_dataframe(df, func):
    global num_cores
    df_split = np.array_split(df, num_cores)
    pool = mp.Pool(num_cores)
    df = pd.concat(pool.map(func, df_split))
    pool.close()
    pool.join()
    return df

저렇게 멀티 프로세스를 위한 함수를 하나 선언하면 끝. df에 들어간 데이터 프레임들이 core 숫자만큼 나눠져서 각 프로세스에 분배된다. 이때 미리 선언한 데이터 전처리용 함수 func도 인자로 같이 넘겨서 실행해줘야 하고, 함수 실행에 필요한 인자는 pool.map 에서 함수와 같이 넘겨주면 된다. 

 

위 예제의 경우는 해당 함수 인자로 데이터 프레임이 넘어가야 하기에 쪼갠 데이터 프레임 df_split을 인자로 넘겨줬다. 

    for chunk in csv_chunk : 
        tmp = parallel_dataframe(chunk,alist.tf)

사용할때는 위와 같이 사용하였다. 

 

전체 코드

import pandas as pd
import numpy as np
import sys
import multiprocessing as mp
import funcs as fc

fnamelist = ['log_20100508','log_20100509','log_20100510','log_20100511'
             ,'log_20100512','log_20100513','log_20100514','log_20100515','log_20100516','log_20100517']

global num_cores
num_cores = mp.cpu_count()
print(num_cores)

def parallel_dataframe(df, func):
    global num_cores
    df_split = np.array_split(df, num_cores)
    pool = mp.Pool(num_cores)
    df = pd.concat(pool.map(func, df_split))
    pool.close()
    pool.join()
    return df


def getchunk(filename):
    csv_chunk = pd.read_csv("G:\ks_data/"+filename+".csv",chunksize=2000000,
    dtype = {
            "log_date" : object,
            "etc_str1" : object,
            "etc_str3" : object,
           }

    )
    
    i = 0
    bres = pd.DataFrame()
    ures = pd.DataFrame()
    tmplist = []

    alist = fc.makelist() #initializing makelist class
    
    for chunk in csv_chunk : 
        tmp = parallel_dataframe(chunk,alist.tf)
        bres = bres.append(parallel_dataframe(tmp,alist.btf))
        ures = ures.append(parallel_dataframe(tmp,alist.utf))
        
    ures.to_csv('results/ures_root_'+filename+'.csv')
    bres.to_csv('results/bres_root_'+filename+'.csv')

    ulen = len(ures)
    blen = len(bres)
    fp = open('results/rooting_output_'+filename+'.txt','w')
    sys.stdout = fp
    print("result => usr: " + str(ulen)+" / bot: "+str(blen))
    print("usrlist :" + str(alist.ualen)+" / botlist : "+str(alist.balen))
                    
    sys.stdout = sys.__stdout__
    fp.close()

전처리를 위한 함수는 fc라는 모듈에 별도로 선언해서 빼서 쓰고 있는 상태. 

 

모쪼록 저사양 pc에서도 pandas를 쓰기에 무리가 없었으면 좋겠다..