본문 바로가기
Python

multiprocessing (프로세스 병렬 처리 모듈/패키지)

by yororing 2024. 7. 10.

00 개요

  • 목적: 회사에서 코드 분석 중 나와서 정리하고자 함

01 서론

1. multiprocessing 패키지 정의

  • 여러 개의 프로세스들을 병렬로 처리(실행)할 수 있는 기능을 제공하는 패키지
  • threading 모듈과 유사한 API를 사용하여 프로세스를 생성하는 것을 지원하는 패키지
  • 로컬 및 원격 동시 실행을 모두 제공하며 (offers both local and remote concurrency), thread이 아닌 sub process를 사용하여 글로벌 인터프리터 락(GIL)을 효과적으로 피함 (effectively side-steps the Global Interpreter Lock by using subprocesses instead of threads)
  • → 프로그래머가 주어진 머신의 여러 프로세서를 완전히 활용할 수 있도록 함
  • POSIX와 Windows에서 모두 실행 가능
  • threading 모듈에는 없는 추가적인 API들 또한 제공
    • 예) Pool object, which offers a convenient means of parallelizing the execution of a function across multiple input values, distributing the input data across processes (data parallelism)
# Pool을 사용한 data parallelism 예시
from multiprocessing import Pool

def square(x):
    return x * x

if __name__ == '__main__':
    with Pool(5) as p:
       print(p.map(f, [1, 2, 3]))
  • 위의 예시는 demonstrates the common practice of defining such functions in a module so that child processes can successfully import that module.
  • 위 파일의 실행 결과:
[1, 4, 9]

2. Process 클래스

  • In multiprocessing, processes are spawned(생성되다) by creating a Process object and then calling its start() method
  • Process follows the API of threading.Thread

1) multiprocess 프로그램 예시

from multiprocessing import Process

def f(name):
    print('hello', name)

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()
  • To show the individual process IDs involved, here is an expanded example:
from multiprocessing import Process
import os

def info(title):
    print(title)
    print('module name: ', __name__)
    print('parent process: ', os.getppid())
    print('process id: ', os.getpid())

def f(name):
    info('function f')
    print('hello', name)

if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('bob', ))
    p.start()
    p.join()
  • 위 파일의 실행 결과:

3. Contexts와 start 메소드

  • platform에 따라 multiprocessing은 프로세스를 시작하는 3가지 방법: 1) spawn 2) fork 3) forkserver
start method 설명
spawn
  • The parent process starts a fresh Python interpreter process.
  • The child process will only inherit those resources necessary to run the process object’s run() method.
  • In particular, unnecessary file descriptors and handles from the parent process will not be inherited.
  • Starting a process using this method is rather slow compared to using fork or forkserver.
  • 사용 가능한 platform: POSIX, Windows (Windows and macOS platform에서는 기본값)
fork
  • The parent process uses os.fork() to fork the Python interpreter.
  • The child process, when it begins, is effectively identical to the parent process. 
  • All resources of the parent are inherited by the child process. 
  • Note that safely forking a multithreaded process is problematic. 
  • 사용 가능한 platform: POSIX systems (macOS를 제외한 POSIX platform에서는 기본값)
forkserver
  • When the program starts and selects the forkserver start method, a server process is spawned
  • From then on, whenever a new process is needed, the parent process connects to the server and requests that it fork a new process.
  • The fork server process is single threaded unless system libraries or preloaded imports spawn threads as a side-effect so it is generally safe for it to use os.fork().
  • No unnecessary resources are inherited.
  • 사용 가능한 platform: POSIX platforms which support passing file descriptors over Unix pipes such as Linux.
  • On POSIX using the spawn or forkserver start methods will also start a resource tracker process which tracks the unlinked named system resources (e.g., named semaphores or SharedMemory objects) created by processes of the program.
  • When all processes have exited the resource tracker unlinks any remaining tracked object.
  • Usually there should be none, but if a process was killed by a signal there may be some “leaked” resources. (Neither leaked semaphores nor shared memory segments will be automatically unlinked until the next reboot. This is problematic for both objects because the system allows only a limited number of named semaphores, and shared memory segments occupy some space in the main memory.)

1) start 메소드 지정하기

multiprocessing.set_start_method('시작메소드')
  • set_start_method() should not be used more than once in the program
  • 예시:
# 프로세스 생성, 큐를 통해 데이터 주고 받기
import multiprocessing as mp

def foo(q):         # foo 함수 정의 
    q.put('hello')  # 매개변수 q(큐)를 받아 q(큐)에 'hello' 추가

