Java Thread Pool
OS λ λ³λ ¬ μ²λ¦¬λ₯Ό λͺ¨λ°©νκΈ° μν΄ Thread κ°μ Context Switching μ μνν©λλ€. Java μμ Thread λ OS μ Thread μ 맀νλ©λλ€. λλ¬Έμ Thread λ₯Ό λ무 λ§μ΄ μμ±νκ² λλ€λ©΄ OS μ μμμ΄ λΉ λ₯΄κ² μμ§λ μ μμΌλ©° Context Switching λΉμ© μμ μ¦κ°ν μ μμ΅λλ€.
μ΄λ° λ¬Έμ λ₯Ό ν΄κ²°νκΈ° μν΄ Thread Pool μ΄λΌλ κ°λ μ΄ λ±μ₯ν©λλ€. λ©ν°μ€λ λ νκ²½μμ μ°μ°ν μμ μ Thread Pool μ μ λ¬νμ¬ μ²λ¦¬ν©λλ€. Thread Pool μ μ λ¬λ°μ μμ μ μ²λ¦¬νλ Thread λ₯Ό κ΄λ¦¬νκ³ Thread μ μμ μλͺ μ£ΌκΈ°λ₯Ό μ μ΄νλ©° μ λ¬λλ μμ μ νμ λ΄μ μ²λ¦¬λ₯Ό μ€μΌμ€λ§νμ¬ μμμ ν¨μ¨μ μΌλ‘ μ¬μ©ν μ μλλ‘ λμμ€λλ€.
Java μμλ Executor
μΈν°νμ΄μ€λ₯Ό ν΅ν΄ Thread Pool μ ꡬννμμ΅λλ€.
μ΄λ² κΈμμ ExecutorService
ThreadPoolExecutor
ScheduledThreadPoolExecutor
Executors
μ λν΄ μμλ³΄κ² μ΅λλ€.
ExecutorService
@Test
void executorServiceTest() throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
Future<String> future = executorService.submit(() -> "Hello World");
String result = future.get();
assertThat(result).isEqualTo("Hello World");
}
ExecutorService
λ Executor
λ₯Ό νμ₯ν μΈν°νμ΄μ€λ‘μ μμ
μ μ§νμ μ μ΄νκ³ κ΄λ¦¬νλ λ§μ λ©μλκ° ν¬ν¨λμ΄ μμ΅λλ€. ν΄λΉ μΈν°νμ΄μ€λ₯Ό ν΅ν΄ μ€νν μμ
μ μ μΆνκ³ λ°νλλ Future
μΈμ€ν΄μ€λ₯Ό ν΅ν΄ μ€νμ μ μ΄ν μλ μμ΅λλ€.
ExecutorService executorService =
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
ExecutorService
λ λ€μν ꡬν체λ€μ΄ μ‘΄μ¬νλ©° μ§μ ꡬμ±νμ¬ μ¬μ©ν μλ μμ΅λλ€. μ μμλ ThreadPoolExecutor
ν΄λμ€μ μμ±μλ₯Ό ν΅ν΄ 컀μ€ν
Thread Pool μ μμ±ν λͺ¨μ΅μ
λλ€.
ThreadPoolExecutor
ThreadPoolExecutor
λ νλμ μν λ§μ 맀κ°λ³μμ νν¬κ° μλ νμ₯μ± μλ Thread Pool μ
λλ€.
κ·Έ μ€ λνμ μΈ λ§€κ°λ³μλ€μ μλμ κ°μ΅λλ€.
corePoolSize
λ Thread Pool μ νμ μμ£Όνλ Thread μ μμ λλ€.maximumPoolSize
λ Core Thread κ° λͺ¨λ μμ μ€μ΄λ©° λ΄λΆworkQueue
κ° κ°λ μ°¨ μμ λ μ΅λλ‘ μμ±ν μ μλ Thread μ μλ₯Ό μλ―Έν©λλ€.keepAliveTime
μ μ΄κ³Ό μμ±λ Thread κ° idle μνλ‘ λκΈ°ν μ μλ μκ°μ λλ€.ThreadPoolExecutor
λ κΈ°λ³Έμ μΌλ‘ μ΄κ³Ό μμ±λ Thread λ₯Ό μ κ±° λμμΌλ‘ κ°μ£Όν©λλ€. Core Thread μ κ°μ μ κ±° μ μ± μ μ μ©νκ³ μΆμ κ²½μ°allowCoreThreadTimeOut(true)
λ©μλλ₯Ό μ¬μ©ν μ μμ΅λλ€.unit
μkeepAliveTime
μ λ¨μμ λλ€.workQueue
λ Core Thread κ° λͺ¨λ μμ μ€μΌ λ μμ μ΄ λκΈ°νλ νμ λλ€.Runnable
μΈμ€ν΄μ€λ€λ§ λκΈ°ν μ μκΈ° λλ¬ΈμCallable
μΈμ€ν΄μ€λ₯Ό μ μΆν μ μλsubmit()
λ©μλλAbstractThreadPoolExecutor
λ₯Ό ν΅ν΄Runnable
μΈμ€ν΄μ€λ‘ λ³νλμ΄ λκΈ°ν©λλ€.
μ΄λ° 맀κ°λ³μλ€μ ν΅ν΄ λ€μν Thread Pool μ μ¬μ©ν μ μμ§λ§, μΌλ°μ μΌλ‘ μ¬μ©λλ Thread Pool μ λλΆλΆ Executors
ν΄λμ€μ ν©ν 리 λ©μλλ₯Ό ν΅ν΄ μ 곡λ©λλ€.
Assigning Tasks
Runnable runnableTask = () -> {
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
executorService.execute(runnableTask);
execute()
λ©μλλ Executor
μΈν°νμ΄μ€λ‘ λΆν° μμλ°μ λ©μλμ
λλ€. Runnable
μΈμ€ν΄μ€λ₯Ό μ²λ¦¬νλ©° λ°ννμ
μ΄ void
μ΄κΈ° λλ¬Έμ μμ
μ μνλ₯Ό νμΈν μ μλ λ°©λ²μ΄ μ νμ μ
λλ€.
μ²λ¦¬ μ€ μμΈκ° λ°μνλ©΄ ν΄λΉ Thread κ° μ’ λ£λκ³ Thread Pool μμ μ κ±°λλ©° μλ‘μ΄ Thread λ₯Ό μμ±νμ¬ λ€λ₯Έ μμ μ μ²λ¦¬ν©λλ€.
Callable<String> callableTask = () -> {
TimeUnit.MILLISECONDS.sleep(300);
return "Task's execution";
};
Future<String> future =
executorService.submit(callableTask);
submit()
λ©μλλ Future
μΈμ€ν΄μ€λ₯Ό λ°ννμ
μΌλ‘ κ°μ§κ³ μμ΅λλ€. μ²λ¦¬ μ€ μμΈκ° λ°μνλλΌλ Thread κ° μ’
λ£λμ§ μκ³ λ€μ μμ
μ μ¬μ©λ©λλ€. μ΄λ° νΉμ§ λλ¬Έμ submit()
λ©μλλ₯Ό μ¬μ©νλ κ²μ΄ λ λ°λμ§ν©λλ€.
Callable<String> callableTask = () -> {
TimeUnit.MILLISECONDS.sleep(300);
return "Task's execution";
};
List<Callable<String>> callableTasks = new ArrayList<>();
callableTasks.add(callableTask);
callableTasks.add(callableTask);
callableTasks.add(callableTask);
String result = executorService.invokeAny(callableTasks);
invokeAny()
λ©μλλ μμ
μ 컬λ μ
μ μ€νν©λλ€. λͺ¨λ μ€νμν€κ³ νλλΌλ μ±κ³΅νλ€λ©΄ ν΄λΉ μμ
μ λ°νκ°μ λ°νν©λλ€.
Callable<String> callableTask = () -> {
TimeUnit.MILLISECONDS.sleep(300);
return "Task's execution";
};
List<Callable<String>> callableTasks = new ArrayList<>();
callableTasks.add(callableTask);
callableTasks.add(callableTask);
callableTasks.add(callableTask);
List<Future<String>> futures = executorService.invokeAll(callableTasks);
invokeAll()
λ©μλλ μμ
μ 컬λ μ
μ μ€νν©λλ€. λͺ¨λ μ€νμν€κ³ λͺ¨λ μμ
μ λν κ²°κ³Όλ₯Ό 컬λ μ
μΌλ‘ λ°νν©λλ€.
Shutting Down
μΌλ°μ μΌλ‘ ExecutorService
λ μμ
μ΄ μμ λ μλμΌλ‘ μ κ±°λμ§ μκ³ μλ‘μ΄ μμ
μ΄ ν λΉλκΈ°λ₯Ό 무기ν κΈ°λ€λ¦½λλ€. μ΄λ λΆκ·μΉμ μΌλ‘ λνλλ μμ
μ μ²λ¦¬νκ±°λ μ»΄νμΌ μμ μ μμ
μ μλμ μ μ μλ κ²½μ° μ μ©ν μ μμ΅λλ€.
νμ§λ§ λκΈ°μ€μΈ ExecutorService
λ‘ μΈν΄ JVM μ΄ κ³μ μ€νλμ΄ μ’
λ£λμ΄μΌ νλ μμ μ μ ν리μΌμ΄μ
μ΄ μ’
λ£λμ§ μμ μ μκΈ° λλ¬Έμ ExecutorService
λ₯Ό μ’
λ£νκΈ° μν shutdown()
shutdownNow()
λ©μλκ° μ‘΄μ¬ν©λλ€.
executorService.shutdown();
shutdown()
μ ExecutorService
λ₯Ό λ°λ‘ μ’
λ£νμ§ μκ³ ν΄λΉ ExecutorService
μ λ μ΄μ μμ
μ΄ ν λΉλμ§ μκ²ν λ€ νμ¬ μ€νμ€μΈ λͺ¨λ μμ
μ΄ λλλ©΄ λΉλ‘μ μ’
λ£λ©λλ€.
List<Runnable> notExecutedTasks = executorService.shutDownNow();
shutdownNow()
λ ExecutorService
λ₯Ό λ°λ‘ μ’
λ£νμ§λ§ μ€νμ€μΈ μμ
λ€μ΄ λͺ¨λ λμμ μ’
λ£λλ€λ 보μ₯μ νμ§ μμ΅λλ€. ν΄λΉ λ©μλλ μ²λ¦¬ λκΈ°μ€μΈ μμ
μ λͺ©λ‘μ λ°ννκΈ° λλ¬Έμ μ΄ μμ
λ€μ μ΄λ»κ² μ²λ¦¬ν μ§ κ²°μ ν μ μμ΅λλ€.
executorService.shutdown();
try {
if (!executorService.awaitTermination(800, TimeUnit.MILLISECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
}
Oracle μμλ κΆμ₯νλ ExecutorService
λ₯Ό μ’
λ£νλ μ’μ λ°©λ²μ μμ²λΌ awaitTermination()
λ©μλμ ν¨κ» μ λ λ©μλλ₯Ό λͺ¨λ μ¬μ©νλ κ²μ
λλ€.
μ΄λ°μμΌλ‘ ExecutorService
λ₯Ό μ’
λ£νλ€λ©΄ ν΄λΉ ExecutorService
λ λ μ΄μ μμ
μ λ°μ§ μμ κ²μ΄κ³ μ§μ λ μκ°μμ λͺ¨λ μμ
μ΄ μλ£λμ§ μλλ€λ©΄ μ¦μ μ’
λ£λ©λλ€.
Future Interface
submit()
κ³Ό invokeAll()
λ©μλλ Future
νμ
λλ Future
μ 컬λ μ
μ λ°ννμ¬ μμ
μ μ€ν κ²°κ³Όλ₯Ό μ»κ±°λ μμ
μ μνλ₯Ό νμΈν μ μμ΅λλ€.
Future<String> future = executorService.submit(callableTask);
String result = null;
try {
result = future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
Future
μΈν°νμ΄μ€λ Callable
μ μ€ν κ²°κ³Όλ₯Ό λ°ννλ get()
λ©μλλ₯Ό μ 곡ν©λλ€. μμ
μ΄ Runnable
μΌ κ²½μ° null
μ λ°νν©λλ€.
μμ
μ΄ μ€νμ€μΌ λ get()
λ©μλλ₯Ό νΈμΆνλ©΄ μμ
μ΄ μ λλ‘ μ€νλκ³ κ²°κ³Όλ₯Ό λ°νν μ μμ λκΉμ§ μ€νμ΄ μ°¨λ¨λ©λλ€.
String result = future.get(200, TimeUnit.MILLISECONDS);
get()
λ©μλλ‘ μΈν κΈ΄ μκ°μ μ°¨λ¨μΌλ‘ μΈν΄ μ ν리μΌμ΄μ
μ μ±λ₯μ΄ μ νλ μ μμ΅λλ€. κ²°κ³Ό λ°μ΄ν°κ° μ€μνμ§ μμ κ²½μ° μκ° μ νμ μ¬μ©νμ¬ μ΄λ¬ν λ¬Έμ λ₯Ό λ°©μ§ν μ μμ΅λλ€.
μ μμμ²λΌ μ€ν μκ°μ΄ μ§μ λ κ²λ³΄λ€ κΈΈλ©΄ TimeoutException
μ΄ λ°μνκΈ° λλ¬Έμ isDone()
λ©μλλ₯Ό ν΅ν΄ ν λΉλ μμ
μ΄ μ΄λ―Έ μ²λ¦¬λμλμ§ νμΈν μλ μμ΅λλ€.
boolean canceled = future.cancel(true);
boolean isCancelled = future.isCancelled();
Future
μΈν°νμ΄μ€λ μμ
μ λμ€μ μ·¨μνλ cancel()
λ©μλλ₯Ό μ 곡νλ©° isCancelled()
λ©μλλ₯Ό ν΅ν΄ μμ
μ μ·¨μ μ¬λΆλ₯Ό νμΈν μ μμ΅λλ€.
ScheduledExecutorService
ScheduledExecutorService
μΈν°νμ΄μ€λ 미리 μ μλ μ§μ° λλ μ£ΌκΈ°μ μΌλ‘ μμ
μ μ€νν μ μκ² λμμ£Όλ μΈν°νμ΄μ€μ
λλ€.
Future<String> resultFuture =
executorService.schedule(callableTask, 1, TimeUnit.SECONDS);
ScheduledExecutorService
μ schedule()
λ©μλλ₯Ό μ΄μ©νμ¬ μμ
μ μ§μ λ μ§μ° μ΄ν μ€νν μ μμ΅λλ€.
executorService.scheduleAtFixedRate(runnableTask, 100, 450, TimeUnit.MILLISECONDS);
scheduleAtFixedRate()
λ©μλλ₯Ό ν΅ν΄ μμ
μ μ£ΌκΈ°μ μΌλ‘ μ€νν μ μμ΅λλ€. μ μμμ κ²½μ° 0.1μ΄ λκΈ° ν μμ
μ΄ μ€νλλ©° μμ
μ μμμκ°μ κΈ°μ€μΌλ‘ 맀 0.45μ΄ λ§λ€ μμ
μ μ£ΌκΈ°μ μΌλ‘ μ€νν©λλ€. λ§μ½ ν λΉλ μμ
μ μ€ννλλ° 0.45μ΄ μ΄μμ μκ°μ΄ νμν κ²½μ° ScheduledExecutorService
λ λ€μ μμ
μ μμνκΈ° μ μ νμ¬ μμ
μ΄ μλ£λ λκΉμ§ λκΈ°ν©λλ€.
executorService.scheduleWithFixedDelay(task, 100, 150, TimeUnit.MILLISECONDS);
λ§μ½ μμ
λ°λ³΅ μ¬μ΄μ κ³ μ κΈΈμ΄μ μ§μ°μ΄ νμν κ²½μ° scheduleWithFixedDelay()
λ©μλλ₯Ό μ¬μ©ν μ μμ΅λλ€. μ μμμ κ²½μ° νμ¬ μ€νμ λκ³Ό λ€μ μ€νμ μμ μ¬μ΄μ 0.15μ΄μ μ€μ§λ₯Ό 보μ₯ν©λλ€.
scheduleAtFixedRate()
λ° scheduleWithFixedDelay()
λ©μλλ ExecutorService
κ° μ’
λ£λκ±°λ μ€ν μ€ μμΈκ° λ°μνλ κ²½μ° μ’
λ£λ©λλ€.
Executors
Executors
ν΄λμ€λ 미리 ꡬμ±λ Thread Pool μΈμ€ν΄μ€λ₯Ό λ§λ€κΈ° μν μ¬λ¬ λ©μλκ° ν¬ν¨λμ΄ μμ΅λλ€. 컀μ€ν
νλμ΄ νμνμ§ μλ€λ©΄ ν΄λΉ ν΄λμ€λ₯Ό ν΅ν΄ Thread Pool μ μμ±νμ¬ μ¬μ©ν μ μμ΅λλ€.
Executor
μ ExecutorService
μΈν°νμ΄μ€λ₯Ό ν΅ν΄ λ€μν Thread Pool μ μ¬μ©ν μ μμ΅λλ€.
newFixedThreadPool
newFixedThreadPool()
μ μ‘΄μ¬νλ Thread μ μκ° νμ κ°μ Thread Pool μ μμ±ν©λλ€.
corePoolSize
= μ§μ maximumPoolSize
=corePoolSize
keepAliveTime
= 0workQueue
=LinkedBlockingQueue*<*Runnable*>*
@Test
void newFixedThreadPoolTest() {
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
executor.submit(sleepThread());
executor.submit(sleepThread());
executor.submit(sleepThread());
assertAll(() -> {
assertThat(executor.getPoolSize()).isEqualTo(2);
assertThat(executor.getQueue().size()).isEqualTo(1);
});
}
Executors.newFixedThreadPool(2)
λ₯Ό ν΅ν΄ 2κ°μ Thread λ₯Ό κ°μ§ Thread Pool μ μμ±ν λ€ 1μ΄κ° μ μ§νλ μμ
3κ°λ₯Ό Thread Pool μ μ μΆνλ©΄ 2κ°μ μμ
μ Thread Pool μ μ‘΄μ¬νλ 2κ°μ Thread μ ν λΉλκ³ λλ¨Έμ§ μμ
μ νμμ λκΈ°νκ² λ©λλ€.
newCachedThreadPool
newCachedThreadPool()
μ νμν λ λ§λ€ Thread λ₯Ό μμ±νλ Thread Pool μ
λλ€.
corePoolSize
= 0maximumPoolSize
=Integer.MAX
keepAliveTime
= 60sworkQueue
=SynchronousQueue
@Test
void newCachedThreadPoolTest() {
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
executor.submit(sleepThread());
executor.submit(sleepThread());
executor.submit(sleepThread());
assertAll(() -> {
assertThat(executor.getPoolSize()).isEqualTo(3);
assertThat(executor.getQueue().size()).isEqualTo(0);
});
}
μ΄λ° μ€μ κ°μ Thread Pool μ΄ μ μΆλ μμ μ μμ©νκΈ° μν΄ μ νμμ΄ μ»€μ§ μ μμμ μλ―Έν©λλ€. νμ§λ§ Thread κ° λ μ΄μ νμνμ§ μλ€λ©΄ 60μ΄μ μκ° μ΄ν λͺ¨λ μ κ±°λ©λλ€.
newCachedThreadPool()
μ μμ
μ μ½μ
κ³Ό μ κ±°κ° ν μμΌλ‘ μ΄λ£¨μ΄μ§λ SynchronousQueue
μΈμ€ν΄μ€λ₯Ό λ΄λΆ νλ‘ μ¬μ©νκ³ μκΈ° λλ¬Έμ μ€μ§μ μΈ νμ ν¬κΈ°λ νμ 0μ
λλ€.
μ£Όλ‘ λ¨μκ° μμ μ΄ λ§μ μ ν리μΌμ΄μ μ μ¬μ©λ©λλ€.
newSingleThreadExecutor
newSingleThreadExecutor()
λ λ¨μΌ Thread λ₯Ό μ¬μ©νλ Thread Pool μ μμ±ν©λλ€.
corePoolSize
= 1maximumPoolSize
= 1keepAliveTime
= 0workQueue
=LinkedBlockingQueue*<*Runnable*>*
@Test
void newSingleThreadExecutorTest() {
AtomicInteger counter = new AtomicInteger();
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> counter.set(1));
executor.submit(() -> counter.compareAndSet(1, 2));
}
μ΄ ThreadPoolExecutor
λ μμ°¨μ μΌλ‘ μνλλ©° λΆλ³ λνΌ ν΄λμ€λ‘ ν¬μ₯λμ΄ μκΈ° λλ¬Έμ μμ± ν μ¬κ΅¬μ±μ΄ λΆκ°λ₯ν©λλ€. λλ¬Έμ ThreadPoolExecutor
ν΄λμ€λ‘ μΊμ€ν
ν μ μμ΅λλ€.
μ£Όλ‘ μ΄λ²€νΈ 루νλ₯Ό λ§λλ λ° μ¬μ©λ©λλ€.
newScheduledThreadPool
newScheduledThreadPool()
μ μ€μΌμ€λ§μ μν΄ DelayedWorkQueue
λ₯Ό μ¬μ©ν©λλ€.
corePoolSize
= μ§μ maximumPoolSize
=Integer.MAX
keepAliveTime
= 10msworkQueue
=DelayedWorkQueue
@Test
void scheduleTest() {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
executor.schedule(() -> {
log.info("Hello World");
}, 500, TimeUnit.MILLISECONDS);
}
μ μμλ Thread Pool μ μλ Thread μμ 0.5 μ΄ λ€μ Hello World λ₯Ό μΆλ ₯ν©λλ€.
@Test
void scheduleAtFixedRateTest() throws InterruptedException {
CountDownLatch lock = new CountDownLatch(3);
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
ScheduledFuture<?> future = executor.scheduleAtFixedRate(() -> {
log.info("Hello World");
lock.countDown();
}, 500, 100, TimeUnit.MILLISECONDS);
lock.await(1000, TimeUnit.MILLISECONDS);
future.cancel(true);
}
μ μ½λλ 0.5μ΄ λκΈ° ν 맀 0.1μ΄ λ§λ€ Hello World λ₯Ό μΆλ ₯νλ μ€μΌμ€λ§ μ λλ€.