본문 바로가기
Python

멀티 프로세싱 및 스레드의 큐(Queue) 사용

by yororing 2024. 10. 28.

00 개요

프로세스 vs 스레드
multiprocessing.Queue
vs queue.Queue
  • 파이썬에서 병렬 처리를 수행할 때, 프로세스 간 또는 스레드 간 통신을 안전하게 하기 위해 큐(Queue)를 많이 사용
  • 특히, 프로세스 간 통신을 위한 multiprocessing.Queue와 스레드 간 통신을 위한 queue.Queue는 병렬 작업에서 중요한 역할을 함
  • 이번 글은 두 큐의 정의, 사용 예시, 그리고 차이점과 공통점에 대한 정리

01 멀티프로세싱 큐 multiprocessing.Queue

  • 프로세스 간 통신을 위한 큐

1. 정의

  • multiprocessing.Queue는 파이썬의 멀티프로세싱 (multiprocessing) 라이브러리에서 프로세스 간 데이터를 주고 받기 위해 사용하는 큐
  • 여러 프로세스가 동시에 데이터를 안전하게 추가/가져갈 수 있도록 설계되어 있음

2. 사용 예시

import multiprocessing

def producer(queue):
    for i in range(100):
        queue.put(i)
        print(f"Produced---- {i}")

def consumer(queue):
    while not queue.empty():
        item = queue.get()
        print(f"----Consumed {item}")
if __name__ == '__main__':
    # 프로세스 간 통신을 위한 Queue 생성
    queue = multiprocessing.Queue()

    # 생산자 프로세스 생성
    producer_process = multiprocessing.Process(target=producer, args=(queue,))
    # 소비자 프로세스 생성
    consumer_process = multiprocessing.Process(target=consumer, args=(queue,))
    # 프로세스 실행
    producer_process.start()
    consumer_process.start()
    producer_process.join()   # 생산자가 데이터를 다 넣을 때까지 기다림
    consumer_process.join()   # 소비자가 데이터를 다 소비할 때까지 기다림

1) 주의

  • 동일한 코드에 대한 실행 결과가 상이함
  • 다음은 다 같은 코드를 실행한 결과들임
  • 다음은 producer 및 consumer가 거의 동시에 실행되는 경우:

  • 다음은 producer가 다 실행 된 후 consumer가 실행된 경우:

  • 다음은 consumer가 실행되지 않은 경우:

  • 위에서 producer만 실행되는 이유:
    • consumer_process가 queue.empty()를 사용해 큐가 비었는지 확인하고 있기 때문
    • multiprocessing.Queue는 프로세스 간에 안전하게 사용할 수 있지만, queue.empty() 메서드는 여러 프로세스에서 정확하게 동작하지 않음
    • 이는 큐가 비었는지 여부를 정확히 확인하지 못하고, 비어 있다고 오판할 수 있기 때문
    • 즉, consumer_process가 데이터를 기다리지 않고 바로 종료되는 문제 발생
  • 해결 방법:
    • 다음과 같이 queue.empty()를 사용하지 않고, consumer_process가 데이터를 기다릴 수 있도록 코드를 수정
    • 이를 위해 반복문을 사용하고 종료 신호로 None 값을 큐에 넣어, 소비자가 종료 조건을 확인하도록 하는 방법

2) 수정된 코드

  • 종료 신호 추가
import multiprocessing

# 프로세스 간 통신 큐
'''
- 사용하는 클래스
    - multiprocessing.Queue()
    - multiprocessing.Process(target=, args=(,))
'''
def producer_pr(queue):
    for i in range(10):
        queue.put(i)
        print(f"Produced_process---- {i}")
    queue.put(None)  # 종료 신호

def consumer_pr(queue):
    while True:
        item = queue.get()
        if item is None: # 종료 신호 확인
            break
        print(f"----Consumed_process {item}")
if __name__ == '__main__':
    # 프로세스 간 통신을 위한 Queue 생성
    queue_process = multiprocessing.Queue()

    # 생산자 프로세스 생성
    producer_process = multiprocessing.Process(target=producer_pr, args=(queue_process,))
    # 소비자 프로세스 생성
    consumer_process = multiprocessing.Process(target=consumer_pr, args=(queue_process,))
    # 프로세스 실행
    producer_process.start()
    consumer_process.start()
    producer_process.join()   # 생산자가 데이터를 다 넣을 때까지 기다림
    consumer_process.join()   # 소비자가 데이터를 다 소비할 때까지 기다림
  • 이 후 해당 파일을 10번 정도 실행했을 때, consumer가 실행 되지 않는 경우 없었음
  • 해당 파일을 실행한 결과:

02 스레드의 큐 queue.Queue

  • 스레드 간 통신을 위한 큐

1. 정의

  • queue.Queue는 파이썬의 queue 모듈에서 제공하는 클래스이며, 스레드 간 안전하게 데이터를 전달하기 위해 사용하는 큐
  • 스레드 간에 데이터를 저장하고 꺼내는 작업이 충돌 없이 진행될 수 있도록 락 (lock) 매커니즘이 적용되어 있어, 여러 스레드가 동시에 접근해도 데이터가 손상되지 않음

2. 사용 예시

# 스레드 간 통신
'''
사용하는 클래스:
    - queue.Queue()
    - threading.Thread(target=, args=(,))
'''
def producer_thr(queue):
    for i in range(10):
        queue.put(i)
        print(f"Produced_thread---- {i}")
    queue.put(None) # 종료 신호

def consumer_thr(queue):
    while True:
        item = queue.get()
        if item is None: # 종료 신호
            break
        print(f"----Consumed_thread {item}")

if __name__ == "__main__":
    queue_thread = queue.Queue()
    producer_thread = threading.Thread(target=producer_thr, args=(queue_thread,))
    consumer_thread = threading.Thread(target=consumer_thr, args=(queue_thread,))
    producer_thread.start()
    consumer_thread.start()
    producer_thread.join()
    consumer_thread.join()
  • Note:
    • thread 같은 경우는 multiprocessing과 달리 producer 다 실행된 후 consumer가 실행되는 경우가 없음 (20번 정도 코드를 실행해봄)
  • 코드 실행 결과 예시: