00 개요
프로세스 vs 스레드
multiprocessing.Queue vs queue.Queue
- 파이썬에서 병렬 처리를 수행할 때, 프로세스 간 또는 스레드 간 통신을 안전하게 하기 위해 큐(Queue)를 많이 사용함
- 특히, 프로세스 간 통신을 위한 multiprocessing.Queue와 스레드 간 통신을 위한 queue.Queue는 병렬 작업에서 중요한 역할을 함
- 이번 글은 두 큐의 정의, 사용 예시, 그리고 차이점과 공통점에 대한 정리
01 멀티프로세싱 큐 multiprocessing.Queue
- 프로세스 간 통신을 위한 큐
1. 정의
- multiprocessing.Queue는 파이썬의 멀티프로세싱 (multiprocessing) 라이브러리에서 프로세스 간 데이터를 주고 받기 위해 사용하는 큐
- 여러 프로세스가 동시에 데이터를 안전하게 추가/가져갈 수 있도록 설계되어 있음
2. 사용 예시
import multiprocessing
def producer(queue):
for i in range(100):
queue.put(i)
print(f"Produced---- {i}")
def consumer(queue):
while not queue.empty():
item = queue.get()
print(f"----Consumed {item}")
if __name__ == '__main__':
# 프로세스 간 통신을 위한 Queue 생성
queue = multiprocessing.Queue()
# 생산자 프로세스 생성
producer_process = multiprocessing.Process(target=producer, args=(queue,))
# 소비자 프로세스 생성
consumer_process = multiprocessing.Process(target=consumer, args=(queue,))
# 프로세스 실행
producer_process.start()
consumer_process.start()
producer_process.join() # 생산자가 데이터를 다 넣을 때까지 기다림
consumer_process.join() # 소비자가 데이터를 다 소비할 때까지 기다림
1) 주의
- 동일한 코드에 대한 실행 결과가 상이함
- 다음은 다 같은 코드를 실행한 결과들임
- 다음은 producer 및 consumer가 거의 동시에 실행되는 경우:
- 다음은 producer가 다 실행 된 후 consumer가 실행된 경우:
- 다음은 consumer가 실행되지 않은 경우:
- 위에서 producer만 실행되는 이유:
- consumer_process가 queue.empty()를 사용해 큐가 비었는지 확인하고 있기 때문
- multiprocessing.Queue는 프로세스 간에 안전하게 사용할 수 있지만, queue.empty() 메서드는 여러 프로세스에서 정확하게 동작하지 않음
- 이는 큐가 비었는지 여부를 정확히 확인하지 못하고, 비어 있다고 오판할 수 있기 때문
- 즉, consumer_process가 데이터를 기다리지 않고 바로 종료되는 문제 발생
- 해결 방법:
- 다음과 같이 queue.empty()를 사용하지 않고, consumer_process가 데이터를 기다릴 수 있도록 코드를 수정
- 이를 위해 반복문을 사용하고 종료 신호로 None 값을 큐에 넣어, 소비자가 종료 조건을 확인하도록 하는 방법
2) 수정된 코드
- 종료 신호 추가
import multiprocessing
# 프로세스 간 통신 큐
'''
- 사용하는 클래스
- multiprocessing.Queue()
- multiprocessing.Process(target=, args=(,))
'''
def producer_pr(queue):
for i in range(10):
queue.put(i)
print(f"Produced_process---- {i}")
queue.put(None) # 종료 신호
def consumer_pr(queue):
while True:
item = queue.get()
if item is None: # 종료 신호 확인
break
print(f"----Consumed_process {item}")
if __name__ == '__main__':
# 프로세스 간 통신을 위한 Queue 생성
queue_process = multiprocessing.Queue()
# 생산자 프로세스 생성
producer_process = multiprocessing.Process(target=producer_pr, args=(queue_process,))
# 소비자 프로세스 생성
consumer_process = multiprocessing.Process(target=consumer_pr, args=(queue_process,))
# 프로세스 실행
producer_process.start()
consumer_process.start()
producer_process.join() # 생산자가 데이터를 다 넣을 때까지 기다림
consumer_process.join() # 소비자가 데이터를 다 소비할 때까지 기다림
- 이 후 해당 파일을 10번 정도 실행했을 때, consumer가 실행 되지 않는 경우 없었음
- 해당 파일을 실행한 결과:
02 스레드의 큐 queue.Queue
- 스레드 간 통신을 위한 큐
1. 정의
- queue.Queue는 파이썬의 queue 모듈에서 제공하는 클래스이며, 스레드 간 안전하게 데이터를 전달하기 위해 사용하는 큐
- 스레드 간에 데이터를 저장하고 꺼내는 작업이 충돌 없이 진행될 수 있도록 락 (lock) 매커니즘이 적용되어 있어, 여러 스레드가 동시에 접근해도 데이터가 손상되지 않음
2. 사용 예시
# 스레드 간 통신
'''
사용하는 클래스:
- queue.Queue()
- threading.Thread(target=, args=(,))
'''
def producer_thr(queue):
for i in range(10):
queue.put(i)
print(f"Produced_thread---- {i}")
queue.put(None) # 종료 신호
def consumer_thr(queue):
while True:
item = queue.get()
if item is None: # 종료 신호
break
print(f"----Consumed_thread {item}")
if __name__ == "__main__":
queue_thread = queue.Queue()
producer_thread = threading.Thread(target=producer_thr, args=(queue_thread,))
consumer_thread = threading.Thread(target=consumer_thr, args=(queue_thread,))
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()
- Note:
- thread 같은 경우는 multiprocessing과 달리 producer 다 실행된 후 consumer가 실행되는 경우가 없음 (20번 정도 코드를 실행해봄)
- 코드 실행 결과 예시:
'Python' 카테고리의 다른 글
collections.Counter() (파이썬 클래스) (0) | 2024.10.07 |
---|---|
지역변수, 전역변수 (파이썬 변수 종류) (1) | 2024.09.26 |
format() (문자열 포맷팅 파이썬 함수) (0) | 2024.09.11 |
cursor.iterate() (파이썬 메서드) (0) | 2024.09.04 |
공백 제거 (파이썬 함수) (1) | 2024.08.23 |