ch16. CompletableFuture: 안정적 비동기 프로그래밍

    1. Future의 단순 활용

    • Future 인터페이스
      • 미래의 어느 시점에 결과를 얻을 수 있음
      • 비동기 계산을 모델링 하는데 사용할 수 있고, 계산이 끝났을 때 결과에 접근할 수 있는 참조를 제공함
      • 시간이 걸리는 작업을 Future 내부로 설정하면 Future에서 계산을 하는 동안 호출한 스레드에서는 다른 작업을 할 수 있음
      • Future를 사용하기 위해서는 시간이 오래 걸리는 작업을 Callable 객체 내부로 감싼 다음 ExecutorService에 제출해야 함
    package org.example;
    
    import java.util.concurrent.*;
    
    public class ex16_1 {
        public static void main(String[] args) {
            ExecutorService es = Executors.newCachedThreadPool();
            Future<Double> future = es.submit(new Callable<Double>() {
                @Override
                public Double call() throws Exception {
                    return doSomeLongComputation();
                }
            });
    
            doSomethingElse();
            try{
                Double result = future.get(1, TimeUnit.SECONDS);
                System.out.println(result);
                future.cancel(true);
            } catch(ExecutionException e){
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
    
            es.shutdown();
    
        }
    
        static double doSomeLongComputation(){
            double sum = 0;
            for(int i=1; i<=100; i++){
                sum += i;
            }
            return sum;
        }
        static void doSomethingElse(){
            StringBuilder sb = new StringBuilder();
            for(int i=1; i<=100; i++){
                sb.append(i).append(" do Something Else\n");
            }
            System.out.println(sb);
        }
    }
    • Future의 get 메서드로 결과를 가져올 수 있는데, get 메서드를 호출할 때 이미 결과가 준비되었다면 즉시 결과를 반환하지만 결과가 준비되지 않았다면 작업이 완료될 때까지 호출한 스레드를 block시킴

     

    2. 비동기 API 구현

    • 최저가격 검색 애플리케이션을 구현함
    package org.example.ex_2;
    
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    
    public class main {
        public static void main(String[] args) {
            shop shop = new shop("BestShop");
            Future<Double> futurePrice = shop.getPriceAsync("my favorite product");
    
            // 제품 가격을 계산하는 동안 다른 일 하기
            doSomethingElse();
    
            try {
            	// 계산이 완료되었을 경우, 정상적인 price 값 리턴
                // 계산이 완료되지 않았을 경우, 계산이 완료될때까지 호출한 스레드가 블락됨
                double price = futurePrice.get();
                System.out.printf("Price is %.2f%n", price);
            } catch (ExecutionException e) {
                throw new RuntimeException(e);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        public static void doSomethingElse(){
            StringBuilder sb = new StringBuilder();
            for(int i=1; i<=10; i++){
                sb.append(i).append(" do Something Else\n");
            }
            System.out.println(sb);
        }
    }
    package org.example.ex_2;
    
    import java.util.Random;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.Future;
    
    public class shop {
    
        private String name;
        shop(String name){
            this.name = name;
        }
    
        public Future<Double> getPriceAsync(String product){
            CompletableFuture<Double> futurePrice = new CompletableFuture<>();
            new Thread(()->{ // 다른 스레드에서 비동기적으로 계산수행
                double price = calculatePrice(product);
                futurePrice.complete(price);} // 오래 걸리는 계산이 완료되면 Future에 값을 설정함
            ).start();
            return futurePrice; // 계산 결과를 기다리지 않고 Future를 반환함
        }
    
        public static void delay(){
            try{
              Thread.sleep(1000L);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    
        private double calculatePrice(String product){
            delay();
            Random random = new Random();
            return random.nextDouble() * product.charAt(0) + product.charAt(1);
        }
    }

     

    2.2 에러 처리 방법

    • 가격을 계산하는 동안 에러가 발생한다면?
      • 예외가 발생하면 해당 스레드에만 영향을 미침
      • 즉, 에러가 발생해도 가격 계산은 계속 진행되며 일의 순서가 꼬임
      • 결과적으로 get 메서드가 반환될 때까지 영원히 기다리게 될 수 있음
    • 해결) 타임아웃값을 받는 get 메서드의 오버로드 버전을 만들어 문제를 해결하기
      • completeExceptionally 메서드를 이용해서 CompletableFuture 내부에서 발생한 예외를 클라이언트에게 전달
    public Future<Double> getPriceAsync(String product){
        CompletableFuture<Double> futurePrice = new CompletableFuture<>();
            new Thread(()->{ // 다른 스레드에서 비동기적으로 계산수행
                try{
                    double price = calculatePrice(product);
                    futurePrice.complete(price); // 계산이 정상적으로 종료되면 가격정보를 저장한 채로 Future를 종료
                } catch (Exception e){
                    futurePrice.completeExceptionally(e); // 문제가 생기면 에러를 포함시키고 Future를 종료함
                }
            }
        ).start();
        return futurePrice; // 계산 결과를 기다리지 않고 Future를 반환함
    }
    • calculatePrice 메서드에서 RuntimeException을 던지게 함

     

     

    팩토리 메서드 supplyAsync로 CompletableFuture 만들기

    • 더 간단한 방식으로 CompletableFuture를 만들어보자. 위의 getPriceAsync 메서드를 다음처럼 구현할 수 있다!
    public Future<Double> getSimplePriceAsync(String product){
            return CompletableFuture.supplyAsync(()->calculatePrice(product));
    }
    • supplyAsync 메서드는 Supplier를 인수로 받고, CompletableFuture를 반환함
      • CompletableFuture는 Supplier를 실행해서 비동기적으로 결과를 생성함
      • Executor를 선택적으로 전달 가능

     

     

    3. 비블록 코드 만들기

    • API를 제어할 권한이 없는 상황이며, 모든 API는 동기 방식의 블록 메서드라고 가정
    • 블록 메서드를 사용할 수 밖에 없는 상황에서 비동기적으로 여러 상점에 질의하는 방법, 즉 한 요청의 응답을 기다리며 블록하는 상황을 피해 최저가격 검색 애플리케이션의 성능을 높일 수 있는 방법을 살펴보자.
    package org.example.ex_3;
    
    import java.util.Arrays;
    import java.util.List;
    import java.util.stream.Collectors;
    
    public class non_block_main {
    
        static List<shop> shops = Arrays.asList(new shop("BestPrice"),
                new shop("LetsSaveBig"),
                new shop("MyFavoriteShop"),
                new shop("BuyItAll"));
    
        public static void main(String[] args){
            long start = System.nanoTime();
            System.out.println(findPrices("myPhone27S"));
            long duration = (System.nanoTime()-start) / 1_000_000;
            System.out.println("Done in " + duration + "ms");
        }
    
        public static List<String> findPrices(String product){
            return shops.stream().map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product
            ))).collect(Collectors.toList());
        }
    }

    위의 코드를 어떻게 개선시킬 수 있을까?

     

     

    3.1 병렬 스트림으로 요청 병렬화하기

    public static List<String> findParallelPrices(String product){
            return shops.parallelStream().map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product
            ))).collect(Collectors.toList());
    }

    성공!

     

     

    3.2 CompletableFuture로 비동기 호출 구현하기

    CompletableFuture 기능을 활용해서 findPrices 메서드의 동기호출을 비동기호출로 바꿔보자

    public static List<String> findAsyncPrices(String product){
        List<CompletableFuture<String>> priceFutures = shops.stream().map(shop->CompletableFuture.supplyAsync(
                    ()-> shop.getName() + " price is " + shop.getPrice(product)
            )).collect(Collectors.toList());
            
        return priceFutures.stream()
                    .map(CompletableFuture::join) // 모든 비동기 동작이 끝나길 기다림
                    .collect(Collectors.toList()); 
    }

    • 두 개의 stream 파이프라인으로 처리함
      • stream은 collect할 때 모든 연산을 합쳐셔 계산함 (lazy) 
      • 하나의 stream 파이프라인을 사용할 경우 계산이 동기적, 순차적으로 이루어짐
    • sync-blocking 방식의 구현에 비해서는 빠르지만, 병렬 스트림을 이용할 때보다 느림

     

     

    3.3 더 확장성이 좋은 해결 방법

    • 코드를 실행하는 컴퓨터가 네 개의 스레드를 병렬로 실행할 수 있는 기기라고 가정
      • 다섯 번째 상점이 추가된다면?
      • 내가 쓰는 컴퓨터는 8개의 스레드를 지원함 그래서 9개로 돌려봄
    • 병렬 스트림

    • Async

    • 미세하게 Async 버전이 빠름
      • 결과적으로 비슷하지만 CompletableFuture는 다양한 Executor를 지정할 수 있다는 장점이 존재함
      • Executor로 스레드 풀의 크기를 조절하는 등 애플리케이션에 맞는 최적화된 설정을 만들 수 있음

     

     

    3.4 커스텀 Executor 사용하기

    • 스레드 풀에서 관리하는 스레드 수를 어떻게 결정?
      • Java Concurrency in Practice
      • N_threads = N_cpu * U_cpu * (1 + W/C)
        • N_threads: thread 수
        • N_cpu: Runtime.getRuntime().availableProcessors()가 반환하는 코어 수
        • U_cpu: 0과 1 사이 값을 갖는 CPU 활용 비율
        • W: Waiting time, C: computing time
    • 하지만 상점 수보다 스레드 수가 많으면 낭비임
    • 그리고 스레드 수가 너무 많으면 서버가 크래시될 수 있으므로 하나의 Executor에서 사용할 스레드의 최대 개수는 100 이하로 설정하는 것이 바람직함
    private final Executor ex = Executors.newFixedThreadPool(Math.min(shops.size(), 100),
                new ThreadFactory(){
                    public Thread newThread(Runnable r){
                        Thread t = new Thread(r);
                        t.setDaemon(true); // 프로그램 종료를 방해하지 않는 데몬스레드 사용
                        return t;
                    }
    });
    public static List<String> findAsyncPrices(String product){
            List<CompletableFuture<String>> priceFutures = shops.stream().map(shop->CompletableFuture.supplyAsync(
                    ()-> shop.getName() + " price is " + shop.getPrice(product)
            ,ex)).collect(Collectors.toList());
    
            return priceFutures.stream()
                    .map(CompletableFuture::join) // 모든 비동기 동작이 끝나길 기다림
                    .collect(Collectors.toList());
    }

    두 배정도 빨라짐

    애플리케이션 특성에 맞는 Executor를 만들어 CompletableFuture를 활용하는 것이 바람직함

    CompletableFuture를사용하면 전체적인 계산이 블록되지 않도록 스레드 풀의 크기를 조절할 수 있음

    • I/O가 포함되지 않은 계산 중심의 동작: 스트림 인터페이스가 구현이 간단하며 효율적일 수 있음. 모든 스레드가 계산 작업을 수행하는 상황에서는 프로세서 코어 수 이상의 스레드를 가질 필요가 없음
    • I/O를 기다리는 작업: CompletableFuture가 더 많은 유연성을 제공하며 대기시간/계산시간 비율에 적합한 스레드 수를 설정할 수 있음. 특히, 스트림의 게으른 특성 때문에 스트림에서 I/O를 실제로 언제 처리할지 예측하기 어려운 문제도 있음

     

     

    4. 비동기 작업 파이프라인 만들기

    할인코드 class

    package org.example.ex_4;
    
    public class Discount {
        public enum Code{
            NONE(0), SILVER(5), GOLD(10), PLATINUM(15), DIAMOND(20);
    
            private final int percentage;
    
            Code(int percentage){
                this.percentage = percentage;
            }
        }
    
        public static String applyDiscount(Quote quote){
            return quote.getShopName() + " price is " + Discount.apply(quote.getPrice(), quote.getDiscountCode());
        }
    
        private static double apply(double price, Code code){
            delay();
            return (price * (100 - code.percentage)/100);
        }
    
        public static void delay(){
            try{
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

     

    쿼리 class

     

    package org.example.ex_4;
    
    public class Quote {
        private final String shopName;
        private final double price;
        private final Discount.Code discountCode;
    
        public Quote(String shopName, double price, Discount.Code code){
            this.shopName = shopName;
            this.price = price;
            this.discountCode = code;
        }
    
        public static Quote parse(String s){
            String[] split = s.split(":");
            String shopName = split[0];
            double price = Double.parseDouble(split[1]);
            Discount.Code discountCode = Discount.Code.valueOf(split[2]);
            return new Quote(shopName, price, discountCode);
        }
    
        public String getShopName(){
            return shopName;
        }
    
        public double getPrice(){
            return price;
        }
    
        public Discount.Code getDiscountCode(){
            return discountCode;
        }
    }

     

    shop class

    public String getPrice(String product){
            double price = calculatePrice(product);
            Random random = new Random();
            Discount.Code code = Discount.Code.values()[
                random.nextInt(Discount.Code.values().length)];
            return String.format("%s:%.2f:%s", name, price, code);
    }

     

     

    Main class

    public static List<String> findPrices(String product){
            return shops.stream().map(shop -> shop.getPrice(product))
                    .map(Quote::parse)
                    .map(Discount::applyDiscount)
                    .collect(Collectors.toList());
    }

    결과

    • 1개의 상점당 2초씩, 총 9개의 상점
      • 상점에서 상품의 가격을 찾아오는데 각 상점당 1초씩
      • 할인코드를 적용하는데 1초씩

     

     

    4.3 동기 작업과 비동기 작업 조합하기

    public static List<String> findAsyncPrices(String product){
    
            List<CompletableFuture<String>> priceFutures = shops.stream()
                    .map(shop -> CompletableFuture.supplyAsync(()->shop.getPrice(product), ex)) // 작업이 끝났을 때 해당 상점에서 반환하는 문자열 정보를 포함
                    .map(future -> future.thenApply(Quote::parse)) // 문자열을 Quote로 변환
                    .map(future -> future.thenCompose(quote->CompletableFuture.supplyAsync(()->Discount.applyDiscount(quote), ex))) // 할인율 적용
                    .collect(Collectors.toList());
            return priceFutures.stream().map(CompletableFuture::join).collect(Collectors.toList());
    }
    • thenApply: 동기적 mapping function을 적용할 때 사용
    • thenCompose:  비동기적 mapping function을 적용할 때 사용
      • thenComposeAsync: 별도의 스레드에서 조합 동작을 수행

    결과

     

     

    4.4 독립 CompletableFuture와 비독립 CompletableFuture 합치기

    • 위의 예제에서는 첫 번째 CompletableFuture에 thenCompose 메서드를 실행해서 실행 결과를 두 번째 CompletableFuture로 전달함
    • 그렇다면 독립적으로 실행된 두 개의 CompletableFuture를 합치려면 어떻게 해야할까?
      • thenCombine 사용: BiFunction을 두 번째 인수로 받음
        • BiFunction: 두 개의 CompletableFuture를 어떻게 합칠지 정의
    • 예제) 온라인 상점에서 유로 가격 정보를 제공하는데, 고객에게는 항상 달러 가격을 보여줘야함
      • task1) 주어진 상품의 가격을 상점에 요청
      • task2) 원격 환율 교환 서비스를 이용해서 유로와 달러의 현재 환율을 비동기적으로 요청
      • 가격 * 환율해서 최종 결과값을 구할 수 있음
    Future<Double> futurePriceInUSD = CompletableFuture.supplyAsync(()->shop.getPrice(product))
    									.thenCombine(
                                            CompletableFuture.supplyAsync(
                                                ()->exchangeService.getRate(Money.EUR, Money.USD)),
                                                (price, rate)->price*rate)
                                         );

    • 두번째 예제) 환율을 가져오는 것에 timeout을 걸고, timeout이 발생할 시 기본값을 사용하자
    Future<Double> futurePriceInUSD = CompletableFuture.supplyAsync(()->shop.getPrice(product))
    									.thenCombine(
                                            CompletableFuture.supplyAsync(
                                                ()->exchangeService.getRate(Money.EUR, Money.USD))
                                                .completeOnTimeout(DEFAULT_RATE, 1, TimeUnit.SECONDS),
                                                (price, rate)->price*rate)
                                         ).orTimeout(3, TimeUnit.SECONDS);

     

     

    5. CompletableFuture의 종료에 대응하는 방법

    • 모든 상점에서 가격 정보를 제공할때 까지 기다리지 않고 각 상점에서 가격 정보를 제공할 때마다 즉시 보여줄 수 있는 최저가격 검색 어플리케이션을 만들어보자

     

    5.1 최저가격 검색 어플리케이션 리팩터링

    1초에서 임의지연으로 변경

    public static void delay(){
            Random random = new Random();
            int delay = 500 + random.nextInt(2000);
            try{
              Thread.sleep(delay);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
    }

     

     

    각 상점에서 가격 정보를 제공할때마다 출력하기 위해 List가 아닌 Stream 사용

    public static Stream<CompletableFuture<String>> findPricesStream(String product){
            return shops.stream()
                    .map(shop -> CompletableFuture.supplyAsync(()->shop.getPrice(product), ex))
                    .map(future -> future.thenApply(Quote::parse))
                    .map(future -> future.thenCompose(quote->CompletableFuture.supplyAsync(()->Discount.applyDiscount(quote), ex)));
    }

     

    응답이 가장 느린 상점까지 기다리고 싶을 때

        public static void main(String[] args){
            long start = System.nanoTime();
            CompletableFuture[] futures = findPricesStream("myPhone27S").map(f->f.thenAccept(
                    s->System.out.println(s + " done in " + ((System.nanoTime()-start) / 1_000_000) + "ms")
                    ))
                    .toArray(size -> new CompletableFuture[size]);
            CompletableFuture.allOf(futures).join();
            long duration = (System.nanoTime()-start) / 1_000_000;
            System.out.println("Done in " + duration + "ms");
        }
    • 팩토리 메서드 allOf: CompletableFuture 배열을 입력받아 CompletableFuture<Void>를 반환함
      • 모든 CompletableFuture가 완료되어야 CompletableFuture<Void>가 완료됨
      • allOf 메서드가 반환하는 CompletableFuture에 join을 호출하면 스트림의 CompletableFuture의 실행 완료를 기다릴 수 있음

    댓글