본문 바로가기
JAVA/SPRING

Asynchronous, Multi-thread 개발하기

by F.E.D 2018. 6. 4.

Asynchronous, Multi-thread 개발하기

스프링 프로젝트에서 Asynchronous를 사용하기 위해 AsyncConfigurer를 사용하고
ThreadPoolTaskExecutor를 사용해서 Executor를 다중생성하고 ExceptionHandler를 연결합니다.
또한 이러한 @Async 어노테이션을 활용하면 여러 Thread를 생성하고 필요한 Executor와 연결하여 사용할 수 있습니다.


그 전에 쓰레드에 대한 예를 한번 생각해보면 좋을 것 같습니다.

쓰레드가 생성될 때 컴퓨터 내부적으로 운영체제(OS)가 요청을 받아들여 메모리공간을 확보해주고 
그 메모리를 쓰레드에게 할당해줍니다. 
쓰레드는 동일한 메모리영역에서 생성되고 관리되지만, 생성/수거에 드는 비용을 무시할 수는 없습니다.

그래서 쓰레드 풀이라는 쓰레드를 모아놓은 풀장을 미리 만들어놓는 개념입니다.

그렇게 생성된 쓰레드풀의 동작원리는 사용자로부터 들어온 요청을 작업큐에 넣고 쓰레드풀은 작업큐에 들어온 일감을 미리 생성해놓은 쓰레드들에게 일감을 할당합니다. 일을 다 처리하고 나면 쓰레드들은 다시 어플리케이션에 결과값을 리턴합니다. 

쓰레드풀을 사용하는 이유는 프로그램 성능저하 방지와 다수의 사용자 요청을 처리하기 위함입니다.
하지만 이러한 쓰레드풀을 사용할 때 사용자가 너무 많은 쓰레드를 생성하는 등으로 대응하면 노는 쓰레드가 생깁니다. 

이러한 일을 방지하기 위해서 JAVA 5.0 에서는 필요없는 쓰레드를 만들지 않는 방법이 생겼습니다.

newScheduledThreadPool(int corePoolSize)

1분에 한번씩 임무를 수행 시키기 위한 쓰레드풀
-> 1초에 한 개씩 DB에 값이 들어갈 때, 1분 평균을 구해서 다른 테이블에 집어 넣을 때 필요.

newFixedThreadPool(int nThreads)

풀장에 쓰레드를 고정적으로 몇개를 예비로할지 정하는 방식 제공
-> 항상 비슷하게 일이 많으면 이렇게 설정해두면 좋습니다.
초기 쓰레드 개수는 0개 ,
코어 쓰레드 수와 최대 쓰레드 수는 매개변수 nThreads 값으로 지정,
이 쓰레드 풀은 쓰레드 개수보다 작업 개수가 많으면 마찬가지로 쓰레드를 새로 생성하여 작업을 처리합니다.
만약 일 없이 놀고 있어도 쓰레드를 제거하지 않고 내버려둡니다.

newCachedThreadPool()

유기적으로 쓰레드의 숫자가 증가하고 감소하는 쓰레드 풀
초기쓰레드 수, 코어쓰레드 수 0개 
최대 쓰레드 수는 integer 데이터타입이 가질 수 있는 최대 값(Integer.MAX_VALUE)
쓰레드 개수보다 작업 개수가 많으면 새로운 쓰레드를 생성하여 작업을 처리합니다.
만약 일 없이 60초동안 아무일을 하지않으면 쓰레드를 종료시키고 쓰레드풀에서 제거합니다.


쓰레드 - ForkJoin (JAVA 7.0)의 분할정복알고리즘

쓰레드는 각각의 잡으로 분할해서 일을 수행하도록 시킬 수도 있습니다. 그 때 노는 쓰레드가 발생할 수 있습니다.
이럴 때는 ForkJoin으로 부모에게 잡을 던져서 그 각각의 일감을 수행할 때 유휴 쓰레드가 기존의 다른 쓰레드의 잡을 훔쳐와서 일을 할 수 있습니다. 이럴 때 각각 세부적으로 각각의 쓰레드가 골고루 일을 할 수 있게 그리고 이 일감의 종료하는 시간도 비슷하게 끝나게 됩니다. 
말로는 매우 좋아보이는데 꼭 그렇지만도 않습니다.

