Java Thread Pool


OS λŠ” 병렬 처리λ₯Ό λͺ¨λ°©ν•˜κΈ° μœ„ν•΄ Thread κ°„μ˜ Context Switching 을 μˆ˜ν–‰ν•©λ‹ˆλ‹€. Java μ—μ„œ Thread λŠ” OS 의 Thread 에 λ§€ν•‘λ©λ‹ˆλ‹€. λ•Œλ¬Έμ— Thread λ₯Ό λ„ˆλ¬΄ 많이 μƒμ„±ν•˜κ²Œ λœλ‹€λ©΄ OS 의 μžμ›μ΄ λΉ λ₯΄κ²Œ μ†Œμ§„λ  수 있으며 Context Switching λΉ„μš© μ—­μ‹œ 증가할 수 μžˆμŠ΅λ‹ˆλ‹€.

이런 문제λ₯Ό ν•΄κ²°ν•˜κΈ° μœ„ν•΄ Thread Pool μ΄λΌλŠ” κ°œλ…μ΄ λ“±μž₯ν•©λ‹ˆλ‹€. λ©€ν‹°μŠ€λ ˆλ“œ ν™˜κ²½μ—μ„  μ—°μ‚°ν•  μž‘μ—…μ„ Thread Pool 에 μ „λ‹¬ν•˜μ—¬ μ²˜λ¦¬ν•©λ‹ˆλ‹€. Thread Pool 은 전달받을 μž‘μ—…μ„ μ²˜λ¦¬ν•˜λŠ” Thread λ₯Ό κ΄€λ¦¬ν•˜κ³  Thread 의 μˆ˜μ™€ 생λͺ…μ£ΌκΈ°λ₯Ό μ œμ–΄ν•˜λ©° μ „λ‹¬λ˜λŠ” μž‘μ—…μ„ 큐에 λ‹΄μ•„ 처리λ₯Ό μŠ€μΌ€μ€„λ§ν•˜μ—¬ μžμ›μ„ 효율적으둜 μ‚¬μš©ν•  수 μžˆλ„λ‘ λ„μ™€μ€λ‹ˆλ‹€.

Java μ—μ„œλŠ” Executor μΈν„°νŽ˜μ΄μŠ€λ₯Ό 톡해 Thread Pool 을 κ΅¬ν˜„ν•˜μ˜€μŠ΅λ‹ˆλ‹€.

이번 글에선 ExecutorService ThreadPoolExecutor ScheduledThreadPoolExecutor Executors 에 λŒ€ν•΄ μ•Œμ•„λ³΄κ² μŠ΅λ‹ˆλ‹€.

ExecutorService


@Test
void executorServiceTest() throws ExecutionException, InterruptedException {
    ExecutorService executorService = Executors.newFixedThreadPool(10);
    Future<String> future = executorService.submit(() -> "Hello World");
    String result = future.get();
 
    assertThat(result).isEqualTo("Hello World");
}

ExecutorService λŠ” Executor λ₯Ό ν™•μž₯ν•œ μΈν„°νŽ˜μ΄μŠ€λ‘œμ„œ μž‘μ—…μ˜ 진행을 μ œμ–΄ν•˜κ³  κ΄€λ¦¬ν•˜λŠ” λ§Žμ€ λ©”μ„œλ“œκ°€ ν¬ν•¨λ˜μ–΄ μžˆμŠ΅λ‹ˆλ‹€. ν•΄λ‹Ή μΈν„°νŽ˜μ΄μŠ€λ₯Ό 톡해 μ‹€ν–‰ν•  μž‘μ—…μ„ μ œμΆœν•˜κ³  λ°˜ν™˜λ˜λŠ” Future μΈμŠ€ν„΄μŠ€λ₯Ό 톡해 싀행을 μ œμ–΄ν•  μˆ˜λ„ μžˆμŠ΅λ‹ˆλ‹€.

ExecutorService executorService = 
  new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,   
  new LinkedBlockingQueue<Runnable>());

ExecutorService λŠ” λ‹€μ–‘ν•œ κ΅¬ν˜„μ²΄λ“€μ΄ μ‘΄μž¬ν•˜λ©° 직접 κ΅¬μ„±ν•˜μ—¬ μ‚¬μš©ν•  μˆ˜λ„ μžˆμŠ΅λ‹ˆλ‹€. μœ„ μ˜ˆμ‹œλŠ” ThreadPoolExecutor 클래슀의 μƒμ„±μžλ₯Ό 톡해 μ»€μŠ€ν…€ Thread Pool 을 μƒμ„±ν•œ λͺ¨μŠ΅μž…λ‹ˆλ‹€.

ThreadPoolExecutor

ThreadPoolExecutor λŠ” νŠœλ‹μ„ μœ„ν•œ λ§Žμ€ λ§€κ°œλ³€μˆ˜μ™€ 후크가 μžˆλŠ” ν™•μž₯μ„± μžˆλŠ” Thread Pool μž…λ‹ˆλ‹€.

κ·Έ 쀑 λŒ€ν‘œμ μΈ λ§€κ°œλ³€μˆ˜λ“€μ€ μ•„λž˜μ™€ κ°™μŠ΅λ‹ˆλ‹€.

  • corePoolSize λŠ” Thread Pool 에 항상 μƒμ£Όν•˜λŠ” Thread 의 μˆ˜μž…λ‹ˆλ‹€.
  • maximumPoolSize λŠ” Core Thread κ°€ λͺ¨λ‘ μž‘μ—…μ€‘μ΄λ©° λ‚΄λΆ€ workQueue κ°€ 가득 μ°¨ μžˆμ„ λ•Œ μ΅œλŒ€λ‘œ 생성할 수 μžˆλŠ” Thread 의 수λ₯Ό μ˜λ―Έν•©λ‹ˆλ‹€.
  • keepAliveTime 은 초과 μƒμ„±λœ Thread κ°€ idle μƒνƒœλ‘œ λŒ€κΈ°ν•  수 μžˆλŠ” μ‹œκ°„μž…λ‹ˆλ‹€. ThreadPoolExecutor λŠ” 기본적으둜 초과 μƒμ„±λœ Thread λ₯Ό 제거 λŒ€μƒμœΌλ‘œ κ°„μ£Όν•©λ‹ˆλ‹€. Core Thread 에 같은 제거 정책을 μ μš©ν•˜κ³  싢을 경우 allowCoreThreadTimeOut(true) λ©”μ„œλ“œλ₯Ό μ‚¬μš©ν•  수 μžˆμŠ΅λ‹ˆλ‹€.
  • unit 은 keepAliveTime 의 λ‹¨μœ„μž…λ‹ˆλ‹€.
  • workQueue λŠ” Core Thread κ°€ λͺ¨λ‘ μž‘μ—…μ€‘μΌ λ•Œ μž‘μ—…μ΄ λŒ€κΈ°ν•˜λŠ” νμž…λ‹ˆλ‹€. Runnable μΈμŠ€ν„΄μŠ€λ“€λ§Œ λŒ€κΈ°ν•  수 있기 λ•Œλ¬Έμ— Callable μΈμŠ€ν„΄μŠ€λ₯Ό μ œμΆœν•  수 μžˆλŠ” submit() λ©”μ„œλ“œλŠ” AbstractThreadPoolExecutor λ₯Ό 톡해 Runnable μΈμŠ€ν„΄μŠ€λ‘œ λ³€ν™˜λ˜μ–΄ λŒ€κΈ°ν•©λ‹ˆλ‹€.

이런 λ§€κ°œλ³€μˆ˜λ“€μ„ 톡해 λ‹€μ–‘ν•œ Thread Pool 을 μ‚¬μš©ν•  수 μžˆμ§€λ§Œ, 일반적으둜 μ‚¬μš©λ˜λŠ” Thread Pool 은 λŒ€λΆ€λΆ„ Executors 클래슀의 νŒ©ν† λ¦¬ λ©”μ„œλ“œλ₯Ό 톡해 μ œκ³΅λ©λ‹ˆλ‹€.

