Skip to content

Latest commit

Β 

History

History
143 lines (117 loc) Β· 5.41 KB

File metadata and controls

143 lines (117 loc) Β· 5.41 KB

64. μ§„μ •ν•œ 병렬성을 살리렀면 concurrent.futures λ₯Ό μ‚¬μš©ν•˜λΌ

1. μ„±λŠ₯ κ°œμ„ 

  • 연산을 λ³‘λ ¬λ‘œ 적용
  • κ·ΈλŸ¬λ‚˜ μ „μ—­ 인터프리터 락(GIL) 둜 인해 νŒŒμ΄μ„  μŠ€λ ˆλ“œλŠ” μ§„μ •ν•œ 병렬 μ‹€ν–‰ λΆˆκ°€
  • μ„±λŠ₯에 κ°€μž₯ 결정적인 영ν–₯을 λ―ΈμΉ˜λŠ” 뢀뢄을 C μ–Έμ–΄λ₯Ό μ‚¬μš©ν•œ ν™•μž₯ λͺ¨λ“ˆλ‘œ μž‘μ„±
    • C λ₯Ό μ‚¬μš©ν•˜λ©΄ 둜우 λ ˆλ²¨μ— κ°€κΉκ²Œ 적용 κ°€λŠ₯ν•˜μ—¬ νŒŒμ΄μ„ μ— λΉ„ν•΄ 빠름
    • νŒŒμ΄μ„ μ˜ GIL과도 상관없이 CPU μ½”μ–΄ ν™œμš© κ°€λŠ₯
    • SWIG(https://github.com/swig/swig), CLIF(https://github.com/google/clif)
  • C둜 μ½”λ“œ μž¬μž‘μ„±ν•˜λŠ” κ²ƒμ˜ 단점
    • νŒŒμ΄μ„ μ—μ„œ μ§§κ³  μ΄ν•΄ν•˜κΈ° μ‰¬μš΄ μ½”λ“œκ°€ C μ—μ„œλŠ” μž₯ν™©ν•˜κ³  λ³΅μž‘ν•  수 있음
    • ν¬νŒ… μ‹œ 버그 μ—¬λΆ€ 확인 ν•„μš”
    • μ½”λ“œμ˜ ν•œ λΆ€λΆ„λ§Œ C 둜 λ°”κΎΈλ©΄ λ˜λŠ” κ²½μš°κ°€ λ“œλ¬Όλ‹€λŠ” 점
    • 거의 λŒ€λΆ€λΆ„μ˜ μ½”λ“œλ₯Ό ν¬νŒ…ν•΄μ•Ό 함
    • 그둜 인해 ν…ŒμŠ€νŠΈμ˜ ν•„μš”μ„± μ¦λŒ€, μ½”λ“œμ˜ μœ„ν—˜μ„± μ¦λŒ€
  • νŒŒμ΄μ„ μ—μ„œ C둜 νŽΈν•˜κ²Œ μ „ν™˜μ‹œμΌœμ£ΌλŠ” μ˜€ν”ˆ μ†ŒμŠ€ 도ꡬ

2. concurrent.futures λ‚΄μž₯ λͺ¨λ“ˆ

  • multiprocessing λ‚΄μž₯ λͺ¨λ“ˆ μ‚¬μš©
  • μžμ‹ ν”„λ‘œμ„ΈμŠ€λ₯Ό λ‹€λ₯Έ νŒŒμ΄μ„  인터프리터λ₯Ό μ‹€ν–‰ν•˜μ—¬ νŒŒμ΄μ„ μ—μ„œ μ—¬λŸ¬ CPU μ½”μ–΄λ₯Ό ν™œμš©ν•  수 있음
  • μžμ‹ ν”„λ‘œμ„ΈμŠ€λŠ” μ£Ό 인터프리터와 λ³„λ„λ‘œ μ‹€ν–‰λ˜λ―€λ‘œ GIL 뢄리됨
# mymodule.py
def gcd(pair):
    a, b = pair
    low = min(a, b)
    for i in range(low, 0, -1):
        if a % i == 0 and b % i == 0:
            return i
    assert False, '도달할 수 μ—†μŒ
# run_serial.py
import my_module
import time

NUMBERS = [
    (1963309, 2265973), (2030677, 3814172),
    (1551645, 2229620), (2039045, 2020802),
    (1823712, 1924928), (2293129, 1020491),
    (1281238, 2273782), (3823812, 4237281),
    (3812741, 4729139), (1292391, 2123811),
]

def main():
    start = time.time()
    results = list(map(my_module.gcd, NUMBERS))
    end = time.time()
    delta = end - start
    print(f'총 {delta:.3f} 초 걸림')

if __name__ == '__main__':
    main() 

>>>
총 0.911 초 걸림
  • μœ„ μ˜ˆμ œλŠ” 두 수의 μ΅œλŒ€κ³΅μ•½μˆ˜λ₯Ό κ΅¬ν•˜λŠ” κ΅¬ν˜„
  • 순차 μ‹€ν–‰ μ‹œ μ—°μ‚° μ‹œκ°„ μ„ ν˜•μ μœΌλ‘œ 증가
  • GIL둜 인해 파이썬이 μ—¬λŸ¬ μŠ€λ ˆλ“œλ₯Ό 닀쀑 CPU μ½”μ–΄μ—μ„œ 병렬 μ‹€ν–‰ν•  수 μ—†μœΌλ―€λ‘œ μŠ€λ ˆλ“œλ₯Ό ν™œμš©ν•΄λ„ 속도 ν–₯상 거의 μ—†μŒ
# run_threads.py
import my_module
from concurrent.futures import ThreadPoolExecutor
import time

NUMBERS = [
   ...
]

def main():
    start = time.time()
    pool = ThreadPoolExecutor(max_workers=2)
    results = list(pool.map(my_module.gcd, NUMBERS))
    end = time.time()
    delta = end - start
    print(f'총 {delta:.3f} 초 걸림')

if __name__ == '__main__':
    main()

>>>
총 1.436 초 걸림
  • μœ„ μ˜ˆμ œλŠ” concurrent.futures λͺ¨λ“ˆμ— μžˆλŠ” ThreadPoolExecutor 클래슀λ₯Ό ν™œμš©ν•˜μ—¬ 두 개의 μž‘μ—…μž μŠ€λ ˆλ“œλ‘œ μˆ˜ν–‰
  • μŠ€λ ˆλ“œ ν’€ μ‹œμž‘, ν’€κ³Ό ν†΅μ‹ ν•˜λŠ” 둜직으둜 인해 μ‹€ν–‰ μ‹œκ°„μ΄ μ’€ 더 증가
# run_parallel.py
import my_module
from concurrent.futures import ProcessPoolExecutor
import time

NUMBERS = [
...
]

def main():
    start = time.time()
    pool = ProcessPoolExecutor(max_workers=2)     # 이 λΆ€λΆ„λ§Œ λ°”κΏˆ
    results = list(pool.map(my_module.gcd, NUMBERS))
    end = time.time()
    delta = end - start
    print(f'총 {delta:.3f} 초 걸림')

if __name__ == '__main__':
    main()

>>>
총 0.683 초 걸림
  • concurrent.futures 에 μžˆλŠ” ProcessPoolExecutor μ‚¬μš©

  • ProcessPoolExecutor μž‘μ—…

    • [parent] 이 κ°μ²΄λŠ” μž…λ ₯ λ°μ΄ν„°λ‘œ λ“€μ–΄μ˜¨ map λ©”μ†Œλ“œμ— μ „λ‹¬λœ NUMBERS의 각 μ›μ†Œλ₯Ό 취함
    • [parent] 이 κ°μ²΄λŠ” μ „λ‹¬λ˜μ–΄ 얻은 μ›μ†Œλ₯Ό pickle λͺ¨λ“ˆμ„ μ‚¬μš©ν•˜μ—¬ 이진 λ°μ΄ν„°λ‘œ 직렬화
    • [parent, child] 이 κ°μ²΄λŠ” 둜컬 μ†ŒμΌ“μ„ 톡해 μ£Ό 인터프리터 ν”„λ‘œμ„ΈμŠ€λ‘œλΆ€ν„° μžμ‹ 인터프리터 ν”„λ‘œμ„ΈμŠ€μ—κ²Œ μ§λ ¬ν™”ν•œ 데이터λ₯Ό 볡사
    • [child] 이 κ°μ²΄λŠ” λ‹€μ‹œ 역직렬화
    • [child] 이 κ°μ²΄λŠ” gcd ν•¨μˆ˜κ°€ λ“€μ–΄ μžˆλŠ” λͺ¨λ“ˆ import
    • [child] 이 κ°μ²΄λŠ” μž…λ ₯ 데이터에 λŒ€ν•΄ gcd ν•¨μˆ˜λ₯Ό μ‹€ν–‰, λ‹€λ₯Έ μžμ‹ 인터프리터 ν”„λ‘œμ„ΈμŠ€μ™€ λ³‘λ ¬λ‘œ μ‹€ν–‰
    • [child] 이 κ°μ²΄λŠ” gcd ν•¨μˆ˜μ˜ κ²°κ³Όλ₯Ό 이진 λ°μ΄ν„°λ‘œ 직렬화
    • [parent, child] 이 κ°μ²΄λŠ” 둜컬 μ†ŒμΌ“μ„ 톡해 μžμ‹ 인터프리터 ν”„λ‘œμ„ΈμŠ€λ‘œλΆ€ν„° λΆ€λͺ¨ 인터프리터 ν”„λ‘œμ„ΈμŠ€μ—κ²Œ μ§λ ¬ν™”ν•œ κ²°κ³Ό 데이터 전달
    • [parent] 이 κ°μ²΄λŠ” λ‹€μ‹œ 역직렬화
    • [parent] μ—¬λŸ¬ μžμ‹ ν”„λ‘œμ„ΈμŠ€κ°€ λ°˜ν™˜ν•œ κ²°κ³Όλ₯Ό λ³‘ν•©ν•˜μ—¬ list 둜 생성
  • λΆ€λͺ¨μ™€ μžμ‹ ν”„λ‘œμ„ΈμŠ€ 사이 데이터λ₯Ό 주고받을 λ•Œ λ§ˆλ‹€ 직렬화 및 역직렬화 λ°œμƒ, μΆ”κ°€λΉ„μš©μ΄ 큼

  • 이 방식은 μ½”λ“œ κ°„ μ˜μ‘΄μ„±μ΄ μ—†κ³  λ ˆλ²„λ¦¬μ§€κ°€ 큰 μœ ν˜•μ˜ μž‘μ—…μ— 적합

    • λ ˆλ²„λ¦¬μ§€ : μ£Όκ³ λ°›μ•„μ•Ό ν•˜λŠ” λ°μ΄ν„°μ˜ ν¬κΈ°λŠ” μž‘μœΌλ‚˜ μžμ‹ ν”„λ‘œμ„ΈμŠ€κ°€ μ—°μ‚°ν•΄μ•Ό ν•  양은 μƒλ‹Ήνžˆ 큰 것

3. 정리

  • μ²˜μŒμ—λŠ” multiprocessing λͺ¨λ“ˆμ„ μ‚¬μš©ν•˜μ§€ μ•Šκ³  순차적으둜 μ½”λ“œλ₯Ό μž‘μ„±
  • ThreadPoolExecutor λ₯Ό 톡해 확인
  • κ·Έλž˜λ„ μ„±λŠ₯ λ¬Έμ œκ°€ 있으면 ProcessPoolExecutor λ₯Ό μ‚¬μš©ν•  것을 ꢌμž₯