if __name__ == '__main__':    # 이 코드는 스크립트가 직접 실행될 때만 실행되도록 보장
                              # multiprocessing 모듈은 새로운 프로세스를 시작할 때 이 코드를 재실행할 수 있으므로
                              # 이 조건을 사용하여 무한 루프를 피합니다.
    mp.set_start_method('spawn')  # 새 프로세스 생성 시 시작 메소드를 'spawn'으로 설정
                                  # 'spawn': 새 Python 인터프리터 시작하여 새 프로세스 생성하는 메소드
    q = mp.Queue()        # 프로세스 간 데이터를 주고받을 수 있는 Queue 객체 생성
                          # 이 큐는 multiprocessing.Queue 클래스의 인스턴스
    p = mp.Process(target=foo, args=(q, ))  # 새 프로세스 p 생성 및 정의
                                            # 이 프로세스는 foo 함수를 실행, 인자로 큐 q를 전달
    p.start()             # 프로세스 p 시작, 이 시점에서 foo 함수가 새로운 프로세스에서 실행됨
    print(q.get())        # q(큐)에서 데이터를 가져와 출력
                          # foo 함수가 q(큐)에 'hello'를 넣었으므로 'hello' 출력됨
                          # q.get()는 큐가 비어 있을 경우 데이터를 받을 때까지 대기함
    p.join()              # p(자식 프로세스)가 종료될 때까지 대기
                          # 이를 통해 프로세스가 완료될 때까지 메인 프로세스가 종료되지 않도록 함
  • 위 파일 실행 결과:

2) context 객체 불러오기

multiprocess.get_context('시작메소드')
  • you can use get_context() to obtain a context object
  • Context objects have the same API as the multiprocessing module, and allow one to use multiple start methods in the same program
import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    ctx = mp.get_context('spawn')  # spawn 시작 방법을 사용하는 새 Context 객체(ctx) 생성
                                   # ctx는 q(큐) 및 p(프로세스) 객체를 생성하는 데 사용됨
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,))
    p.start()                # p(프로세스) 시작
                             # 이 시점에서 foo 함수가 새 프로세스에서 실행됨
    print(q.get())
    p.join()
  • 위 파일 실행 결과:

  • Note that objects related to one context may not be compatible with processes for a different context
  • In particular, locks created using the fork context cannot be passed to processes started using the spawn or forkserver start methods.
  • A library which wants to use a particular start method should probably use get_context() to avoid interfering with the choice of the library user.

4. Process 간 객체 교환하기

  • 프로세스 간 통신 채널 (communication channel b/w processes) 종류 2가지: 1) Queue 2) Pipes

1) Queue

  • Queue 클래스는 거의 queue.Queue를 복제한 수준의 클래스임
  • Queue는 thread랑 process 둘 다 안전함
from multiprocessing import Process, Queue
import os

def foo(q):
    print('module name:', __name__)
    print('parent process id:', os.getppid())
    print('process id:', os.getpid())
    q.put('3')

if __name__ == '__main__':
    q = Queue()
    p = Process(target=foo, args=(q,))
    p.start()
    print('module name:', __name__)
    print('parent process id:', os.getppid())
    print('process id:', os.getpid())
    q.put('1')
    q.put('2')
    print('1 get:', q.get())
    print('2 get:', q.get())
    print('3 get:', q.get())
    p.join()
  • 위 파일 실행 결과:

2) Pipes

  • Pipe() 함수는 pair of connection objects connected by a pipe를 반환함
  • 기본값 = duplex (two-way)
from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())    # prints "[42, None, 'hello']"
    p.join()
  • 위 파일 실행 결과:

  • The two connection objects returned by Pipe() represent the two ends of the pipe
  • Each connection object has send() and recv() methods (among others)
  • data in a pipe may become corrupted if two processes/threads try to read from or write to the same end of the pipe at the same time
  • Of course there is no risk of corruption from processes using different ends of the pipe at the same time.

참조

  1. (소스 코드) https://github.com/python/cpython/tree/3.12/Lib/multiprocessing/ 
  2. https://docs.python.org/3/library/multiprocessing.html#  
  3.  
  4.  
  5.  

 

'Python' 카테고리의 다른 글

super()  (0) 2024.07.25
종료 상태 코드 (Python)  (0) 2024.07.18
traceback (파이썬 모듈)  (0) 2024.07.10
logging (파이썬 로깅 모듈)  (0) 2024.07.10
Recursion Limit (재귀 호출 제한)  (0) 2024.07.10