Runnable과 Callable
Runnable 인터페이스
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
Java
복사
•
Runnable 인터페이스는 위와 같이 아무 값도 반환하지 않고, 내부에서 Exception을 발생시키지도 않는다.
public class RunnableExample {
static class MyRunnable implements Runnable {
@Override
public void run() {
System.out.println("Runnable at " + LocalTime.now());
}
}
public static void main(String[] args) {
MyRunnable runnable = new MyRunnable();
Thread thread = new Thread(runnable);
thread.start();
}
}
Java
복사
•
이런 Runnable을 통해 직접 메서드를 Thread로 직접 호출하여 사용하는 방식은 다음과 같은 한계가 있다.
◦
저수준의 API(스레드 생성)에 의존
◦
값의 반환이 불가능
◦
스레드 생성과 종료 시에 오버헤드 발생
◦
스레드 관리 어려움
•
이런 한계를 해결하기 위해 Java 5에서 Callable과 Future가 새로 추가되었다.
Callable 인터페이스
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
Java
복사
•
Runnable에서는 결과를 반환할 수 없기 때문에, 공용 메모리나 파이프 같이 번거로운 작업을 통해 반환값을 가져와야 했다. 이러한 한계를 해결하기 위해 제네릭을 통해 결과를 받을 수 있고 Exception을 발생 시킬 수 있는 Callable 인터페이스가 추가되었다.
public class CallableExample {
static class MyCallable implements Callable<String> {
@Override
public String call() throws Exception {
return "Callable at " + LocalTime.now();
}
}
public static void main(String[] args) {
MyCallable callable = new MyCallable();
FutureTask futureTask = new FutureTask(callable);
Thread thread = new Thread(futureTask);
thread.start();
System.out.prinln(futureTask.get()); // 결과 반환값 기다리기
}
}
Java
복사
•
Callable 인터페이스의 구현인 작업(Task)은 가용 스레드가 없는 경우 실행이 미뤄져 작업이 오래걸릴 수 있다. 때문에 실행결과를 바로 받지 못하고 미래의 어느 시점에 얻을 수 있는데, 이렇게 미래에 완료된 Callable의 반환값을 얻을 수 있게 구현된 것이 Future 인터페이스이다.
Future 인터페이스
•
위 예시에서는 Runnable과 Future 인터페이스의 구현체인 FutureTask를 사용하였는데, Future는 비동기 작업을 통해 미래의 실행 결과를 얻거나, 현재 비동기 작업의 상태를 확인하고, 작업을 기다릴 수 있도록 메서드를 제공한다.
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Java
복사
•
Futuer 클래스는 다음과 같은 한계점들을 가지고 있다.
◦
외부에서 해당 작업을 완료 시킬 수 없고, get의 타임아웃 설정으로만 완료 가능
◦
Blocking을 통해서만 이후의 결과를 처리할 수 있음(get)
◦
여러 Future를 조합할 수 없고, 예외를 처리할 수 없음
•
이런 Future의 한계를 해결하여 Java 8에서 CompletableFuture가 등장하게 되었다.
CompletableFuture 클래스
•
CompleatbleFuture 클래스는 Future의 단점을 극복한 여러 메서드들을 제공한다.
•
비동기 작업 실행
◦
CompletableFuture<Void> runAsync(Runnable runnable)
반환값이 없는 경우 비동기로 작업을 실행시킨다.
◦
<U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
반환값이 있는 경우 비동기로 작업을 실행시킨다.
•
작업 콜백
◦
<U> CompletableFutre<U> thenApply(Function<? super T, ? extends U> fn)
함수형 인터페이스를 작업 콜백으로 매개변수에 받아, 작업 결과를 매개변수화 메서드로 수행 후 결과를 반환한다.
◦
CompletableFuture<Void> thenAccept(Consumer<? super T> action)
Consumer를 작업 콜백으로 매개변수에 받아, 작업 결과를 Consumer로 소비한다.
◦
CompetableFuture<Void> thenRun(Runnable action)
Runnable을 작업 콜백으로 매개변수에 받아, 작업이 끝나면 Runnable을 수행한다.
•
예외 처리
◦
CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
함수형 인터페이스를 매개변수로 받아, 예외가 발생하면 발생한 예외를 받아서 매개변수화 메서드로 처리한다.
◦
<U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
BiFunction을 매개변수로 받아, 예외가 발생한 경우와 발생하지 않은 경우에 대해 모두 처리할 수 있다.
Thread 클래스
•
Thread 클래스는 자바에서 스레드 생성을 위해 구현되어있는 클래스로, 아래의 여러 메서드들을 지원한다.
◦
sleep
현재 스레드 잠시 멈추는 메서드로, 자원을 놓지않고 제어권을 넘기기 때문에 데드락 발생 위험이 있다.
◦
interrupt
다른 스레드를 깨워서 intrrupedException을 발생시키는 메서드로, interrupt가 발생한 스레드는 예외를 catch하여 다른 작업을 수행할 수 있다.
◦
join
다른 스레드의 작업이 끝날 때까지 대기하도록 하는 메서드로, 스레드의 순서를 제어하는데 사용할 수 있다.
@Test
void threadStart() {
Thread thread = new MyThread();
thread.start();
System.out.println("Hello: " + Thread.currentThread().getName());
}
static class MyThread extends Thread {
@Override
public void run() {
System.out.println("Thread: " + Thread.currentThread().getName());
}
}
Java
복사
•
위 예시 코드에서 알 수 있듯이, run()을 호출하는 것이 아니라 thread.start()를 호출하는 것임에 주의하자.
•
start()를 호출하면 해당 스레드의 state를 확인해 스레드가 실행 가능한 상태인지(New state인지) 검사하고, 스레드를 스레드 그룹(ThreadGroup 클래스)에 추가 후 JVM에서 해당 스레드를 실행시킨다.
while (true) {
request = acceptRequest();
Runnable requestHandler = new RequestHandler(request);
new Thread(requestHandler).start();
}
JavaScript
복사
•
위와 같이 Thread 클래스를 사용하는 코드의 문제점은 몇 가지 성능상의 문제를 가지고 있다.
◦
많은 요청이 들어오는 경우 스레드 생성 및 종료에 따른 오버헤드가 발생한다.
◦
스레드 생성 개수에 제한이 없기 때문에, 요청이 많이 들어오면 OutOfMemoryError가 발생할 수 있다.
◦
많은 수의 스레드가 실행되고 있는 경우, 스레드 스케줄링에 의한 오버헤드가 발생한다.
•
이런 문제점들 때문에, 동시에 다수의 요청을 처리해야하는 어플리케이션에서는 스레드 풀을 사용하여 동시에 실행될 수 있는 스레드 개수를 제한하는 것이 일반적이다.
•
자바 5부터 Executor 인터페이스를 제공하고, 스레드 풀과 큐 등 다양한 Executor 구현체를 제공하여 위의 Thread 클래스의 문제점을 해소할 수 있다.
Executor 인터페이스
package java.utill.concurrent.Executor
public interface Executor {
void execute(Runnable command);
}
JavaScript
복사
•
매번 새로운 스레드를 만드는 것이 비효율적이기 때문에, Executor는 스레드 풀(Thread Pool)의 구현을 위해 도입된 인터페이스이다.
•
Executor는 execute() 메서드로 등록된 작업(Runnable 인스턴스)을 실행하기 위한 인터페이스이면서, 인터페이스 분리 원칙(ISP, Interface Segregation Principle)에 맞게 작업 등록과 작업 실행 중 등록된 작업의 실행 책임만 가진다.
•
예를 들면, 스레드 풀을 구현한 Executor 구현체는 전달받은 작업을 큐에 넣은 뒤 가용한 스레드가 존재할 경우, 해당 스레드에 작업을 실행하도로록 구현될 것이다.
@Test
void executorRun() {
final Runnable runnable = () -> System.out.println("Thread: " + Thread.currentThread().getName());
Executor executor = new RunExecutor();
executor.execute(runnable);
}
static class RunExecutor implements Executor {
@Override
public void execute(final Runnable command) {
command.run();
}
}
Java
복사
ExecutorService 인터페이스
•
ExecutorService 인터페이스는 작업 등록(Runnable, Callable)을 위한 인터페이스로, Executor의 라이프 사이클을 관리할 수 있는 기능을 정의하는 인터페이스이다.
•
ExecutorService는 Executor를 상속받기 때문에 작업 실행에 대한 책임도 갖고, 거기에 더해 작업 등록에 대한 책임도 갖는다.
•
ExecutorService 인터페이스의 라이프 사이클 관리 메서드
◦
void shutdown() : 셧다운 한다. 이미 실행자에 제공된 작업은 실행되지만, 새로운 작업은 수용하지 않는다.
◦
List<Runnable> shutdownNow() : 현재 실행 중인 모든 작업을 중지시키고 대기 중인 작업을 멈춘다. 현재 실행되기 위해 대기 중인 작업 목록을 반환한다.
◦
boolean isShutdown() : Executor가 셧다운 되었는지 여부를 확인한다.
◦
boolean isTerminated() : 셧다운 실행 후 모든 작업이 종료되었는지 여부를 확인한다.
◦
boolean awaitTermination(long timeout, TimeUnit unit) : 셧다운을 실행한 뒤 지정한 시간동안 모든 작업이 종료되는 것을 기다린다. 지정 시간 이내에 모든 작업이 종료되면 true를 반환하고, 아니라면 false를 반환한다.
@Test
void shutdown() {
ExecutorService executorService = Executors.newFixedThreadPool(10);
Runnable runnable = () -> System.out.println("Thread: " + Thread.currentThread().getName());
executorService.execute(runnable);
// shutdown 호출
executorService.shutdown();
// shutdown 호출 이후에는 새로운 작업들을 받을 수 없음, 에러 발생
RejectedExecutionException result = assertThrows(RejectedExecutionException.class, () -> executorService.execute(runnable));
assertThat(result).isInstanceOf(RejectedExecutionException.class);
}
Java
복사
executor.shutdown();
try {
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
System.out.println("아직 처리중인 작업 존재");
System.out.println("작업 강제 종료 실행");
executor.shutdownNow();
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
System.out.println("여전히 종료하지 않은 작업 존재");
}
}
} catch (InterruptedException e1) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
System.out.println("서버 셧다운 완료");
Java
복사
•
ExecutorService 인터페이스의 비동기 작업 메서드
◦
<T> Future<T> submit(Callable<T> task)
결과값을 리턴하는 작업을 추가한다.
◦
Future<?> submit(Runnable task)
결과값이 없는 작업을 추가한다.
◦
<T> Future<T> submit(Runnable task, T result)
새로운 작업을 추가한다. result는 작업이 성공적으로 수행될 때 사용될 리턴 값을 의미한다.
◦
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
주어진 작업을 모두 실행한다. 각 실행 결과 얻을 수 있는 Future의 List를 리턴한다.
◦
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
invokeAll()과 동일하지만, 지정한 시간 동안 완료되지 못한 작업은 취소된다.
◦
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
주어진 작업들 중 가장 빨리 끝난 작업 결과만을 Future로 반환한다. 나머지 완료되지 않은 작업은 취소된다.
◦
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
invokeAny()와 동일하지만, 지정한 시간 동안만 대기한다.
@Test
void invokeAll() throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
Instant start = Instant.now();
Callable<String> hello = () -> {
Thread.sleep(1000L);
final String result = "Hello";
System.out.println("result = " + result);
return result;
};
Callable<String> mang = () -> {
Thread.sleep(4000L);
final String result = "Java";
System.out.println("result = " + result);
return result;
};
Callable<String> kyu = () -> {
Thread.sleep(2000L);
final String result = "kyu";
System.out.println("result = " + result);
return result;
};
List<Future<String>> futures = executorService.invokeAll(Arrays.asList(hello, mang, kyu));
for(Future<String> f : futures) {
System.out.println(f.get());
}
// String result = executorService.invokeAny(Arrays.asList(hello, mang, kyu));
// System.out.println(result);
System.out.println("time = " + Duration.between(start, Instant.now()).getSeconds());
executorService.shutdown();
}
Java
복사
ScheduledExecutorService 인터페이스
•
ScheduledExecutorService 인터페이스는 ExecutorService 인터페이스를 상속받고, 거기에 더해 특정 시간 이후에 혹은 주기적으로 작업을 실행시키는 메서드를 포함한다.
◦
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
특정 시간(delay) 이후에 작업(Runnable)을 실행시킨다.
◦
<V> ScheduledFuture<V> schedule(Callable<V> command, long delay, TimeUnit unit)
특정 시간(delay) 이후에 작업(Callable)을 실행시킨다.
◦
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period TimeUnit unit)
특정 시간(delay) 이후 작업을 처음 실행시키고, 이후 주기적으로(period) 작업을 실행시킨다.
작업 실행 주기는 실행시간 기준으로 반복한다.(작업 1초에 2초 주기라면, 0초 실행 → 2초 실행 → 4초 실행 → …)
◦
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
특정 시간(delay) 이후 작업을 처음 실행시키고, 작업이 완료된 후 특정 시간(delay)이 지나면 작업을 실행시킨다.
작업이 완료 시간 + delay 후에 다음 작업이 실행된다.(작업 1초에 2초 주기라면, 0초 실행 → 1초 완료 → 3초 실행 → 4초 완료 → 6초 실행 → …)
•
ScheduledThreadPoolExecutor 클래스는 ScheduledExecutorService 인터페이스를 구현한 클래스로, 스케줄링 기능을 제공한다.
•
또한 ScheduledThreadPoolExecutor 클래스는 스레드 풀을 구현하고 있는 ThreadPollExecutor 클래스를 상속받고 있기 때문에, 스레드 풀을 관리하는 기능도 제공된다.
◦
ExecutorService newFixedThreadPool(int nThreads)
최대 지정한 개수 만큼의 스레드를 가질 수 있는 스레드 풀을 생성한다. 실제 생성되는 객체는 ThreadPoolExecutor 객체이다.
◦
ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
지정한 개수만큼 스레드가 유지되는 스케줄 가능한 스레드 풀을 생성한다. 실제 생성되는 객체는 ScheduledThreadPoolExecutor 객체이다.
◦
ExecutorService newSingleThreadExecutor()
하나의 스레드만 사용하는 ExecutorService를 생성한다.
◦
ScheduledExecutorService newSingleThreadScheduledExecutor()
하나의 스레드만 사용하는 ScheduledExecutorService를 생성한다.
◦
ExecutorService newCachedThreadPool()
필요할 때 마다 스레드를 생성하는 스레드 풀을 생성한다. 이미 생성된 스레드의 경우 재사용된다. 실제 생성되는 객체는 ThreadPoolExecutor 객체이다.
Executor executor = Executors.newFixedThreadPool(THREADCOUNT);
while(true) {
request = acceptRequest();
Runnable requestHandler = new RequestHandler(request);
executor.execute(requestHandler);
}
Java
복사