Search

멀티 스레드와 동시성 (4) 멀티 스레드 환경의 생산자 소비자 문제 해결

Tags
Study
Java
Last edited time
2025/10/10 00:52
2 more properties
Search
멀티 스레드와 동시성 (5) 자바 동기화 메커니즘: 스레드 락 / 대기 집합 / 임계영역
Study
Java
멀티 스레드와 동시성 (5) 자바 동기화 메커니즘: 스레드 락 / 대기 집합 / 임계영역
Study
Java

0. 개요

개념
생산자-소비자 문제는 멀티스레드 환경에서 발생하는 고전적인 동시성 문제
한정된 크기의 버퍼를 공유하는 생산자와 소비자 스레드 간의 동기화를 안전하고 효율적으로 처리하는 것이 핵심
공통 사전 설정
큐 용량(CAPACITY): 2
생산자 스레드: p1, p2, p3 (총 3개)
소비자 스레드: c1, c2, c3 (총 3개)
공유 자원(버퍼): ArrayDeque 기반의 큐 Q
임계 영역(Critical Section)
put(enqueue)과 take(dequeue) 동작 자체
구현에 따라 다음 중 하나의 락으로 보호됨
synchronized (monitor) 블록(모니터 락)
ReentrantLocklock()/unlock() 블록(명시적 락)

1. [1단계] 동기화 없음

1.1. 구현 특징

버퍼에 대한 어떠한 동기화 기법도 사용하지 않음
단순한 ArrayDeque를 사용한 큐 구현

1.2. 문제점

데이터 손실: 버퍼가 가득 찼을 때 생산자가 데이터를 버림
null 처리: 버퍼가 비었을 때 소비자가 null 값을 받음

2. [2단계] 바쁜 대기 방식

2.1. 구현 특징