천만개의 랜덤한 숫자(1~100사이)가 있습니다.
이 숫자들 중에서 10보다 작은 수가 몇개나 되는지 세는 코딩을 합니다.
CPU는 4개입니다.

4가지 방법이 있습니다.

1. 쓰레드 하나로 천만번을 순회하며 숫자를 셉니다(6초)
2. ForkJoinPool을 사용해서 리프가 100개일 때까지 분할(fork)해서 각각의 수치를 위로 합쳐서(join) 계산합니다. 쓰레드 4개를 골고루 사용합니다.(2.5초)
3. ThreadPoolExecutor로 쓰레드를 4개 만든 후에 각각 천만개/4로 나뉘어진 영역에 대해 순회하면서 숫자를 계산해서 합칩니다.(2초) - > ForkJoinPool은 각각의 일감을 분할하는 만큼 쓸모없는 객체가 생성되기 때문에 시간이 조금 더 걸립니다.
4. 쓰레드 4개가 거의 동일한 일을 하면 ForkJoinPool이 별로이지만 하나의 쓰레드가 엄청 오래걸리는 일을 하고 나머지 3개의 유휴 쓰레드의 시간이 많다는 전제로 하면 ForkJoinPool이 낫습니다.


그럼 이러한 지식들을 바탕으로 본격적으로 비동기에 관한 포스팅을 시작하겠습니다.

1. JAVA 비동기

먼저 Java 비동기 방식의 처리 부분을 보겠습니다.
메시지를 저장하는 메서드입니다.

1
2
3
4
5
6
7
8
9
10
11
12
public class ThreadService { 
    public void thread_method(final String message) throws Exception {
        new Thread(new Runnable() {
            @Override
            public void run() {
                // do
            }            
        }).start();
    }
 
}
cs

method가 실행되면 새로운 thread를 만들고 thread에서 메시지를 저장하도록 처리합니다.
이 방법은 thread를 관리할 수 없습니다. 따라서 위험한 방식이라고 할 수 있습니다.
동시에 1000개의 호출이 이뤄지면 1000개의 thread가 생기는 것입니다.
thread 관리를 위해서는 JDK 1.5부터 제공하는 java.util.concurrent.ExecutorService를 사용하면 됩니다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
public class ThreadService {
 
    ExecutorService executorService = Executors.newFixedThreadPool(10);
 
    public void thread_method(final String message) throws Exception{
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                // do 
            }            
        });
    }
 
}
cs

ExecutorService를 사용하지만 method는 수정되어야 합니다.
비동기방식으로 처리하고 싶은 method마다 반복적으로 수정작업을 반드시 해야합니다.


2. Spring @Async

@Async with SimpleAsyncTaskExecutor


스프링을 사용한 @Async 어노테이션을 이용해서 위의 Java 처럼 메서드마다 반복적으로 thread를 관리 할 필요 없이 간단하게 처리할 수 있습니다.
메소드 위에 @Async 어노테이션만 추가하면 method는 비동기로 처리됩니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@EnableAsync
@SpringBootApplication
public class Application {
    ...
}
 
 
public class ThreadService {
 
    @Async
    public void thread_method(String message) throws Exception {
        // do
    }
 
}
cs

하지만 thread 관리는 이 방법을 쓰더라도 제대로 되지않습니다.
왜냐하면 @Async 어노테이션의 기본설정은 SimpleAsyncTaskExecutor를 사용하도록 되어있기 때문입니다.
즉, 해당 method 마다 관리해야되는 것입니다. 

@Async with ThreadPoolTaskExecutor

Thread pool을 이용해서 thread를 관리할 수 있는 방법입니다.
SpringAsyncConfig 클래스를 추가하고 사용하는 방식입니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
import java.util.concurrent.Executor;
 
@Configuration
@EnableAsync
public class SpringAsyncConfig {
 
    @Bean(name = "threadPoolTaskExecutor")
    public Executor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(3);
        taskExecutor.setMaxPoolSize(30);
        taskExecutor.setQueueCapacity(10);
        taskExecutor.setThreadNamePrefix("Executor-");
        taskExecutor.initialize();
        return taskExecutor;
    }
 
}
cs

