Search

Java - 실행자(Executor)

글감
Java
작성자
작성 일자
2024/02/08 06:43
상태
완료
공개여부
공개
Date
생성자
작업자

Runnable과 Callable

Runnable 인터페이스
@FunctionalInterface public interface Runnable { public abstract void run(); }
Java
복사
Runnable 인터페이스는 위와 같이 아무 값도 반환하지 않고, 내부에서 Exception을 발생시키지도 않는다.
public class RunnableExample { static class MyRunnable implements Runnable { @Override public void run() { System.out.println("Runnable at " + LocalTime.now()); } } public static void main(String[] args) { MyRunnable runnable = new MyRunnable(); Thread thread = new Thread(runnable); thread.start(); } }
Java
복사
이런 Runnable을 통해 직접 메서드를 Thread로 직접 호출하여 사용하는 방식은 다음과 같은 한계가 있다.
저수준의 API(스레드 생성)에 의존
값의 반환이 불가능
스레드 생성과 종료 시에 오버헤드 발생
스레드 관리 어려움
이런 한계를 해결하기 위해 Java 5에서 Callable과 Future가 새로 추가되었다.
Callable 인터페이스
@FunctionalInterface public interface Callable<V> { V call() throws Exception; }
Java
복사
Runnable에서는 결과를 반환할 수 없기 때문에, 공용 메모리나 파이프 같이 번거로운 작업을 통해 반환값을 가져와야 했다. 이러한 한계를 해결하기 위해 제네릭을 통해 결과를 받을 수 있고 Exception을 발생 시킬 수 있는 Callable 인터페이스가 추가되었다.
public class CallableExample { static class MyCallable implements Callable<String> { @Override public String call() throws Exception { return "Callable at " + LocalTime.now(); } } public static void main(String[] args) { MyCallable callable = new MyCallable(); FutureTask futureTask = new FutureTask(callable); Thread thread = new Thread(futureTask); thread.start(); System.out.prinln(futureTask.get()); // 결과 반환값 기다리기 } }
Java
복사
Callable 인터페이스의 구현인 작업(Task)은 가용 스레드가 없는 경우 실행이 미뤄져 작업이 오래걸릴 수 있다. 때문에 실행결과를 바로 받지 못하고 미래의 어느 시점에 얻을 수 있는데, 이렇게 미래에 완료된 Callable의 반환값을 얻을 수 있게 구현된 것이 Future 인터페이스이다.
Future 인터페이스
위 예시에서는 Runnable과 Future 인터페이스의 구현체인 FutureTask를 사용하였는데, Future는 비동기 작업을 통해 미래의 실행 결과를 얻거나, 현재 비동기 작업의 상태를 확인하고, 작업을 기다릴 수 있도록 메서드를 제공한다.
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
Java
복사
Futuer 클래스는 다음과 같은 한계점들을 가지고 있다.
외부에서 해당 작업을 완료 시킬 수 없고, get의 타임아웃 설정으로만 완료 가능
Blocking을 통해서만 이후의 결과를 처리할 수 있음(get)
여러 Future를 조합할 수 없고, 예외를 처리할 수 없음
이런 Future의 한계를 해결하여 Java 8에서 CompletableFuture가 등장하게 되었다.
CompletableFuture 클래스
CompleatbleFuture 클래스는 Future의 단점을 극복한 여러 메서드들을 제공한다.
비동기 작업 실행
CompletableFuture<Void> runAsync(Runnable runnable)
반환값이 없는 경우 비동기로 작업을 실행시킨다.
<U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
반환값이 있는 경우 비동기로 작업을 실행시킨다.
작업 콜백
<U> CompletableFutre<U> thenApply(Function<? super T, ? extends U> fn)
함수형 인터페이스를 작업 콜백으로 매개변수에 받아, 작업 결과를 매개변수화 메서드로 수행 후 결과를 반환한다.
CompletableFuture<Void> thenAccept(Consumer<? super T> action)
Consumer를 작업 콜백으로 매개변수에 받아, 작업 결과를 Consumer로 소비한다.
CompetableFuture<Void> thenRun(Runnable action)
Runnable을 작업 콜백으로 매개변수에 받아, 작업이 끝나면 Runnable을 수행한다.
예외 처리
CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
함수형 인터페이스를 매개변수로 받아, 예외가 발생하면 발생한 예외를 받아서 매개변수화 메서드로 처리한다.
<U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
BiFunction을 매개변수로 받아, 예외가 발생한 경우와 발생하지 않은 경우에 대해 모두 처리할 수 있다.

Thread 클래스

Thread 클래스는 자바에서 스레드 생성을 위해 구현되어있는 클래스로, 아래의 여러 메서드들을 지원한다.
sleep
현재 스레드 잠시 멈추는 메서드로, 자원을 놓지않고 제어권을 넘기기 때문에 데드락 발생 위험이 있다.
interrupt
다른 스레드를 깨워서 intrrupedException을 발생시키는 메서드로, interrupt가 발생한 스레드는 예외를 catch하여 다른 작업을 수행할 수 있다.
join
다른 스레드의 작업이 끝날 때까지 대기하도록 하는 메서드로, 스레드의 순서를 제어하는데 사용할 수 있다.
@Test void threadStart() { Thread thread = new MyThread(); thread.start(); System.out.println("Hello: " + Thread.currentThread().getName()); } static class MyThread extends Thread { @Override public void run() { System.out.println("Thread: " + Thread.currentThread().getName()); } }
Java
복사
위 예시 코드에서 알 수 있듯이, run()을 호출하는 것이 아니라 thread.start()를 호출하는 것임에 주의하자.
start()를 호출하면 해당 스레드의 state를 확인해 스레드가 실행 가능한 상태인지(New state인지) 검사하고, 스레드를 스레드 그룹(ThreadGroup 클래스)에 추가 후 JVM에서 해당 스레드를 실행시킨다.
while (true) { request = acceptRequest(); Runnable requestHandler = new RequestHandler(request); new Thread(requestHandler).start(); }
JavaScript
복사
위와 같이 Thread 클래스를 사용하는 코드의 문제점은 몇 가지 성능상의 문제를 가지고 있다.
많은 요청이 들어오는 경우 스레드 생성 및 종료에 따른 오버헤드가 발생한다.
스레드 생성 개수에 제한이 없기 때문에, 요청이 많이 들어오면 OutOfMemoryError가 발생할 수 있다.
많은 수의 스레드가 실행되고 있는 경우, 스레드 스케줄링에 의한 오버헤드가 발생한다.
이런 문제점들 때문에, 동시에 다수의 요청을 처리해야하는 어플리케이션에서는 스레드 풀을 사용하여 동시에 실행될 수 있는 스레드 개수를 제한하는 것이 일반적이다.
자바 5부터 Executor 인터페이스를 제공하고, 스레드 풀과 큐 등 다양한 Executor 구현체를 제공하여 위의 Thread 클래스의 문제점을 해소할 수 있다.

Executor 인터페이스

package java.utill.concurrent.Executor public interface Executor { void execute(Runnable command); }
JavaScript
복사
매번 새로운 스레드를 만드는 것이 비효율적이기 때문에, Executor는 스레드 풀(Thread Pool)의 구현을 위해 도입된 인터페이스이다.
Executor는 execute() 메서드로 등록된 작업(Runnable 인스턴스)을 실행하기 위한 인터페이스이면서, 인터페이스 분리 원칙(ISP, Interface Segregation Principle)에 맞게 작업 등록과 작업 실행 중 등록된 작업의 실행 책임만 가진다.
예를 들면, 스레드 풀을 구현한 Executor 구현체는 전달받은 작업을 큐에 넣은 뒤 가용한 스레드가 존재할 경우, 해당 스레드에 작업을 실행하도로록 구현될 것이다.
@Test void executorRun() { final Runnable runnable = () -> System.out.println("Thread: " + Thread.currentThread().getName()); Executor executor = new RunExecutor(); executor.execute(runnable); } static class RunExecutor implements Executor { @Override public void execute(final Runnable command) { command.run(); } }
Java
복사

ExecutorService 인터페이스

ExecutorService 인터페이스는 작업 등록(Runnable, Callable)을 위한 인터페이스로, Executor의 라이프 사이클을 관리할 수 있는 기능을 정의하는 인터페이스이다.
ExecutorService는 Executor를 상속받기 때문에 작업 실행에 대한 책임도 갖고, 거기에 더해 작업 등록에 대한 책임도 갖는다.
ExecutorService 인터페이스의 라이프 사이클 관리 메서드
void shutdown() : 셧다운 한다. 이미 실행자에 제공된 작업은 실행되지만, 새로운 작업은 수용하지 않는다.
List<Runnable> shutdownNow() : 현재 실행 중인 모든 작업을 중지시키고 대기 중인 작업을 멈춘다. 현재 실행되기 위해 대기 중인 작업 목록을 반환한다.
boolean isShutdown() : Executor가 셧다운 되었는지 여부를 확인한다.
boolean isTerminated() : 셧다운 실행 후 모든 작업이 종료되었는지 여부를 확인한다.
boolean awaitTermination(long timeout, TimeUnit unit) : 셧다운을 실행한 뒤 지정한 시간동안 모든 작업이 종료되는 것을 기다린다. 지정 시간 이내에 모든 작업이 종료되면 true를 반환하고, 아니라면 false를 반환한다.
@Test void shutdown() { ExecutorService executorService = Executors.newFixedThreadPool(10); Runnable runnable = () -> System.out.println("Thread: " + Thread.currentThread().getName()); executorService.execute(runnable); // shutdown 호출 executorService.shutdown(); // shutdown 호출 이후에는 새로운 작업들을 받을 수 없음, 에러 발생 RejectedExecutionException result = assertThrows(RejectedExecutionException.class, () -> executorService.execute(runnable)); assertThat(result).isInstanceOf(RejectedExecutionException.class); }
Java
복사
executor.shutdown(); try { if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { System.out.println("아직 처리중인 작업 존재"); System.out.println("작업 강제 종료 실행"); executor.shutdownNow(); if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { System.out.println("여전히 종료하지 않은 작업 존재"); } } } catch (InterruptedException e1) { executor.shutdownNow(); Thread.currentThread().interrupt(); } System.out.println("서버 셧다운 완료");
Java
복사
ExecutorService 인터페이스의 비동기 작업 메서드
<T> Future<T> submit(Callable<T> task)
결과값을 리턴하는 작업을 추가한다.
Future<?> submit(Runnable task)
결과값이 없는 작업을 추가한다.
<T> Future<T> submit(Runnable task, T result)
새로운 작업을 추가한다. result는 작업이 성공적으로 수행될 때 사용될 리턴 값을 의미한다.
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
주어진 작업을 모두 실행한다. 각 실행 결과 얻을 수 있는 Future의 List를 리턴한다.
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
invokeAll()과 동일하지만, 지정한 시간 동안 완료되지 못한 작업은 취소된다.
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
주어진 작업들 중 가장 빨리 끝난 작업 결과만을 Future로 반환한다. 나머지 완료되지 않은 작업은 취소된다.
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
invokeAny()와 동일하지만, 지정한 시간 동안만 대기한다.
@Test void invokeAll() throws InterruptedException, ExecutionException { ExecutorService executorService = Executors.newFixedThreadPool(10); Instant start = Instant.now(); Callable<String> hello = () -> { Thread.sleep(1000L); final String result = "Hello"; System.out.println("result = " + result); return result; }; Callable<String> mang = () -> { Thread.sleep(4000L); final String result = "Java"; System.out.println("result = " + result); return result; }; Callable<String> kyu = () -> { Thread.sleep(2000L); final String result = "kyu"; System.out.println("result = " + result); return result; }; List<Future<String>> futures = executorService.invokeAll(Arrays.asList(hello, mang, kyu)); for(Future<String> f : futures) { System.out.println(f.get()); } // String result = executorService.invokeAny(Arrays.asList(hello, mang, kyu)); // System.out.println(result); System.out.println("time = " + Duration.between(start, Instant.now()).getSeconds()); executorService.shutdown(); }
Java
복사

ScheduledExecutorService 인터페이스

ScheduledExecutorService 인터페이스는 ExecutorService 인터페이스를 상속받고, 거기에 더해 특정 시간 이후에 혹은 주기적으로 작업을 실행시키는 메서드를 포함한다.
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
특정 시간(delay) 이후에 작업(Runnable)을 실행시킨다.
<V> ScheduledFuture<V> schedule(Callable<V> command, long delay, TimeUnit unit)
특정 시간(delay) 이후에 작업(Callable)을 실행시킨다.
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period TimeUnit unit)
특정 시간(delay) 이후 작업을 처음 실행시키고, 이후 주기적으로(period) 작업을 실행시킨다.
작업 실행 주기는 실행시간 기준으로 반복한다.(작업 1초에 2초 주기라면, 0초 실행 → 2초 실행 → 4초 실행 → …)
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
특정 시간(delay) 이후 작업을 처음 실행시키고, 작업이 완료된 후 특정 시간(delay)이 지나면 작업을 실행시킨다.
작업이 완료 시간 + delay 후에 다음 작업이 실행된다.(작업 1초에 2초 주기라면, 0초 실행 → 1초 완료 → 3초 실행 → 4초 완료 → 6초 실행 → …)
ScheduledThreadPoolExecutor 클래스는 ScheduledExecutorService 인터페이스를 구현한 클래스로, 스케줄링 기능을 제공한다.
또한 ScheduledThreadPoolExecutor 클래스는 스레드 풀을 구현하고 있는 ThreadPollExecutor 클래스를 상속받고 있기 때문에, 스레드 풀을 관리하는 기능도 제공된다.
ExecutorService newFixedThreadPool(int nThreads)
최대 지정한 개수 만큼의 스레드를 가질 수 있는 스레드 풀을 생성한다. 실제 생성되는 객체는 ThreadPoolExecutor 객체이다.
ScheduledExecutorService newScheduledThreadPool(int corePoolSize) 지정한 개수만큼 스레드가 유지되는 스케줄 가능한 스레드 풀을 생성한다. 실제 생성되는 객체는 ScheduledThreadPoolExecutor 객체이다.
ExecutorService newSingleThreadExecutor()
하나의 스레드만 사용하는 ExecutorService를 생성한다.
ScheduledExecutorService newSingleThreadScheduledExecutor()
하나의 스레드만 사용하는 ScheduledExecutorService를 생성한다.
ExecutorService newCachedThreadPool()
필요할 때 마다 스레드를 생성하는 스레드 풀을 생성한다. 이미 생성된 스레드의 경우 재사용된다. 실제 생성되는 객체는 ThreadPoolExecutor 객체이다.
Executor executor = Executors.newFixedThreadPool(THREADCOUNT); while(true) { request = acceptRequest(); Runnable requestHandler = new RequestHandler(request); executor.execute(requestHandler); }
Java
복사