버퍼가 가득 차거나 비었을 때 스레드가 무작정 데이터를 버리거나 null을 받는 대신, while 루프와 Thread.sleep()을 사용한 대기
synchronized 키워드로 임계 영역 설정
synchronized (this) { while (queue.size() == max) { Thread.sleep(1000); // 임계 영역 내에서 대기 } queue.offer(data); }
Java
복사

2.2. 문제점

데드락 발생 (심각한 무한대기)
synchronized 임계 영역 안에서 락을 소유한 채로 sleep() 호출
다른 스레드 (예. 생산자가 대기중일때 소비자가, 또는 소비자가 대기중일때 생산자가)가 락 획득 불가
락 점유 지속
대기 중인 스레드가 락을 반납하지 않아 다른 스레드가 접근 불가
시스템 정지
전체 애플리케이션이 무한 대기 상태에 빠짐

2.3. 예시 시나리오

생산자 먼저 실행
p1, p2가 큐에 데이터 저장
p3가 락 획득 후 큐가 가득참을 발견 → sleep() 호출
p3가 락을 소유한 채 TIME_WAITING 상태
c1,c2,c3는 락을 얻을 수 없어 BLOCKED 상태 → 무한 대기

3. [3단계] Object.wait()와 Object.notify()

3.1. 구현 특징

자바의 모든 객체가 기본적으로 가지고 있는 Object.wait()Object.notify() (또는 notifyAll()) 메서드를 사용하여 이 문제를 해결
wait(): 스레드가 락을 반납하고 WAITING 상태로 전환
notify(): 대기 중인 스레드 중 하나를 깨워서 락 획득 시도하게 함
wait()를 호출해서 대기 상태에 빠질때 락을 반납하고 대기상태에 빠지는것이 핵심
스레드 대기 집합
모든 대기 중인 스레드를 하나의 wait set에서 관리
wait() 호출 시 락을 반납하고 (RUNNABLE → WAITING) 스레드 대기 집합에서 관리
synchronized (this) { while (queue.size() == max) { wait(); // 락 반납 후 대기 } queue.offer(data); notify(); // 대기 중인 스레드 깨움 }
Java
복사

3.2. 동작 과정

1.
생산자
a.
synchronized 키워드를 통해 임계영역 설정. 생산자 스레드는 임계영역의 모니터락 획득
b.
큐가 가득 차면 락을 반납하고 대기
i.
wait()
ii.
RUNNABLE → WAITING 상태로 변경
c.
소비자가 데이터를 가져가면 깨어남
i.
notify()
2.
소비자
a.
synchronized 키워드를 통해 임계영역 설정. 소비자 스레드는 임계영역의 모니터락 획득
b.
큐가 비면 락을 반납하고 대기
c.
생산자가 데이터를 추가하면 깨어남

3.3. 개선된 점

데드락 문제 해결
데이터 손실 없이 모든 데이터 안전 처리
락을 양보하여 다른 스레드의 접근 허용

3.4. 남은 한계

비효율적인 깨우기
생산자가 생산자를 깨우거나 소비자가 소비자를 깨우는 문제
예측 불가능성
notify()가 어떤 스레드를 깨울지 예측 불가
스레드 기아
특정 스레드가 계속 선택되지 않을 가능성
notifyAll()로 스레드 대기 집합에 있는 모든 스레드를 깨 수 있으나 여전히 비효율적

4. [4단계] 고급 동기화 - ReentrantLock과 Condition

4.1. 구현 특징

condition: ReentrantLock이 사용하는 스레드 대기 공간
condition.await(): 락 반환 후 현재 스레드를 대기 상태로 condition에 보관
condition.siginal()
지정된 condition에서 대기 중인 스레드 하나를 깨움
일반적으로 FIFO로 깨움 (자바 버전과 구현에 따라 다를 수 있음)
생산자 / 소비자 스래드를 위한 대기공간 분리
private final Lock lock = new ReentrantLock(); private final Condition producerCond = lock.newCondition(); private final Condition consumerCond = lock.newCondition(); // 생산자 while (queue.size() == max) { producerCond.await(); // 생산자 전용 대기 공간 } queue.offer(data); consumerCond.signal(); // 소비자만 깨움 // 소비자 while (queue.isEmpty()) { consumerCond.await(); // 소비자 전용 대기 공간 } String data = queue.poll(); producerCond.signal(); // 생산자만 깨움
Java
복사

4.2. 핵심 장점

대기 공간 분리: 생산자와 소비자가 서로 다른 Condition에서 대기
정확한 타겟팅: 생산자는 소비자만, 소비자는 생산자만 깨움
효율성 극대화: 불필요한 스레드 깨우기 현상 완전 제거
스레드 기아 해결: FIFO 방식으로 스레드 깨우기 (구현에 따라 다를 수 있음)

5. [5단계] 표준 라이브러리 활용 - BlockingQueue

5.1. 핵심 개념

자바 표준 라이브러리
java.util.concurrent.BlockingQueue
구현체
ArrayBlockingQueue: 배열 기반, 고정 크기
LinkedBlockingQueue: 링크드 리스트 기반, 동적 크기 (옵션)
BlockingQueue<String> queue = new ArrayBlockingQueue<>(max); // 사용법 queue.put(data); // 블로킹 방식 - 공간이 생길 때까지 대기 String data = queue.take(); // 블로킹 방식 - 데이터가 있을 때까지 대기
Java
복사

5.2. 제공 기능

블로킹
put(e) : 큐가 가득 차면 공간이 생길 때까지 스레드를 차단
take() : 큐가 비어 있으면 요소가 준비될 때까지 스레드를 차단
시간 제한 대기
특정 시간 동안만 대기하고, 시간이 초과되면 실패(false 또는 null)를 반환 가능
즉시 반환
대기하지 않고 즉시 성공 여부(true/false)나 값/null을 반환할 수 있음
메서드
예외 발생
즉시 반환
블로킹
시간 제한 대기
삽입
add(e)
offer(e)
put(e)
offer(e, time, unit)
제거
remove()
poll()
take()
poll(time, unit)

5.3. 장점

완전한 캡슐화: 모든 동기화 메커니즘이 내부에 구현됨
다양한 옵션: 블로킹, 논블로킹, 시간 제한 등 다양한 방식 제공
인터럽트 지원: 모든 대기 메서드가 인터럽트를 지원하여 유연한 스레드 제어
검증된 구현: 수많은 프로덕션 환경에서 검증된 안정성
성능 최적화: 내부적으로 고도로 최적화된 구현

6. 결론

생산자-소비자 문제 해결의 발전 과정은 다음과 같이 요약할 수 있음
1.
문제 인식: 동기화 없는 접근의 위험성
2.
잘못된 접근: 락을 소유한 채로 대기하는 데드락 문제
3.
기본 해결: wait()/notify()를 통한 락 양보 메커니즘
4.
효율성 개선: 대기 공간 분리를 통한 정확한 스레드 타겟팅
5.
표준화: 검증된 라이브러리 활용으로 안정성과 편의성 확보
실무에서는 직접 동기화 메커니즘을 구현하기보다는 BlockingQueue와 같은 java.util.concurrent 패키지의 동시성 컬렉션을 적극 활용하는 것이 안전하고 효율적

7. 예시코드

package thread.bounded; import java.util.ArrayList; import java.util.List; import static util.MyLogger.log; import static util.ThreadUtils.sleep; public class BoundedMain { public static void main(String[] args) { // 1. BoundedQueue 선택 BoundedQueue queue = new BoundedQueueV5(2); // BoundedQueue queue = new BoundedQueueV6_1(2); // BoundedQueue queue = new BoundedQueueV6_2(2); // BoundedQueue queue = new BoundedQueueV6_3(2); // BoundedQueue queue = new BoundedQueueV6_4(2); // 2. 생산자, 소비자 실행 순서 선택, 반드시 하나만 선택! producerFirst(queue); // 생산자 먼저 실행 // consumerFirst(queue); // 소비자 먼저 실행 } private static void producerFirst(BoundedQueue queue) { log("== [생산자 먼저 실행] 시작," + queue.getClass().getSimpleName() + "=="); List<Thread> threads = new ArrayList<>(); startProducer(queue, threads); printAllState(queue, threads); startConsumer(queue, threads); printAllState(queue, threads); log("== [생산자 먼저 실행] 종료," + queue.getClass().getSimpleName() + "=="); } private static void consumerFirst(BoundedQueue queue) { log("== [소비자 먼저 실행] 시작," + queue.getClass().getSimpleName() + "=="); List<Thread> threads = new ArrayList<>(); startConsumer(queue, threads); printAllState(queue, threads); startProducer(queue, threads); printAllState(queue, threads); log("== [소비자 먼저 실행] 종료," + queue.getClass().getSimpleName() + "=="); } private static void printAllState(BoundedQueue queue, List<Thread> threads) { System.out.println(); log("현재 상태 출력, 큐 데이터: " + queue); for(Thread thread : threads) { log(thread.getName() + ": " + thread.getState()); } } private static void startProducer(BoundedQueue queue, List<Thread> threads) { System.out.println(); log("생산자 시작"); for(int i = 1; i <= 3; i++) { Thread producer = new Thread(new ProducerTask(queue, "data" + i), "producer" + i); threads.add(producer); producer.start(); sleep(100); } } private static void startConsumer(BoundedQueue queue, List<Thread> threads) { System.out.println(); log("소비자 시작"); for(int i = 1; i <= 3; i++) { Thread consumer = new Thread(new ConsumerTask(queue), "consumer" + i); threads.add(consumer); consumer.start(); sleep(100); } } }
Java
복사
package thread.bounded; public interface BoundedQueue { void put(String data); String take(); }
Java
복사
package thread.bounded; import static util.MyLogger.log; public class ProducerTask implements Runnable { private BoundedQueue queue; private String request; public ProducerTask(BoundedQueue queue, String request) { this.queue = queue; this.request = request; } @Override public void run() { log("[생산 시도] " + request + " -> " + queue); queue.put(request); log("[생산 완료] " + request + " -> " + queue); } }
Java
복사
package thread.bounded; import static util.MyLogger.log; public class ConsumerTask implements Runnable { private BoundedQueue queue; public ConsumerTask(final BoundedQueue queue) { this.queue = queue; } @Override public void run() { log("[소비 시도] ? <- " + queue); String data = queue.take(); log("[소비 완료] " + data + " <- " + queue); } }
Java
복사

7.1. 동기화 없음

큐가 가득차서 producer3가 데이터를 버림
큐가 비어있을 때 소비자가 null을 조회
package thread.bounded; import java.util.ArrayDeque; import java.util.Queue; import static util.MyLogger.log; public class BoundedQueueV1 implements BoundedQueue { private final Queue<String> queue = new ArrayDeque<>(); private final int max; public BoundedQueueV1(final int max) { this.max = max; } @Override public synchronized void put(final String data) { if(queue.size() == max) { log("[put] 큐가 가득 참, 버림 : " + data); return; } queue.offer(data); } @Override public synchronized String take() { if(queue.isEmpty()) { return null; } return queue.poll(); } @Override public String toString() { return queue.toString(); } }
Java
복사
13:31:22.662 [ main] == [생산자 먼저 실행] 시작,BoundedQueueV1== 13:31:22.666 [ main] 생산자 시작 13:31:22.672 [producer1] [생산 시도] data1 -> [] 13:31:22.672 [producer1] [생산 완료] data1 -> [data1] 13:31:22.774 [producer2] [생산 시도] data2 -> [data1] 13:31:22.775 [producer2] [생산 완료] data2 -> [data1, data2] 13:31:22.880 [producer3] [생산 시도] data3 -> [data1, data2] 13:31:22.881 [producer3] [put] 큐가 가득 참, 버림 : data3 13:31:22.882 [producer3] [생산 완료] data3 -> [data1, data2] 13:31:22.981 [ main] 현재 상태 출력, 큐 데이터: [data1, data2] 13:31:22.983 [ main] producer1: TERMINATED 13:31:22.983 [ main] producer2: TERMINATED 13:31:22.984 [ main] producer3: TERMINATED 13:31:22.984 [ main] 소비자 시작 13:31:22.986 [consumer1] [소비 시도] ? <- [data1, data2] 13:31:22.987 [consumer1] [소비 완료] data1 <- [data2] 13:31:23.088 [consumer2] [소비 시도] ? <- [data2] 13:31:23.089 [consumer2] [소비 완료] data2 <- [] 13:31:23.194 [consumer3] [소비 시도] ? <- [] 13:31:23.195 [consumer3] [소비 완료] null <- [] 13:31:23.298 [ main] 현재 상태 출력, 큐 데이터: [] 13:31:23.299 [ main] producer1: TERMINATED 13:31:23.299 [ main] producer2: TERMINATED 13:31:23.300 [ main] producer3: TERMINATED 13:31:23.300 [ main] consumer1: TERMINATED 13:31:23.301 [ main] consumer2: TERMINATED 13:31:23.301 [ main] consumer3: TERMINATED 13:31:23.303 [ main] == [생산자 먼저 실행] 종료,BoundedQueueV1==
Bash
복사

7.2. synchronized + sleep()

producer3가 TIME WATING으로 임계영역 내부에서 락을 소유하고 있는 채 대기
consumer는 락을 획득하기 위해 락 대기 집합에서 무한 대기
package thread.bounded; import java.util.ArrayDeque; import java.util.Queue; import static util.MyLogger.log; import static util.ThreadUtils.sleep; public class BoundedQueueV2 implements BoundedQueue { private final Queue<String> queue = new ArrayDeque<>(); private final int max; public BoundedQueueV2(final int max) { this.max = max; } @Override public synchronized void put(final String data) { while (queue.size() == max) { log("[put] 큐가 가득 참, 생산자 대기"); sleep(1000); } queue.offer(data); } @Override public synchronized String take() { while(queue.isEmpty()) { log("[take] 큐에 데이터가 없음, 소비자 대기"); sleep(1000); } return queue.poll(); } @Override public String toString() { return queue.toString(); } }
Java
복사
13:21:23.039 [ main] == [생산자 먼저 실행] 시작,BoundedQueueV2== 13:21:23.043 [ main] 생산자 시작 13:21:23.049 [producer1] [생산 시도] data1 -> [] 13:21:23.049 [producer1] [생산 완료] data1 -> [data1] 13:21:23.150 [producer2] [생산 시도] data2 -> [data1] 13:21:23.150 [producer2] [생산 완료] data2 -> [data1, data2] 13:21:23.253 [producer3] [생산 시도] data3 -> [data1, data2] 13:21:23.253 [producer3] [put] 큐가 가득 참, 생산자 대기 13:21:23.360 [ main] 현재 상태 출력, 큐 데이터: [data1, data2] 13:21:23.363 [ main] producer1: TERMINATED 13:21:23.364 [ main] producer2: TERMINATED 13:21:23.364 [ main] producer3: TIMED_WAITING 13:21:23.364 [ main] 소비자 시작 13:21:23.367 [consumer1] [소비 시도] ? <- [data1, data2] 13:21:23.470 [consumer2] [소비 시도] ? <- [data1, data2] 13:21:23.576 [consumer3] [소비 시도] ? <- [data1, data2] 13:21:23.678 [ main] 현재 상태 출력, 큐 데이터: [data1, data2] 13:21:23.679 [ main] producer1: TERMINATED 13:21:23.679 [ main] producer2: TERMINATED 13:21:23.679 [ main] producer3: TIMED_WAITING 13:21:23.679 [ main] consumer1: BLOCKED 13:21:23.679 [ main] consumer2: BLOCKED 13:21:23.679 [ main] consumer3: BLOCKED 13:21:23.680 [ main] == [생산자 먼저 실행] 종료,BoundedQueueV2== 13:21:24.259 [producer3] [put] 큐가 가득 참, 생산자 대기 13:21:25.262 [producer3] [put] 큐가 가득 참, 생산자 대기 13:21:26.266 [producer3] [put] 큐가 가득 참, 생산자 대기 13:21:27.272 [producer3] [put] 큐가 가득 참, 생산자 대기 13:21:28.277 [producer3] [put] 큐가 가득 참, 생산자 대기 13:21:29.283 [producer3] [put] 큐가 가득 참, 생산자 대기
Bash
복사

7.3. synchronized + wait / notify()

큐가 가득찼을 때 producer3가 스레드 대기 집합에서 대기, conumser를 깨움
package thread.bounded; import java.util.ArrayDeque; import java.util.Queue; import static util.MyLogger.log; import static util.ThreadUtils.sleep; public class BoundedQueueV3 implements BoundedQueue { private final Queue<String> queue = new ArrayDeque<>(); private final int max; public BoundedQueueV3(final int max) { this.max = max; } @Override public synchronized void put(final String data) { while (queue.size() == max) { log("[put] 큐가 가득 참, 생산자 대기"); try { wait(); // RUNNABLE -> WAITING, 락 반납 log("[put] 생산자 깨어남"); } catch (InterruptedException e) { throw new RuntimeException(e); } } queue.offer(data); log("[put] 생산자 데이터 저장, notify() 호출"); notify(); // 대시 쓰레드, WAIT -> BLOCKED } @Override public synchronized String take() { while(queue.isEmpty()) { log("[take] 큐에 데이터가 없음, 소비자 대기"); try { wait(); log("[take] 소비자 깨어남"); } catch (InterruptedException e) { throw new RuntimeException(e); } } String data = queue.poll(); log("[take] 소비자 데이터 획득, notify() 호출"); notify(); // 대기 스레드, WAIT -> BLOCKED return data; } @Override public String toString() { return queue.toString(); } }
Java
복사
13:23:43.445 [ main] == [생산자 먼저 실행] 시작,BoundedQueueV3== 13:23:43.455 [ main] 생산자 시작 13:23:43.484 [producer1] [생산 시도] data1 -> [] 13:23:43.484 [producer1] [put] 생산자 데이터 저장, notify() 호출 13:23:43.484 [producer1] [생산 완료] data1 -> [data1] 13:23:43.573 [producer2] [생산 시도] data2 -> [data1] 13:23:43.573 [producer2] [put] 생산자 데이터 저장, notify() 호출 13:23:43.574 [producer2] [생산 완료] data2 -> [data1, data2] 13:23:43.678 [producer3] [생산 시도] data3 -> [data1, data2] 13:23:43.679 [producer3] [put] 큐가 가득 참, 생산자 대기 13:23:43.780 [ main] 현재 상태 출력, 큐 데이터: [data1, data2] 13:23:43.780 [ main] producer1: TERMINATED 13:23:43.780 [ main] producer2: TERMINATED 13:23:43.780 [ main] producer3: WAITING 13:23:43.781 [ main] 소비자 시작 13:23:43.782 [consumer1] [소비 시도] ? <- [data1, data2] 13:23:43.782 [consumer1] [take] 소비자 데이터 획득, notify() 호출 13:23:43.782 [producer3] [put] 생산자 깨어남 13:23:43.782 [consumer1] [소비 완료] data1 <- [data2] 13:23:43.782 [producer3] [put] 생산자 데이터 저장, notify() 호출 13:23:43.782 [producer3] [생산 완료] data3 -> [data2, data3] 13:23:43.887 [consumer2] [소비 시도] ? <- [data2, data3] 13:23:43.887 [consumer2] [take] 소비자 데이터 획득, notify() 호출 13:23:43.887 [consumer2] [소비 완료] data2 <- [data3] 13:23:43.992 [consumer3] [소비 시도] ? <- [data3] 13:23:43.993 [consumer3] [take] 소비자 데이터 획득, notify() 호출 13:23:43.993 [consumer3] [소비 완료] data3 <- [] 13:23:44.096 [ main] 현재 상태 출력, 큐 데이터: [] 13:23:44.098 [ main] producer1: TERMINATED 13:23:44.098 [ main] producer2: TERMINATED 13:23:44.099 [ main] producer3: TERMINATED 13:23:44.099 [ main] consumer1: TERMINATED 13:23:44.100 [ main] consumer2: TERMINATED 13:23:44.100 [ main] consumer3: TERMINATED 13:23:44.102 [ main] == [생산자 먼저 실행] 종료,BoundedQueueV3==
Bash
복사

7.4. ReentrantLock + Condition

condition을 활용하여 생산자용 스레드 대기 집합과 소비자용 스레드 대기 집합을 분리
package thread.bounded; import java.util.ArrayDeque; import java.util.Queue; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import static util.MyLogger.log; public class BoundedQueueV5 implements BoundedQueue { private final Lock lock = new ReentrantLock(); private final Condition producerCond = lock.newCondition(); private final Condition consumerCond = lock.newCondition(); private final Queue<String> queue = new ArrayDeque<>(); private final int max; public BoundedQueueV5(final int max) { this.max = max; } @Override public void put(final String data) { lock.lock(); try { while (queue.size() == max) { log("[put] 큐가 가득 참, 생산자 대기"); try { producerCond.await(); // wait()는 Object로부터 상속받은 wait() log("[put] 생산자 깨어남"); } catch (InterruptedException e) { throw new RuntimeException(e); } } queue.offer(data); log("[put] 생산자 데이터 저장, consumerCond.signal() 호출"); consumerCond.signal(); // notify() 의 역살을 하는 기능 } finally { lock.unlock(); } } @Override public String take() { lock.lock(); try { while(queue.isEmpty()) { log("[take] 큐에 데이터가 없음, 소비자 대기"); try { consumerCond.await(); log("[take] 소비자 깨어남"); } catch (InterruptedException e) { throw new RuntimeException(e); } } String data = queue.poll(); log("[take] 소비자 데이터 획득, producerCond.signal() 호출"); producerCond.signal(); return data; } finally { lock.unlock(); } } @Override public String toString() { return queue.toString(); } }
Java
복사
13:24:36.398 [ main] == [생산자 먼저 실행] 시작,BoundedQueueV5== 13:24:36.403 [ main] 생산자 시작 13:24:36.408 [producer1] [생산 시도] data1 -> [] 13:24:36.409 [producer1] [put] 생산자 데이터 저장, consumerCond.signal() 호출 13:24:36.409 [producer1] [생산 완료] data1 -> [data1] 13:24:36.511 [producer2] [생산 시도] data2 -> [data1] 13:24:36.511 [producer2] [put] 생산자 데이터 저장, consumerCond.signal() 호출 13:24:36.512 [producer2] [생산 완료] data2 -> [data1, data2] 13:24:36.612 [producer3] [생산 시도] data3 -> [data1, data2] 13:24:36.613 [producer3] [put] 큐가 가득 참, 생산자 대기 13:24:36.716 [ main] 현재 상태 출력, 큐 데이터: [data1, data2] 13:24:36.717 [ main] producer1: TERMINATED 13:24:36.717 [ main] producer2: TERMINATED 13:24:36.717 [ main] producer3: WAITING 13:24:36.718 [ main] 소비자 시작 13:24:36.720 [consumer1] [소비 시도] ? <- [data1, data2] 13:24:36.720 [consumer1] [take] 소비자 데이터 획득, producerCond.signal() 호출 13:24:36.721 [producer3] [put] 생산자 데이터 저장, consumerCond.signal() 호출 13:24:36.721 [consumer1] [소비 완료] data1 <- [data2] 13:24:36.721 [producer3] [생산 완료] data3 -> [data2, data3] 13:24:36.825 [consumer2] [소비 시도] ? <- [data2, data3] 13:24:36.825 [consumer2] [take] 소비자 데이터 획득, producerCond.signal() 호출 13:24:36.826 [consumer2] [소비 완료] data2 <- [data3] 13:24:36.931 [consumer3] [소비 시도] ? <- [data3] 13:24:36.931 [consumer3] [take] 소비자 데이터 획득, producerCond.signal() 호출 13:24:36.932 [consumer3] [소비 완료] data3 <- [] 13:24:37.031 [ main] 현재 상태 출력, 큐 데이터: [] 13:24:37.032 [ main] producer1: TERMINATED 13:24:37.033 [ main] producer2: TERMINATED 13:24:37.033 [ main] producer3: TERMINATED 13:24:37.034 [ main] consumer1: TERMINATED 13:24:37.034 [ main] consumer2: TERMINATED 13:24:37.034 [ main] consumer3: TERMINATED 13:24:37.035 [ main] == [생산자 먼저 실행] 종료,BoundedQueueV5==
Bash
복사

7.5. BlockingQueue

put / take를 이용하여 큐가 가득차거나 비어있을때 준비될때 까지 차단
package thread.bounded; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class BoundedQueueV6_1 implements BoundedQueue { private BlockingQueue<String> queue; public BoundedQueueV6_1(int max) { this.queue = new ArrayBlockingQueue<>(max); } @Override public void put(final String data) { try { queue.put(data); } catch (InterruptedException e) { throw new RuntimeException(e); } } @Override public String take() { try { return queue.take(); } catch (InterruptedException e) { throw new RuntimeException(e); } } @Override public String toString() { return queue.toString(); } }
Java
복사
13:36:37.029 [ main] == [생산자 먼저 실행] 시작,BoundedQueueV6_1== 13:36:37.033 [ main] 생산자 시작 13:36:37.039 [producer1] [생산 시도] data1 -> [] 13:36:37.039 [producer1] [생산 완료] data1 -> [data1] 13:36:37.139 [producer2] [생산 시도] data2 -> [data1] 13:36:37.139 [producer2] [생산 완료] data2 -> [data1, data2] 13:36:37.244 [producer3] [생산 시도] data3 -> [data1, data2] 13:36:37.351 [ main] 현재 상태 출력, 큐 데이터: [data1, data2] 13:36:37.353 [ main] producer1: TERMINATED 13:36:37.353 [ main] producer2: TERMINATED 13:36:37.354 [ main] producer3: WAITING 13:36:37.354 [ main] 소비자 시작 13:36:37.358 [consumer1] [소비 시도] ? <- [data1, data2] 13:36:37.358 [producer3] [생산 완료] data3 -> [data2, data3] 13:36:37.358 [consumer1] [소비 완료] data1 <- [data2] 13:36:37.462 [consumer2] [소비 시도] ? <- [data2, data3] 13:36:37.463 [consumer2] [소비 완료] data2 <- [data3] 13:36:37.568 [consumer3] [소비 시도] ? <- [data3] 13:36:37.568 [consumer3] [소비 완료] data3 <- [] 13:36:37.673 [ main] 현재 상태 출력, 큐 데이터: [] 13:36:37.674 [ main] producer1: TERMINATED 13:36:37.674 [ main] producer2: TERMINATED 13:36:37.675 [ main] producer3: TERMINATED 13:36:37.675 [ main] consumer1: TERMINATED 13:36:37.676 [ main] consumer2: TERMINATED 13:36:37.676 [ main] consumer3: TERMINATED 13:36:37.678 [ main] == [생산자 먼저 실행] 종료,BoundedQueueV6_1==
Bash
복사
offer, poll을 활용하여 대기하지 않고 즉시 성공 여부를 반환
package thread.bounded; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import static util.MyLogger.log; public class BoundedQueueV6_2 implements BoundedQueue { private BlockingQueue<String> queue; public BoundedQueueV6_2(int max) { this.queue = new ArrayBlockingQueue<>(max); } @Override public void put(final String data) { boolean result = queue.offer(data); log("저장 시도 결과 = " + result); } @Override public String take() { return queue.poll(); } @Override public String toString() { return queue.toString(); } }
Java
복사
13:37:10.494 [ main] == [생산자 먼저 실행] 시작,BoundedQueueV6_2== 13:37:10.497 [ main] 생산자 시작 13:37:10.502 [producer1] [생산 시도] data1 -> [] 13:37:10.503 [producer1] 저장 시도 결과 = true 13:37:10.503 [producer1] [생산 완료] data1 -> [data1] 13:37:10.605 [producer2] [생산 시도] data2 -> [data1] 13:37:10.605 [producer2] 저장 시도 결과 = true 13:37:10.605 [producer2] [생산 완료] data2 -> [data1, data2] 13:37:10.706 [producer3] [생산 시도] data3 -> [data1, data2] 13:37:10.707 [producer3] 저장 시도 결과 = false 13:37:10.707 [producer3] [생산 완료] data3 -> [data1, data2] 13:37:10.812 [ main] 현재 상태 출력, 큐 데이터: [data1, data2] 13:37:10.813 [ main] producer1: TERMINATED 13:37:10.814 [ main] producer2: TERMINATED 13:37:10.814 [ main] producer3: TERMINATED 13:37:10.814 [ main] 소비자 시작 13:37:10.815 [consumer1] [소비 시도] ? <- [data1, data2] 13:37:10.816 [consumer1] [소비 완료] data1 <- [data2] 13:37:10.921 [consumer2] [소비 시도] ? <- [data2] 13:37:10.921 [consumer2] [소비 완료] data2 <- [] 13:37:11.026 [consumer3] [소비 시도] ? <- [] 13:37:11.027 [consumer3] [소비 완료] null <- [] 13:37:11.131 [ main] 현재 상태 출력, 큐 데이터: [] 13:37:11.132 [ main] producer1: TERMINATED 13:37:11.133 [ main] producer2: TERMINATED 13:37:11.133 [ main] producer3: TERMINATED 13:37:11.133 [ main] consumer1: TERMINATED 13:37:11.134 [ main] consumer2: TERMINATED 13:37:11.134 [ main] consumer3: TERMINATED 13:37:11.135 [ main] == [생산자 먼저 실행] 종료,BoundedQueueV6_2==
Bash
복사
offer, poll + 특정 시간 대기 기능을 활용하여 시간이 최과 될 경우 false, null 반환
package thread.bounded; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import static util.MyLogger.log; public class BoundedQueueV6_3 implements BoundedQueue { private BlockingQueue<String> queue; public BoundedQueueV6_3(int max) { this.queue = new ArrayBlockingQueue<>(max); } @Override public void put(final String data) { try { // 대기 시간 설정 가능 boolean result = queue.offer(data, 1, TimeUnit.NANOSECONDS); log("저장 시도 결과 = " + result); } catch (InterruptedException e) { throw new RuntimeException(e); } } @Override public String take() { try { // 대기 시간 설정 가능 return queue.poll(2, TimeUnit.SECONDS); } catch (InterruptedException e) { throw new RuntimeException(e); } } @Override public String toString() { return queue.toString(); } }
Java
복사
13:38:29.945 [ main] == [생산자 먼저 실행] 시작,BoundedQueueV6_3== 13:38:29.947 [ main] 생산자 시작 13:38:29.953 [producer1] [생산 시도] data1 -> [] 13:38:29.954 [producer1] 저장 시도 결과 = true 13:38:29.954 [producer1] [생산 완료] data1 -> [data1] 13:38:30.050 [producer2] [생산 시도] data2 -> [data1] 13:38:30.050 [producer2] 저장 시도 결과 = true 13:38:30.050 [producer2] [생산 완료] data2 -> [data1, data2] 13:38:30.155 [producer3] [생산 시도] data3 -> [data1, data2] 13:38:30.156 [producer3] 저장 시도 결과 = false 13:38:30.156 [producer3] [생산 완료] data3 -> [data1, data2] 13:38:30.261 [ main] 현재 상태 출력, 큐 데이터: [data1, data2] 13:38:30.261 [ main] producer1: TERMINATED 13:38:30.261 [ main] producer2: TERMINATED 13:38:30.261 [ main] producer3: TERMINATED 13:38:30.261 [ main] 소비자 시작 13:38:30.262 [consumer1] [소비 시도] ? <- [data1, data2] 13:38:30.263 [consumer1] [소비 완료] data1 <- [data2] 13:38:30.368 [consumer2] [소비 시도] ? <- [data2] 13:38:30.368 [consumer2] [소비 완료] data2 <- [] 13:38:30.473 [consumer3] [소비 시도] ? <- [] 13:38:30.578 [ main] 현재 상태 출력, 큐 데이터: [] 13:38:30.579 [ main] producer1: TERMINATED 13:38:30.579 [ main] producer2: TERMINATED 13:38:30.580 [ main] producer3: TERMINATED 13:38:30.580 [ main] consumer1: TERMINATED 13:38:30.580 [ main] consumer2: TERMINATED 13:38:30.581 [ main] consumer3: TIMED_WAITING 13:38:30.582 [ main] == [생산자 먼저 실행] 종료,BoundedQueueV6_3== 13:38:32.479 [consumer3] [소비 완료] null <- []
Bash
복사
add, remove를 활용하여 생산, 소비를 실패했을 때 예외 발생
package thread.bounded; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class BoundedQueueV6_4 implements BoundedQueue { private BlockingQueue<String> queue; public BoundedQueueV6_4(int max) { this.queue = new ArrayBlockingQueue<>(max); } @Override public void put(final String data) { queue.add(data); // java.lang.IllegalStateException: Queue full } @Override public String take() { return queue.remove(); // java.util.NoSuchElementException } @Override public String toString() { return queue.toString(); } }
Java
복사
13:39:03.803 [ main] == [생산자 먼저 실행] 시작,BoundedQueueV6_4== 13:39:03.807 [ main] 생산자 시작 13:39:03.812 [producer1] [생산 시도] data1 -> [] 13:39:03.813 [producer1] [생산 완료] data1 -> [data1] 13:39:03.915 [producer2] [생산 시도] data2 -> [data1] 13:39:03.915 [producer2] [생산 완료] data2 -> [data1, data2] 13:39:04.015 [producer3] [생산 시도] data3 -> [data1, data2] Exception in thread "producer3" java.lang.IllegalStateException: Queue full at java.base/java.util.AbstractQueue.add(AbstractQueue.java:98) at java.base/java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:329) at thread.bounded.BoundedQueueV6_4.put(BoundedQueueV6_4.java:16) at thread.bounded.ProducerTask.run(ProducerTask.java:18) at java.base/java.lang.Thread.run(Thread.java:833) 13:39:04.118 [ main] 현재 상태 출력, 큐 데이터: [data1, data2] 13:39:04.119 [ main] producer1: TERMINATED 13:39:04.119 [ main] producer2: TERMINATED 13:39:04.119 [ main] producer3: TERMINATED 13:39:04.120 [ main] 소비자 시작 13:39:04.121 [consumer1] [소비 시도] ? <- [data1, data2] 13:39:04.122 [consumer1] [소비 완료] data1 <- [data2] 13:39:04.224 [consumer2] [소비 시도] ? <- [data2] 13:39:04.224 [consumer2] [소비 완료] data2 <- [] 13:39:04.325 [consumer3] [소비 시도] ? <- [] Exception in thread "consumer3" java.util.NoSuchElementException at java.base/java.util.AbstractQueue.remove(AbstractQueue.java:117) at thread.bounded.BoundedQueueV6_4.take(BoundedQueueV6_4.java:21) at thread.bounded.ConsumerTask.run(ConsumerTask.java:16) at java.base/java.lang.Thread.run(Thread.java:833) 13:39:04.427 [ main] 현재 상태 출력, 큐 데이터: [] 13:39:04.427 [ main] producer1: TERMINATED 13:39:04.427 [ main] producer2: TERMINATED 13:39:04.427 [ main] producer3: TERMINATED 13:39:04.427 [ main] consumer1: TERMINATED 13:39:04.428 [ main] consumer2: TERMINATED 13:39:04.428 [ main] consumer3: TERMINATED 13:39:04.428 [ main] == [생산자 먼저 실행] 종료,BoundedQueueV6_4==
Bash
복사