Executor을 이용한 Thread관리
- Executor관련 주요 API class diagram
Executor interface
- package : java.util.concurrent.Executor
- Executor는 굉장히 단순한 인터페이스로 보이지만 아주 다양한 여러 가지 종류의 작업 실행 정책을 지원하는 유연하면서도 강력한 비동기적 작업 실행 프레임워크의 근간을 이루는 인터페이스입니다.
- Executor는 작업등록(task submission) ,작업실행(task Execution)을 분리하는 표준적인 방법이며, 각 작업은 Runnable의 형태로 정의합니다.
- Executor인터페이스를 구현한 클래스는 작업의 라이프 사이클을 관리하는 기능도 갖고 있고, 몇가지 통계값을 뽑아내거나 애플리케이션에서 작업 실행 과정을 관리하고 모니터링하기 위한 기능도 갖고 있습니다.
- Executor의 구조는 프로듀서-컨슈머 패턴에 기반하고 있으며, 작업을 생성해 등록하는 클래스가 프로듀서(처리해야할 작업을 생성하는 주체)가 되고, 작업을 실제로 실행하는 스레드가 컨슈머(생성된 작업을 처리하는 주체)가 되는 모양을 갖추고 있습니다.
- 일반적으로 프로듀서-컨슈머 패턴을 애플리케이션에 적용해 구현할 수 있는 가장 쉬운 방법이 바로 Executor 프레임워크을 사용하는 방법입니다.
- Executor 인터페이스의 구현체는 execute() 메소드로 전달받은 작업(Runnable 인스턴스)을 알맞게 실행하게 됩니다.
예를 들어, 쓰레드 풀을 구현한 Executor 구현체는 전달받은 작업을 큐에 넣은 뒤 가용한 쓰레드가 존재할 경우, 해당 쓰레드에 작업을 실행하도록 구현될 것입니다.…
Executor executor = …;
// Executor 구현체 생성
Runnable stdPot =
new
ProcessOutputThread(process.getInputStream());
executor.execute(stdPot);
…
- 위의 코드와 같이 Executor.execute() 메소드에 생성한 작업을 전달합니다. 작업을 실제로 실행하는 책임은 Executor에 있으며, 작업을 생성하는 코드에서는 작업이 어떻게 실행되는 지의 여부는 알 필요가 없습니다.
즉, Executor를 사용함으로써 작업을 생성하는 코드와 작업을 실행하는 메커니즘 사이의 커플링을 제거하였습니다.
- 위의 코드와 같이 Executor.execute() 메소드에 생성한 작업을 전달합니다. 작업을 실제로 실행하는 책임은 Executor에 있으며, 작업을 생성하는 코드에서는 작업이 어떻게 실행되는 지의 여부는 알 필요가 없습니다.
Executors Class
- package : java.util.concurrent.Executors
- java.util.concurrent.Executors 클래스는 JAVA5부터 기본적으로 제공하는 Executor구현체를 구현할 수 있는 메소드를 제공하는 유틸리티입니다.
예를 들어, 쓰레드 풀을 구현한 Executor 구현체를 구하고 싶다면 다음과 같은 코드를 사용하면 됩니다....
Executor executor = Executors.newFixedThreadPool(ThreadCount);
// ThreadCount : Thread수
...
- Executor 인스턴스를 구할수 있게 제공되는 ThreadPool Method
- ExecutorService newFixedThreadPool(int threadCount) // threadCount: Thread수
- 처리할 작업이 등록되면 그에 따라 실제 작업할 Thread를 하나씩 생성합니다.
- 항상 일정한 Thread 개수를 유지합니다.
- Thread가 유휴상태이더라도 제거하지 않고 유지합니다.
- 생성할 수 있는 Thread의 최대 개수는 제한되어 있습니다.
- 제한된 개수까지 Thread를 생성하고 나면 더이상 생성하지 않고 Thread 수를 유지합니다.
- 작업도중 비정상적으로 Thread가 종료하는 경우에는 Thread를 추가로 생성하며, threadCount 개수보다 1개가 더 생길 수도 있습니다.
- 실제 생성되는 객체는 ThreadPoolExecutor 객체입니다.
- ExecutorService newCachedThreadPool()
- CachedThreadPool은 현재 pool에 갖고 있는 Thread의 수가 처리할 작업의 수보다 많아서 쉬는 Thread가 많이 발생할 때 쉬는 Thread를 종료시켜 훨씬 유연하게 대응 할 수 있으며, 처리할 작업의 수가 많이 지면 필요한 만큼 Thread를 새로 생성합니다.
- 일정 시간(60초)동안 사용하지 않는(idle) Thread는 종료됩니다.
- Thread의 수에 제한을 두지 않습니다.
- 필요없는 Thread를 제거하므로 서버 리소스(memory)는 적게 사용하지만, Thread 생성과 삭제를 반복하므로 작업 부하가 불규칙적인 경우 비효율적입니다.
- 실제 생성되는 객체는 ThreadPoolExecutor 객체입니다.
- ExecutorService newSingleThreadExecutor()
- 단일 Thread로 동작하는 Executor 로서 작업을 처리하는 Thread가 단 하나 뿐입니다.
- 만약 작업중에 Exeception 이 발생해 비정상적으로 종료되면 새로운 Thread를 하나 생성해 나머지 작업을 실행합니다.
- 등록된 작업은 설정된 큐에서 지정하는 순서(FIFO,LIFO,우선순위)에 따라 반드시 순차적으로 처리됩니다.
- ScheduledExecutorService newScheduledThreadPool(int threadCount) // threadCount: Thread 수
- 지정한 개수만큼 쓰레드가 유지되는 스케줄 가능한 Thread pool을 생성합니다.
- 일정시간 이후에 실행하거나 주기적으로 작업을 실행할 수 있으며, Thread의 수가 고정되어 있는 형태의 Executor.Timer 클래스의 기능과 유사합니다.
- 실제 생성되는 객체는 ScheduledThreadPoolExecutor 객체입니다.
- ScheduledExecutorService newSingleThreadScheduledExecutor()
- newScheduledThreadPool와 유사합니다.
- 하나의 Thread만 사용하는 ScheduledExecutorService를 생성합니다.
- ExecutorService newFixedThreadPool(int threadCount) // threadCount: Thread수
ExecutorService interface
- package : java.util.concurrent.ExecutorService
- 서비스를 실행하는 동작주기와 관련해 Ececutor를 상속받은 ExecutorService 인터페이스에는 동작주기를 관리할 수 있는 여러가지 메소드가 추가되어 있습니다.
Executor의 라이프사이클을 관리할 수 있는 기능을 정의하고 있으며 Runnable 뿐만 아니라 Callable을 작업으로 사용할 수 있는 메소드가 추가로 제공되고 있습니다. - 라이프사이클 관련 메소드
- void shutdown():
셧다운 합니다. 이미 Executor에 제공된 작업은 실행되지만, 새로운 작업은 수용하지 않습니다. - List<Runnable> shutdownNow():
현재 실행중인 모든 작업을 중지시키고, 대기중인 작업을 멈추고, 현재 실행되기 위해 대기중인 작업 목록을 리턴합니다. - boolean isShutdown():
Executor가 셧다운 되었는 지의 여부를 확인합니다. - boolean isTerminated():
셧다운 실행 후 모든 작업이 종료되었는 지의 여부를 확인합니다. - boolean awaitTermination(long timeout, TimeUnit unit):
셧다운을 실행한 뒤, 지정한 시간 동안 모든 작업이 종료될 때 까지 대기합니다. 지정한 시간 이내에서 실행중인 모든 작업이 종료되면 true를 리턴하고, 여전히 실행중인 작업이 남아 있다면 false를 리턴합니다. - Sample Code
...
finally
{
executorService.shutdown();
// 안전한 종료(shutdownNow()를 사용하는 경우는 강제종료)
try
{
executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
// ExecutorService가 종료상태로 들어갈 때까지 기다리고자 할때 사용
}
catch
(InterruptedException e) {
log.error(
" ExecutorService awaitTermination Error : "
+ e.getMessage(), e);
}
...
}
...
- 보통 shutdown() 메소드 실행 후 바로 awaitTermination()을 실행하면 Executor를 직접 종료시키는것과 비슷한 효과를 얻을 수 있습니다.
- void shutdown():
- 작업 수행과 관련해서 추가적으로 메소드
- <T> Future<T> submit(Callable<T> task)
결과값을 리턴하는 작업을 추가합니다. - Future<?> submit(Runnable task)
결과값이 없는 작업을 추가합니다. - <T> Future<T> submit(Runnable task, T result)
새로운 작업을 추가합니다. result는 작업이 성공적으로 수행될 때 사용될 리턴 값을 의미합니다. - <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
주어진 작업을 모두 실행합니다. 각 실행 결과값을 구할 수 있는 Future의 List를 리턴합니다. - <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
앞서 invokeAll()과 동일합니다. 지정한 시간 동안 완료되지 못한 작업은 취소되는 차이점이 있습니다. - <T> T invokeAny(Collection<? extends Callable<T>> tasks)
작업울 수행하고, 작업 결과 중 성공적으로 완료된 것의 결과를 리턴합니다. 정상적으로 수행된 결과가 발생하거나 예외가 발생하는 경우 나머지 완료되지 않은 작업은 취소됩니다. - <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
invokeAny()와 동일합니다. 지정한 시간 동안만 대기한다는 차이점이 있습니다.
- <T> Future<T> submit(Callable<T> task)
ScheduledThreadPoolExecutor Class
- ScheduledThreadPoolExecutor 클래스는 ScheduledExecutorService 인터페이스을 구현한 클래스로서 스케줄링 기능을 제공합니다.
ScheduledThreadPoolExecutor 클래스는 또한 쓰레드 풀을 구현하고 있는 ThreadPoolExecutor 클래스를 상속받고 있기 때문에 쓰레드 풀 기능도 함께 제공합니다. - 지연작업과 주기적 작업 마다 여러 개의 스레드를 할당해 작업을 실행하느라 각자의 실행 예정 시각을 벗어나는 일이 없도록 조절해 줍니다.
- TimerTask의 단점으로 동작하던 도중에 예상치 못 한 Exception을 던져버리를 경우 예측하지 못한 상태로 넘어갈 수 있다는 점에서 Timer 스레드는 예외 상황을 전혀 처리하지 않기 때문에 스레드 자체가 멈춰 버릴 가능성도 있습니다.
- 특별한 스케줄 방법을 지원하는 스케줄링 서비스를 구현해야 할 필요가 있다면, BlockingQueue를 구현하면서 ScheduledThreadPoolExcutor와 비슷한 기능을 제공하는 DalayQueue클래스를 사용해 보는 것이 좋을것 같습니다.
- 참조 : http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ScheduledThreadPoolExecutor.html
Callable & Future
Callable
- package : java.util.concurrent.Callable
- Runnable.run() 메소드는 결과 값을 리턴하지 않기 때문에, run() 메소드의 실행 결과를 구하기 위해서는 공용 메모리나 파이프와 같은 것들을 사용해서 결과 값을 받아야만 했습니다.
이런 Runnable 인터페이스의 단점을 없애기 위해 추가된 것이 바로 Callable 인터페이스입니다. - Callable 인터페이스의 call() 메소드는 결과 값을 리턴하도록 되어 있습니다. 또한, 자바 5부터 추가된 generic을 사용하여 어떤 타입이든 리턴 값으로 사용할 수 있도록 하였습니다.
- Sample Code
public
class
ProcessOutputThread
implements
Callable<StringBuffer> {
...
/**
* @return
* @see java.util.concurrent.Callable#call()
*/
public
StringBuffer call() {
...
return
result;
}
...
}
...
ExecutorService executorService = Executors.newFixedThreadPool(
10
);
ProcessOutputThread stdPot =
new
ProcessOutputThread(process.getInputStream());
Future<StringBuffer> stdFuture = executorService.submit(stdPot);
StringBuffer stdSb = stdFuture.get();
...
- ExecutorService.submit() 메소드는 전달받은 Callable 객체를 내부 메커니즘에 따라 지정한 때에 실행됩니다.
위 경우 ProcessOutputThread 객체는 큐에 저장되었다가 가용한 쓰레드가 생길 때 ProcessOutputThread .call() 메소드가 실행될 것입니다.
- ExecutorService.submit() 메소드는 전달받은 Callable 객체를 내부 메커니즘에 따라 지정한 때에 실행됩니다.
Future
- package : java.util.concurrent.Future
- Callable.call() 메소드가 ExecutorService.submit() 메소드에 전달될 때 곧 바로 실행되는 것이 아니기 때문에 리턴값을 바로 구할 수 없습니다.
이 리턴값은 미래의 어느 시점에 구할 수 있는데, 이렇게 미래에 실행되는 Callable의 수행 결과 값을 구할 때 사용되는 것이 Future입니다. - Future 인터페이스는 다음과 같은 메소드를 제공하고 있습니다.
- V get()
Callable 등 작업의 실행이 완료될 때 까지 블록킹 되며, 완료되면 그 결과값을 리턴합니다. - V get(long timeout, TimeUnit unit)
지정한 시간 동안 작업의 실행 결과를 기다린다. 지정한 시간 내에 수행이 완료되면 그 결과값을 리턴한다. 대기 시간이 초과되면 TimeoutException을 발생시킵니다. - boolean cancel(boolean mayInterruptIfRunning)
작업을 취소합니다. - boolean isCancelled()
작업이 정상적으로 완료되기 이전에 취소되었을 경우 true를 리턴합니다. - boolean isDone()
작업이 완료되었다면 true를 리턴합니다.
- V get()
'Programing > Java' 카테고리의 다른 글
Java의 System.out을 파일 출력으로 변경하는 방법 (0) | 2015.03.30 |
---|---|
[자바8 람다의 힘] 2장 컬렉션의 사용 (0) | 2015.03.26 |
Java에서 정규표현식은 이렇게 사용하자 (0) | 2015.03.12 |
json <-> java 변환 라이브러리 (GSON, Jackson) (0) | 2015.03.12 |
Lock VS ReentrantLock (0) | 2015.03.06 |