비동기 프로그래밍
비동기 프로그래밍
프로그래밍을 하면서 비동기(Async), 동기(Sync), 블로킹(Blocking), 논 블로킹(Non-Blocking) 방식의 프로그래밍이라는 언어를 많이 들어보게 됩니다. 저도 회사에서 비동기 방식으로 코드를 작성한적이 있었는데 아무래도 기능을 빠르게 만들다보니 제대로 공부를 하지 않고 작성한 경우가 대부분이라서 오늘은 비동기, 동기, 블로킹, 논 블로킹이 무엇인지 살펴보고 간단하게 예제 코드를 작성하였습니다.
제가 생각하는 동기, 비동기는 메서드를 제공해주는 객체 입장에서 보는 관점이라고 생각하고, 블로킹, 논 블로킹은 메서드를 호출하는 곳 즉 클라이언트 관점이라고 생각합니다. 물론 여러 블로그 포스팅을 보면 아직까지 정확한 정의가 개발자분들마다 다르다고 생각합니다.
블로킹(Blocking), 논 블로킹(Non-Blocking)
동기,비동기는 메서드를 제공하는 곳의 입장이라면, 블로킹, 논 블로킹은 메서드를 호출하는 곳, 즉 클라이언트에서의 입장입니다.
만약 데이터를 조회하는 메서드를 제공하는 객체를 클라이언트에서 호출한다고 하면 블로킹 방식으로 프로그래밍을 구현하면 데이터 조회 메서드를 호출하는 순간 클라이언트의 코드 흐름에 대한 제어권이 데이터를 조회하는 부분으로 넘어가게 됩니다. 그러면 클라이언트는 데이터 조회 결과를 받기전 까지 아무것도 할 수 없는 블로킹 상태에 빠지게 됩니다.
반대로, 논 블로킹 방식은 데이터 조회 메서드를 호출 후 제어권을 넘기지 않고 다른 작업을 수행할 수 있는 프로그래밍 방식이라고 생각하면 됩니다.
이제 동기, 비동기, 블로킹, 논 블로킹을 좀 더 이해하기 위해 간단한 예제 코드로 설명하겠습니다.
커피 이름으로 커피 가격을 조회하는 Repository 객체를 생성하였습니다. H2 DB를 이용해서 조회를 하려고 했지만 아무래도 가볍게 설명하기 위해서는 Map 객체를 사용하는게 좋다고 판단이 되었습니다.
커피 도메인 클래스를 만들어서 해당 인스턴스를 생성하여 Map 객체에서 관리하도록 하기 위해서 만들었습니다.
public class Coffee {
private String name;
private int price;
@Builder
public Coffee(String name, int price) {
this.name = name;
this.price = price;
}
public static Coffee makeCoffee(String name, int price) {
return Coffee.builder()
.name(name)
.price(price)
.build();
}
}
@Repository
public class CoffeeRepository {
private Map<String, Coffee> coffeeMap = new HashMap<>();
@PostConstruct
public void init() {
coffeeMap.put("latte", Coffee.makeCoffee("latte", 3500));
coffeeMap.put("mocha", Coffee.makeCoffee("mocha", 4000));
coffeeMap.put("americano", Coffee.makeCoffee("americano", 2000));
}
public int getPriceByName(String name) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return coffeeMap.get(name).getPrice();
}
}
위의 코드를 살펴보면, 스프링 부트 환경에서 작성하였습니다. 데이터를 제공해주는 Repository 클래스의 getPriceByName() 메서드를 정의하여 커피의 이름을 파라미터로 받아서 커피의 가격을 리턴해주는 메서드입니다.
단, 해당 메서드는 1초의 지연 시간을 주기로 하였습니다. 클라이언트는 커피의 가격을 조회하기 위해서 최소 1초가 걸릴 것입니다.
이제 이 Repository 객체를 의존하는 서비스 인터페이스를 아래와 같이 정의하였습니다.
public interface CoffeeUseService {
int getPrice(String name); // 동기
CompletableFuture<Integer> getPriceAsync(String name); // 비동기
CompletableFuture<Integer> getDiscountPriceAsync(Integer price); // 비동기
}
getPrice()는 동기 메서드이고, Async가 붙은 나머지 메서드 두개는 비동기 방식의 메서드입니다. 기능을 제공하는 곳에서 동기, 비동기에 대한 개념을 포함하고 있습니다. 블로킹으로 할지, 논 블로킹으로 할지 선택은 기능을 제공하는 클래스에서 결정되는게 아니라, 해당 메서드를 호출 하는곳, 즉 클라이언트에서 선택할 것입니다.
동기(Sync) 방식
CoffeeUseService 인터페이스의 구현체를 작성하였습니다. 첫번째 메서드인 getPrice()는 동기 방식으로 데이터를 제공합니다. 즉, 클라이언트에서 제어권을 받아서 데이터 처리 연산이 완료되어야 반환하는 방식입니다.
@Slf4j
@RequiredArgsConstructor
@Service
public class CoffeeUseServiceImpl implements CoffeeUseService {
private final CoffeeRepository coffeeRepository;
Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
@Override
public int getPrice(String name) {
log.info("동기 호출 방식으로 가격 조회 시작");
return coffeeRepository.getPriceByName(name);
}
}
테스트 코드를 작성하여 검증하였습니다. CoffeeTest라는 테스트 클래스를 생성하고, @SpringBootTest 어노테이션을 사용해서 테스트에 필요한 빈들을 주입하여 사용하도록 했습니다. 참고로 @SpringBootTest 어노테이션을 사용할 경우 스프링 컨테이너에 생성된 모든 Bean들을 가져오기 때문에 간단한 테스트를 할 경우에는 필요한 Bean들만 가져와서 사용하는 것을 권고드립니다. 저는 귀찮아서 @SpringBootTest 어노테이션을 사용했습니다.
@SpringBootTest
public class CoffeeTest {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
CoffeeUseService coffeeUseService;
@Test
public void 가격_조회_동기_블로킹_호출_테스트() throws Exception {
//given
int expectedPrice = 2000;
//when
int resultPrice = coffeeUseService.getPrice("americano");
logger.info("최종 가격 전달: [{}]", resultPrice);
//then
assertThat(resultPrice).isEqualTo(expectedPrice);
}
}
실행 결과
테스트가 성공적으로 수행되었습니다. 1초라는 지연시간이 걸렸습니다. 만약 두 번 수행하면 동기호출이기 때문에 2초가 넘게 걸릴 것입니다.
비동기(Async) 메서드, 논 블로킹 + 블로킹 혼합
이번에는 Async(비동기) 메서드를 구현하겠습니다. 여기서는 CompleteFuture를 사용합니다. 이 객체는 비동기 방식의 메서드를 호출하여 리턴값을 받고자 할때 자주 사용하는 객체라고 생각하면 됩니다. 사실 저도 ComplteFuture 클래스를 최근 들어서 써봤기 때문에 Doc은 아래 블로그 포스팅에 잘나와있습니다.
참조: https://gunju-ko.github.io/java/2018/07/05/Future.html
비동기 메서드에서는 새로운 쓰레드를 생성해서 Repository를 통해서 데이터를 조회합니다. 최종 데이터 연산이 끝나지 않아도 일단 return future를 실행해서 먼저 껍데기만 반환하게 됩니다.
@Override
public CompletableFuture<Integer> getPriceAsync(String name) {
log.info("비동기 호출 방식으로 가격 조회 시작");
CompletableFuture<Integer> future = new CompletableFuture<>();
new Thread(() -> {
log.info("새로운 쓰레드로 작업 시작");
Integer price = coffeeRepository.getPriceByName(name);
future.complete(price);
}).start();
return future;
}
getPriceByName() 메서드에서 1초의 지연시간을 임의로 주었지만, 해당 데이터는 무작정 기다리지 않고, 다른 작업을 병행할 수 있습니다. 아래와 같이 테스트 코드를 통해 검증하였습니다.
@Test
public void 가격_조회_비동기_블록킹_호출_테스트() throws Exception {
//given
int expectedPrice = 3500;
//when
// 비동기 메소드 호출 후 껍데기 반환
CompletableFuture<Integer> future = coffeeUseService.getPriceAsync("latte");
logger.info("아직 최종 데이터를 전달 받지는 않았지만, 다른 작업 수행 가능");
int resultPrice = future.join(); // 블로킹
logger.info("최종 가격 전달 받음: [{}]", resultPrice);
//then
assertThat(resultPrice).isEqualTo(expectedPrice);
}
CompletableFuture로 리턴을 받았지만, 최종 데이터를 조회하기 전까지 다른 작업을 병핼할 수 있습니다. 즉, 제어권을 넘겨주지 않고 다른 작업을 할 수 있습니다.
실행 결과
하지만, 최종 데이터를 조회하기 위해서는 CompletableFuture의 join 또는 get 메서드를 사용해야 합니다. 일단 get과 join은 예외처리를 하는 방식이 조금 다릅니다. 이 정도 차이만 있고, join이나 get을 수행하는 시점에서는 데이터를 조회할 때까지 블로킹 됩니다. 데이터가 계산이 안되었다면 될때까지 기다렸다가 결과를 전달받습니다.
결국 동기든 비동기는 결과 값을 받기 위해서는 로직이 다 돌아야되기 때문이죠.
메서드를 제공하는 곳에서는 CompletableFuture를 반환하고, 메서드를 사용하는 곳, 즉 클라이언트에서는 논 블로킹과 블로킹이 혼합되어 있는 상황입니다. 어떻게 보면 완전한 논블로킹 프로그래밍은 아닙니다.
비동기(Async)를 더 깔끔하게 수정하기
getPriceAsync 메서드를 좀 더 깔끔하게 수정해보겠습니다. 참고로, 테스트 코드는 수정이 되지 않습니다. 즉 메서드를 리팩토링한 이후에도 테스트 코드는 수정없이 정상적으로 통과해야 합니다. CompletableFuture에서는 몇개의 유용한 팩토리 메서드를 제공하는데, 그 중에서 supplyAsync와 runAsync 메서드를 살펴보겠습니다. supplyhAsync 메서드는 supplier라는 함수적 인터페이스를 파라미터로 받습니다. 람다에 대해서 알고 있다면 쉽게 이해할 수 있는 내용입니다. 반면에 runAsync 메서드는 Runnable 함수적 인터페이스를 파라미터로 받고 있습니다.
전자인 supplyAsync는 파라미터는 없지만 리턴 값이 존재하고, Runnable은 파라미터, 리턴 모두 없는 함수적 인터페이스입니다. 먼저 supplyAsync 팩토리 메서드를 사용해서 아래와 같이 코드를 수정하였습니다.
@Override
public CompletableFuture<Integer> getPriceAsync(String name) {
log.info("비동기 호출 방식으로 가격 조회 시작");
return CompletableFuture.supplyAsync(() -> {
log.info("Thread CurrentName: [{}]", Thread.currentThread().getName());
log.info("supplyAsync");
return coffeeRepository.getPriceByName(name);
});
}
Thread CurrentName: [ForkJoinPool.commonPool-worker-9]
위와 같이 supplyAsync 메서드로 수행하는 로직은 ForkJoinPool의 commonPool을 사용하는 것을 확인할 수 있습니다. 사실, 일반적으로 commonPool을 사용하는 방법은 바람직하지 않습니다. 그래서, 좀 더 수정을 하였습니다. supplyAsync를 실행할 때 Executor를 파라미터로 추가하면, Common Pool에서 동작하지 않고 별도의 쓰레드 풀에서 동작할 것입니다. 함수를 제공하는 코드를 아래와 같이 수정하였습니다.
Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
@Override
public CompletableFuture<Integer> getPriceAsync(String name) {
log.info("비동기 호출 방식으로 가격 조회 시작");
return CompletableFuture.supplyAsync(() -> {
log.info("Thread CurrentName: [{}]", Thread.currentThread().getName());
log.info("supplyAsync");
return coffeeRepository.getPriceByName(name);
}, executor);
}
테스트 코드를 수행하면, commonPool을 사용하지 않고, 별도로 정의한 쓰레드 풀을 사용합니다.
Thread CurrentName: [pool-1-thread-1]
콜백함수를 사용하여 Non-Blocking 프로그래밍 구현 에제를 살펴보겠습니다.
이제, 논 블로킹(Non-Blocking)을 위해서 코드를 더 수정하겠습니다. 비동기 메서드는 수정하지 않고, 클라이언트 코드를 수정해야 합니다.
블로킹, 논 블로킹은 메서드를 사용하는 곳, 즉 클라이언트에서의 입장입니다.
Non-Blocking 구현: thenAccept, thenApply
위 코드는, CompletableFuture의 get, join 메서드를 사용하는데, 해당 메서드를 호출하는 순간 블로킹 현상이 발생합니다. 논 블로킹으로 개선하기 위해서는 콜백 함수를 구현해야 하는데, CompletableFuture는 thenAccept()와 thenApply() 메서드를 제공합니다. thenAccept() 메서드는 CompletableFuture
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
return uniAcceptStage(null, action);
}
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
먼저, thenAccept() 메서드를 사용해서 테스트 코드를 작성해보겠습니다. 일단 getPriceAsync()는 CompletableFuture
@Test
public void 가격_조회_비동기_호출_콜백_테스트() throws Exception {
//given
int expectedPrice = 4000;
//when
CompletableFuture<Void> future = coffeeUseService
.getPriceAsync("mocha")
.thenAccept(p -> {
logger.info("콜백, 가격은: [{}]", p + "원, 하지만 데이터를 반환하지 않습니다." );
assertThat(p).isEqualTo(expectedPrice);
});
//then
logger.info("아직 최종 데이터를 전달 받지는 않았지만, 다른 작업 수행 가능, 논 블로킹");
assertThat(future.join()).isEqualTo(expectedPrice);
}
위에서 작성했던 테스트 코드처럼 get이나 join 메서드를 사용해서 최종 연산이 된 데이터를 조회할 필요가 없습니다. CompletableFuture 객체에서 알아서 최종 연산이 되면 콜백 함수를 실행해주기 때문입니다. 단, 해당 코드는 테스트 코드이기 때문에 제일 하단에 future.join() 메서드를 실행해서 블로킹 코드를 추가하였습니다. 실제 서비스 코드에서 해당 코드는 필요없지만, 테스트 코드이기 때문에 추가하였는데, 해당 코드가 없다면 thenAccept() 메소드가 수행하기 전에 테스트는 통과해버릴 것입니다. 그 이유는 테스트 코드는 Main 쓰레드에서 동작하게 되고, thenAccept 콜백 메서드가 수행하기도 전에 Main 쓰레드는 종료되기 때문입니다. Non-Blocking 코드이기 때문에 결과가 오는 것을 기다리지 않고 게속 코드가 동작이 되는데, 테스트 코드 특성상 Main 쓰레드가 종료되기 때문에, Main 쓰레드를 종료시키지 않기 위해서 임의로 작성한 코드입니다.
thenAccept는 CompletableFuture
@Test
public void 가격_조회_비동기_호출_콜백_테스트() throws Exception {
//given
int expectedPrice = 4000;
//when
CompletableFuture<Void> future = coffeeUseService
.getPriceAsync("mocha")
.thenApply(p -> {
logger.info("같은 쓰레드로 동작");
return p + 100;
})
.thenAccept(p -> {
logger.info("콜백, 가격은: [{}]", p + "원, 하지만 데이터를 반환하지 않습니다." );
assertThat(p).isEqualTo(expectedPrice);
});
//then
logger.info("아직 최종 데이터를 전달 받지는 않았지만, 다른 작업 수행 가능, 논 블로킹");
assertThat(future.join()).isEqualTo(expectedPrice);
}
실행 결과
참고로, thenApply와 thenAccept 메서드를 별도의 쓰레드로 동작하고 싶다면, thenApplyAsync와 thenAcceptAsync 메서드를 사용하면 됩니다.
thenCombine 메서드
thenCombine() 메서드는 CompletableFuture를 2개 실행해서 결과를 조합할 때 사용합니다.thenCombine()는 병렬 실행을 해서 조합하는데, 순차적으로 실행하지 않습니다. 커피의 가격을 조회하는 기능은 1초의 지연시간이 있습니다. 만약 순차적으로 조회하면 1+1이 되기 때문에 총 2초가 걸릴 것입니다. 그래서 동시에 두가지 조회를 같이 수행한 다음에 결과를 조합할 것이고, 그러면 2개를 조회하는데 1초가 걸리도록 프로그램을 작성할 것입니다. 이것이 바로 병렬 프로그래밍입니다.
@Test
public void thenCombine_테스트() throws Exception {
//given
Integer expectedPrice = 7500;
//when
CompletableFuture<Integer> futureA = coffeeUseService.getPriceAsync("latte");
CompletableFuture<Integer> futureB = coffeeUseService.getPriceAsync("mocha");
Integer resultPrice = futureA.thenCombine(futureB,Integer::sum).join();
//then
assertThat(resultPrice).isEqualTo(expectedPrice);
}
실행 결과
커피 이름 중, 라떼와 모카를 조회하는데 총 1초 정도 걸렸습니다. 즉 2초가 걸리지 않았습니다. 두 작업은 별도의 쓰레드 풀에서 동작하고, thenCombine 메서드에 의해서 조합이 됩니다. 여기서 쓰레드 풀의 쓰레드 개수를 1로 주게되면 쓰레드가 한개이기 때문에 병렬로 수행하지 못하고 하나의 쓰레드를 사용합니다. 따라서 1초가 아니고 2초가 걸리게 됩니다.
thenCompose 메소드
thenCompose() 메서드는 바로 위에서 설명한 thenCombine와는 다르게 CompletableFuture를 순차적으로 실행합니다. 가격을 조회하는 기능이 있고, 조회된 가격에서 할인을 하는 기능을 별도로 조회하는 기능을 구현해보겠습니다.
https://github.com/HomoEfficio/dev-tips/blob/master/Java-Spring%20Thread%20Programming%20%EA%B0%84%EB%8B%A8%20%EC%A0%95%EB%A6%AC.md
@Override
public CompletableFuture<Integer> getDiscountPriceAsync(Integer price) {
return CompletableFuture.supplyAsync(() -> {
log.info("supplyAsync");
return (int)(price * 0.9);
}, threadPoolTaskExecutor);
}
- 가격 조회
- 조회된 가격에 할인율 적용이라는 기능을 순차적으로 수행해야합니다.
@Test
public void thenCompose_테스트() throws Exception {
//given
Integer expectedPrice = (int)(3500 * 0.9);
//when
CompletableFuture<Integer> futureA = coffeeUseService.getPriceAsync("latte");
Integer resultPrice = futureA.thenCompose(result ->
coffeeUseService.getDiscountPriceAsync(result)).join();
//then
assertThat(resultPrice).isEqualTo(expectedPrice);
}