Assigning Tasks

Runnable runnableTask = () -> {
    try {
        TimeUnit.MILLISECONDS.sleep(300);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
};
 
executorService.execute(runnableTask);

execute() λ©”μ„œλ“œλŠ” Executor μΈν„°νŽ˜μ΄μŠ€λ‘œ λΆ€ν„° 상속받은 λ©”μ„œλ“œμž…λ‹ˆλ‹€. Runnable μΈμŠ€ν„΄μŠ€λ₯Ό μ²˜λ¦¬ν•˜λ©° λ°˜ν™˜νƒ€μž…μ΄ void 이기 λ•Œλ¬Έμ— μž‘μ—…μ˜ μƒνƒœλ₯Ό 확인할 수 μžˆλŠ” 방법이 μ œν•œμ μž…λ‹ˆλ‹€.

처리 쀑 μ˜ˆμ™Έκ°€ λ°œμƒν•˜λ©΄ ν•΄λ‹Ή Thread κ°€ μ’…λ£Œλ˜κ³  Thread Pool μ—μ„œ 제거되며 μƒˆλ‘œμš΄ Thread λ₯Ό μƒμ„±ν•˜μ—¬ λ‹€λ₯Έ μž‘μ—…μ„ μ²˜λ¦¬ν•©λ‹ˆλ‹€.

Callable<String> callableTask = () -> {
    TimeUnit.MILLISECONDS.sleep(300);
    return "Task's execution";
};
 
Future<String> future = 
  executorService.submit(callableTask);

submit() λ©”μ„œλ“œλŠ” Future μΈμŠ€ν„΄μŠ€λ₯Ό λ°˜ν™˜νƒ€μž…μœΌλ‘œ 가지고 μžˆμŠ΅λ‹ˆλ‹€. 처리 쀑 μ˜ˆμ™Έκ°€ λ°œμƒν•˜λ”λΌλ„ Thread κ°€ μ’…λ£Œλ˜μ§€ μ•Šκ³  λ‹€μŒ μž‘μ—…μ— μ‚¬μš©λ©λ‹ˆλ‹€. 이런 νŠΉμ§• λ•Œλ¬Έμ— submit() λ©”μ„œλ“œλ₯Ό μ‚¬μš©ν•˜λŠ” 것이 더 λ°”λžŒμ§ν•©λ‹ˆλ‹€.

Callable<String> callableTask = () -> {
    TimeUnit.MILLISECONDS.sleep(300);
    return "Task's execution";
};
 
List<Callable<String>> callableTasks = new ArrayList<>();
callableTasks.add(callableTask);
callableTasks.add(callableTask);
callableTasks.add(callableTask);
 
String result = executorService.invokeAny(callableTasks);

invokeAny() λ©”μ„œλ“œλŠ” μž‘μ—…μ˜ μ»¬λ ‰μ…˜μ„ μ‹€ν–‰ν•©λ‹ˆλ‹€. λͺ¨λ‘ μ‹€ν–‰μ‹œν‚€κ³  ν•˜λ‚˜λΌλ„ μ„±κ³΅ν•œλ‹€λ©΄ ν•΄λ‹Ή μž‘μ—…μ˜ λ°˜ν™˜κ°’μ„ λ°˜ν™˜ν•©λ‹ˆλ‹€.

Callable<String> callableTask = () -> {
    TimeUnit.MILLISECONDS.sleep(300);
    return "Task's execution";
};
 
List<Callable<String>> callableTasks = new ArrayList<>();
callableTasks.add(callableTask);
callableTasks.add(callableTask);
callableTasks.add(callableTask);
 
List<Future<String>> futures = executorService.invokeAll(callableTasks);

invokeAll() λ©”μ„œλ“œλŠ” μž‘μ—…μ˜ μ»¬λ ‰μ…˜μ„ μ‹€ν–‰ν•©λ‹ˆλ‹€. λͺ¨λ‘ μ‹€ν–‰μ‹œν‚€κ³  λͺ¨λ“  μž‘μ—…μ— λŒ€ν•œ κ²°κ³Όλ₯Ό μ»¬λ ‰μ…˜μœΌλ‘œ λ°˜ν™˜ν•©λ‹ˆλ‹€.

Shutting Down

일반적으둜 ExecutorService λŠ” μž‘μ—…μ΄ 없을 λ•Œ μžλ™μœΌλ‘œ μ œκ±°λ˜μ§€ μ•Šκ³  μƒˆλ‘œμš΄ μž‘μ—…μ΄ ν• λ‹Ήλ˜κΈ°λ₯Ό λ¬΄κΈ°ν•œ κΈ°λ‹€λ¦½λ‹ˆλ‹€. μ΄λŠ” λΆˆκ·œμΉ™μ μœΌλ‘œ λ‚˜νƒ€λ‚˜λŠ” μž‘μ—…μ„ μ²˜λ¦¬ν•˜κ±°λ‚˜ 컴파일 μ‹œμ μ— μž‘μ—…μ˜ μˆ˜λŸ‰μ„ μ•Œ 수 μ—†λŠ” 경우 μœ μš©ν•  수 μžˆμŠ΅λ‹ˆλ‹€.

ν•˜μ§€λ§Œ λŒ€κΈ°μ€‘μΈ ExecutorService 둜 인해 JVM 이 계속 μ‹€ν–‰λ˜μ–΄ μ’…λ£Œλ˜μ–΄μ•Ό ν•˜λŠ” μ‹œμ μ— μ• ν”Œλ¦¬μΌ€μ΄μ…˜μ΄ μ’…λ£Œλ˜μ§€ μ•Šμ„ 수 있기 λ•Œλ¬Έμ— ExecutorService λ₯Ό μ’…λ£Œν•˜κΈ° μœ„ν•œ shutdown() shutdownNow() λ©”μ„œλ“œκ°€ μ‘΄μž¬ν•©λ‹ˆλ‹€.

executorService.shutdown();

shutdown() 은 ExecutorService λ₯Ό λ°”λ‘œ μ’…λ£Œν•˜μ§€ μ•Šκ³  ν•΄λ‹Ή ExecutorService 에 더 이상 μž‘μ—…μ΄ ν• λ‹Ήλ˜μ§€ μ•Šκ²Œν•œ λ’€ ν˜„μž¬ 싀행쀑인 λͺ¨λ“  μž‘μ—…μ΄ λλ‚˜λ©΄ λΉ„λ‘œμ†Œ μ’…λ£Œλ©λ‹ˆλ‹€.

List<Runnable> notExecutedTasks = executorService.shutDownNow();

shutdownNow() λŠ” ExecutorService λ₯Ό λ°”λ‘œ μ’…λ£Œν•˜μ§€λ§Œ 싀행쀑인 μž‘μ—…λ“€μ΄ λͺ¨λ‘ λ™μ‹œμ— μ’…λ£Œλœλ‹€λŠ” 보μž₯은 ν•˜μ§€ μ•ŠμŠ΅λ‹ˆλ‹€. ν•΄λ‹Ή λ©”μ„œλ“œλŠ” 처리 λŒ€κΈ°μ€‘μΈ μž‘μ—…μ˜ λͺ©λ‘μ„ λ°˜ν™˜ν•˜κΈ° λ•Œλ¬Έμ— 이 μž‘μ—…λ“€μ„ μ–΄λ–»κ²Œ μ²˜λ¦¬ν• μ§€ κ²°μ •ν•  수 μžˆμŠ΅λ‹ˆλ‹€.

executorService.shutdown();
try {
    if (!executorService.awaitTermination(800, TimeUnit.MILLISECONDS)) {
        executorService.shutdownNow();
    } 
} catch (InterruptedException e) {
    executorService.shutdownNow();
}

Oracle μ—μ„œλ„ ꢌμž₯ν•˜λŠ” ExecutorService λ₯Ό μ’…λ£Œν•˜λŠ” 쒋은 방법은 μœ„μ²˜λŸΌ awaitTermination() λ©”μ„œλ“œμ™€ ν•¨κ»˜ μœ„ 두 λ©”μ„œλ“œλ₯Ό λͺ¨λ‘ μ‚¬μš©ν•˜λŠ” κ²ƒμž…λ‹ˆλ‹€.

μ΄λŸ°μ‹μœΌλ‘œ ExecutorService λ₯Ό μ’…λ£Œν•œλ‹€λ©΄ ν•΄λ‹Ή ExecutorService λŠ” 더 이상 μž‘μ—…μ„ 받지 μ•Šμ„ 것이고 μ§€μ •λœ μ‹œκ°„μ•ˆμ— λͺ¨λ“  μž‘μ—…μ΄ μ™„λ£Œλ˜μ§€ μ•ŠλŠ”λ‹€λ©΄ μ¦‰μ‹œ μ’…λ£Œλ©λ‹ˆλ‹€.

Future Interface

submit() κ³Ό invokeAll() λ©”μ„œλ“œλŠ” Future νƒ€μž… λ˜λŠ” Future 의 μ»¬λ ‰μ…˜μ„ λ°˜ν™˜ν•˜μ—¬ μž‘μ—…μ˜ μ‹€ν–‰ κ²°κ³Όλ₯Ό μ–»κ±°λ‚˜ μž‘μ—…μ˜ μƒνƒœλ₯Ό 확인할 수 μžˆμŠ΅λ‹ˆλ‹€.

Future<String> future = executorService.submit(callableTask);
String result = null;
try {
    result = future.get();
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

Future μΈν„°νŽ˜μ΄μŠ€λŠ” Callable 의 μ‹€ν–‰ κ²°κ³Όλ₯Ό λ°˜ν™˜ν•˜λŠ” get() λ©”μ„œλ“œλ₯Ό μ œκ³΅ν•©λ‹ˆλ‹€. μž‘μ—…μ΄ Runnable 일 경우 null 을 λ°˜ν™˜ν•©λ‹ˆλ‹€.

μž‘μ—…μ΄ 싀행쀑일 λ•Œ get() λ©”μ„œλ“œλ₯Ό ν˜ΈμΆœν•˜λ©΄ μž‘μ—…μ΄ μ œλŒ€λ‘œ μ‹€ν–‰λ˜κ³  κ²°κ³Όλ₯Ό λ°˜ν™˜ν•  수 μžˆμ„ λ•ŒκΉŒμ§€ 싀행이 μ°¨λ‹¨λ©λ‹ˆλ‹€.

String result = future.get(200, TimeUnit.MILLISECONDS);

get() λ©”μ„œλ“œλ‘œ μΈν•œ κΈ΄ μ‹œκ°„μ˜ μ°¨λ‹¨μœΌλ‘œ 인해 μ• ν”Œλ¦¬μΌ€μ΄μ…˜μ˜ μ„±λŠ₯이 μ €ν•˜λ  수 μžˆμŠ΅λ‹ˆλ‹€. κ²°κ³Ό 데이터가 μ€‘μš”ν•˜μ§€ μ•Šμ€ 경우 μ‹œκ°„ μ œν•œμ„ μ‚¬μš©ν•˜μ—¬ μ΄λŸ¬ν•œ 문제λ₯Ό 방지할 수 μžˆμŠ΅λ‹ˆλ‹€.

μœ„ μ˜ˆμ‹œμ²˜λŸΌ μ‹€ν–‰ μ‹œκ°„μ΄ μ§€μ •λœ 것보닀 κΈΈλ©΄ TimeoutException 이 λ°œμƒν•˜κΈ° λ•Œλ¬Έμ— isDone() λ©”μ„œλ“œλ₯Ό 톡해 ν• λ‹Ήλœ μž‘μ—…μ΄ 이미 μ²˜λ¦¬λ˜μ—ˆλŠ”μ§€ 확인할 μˆ˜λ„ μžˆμŠ΅λ‹ˆλ‹€.

boolean canceled = future.cancel(true);
boolean isCancelled = future.isCancelled();

Future μΈν„°νŽ˜μ΄μŠ€λŠ” μž‘μ—…μ„ 도쀑에 μ·¨μ†Œν•˜λŠ” cancel() λ©”μ„œλ“œλ₯Ό μ œκ³΅ν•˜λ©° isCancelled() λ©”μ„œλ“œλ₯Ό 톡해 μž‘μ—…μ˜ μ·¨μ†Œ μ—¬λΆ€λ₯Ό 확인할 수 μžˆμŠ΅λ‹ˆλ‹€.

ScheduledExecutorService


ScheduledExecutorService μΈν„°νŽ˜μ΄μŠ€λŠ” 미리 μ •μ˜λœ 지연 λ˜λŠ” 주기적으둜 μž‘μ—…μ„ μ‹€ν–‰ν•  수 있게 λ„μ™€μ£ΌλŠ” μΈν„°νŽ˜μ΄μŠ€μž…λ‹ˆλ‹€.

Future<String> resultFuture = 
  executorService.schedule(callableTask, 1, TimeUnit.SECONDS);

ScheduledExecutorService 의 schedule() λ©”μ„œλ“œλ₯Ό μ΄μš©ν•˜μ—¬ μž‘μ—…μ„ μ§€μ •λœ 지연 이후 μ‹€ν–‰ν•  수 μžˆμŠ΅λ‹ˆλ‹€.

executorService.scheduleAtFixedRate(runnableTask, 100, 450, TimeUnit.MILLISECONDS);

scheduleAtFixedRate() λ©”μ„œλ“œλ₯Ό 톡해 μž‘μ—…μ„ 주기적으둜 μ‹€ν–‰ν•  수 μžˆμŠ΅λ‹ˆλ‹€. μœ„ μ˜ˆμ‹œμ˜ 경우 0.1초 λŒ€κΈ° ν›„ μž‘μ—…μ΄ μ‹€ν–‰λ˜λ©° μž‘μ—…μ˜ μ‹œμž‘μ‹œκ°„μ„ κΈ°μ€€μœΌλ‘œ 맀 0.45초 λ§ˆλ‹€ μž‘μ—…μ„ 주기적으둜 μ‹€ν–‰ν•©λ‹ˆλ‹€. λ§Œμ•½ ν• λ‹Ήλœ μž‘μ—…μ„ μ‹€ν–‰ν•˜λŠ”λ° 0.45초 μ΄μƒμ˜ μ‹œκ°„μ΄ ν•„μš”ν•  경우 ScheduledExecutorService λŠ” λ‹€μŒ μž‘μ—…μ„ μ‹œμž‘ν•˜κΈ° 전에 ν˜„μž¬ μž‘μ—…μ΄ μ™„λ£Œλ  λ•ŒκΉŒμ§€ λŒ€κΈ°ν•©λ‹ˆλ‹€.

executorService.scheduleWithFixedDelay(task, 100, 150, TimeUnit.MILLISECONDS);

λ§Œμ•½ μž‘μ—… 반볡 사이에 κ³ μ • 길이의 지연이 ν•„μš”ν•œ 경우 scheduleWithFixedDelay() λ©”μ„œλ“œλ₯Ό μ‚¬μš©ν•  수 μžˆμŠ΅λ‹ˆλ‹€. μœ„ μ˜ˆμ‹œμ˜ 경우 ν˜„μž¬ μ‹€ν–‰μ˜ 끝과 λ‹€μŒ μ‹€ν–‰μ˜ μ‹œμž‘ μ‚¬μ΄μ˜ 0.15초의 쀑지λ₯Ό 보μž₯ν•©λ‹ˆλ‹€.

scheduleAtFixedRate() 및 scheduleWithFixedDelay() λ©”μ„œλ“œλŠ” ExecutorService κ°€ μ’…λ£Œλ˜κ±°λ‚˜ μ‹€ν–‰ 쀑 μ˜ˆμ™Έκ°€ λ°œμƒν•˜λŠ” 경우 μ’…λ£Œλ©λ‹ˆλ‹€.

Executors


Executors ν΄λž˜μŠ€λŠ” 미리 κ΅¬μ„±λœ Thread Pool μΈμŠ€ν„΄μŠ€λ₯Ό λ§Œλ“€κΈ° μœ„ν•œ μ—¬λŸ¬ λ©”μ„œλ“œκ°€ ν¬ν•¨λ˜μ–΄ μžˆμŠ΅λ‹ˆλ‹€. μ»€μŠ€ν…€ νŠœλ‹μ΄ ν•„μš”ν•˜μ§€ μ•Šλ‹€λ©΄ ν•΄λ‹Ή 클래슀λ₯Ό 톡해 Thread Pool 을 μƒμ„±ν•˜μ—¬ μ‚¬μš©ν•  수 μžˆμŠ΅λ‹ˆλ‹€.

Executor 와 ExecutorService μΈν„°νŽ˜μ΄μŠ€λ₯Ό 톡해 λ‹€μ–‘ν•œ Thread Pool 을 μ‚¬μš©ν•  수 μžˆμŠ΅λ‹ˆλ‹€.

newFixedThreadPool

newFixedThreadPool() 은 μ‘΄μž¬ν•˜λŠ” Thread 의 μˆ˜κ°€ 항상 같은 Thread Pool 을 μƒμ„±ν•©λ‹ˆλ‹€.

  • corePoolSize = 지정
  • maximumPoolSize = corePoolSize
  • keepAliveTime = 0
  • workQueue = LinkedBlockingQueue*<*Runnable*>*
@Test
void newFixedThreadPoolTest() {
    ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
    executor.submit(sleepThread());
    executor.submit(sleepThread());
    executor.submit(sleepThread());
 
    assertAll(() -> {
        assertThat(executor.getPoolSize()).isEqualTo(2);
        assertThat(executor.getQueue().size()).isEqualTo(1);
    });
}

Executors.newFixedThreadPool(2) λ₯Ό 톡해 2개의 Thread λ₯Ό 가진 Thread Pool 을 μƒμ„±ν•œ λ’€ 1μ΄ˆκ°„ μ •μ§€ν•˜λŠ” μž‘μ—… 3개λ₯Ό Thread Pool 에 μ œμΆœν•˜λ©΄ 2개의 μž‘μ—…μ€ Thread Pool 에 μ‘΄μž¬ν•˜λŠ” 2개의 Thread 에 ν• λ‹Ήλ˜κ³  λ‚˜λ¨Έμ§€ μž‘μ—…μ€ νμ—μ„œ λŒ€κΈ°ν•˜κ²Œ λ©λ‹ˆλ‹€.

newCachedThreadPool

newCachedThreadPool() 은 ν•„μš”ν•  λ•Œ λ§ˆλ‹€ Thread λ₯Ό μƒμ„±ν•˜λŠ” Thread Pool μž…λ‹ˆλ‹€.

  • corePoolSize = 0
  • maximumPoolSize = Integer.MAX
  • keepAliveTime = 60s
  • workQueue = SynchronousQueue
@Test
void newCachedThreadPoolTest() {
    ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
    executor.submit(sleepThread());
    executor.submit(sleepThread());
    executor.submit(sleepThread());
 
    assertAll(() -> {
        assertThat(executor.getPoolSize()).isEqualTo(3);
        assertThat(executor.getQueue().size()).isEqualTo(0);
    });
}

이런 섀정값은 Thread Pool 이 제좜된 μž‘μ—…μ„ μˆ˜μš©ν•˜κΈ° μœ„ν•΄ μ œν•œμ—†μ΄ 컀질 수 μžˆμŒμ„ μ˜λ―Έν•©λ‹ˆλ‹€. ν•˜μ§€λ§Œ Thread κ°€ 더 이상 ν•„μš”ν•˜μ§€ μ•Šλ‹€λ©΄ 60초의 μ‹œκ°„ 이후 λͺ¨λ‘ μ œκ±°λ©λ‹ˆλ‹€.

newCachedThreadPool() 은 μž‘μ—…μ˜ μ‚½μž…κ³Ό μ œκ±°κ°€ ν•œ 쌍으둜 μ΄λ£¨μ–΄μ§€λŠ” SynchronousQueue μΈμŠ€ν„΄μŠ€λ₯Ό λ‚΄λΆ€ 큐둜 μ‚¬μš©ν•˜κ³  있기 λ•Œλ¬Έμ— μ‹€μ§ˆμ μΈ 큐의 ν¬κΈ°λŠ” 항상 0μž…λ‹ˆλ‹€.

주둜 λ‹¨μ‹œκ°„ μž‘μ—…μ΄ λ§Žμ€ μ• ν”Œλ¦¬μΌ€μ΄μ…˜μ— μ‚¬μš©λ©λ‹ˆλ‹€.

newSingleThreadExecutor

newSingleThreadExecutor() λŠ” 단일 Thread λ₯Ό μ‚¬μš©ν•˜λŠ” Thread Pool 을 μƒμ„±ν•©λ‹ˆλ‹€.

  • corePoolSize = 1
  • maximumPoolSize = 1
  • keepAliveTime = 0
  • workQueue = LinkedBlockingQueue*<*Runnable*>*
@Test
void newSingleThreadExecutorTest() {
    AtomicInteger counter = new AtomicInteger();
    ExecutorService executor = Executors.newSingleThreadExecutor();
    executor.submit(() -> counter.set(1));
    executor.submit(() -> counter.compareAndSet(1, 2));
}

이 ThreadPoolExecutor λŠ” 순차적으둜 μˆ˜ν–‰λ˜λ©° λΆˆλ³€ 래퍼 클래슀둜 포μž₯λ˜μ–΄ 있기 λ•Œλ¬Έμ— 생성 ν›„ μž¬κ΅¬μ„±μ΄ λΆˆκ°€λŠ₯ν•©λ‹ˆλ‹€. λ•Œλ¬Έμ— ThreadPoolExecutor 클래슀둜 μΊμŠ€νŒ…ν•  수 μ—†μŠ΅λ‹ˆλ‹€.

주둜 이벀트 루프λ₯Ό λ§Œλ“œλŠ” 데 μ‚¬μš©λ©λ‹ˆλ‹€.

newScheduledThreadPool

newScheduledThreadPool() 은 μŠ€μΌ€μ€„λ§μ„ μœ„ν•΄ DelayedWorkQueue λ₯Ό μ‚¬μš©ν•©λ‹ˆλ‹€.

  • corePoolSize = 지정
  • maximumPoolSize = Integer.MAX
  • keepAliveTime = 10ms
  • workQueue = DelayedWorkQueue
@Test
void scheduleTest() {
    ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
    executor.schedule(() -> {
        log.info("Hello World");
    }, 500, TimeUnit.MILLISECONDS);
}

μœ„ μ˜ˆμ‹œλŠ” Thread Pool 에 μžˆλŠ” Thread μ—μ„œ 0.5 초 뒀에 Hello World λ₯Ό 좜λ ₯ν•©λ‹ˆλ‹€.

@Test
void scheduleAtFixedRateTest() throws InterruptedException {
    CountDownLatch lock = new CountDownLatch(3);
 
    ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
    ScheduledFuture<?> future = executor.scheduleAtFixedRate(() -> {
        log.info("Hello World");
        lock.countDown();
    }, 500, 100, TimeUnit.MILLISECONDS);
 
    lock.await(1000, TimeUnit.MILLISECONDS);
    future.cancel(true);
}

μœ„ μ½”λ“œλŠ” 0.5초 λŒ€κΈ° ν›„ 맀 0.1초 λ§ˆλ‹€ Hello World λ₯Ό 좜λ ₯ν•˜λŠ” μŠ€μΌ€μ€„λ§ μž…λ‹ˆλ‹€.

References