setCorePoolSize : 기본 쓰레드 사이즈
setMaxPoolSize : 최대 쓰레드 사이즈
setQueueCapacity :  Max쓰레드가 동작하는 경우 대기하는 queue 사이즈

이미 SpringAsyncConfig 클래스에서 설정했기 때문에 SimpleAsyncTaskExecutor에서 사용 한 @EnableAsync는 제거하시면 됩니다.

1
2
3
4
5
6
7
8
public class ThreadService{
    
    @Async("threadPoolTaskExecutor")
    public void thread_method() throws Exception {
        // do 
    }
 
}
cs

훨씬 간결하게 바뀐 것 같습니다.
Thread Pool의 종류를 여러개 설정하고자한다면 SpringAsyncConfig 안에 bean을 여러개 만드시고
@Async를 설정시 원하는 bean의 이름을 설정하시면 됩니다.

@Async with Logging Exception

비동기로 처리되는 method에서 exception 발생시에 해당 thread만 죽는데 SpringAsyncCofing를 수정해서 logging 처리를 해보도록 합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
 
@Configuration
@EnableAsync
public class SpringAsyncConfig {
 
    protected Logger logger = LoggerFactory.getLogger(getClass());
    protected Logger errorLogger = LoggerFactory.getLogger("error");
 
    @Bean(name = "threadPoolTaskExecutor")
    public Executor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(3);
        taskExecutor.setMaxPoolSize(30);
        taskExecutor.setQueueCapacity(10);
        taskExecutor.setThreadNamePrefix("Executor-");
        taskExecutor.initialize();
        return new HandlingExecutor(taskExecutor); // HandlingExecutor로 wrapping 합니다.
    }
 
    public class HandlingExecutor implements AsyncTaskExecutor {
        private AsyncTaskExecutor executor;
 
        public HandlingExecutor(AsyncTaskExecutor executor) {
            this.executor = executor;
        }
 
        @Override
        public void execute(Runnable task) {
            executor.execute(task);
        }
 
        @Override
        public void execute(Runnable task, long startTimeout) {
            executor.execute(createWrappedRunnable(task), startTimeout);
        }
 
        @Override
        public Future<?> submit(Runnable task) {
            return executor.submit(createWrappedRunnable(task));
        }
 
        @Override
        public <T> Future<T> submit(final Callable<T> task) {
            return executor.submit(createCallable(task));
        }
 
        private <T> Callable<T> createCallable(final Callable<T> task) {
            return new Callable<T>() {
                @Override
                public T call() throws Exception {
                    try {
                        return task.call();
                    } catch (Exception ex) {
                        handle(ex);
                        throw ex;
                    }
                }
            };
        }
 
        private Runnable createWrappedRunnable(final Runnable task) {
            return new Runnable() {
                @Override
                public void run() {
                    try {
                        task.run();
                    } catch (Exception ex) {
                        handle(ex);
                    }
                }
            };
        }
 
        private void handle(Exception ex) {
            errorLogger.info("Failed to execute task. : {}", ex.getMessage());
            errorLogger.error("Failed to execute task. ",ex);
        }
 
    }
 
}
cs

1
2
3
4
5
6
7
8
public class ThreadService{
 
    @Async("threadPoolTaskExecutor")
    public Future<String> thread_method(String message) throws Exception {
        // do 
        return new AsyncResult<String>("Success");
    }
}
cs

@Async의 장점

1. thread_method에 대한 수정없이 처리가 가능합니다.
2. 기존에는 동기/비동기에 따라서 thread_method의 내용이 달라집니다.
3. 동기/비동기에 대한 고민없이 메시지를 저장하는 과정에만 집중할 수 있습니다.

제약사항 또한 있습니다.
public method에만 사용해야됩니다. 같은 객체내의 method끼리 호출 시에 async method는 동작하지 않는 제약사항이 있습니다.
하지만 이러한 제약사항을 또 회피하는 방법이 있습니다.

다음시간에는 AOP와 이러한 제약사항을 회피하는 방법에 대해서 포스팅 해보도록 하겠습니다.




출처
http://dveamer.github.io/java/SpringAsync.html
http://cofs.tistory.com/319
http://limkydev.tistory.com/55
http://jdm.kr/blog/174
http://hamait.tistory.com/612



댓글