1. Future의 단순 활용
- Future 인터페이스
- 미래의 어느 시점에 결과를 얻을 수 있음
- 비동기 계산을 모델링 하는데 사용할 수 있고, 계산이 끝났을 때 결과에 접근할 수 있는 참조를 제공함
- 시간이 걸리는 작업을 Future 내부로 설정하면 Future에서 계산을 하는 동안 호출한 스레드에서는 다른 작업을 할 수 있음
- Future를 사용하기 위해서는 시간이 오래 걸리는 작업을 Callable 객체 내부로 감싼 다음 ExecutorService에 제출해야 함
package org.example;
import java.util.concurrent.*;
public class ex16_1 {
public static void main(String[] args) {
ExecutorService es = Executors.newCachedThreadPool();
Future<Double> future = es.submit(new Callable<Double>() {
@Override
public Double call() throws Exception {
return doSomeLongComputation();
}
});
doSomethingElse();
try{
Double result = future.get(1, TimeUnit.SECONDS);
System.out.println(result);
future.cancel(true);
} catch(ExecutionException e){
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
es.shutdown();
}
static double doSomeLongComputation(){
double sum = 0;
for(int i=1; i<=100; i++){
sum += i;
}
return sum;
}
static void doSomethingElse(){
StringBuilder sb = new StringBuilder();
for(int i=1; i<=100; i++){
sb.append(i).append(" do Something Else\n");
}
System.out.println(sb);
}
}
- Future의 get 메서드로 결과를 가져올 수 있는데, get 메서드를 호출할 때 이미 결과가 준비되었다면 즉시 결과를 반환하지만 결과가 준비되지 않았다면 작업이 완료될 때까지 호출한 스레드를 block시킴
2. 비동기 API 구현
- 최저가격 검색 애플리케이션을 구현함
package org.example.ex_2;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class main {
public static void main(String[] args) {
shop shop = new shop("BestShop");
Future<Double> futurePrice = shop.getPriceAsync("my favorite product");
// 제품 가격을 계산하는 동안 다른 일 하기
doSomethingElse();
try {
// 계산이 완료되었을 경우, 정상적인 price 값 리턴
// 계산이 완료되지 않았을 경우, 계산이 완료될때까지 호출한 스레드가 블락됨
double price = futurePrice.get();
System.out.printf("Price is %.2f%n", price);
} catch (ExecutionException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public static void doSomethingElse(){
StringBuilder sb = new StringBuilder();
for(int i=1; i<=10; i++){
sb.append(i).append(" do Something Else\n");
}
System.out.println(sb);
}
}
package org.example.ex_2;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
public class shop {
private String name;
shop(String name){
this.name = name;
}
public Future<Double> getPriceAsync(String product){
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
new Thread(()->{ // 다른 스레드에서 비동기적으로 계산수행
double price = calculatePrice(product);
futurePrice.complete(price);} // 오래 걸리는 계산이 완료되면 Future에 값을 설정함
).start();
return futurePrice; // 계산 결과를 기다리지 않고 Future를 반환함
}
public static void delay(){
try{
Thread.sleep(1000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private double calculatePrice(String product){
delay();
Random random = new Random();
return random.nextDouble() * product.charAt(0) + product.charAt(1);
}
}
2.2 에러 처리 방법
- 가격을 계산하는 동안 에러가 발생한다면?
- 예외가 발생하면 해당 스레드에만 영향을 미침
- 즉, 에러가 발생해도 가격 계산은 계속 진행되며 일의 순서가 꼬임
- 결과적으로 get 메서드가 반환될 때까지 영원히 기다리게 될 수 있음
- 해결) 타임아웃값을 받는 get 메서드의 오버로드 버전을 만들어 문제를 해결하기
- completeExceptionally 메서드를 이용해서 CompletableFuture 내부에서 발생한 예외를 클라이언트에게 전달
public Future<Double> getPriceAsync(String product){
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
new Thread(()->{ // 다른 스레드에서 비동기적으로 계산수행
try{
double price = calculatePrice(product);
futurePrice.complete(price); // 계산이 정상적으로 종료되면 가격정보를 저장한 채로 Future를 종료
} catch (Exception e){
futurePrice.completeExceptionally(e); // 문제가 생기면 에러를 포함시키고 Future를 종료함
}
}
).start();
return futurePrice; // 계산 결과를 기다리지 않고 Future를 반환함
}
- calculatePrice 메서드에서 RuntimeException을 던지게 함
팩토리 메서드 supplyAsync로 CompletableFuture 만들기
- 더 간단한 방식으로 CompletableFuture를 만들어보자. 위의 getPriceAsync 메서드를 다음처럼 구현할 수 있다!
public Future<Double> getSimplePriceAsync(String product){
return CompletableFuture.supplyAsync(()->calculatePrice(product));
}
- supplyAsync 메서드는 Supplier를 인수로 받고, CompletableFuture를 반환함
- CompletableFuture는 Supplier를 실행해서 비동기적으로 결과를 생성함
- Executor를 선택적으로 전달 가능
3. 비블록 코드 만들기
- API를 제어할 권한이 없는 상황이며, 모든 API는 동기 방식의 블록 메서드라고 가정함
- 블록 메서드를 사용할 수 밖에 없는 상황에서 비동기적으로 여러 상점에 질의하는 방법, 즉 한 요청의 응답을 기다리며 블록하는 상황을 피해 최저가격 검색 애플리케이션의 성능을 높일 수 있는 방법을 살펴보자.
package org.example.ex_3;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
public class non_block_main {
static List<shop> shops = Arrays.asList(new shop("BestPrice"),
new shop("LetsSaveBig"),
new shop("MyFavoriteShop"),
new shop("BuyItAll"));
public static void main(String[] args){
long start = System.nanoTime();
System.out.println(findPrices("myPhone27S"));
long duration = (System.nanoTime()-start) / 1_000_000;
System.out.println("Done in " + duration + "ms");
}
public static List<String> findPrices(String product){
return shops.stream().map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product
))).collect(Collectors.toList());
}
}
위의 코드를 어떻게 개선시킬 수 있을까?
3.1 병렬 스트림으로 요청 병렬화하기
public static List<String> findParallelPrices(String product){
return shops.parallelStream().map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product
))).collect(Collectors.toList());
}
성공!
3.2 CompletableFuture로 비동기 호출 구현하기
CompletableFuture 기능을 활용해서 findPrices 메서드의 동기호출을 비동기호출로 바꿔보자
public static List<String> findAsyncPrices(String product){
List<CompletableFuture<String>> priceFutures = shops.stream().map(shop->CompletableFuture.supplyAsync(
()-> shop.getName() + " price is " + shop.getPrice(product)
)).collect(Collectors.toList());
return priceFutures.stream()
.map(CompletableFuture::join) // 모든 비동기 동작이 끝나길 기다림
.collect(Collectors.toList());
}
- 두 개의 stream 파이프라인으로 처리함
- stream은 collect할 때 모든 연산을 합쳐셔 계산함 (lazy)
- 하나의 stream 파이프라인을 사용할 경우 계산이 동기적, 순차적으로 이루어짐
- sync-blocking 방식의 구현에 비해서는 빠르지만, 병렬 스트림을 이용할 때보다 느림
3.3 더 확장성이 좋은 해결 방법
- 코드를 실행하는 컴퓨터가 네 개의 스레드를 병렬로 실행할 수 있는 기기라고 가정
- 다섯 번째 상점이 추가된다면?
- 내가 쓰는 컴퓨터는 8개의 스레드를 지원함 그래서 9개로 돌려봄
- 병렬 스트림
- Async
- 미세하게 Async 버전이 빠름
- 결과적으로 비슷하지만 CompletableFuture는 다양한 Executor를 지정할 수 있다는 장점이 존재함
- Executor로 스레드 풀의 크기를 조절하는 등 애플리케이션에 맞는 최적화된 설정을 만들 수 있음
3.4 커스텀 Executor 사용하기
- 스레드 풀에서 관리하는 스레드 수를 어떻게 결정?
- Java Concurrency in Practice
- N_threads = N_cpu * U_cpu * (1 + W/C)
- N_threads: thread 수
- N_cpu: Runtime.getRuntime().availableProcessors()가 반환하는 코어 수
- U_cpu: 0과 1 사이 값을 갖는 CPU 활용 비율
- W: Waiting time, C: computing time
- 하지만 상점 수보다 스레드 수가 많으면 낭비임
- 그리고 스레드 수가 너무 많으면 서버가 크래시될 수 있으므로 하나의 Executor에서 사용할 스레드의 최대 개수는 100 이하로 설정하는 것이 바람직함
private final Executor ex = Executors.newFixedThreadPool(Math.min(shops.size(), 100),
new ThreadFactory(){
public Thread newThread(Runnable r){
Thread t = new Thread(r);
t.setDaemon(true); // 프로그램 종료를 방해하지 않는 데몬스레드 사용
return t;
}
});
public static List<String> findAsyncPrices(String product){
List<CompletableFuture<String>> priceFutures = shops.stream().map(shop->CompletableFuture.supplyAsync(
()-> shop.getName() + " price is " + shop.getPrice(product)
,ex)).collect(Collectors.toList());
return priceFutures.stream()
.map(CompletableFuture::join) // 모든 비동기 동작이 끝나길 기다림
.collect(Collectors.toList());
}
두 배정도 빨라짐
애플리케이션 특성에 맞는 Executor를 만들어 CompletableFuture를 활용하는 것이 바람직함
CompletableFuture를사용하면 전체적인 계산이 블록되지 않도록 스레드 풀의 크기를 조절할 수 있음
- I/O가 포함되지 않은 계산 중심의 동작: 스트림 인터페이스가 구현이 간단하며 효율적일 수 있음. 모든 스레드가 계산 작업을 수행하는 상황에서는 프로세서 코어 수 이상의 스레드를 가질 필요가 없음
- I/O를 기다리는 작업: CompletableFuture가 더 많은 유연성을 제공하며 대기시간/계산시간 비율에 적합한 스레드 수를 설정할 수 있음. 특히, 스트림의 게으른 특성 때문에 스트림에서 I/O를 실제로 언제 처리할지 예측하기 어려운 문제도 있음
4. 비동기 작업 파이프라인 만들기
할인코드 class
package org.example.ex_4;
public class Discount {
public enum Code{
NONE(0), SILVER(5), GOLD(10), PLATINUM(15), DIAMOND(20);
private final int percentage;
Code(int percentage){
this.percentage = percentage;
}
}
public static String applyDiscount(Quote quote){
return quote.getShopName() + " price is " + Discount.apply(quote.getPrice(), quote.getDiscountCode());
}
private static double apply(double price, Code code){
delay();
return (price * (100 - code.percentage)/100);
}
public static void delay(){
try{
Thread.sleep(1000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
쿼리 class
package org.example.ex_4;
public class Quote {
private final String shopName;
private final double price;
private final Discount.Code discountCode;
public Quote(String shopName, double price, Discount.Code code){
this.shopName = shopName;
this.price = price;
this.discountCode = code;
}
public static Quote parse(String s){
String[] split = s.split(":");
String shopName = split[0];
double price = Double.parseDouble(split[1]);
Discount.Code discountCode = Discount.Code.valueOf(split[2]);
return new Quote(shopName, price, discountCode);
}
public String getShopName(){
return shopName;
}
public double getPrice(){
return price;
}
public Discount.Code getDiscountCode(){
return discountCode;
}
}
shop class
public String getPrice(String product){
double price = calculatePrice(product);
Random random = new Random();
Discount.Code code = Discount.Code.values()[
random.nextInt(Discount.Code.values().length)];
return String.format("%s:%.2f:%s", name, price, code);
}
Main class
public static List<String> findPrices(String product){
return shops.stream().map(shop -> shop.getPrice(product))
.map(Quote::parse)
.map(Discount::applyDiscount)
.collect(Collectors.toList());
}
- 1개의 상점당 2초씩, 총 9개의 상점
- 상점에서 상품의 가격을 찾아오는데 각 상점당 1초씩
- 할인코드를 적용하는데 1초씩
4.3 동기 작업과 비동기 작업 조합하기
public static List<String> findAsyncPrices(String product){
List<CompletableFuture<String>> priceFutures = shops.stream()
.map(shop -> CompletableFuture.supplyAsync(()->shop.getPrice(product), ex)) // 작업이 끝났을 때 해당 상점에서 반환하는 문자열 정보를 포함
.map(future -> future.thenApply(Quote::parse)) // 문자열을 Quote로 변환
.map(future -> future.thenCompose(quote->CompletableFuture.supplyAsync(()->Discount.applyDiscount(quote), ex))) // 할인율 적용
.collect(Collectors.toList());
return priceFutures.stream().map(CompletableFuture::join).collect(Collectors.toList());
}
- thenApply: 동기적 mapping function을 적용할 때 사용
- thenCompose: 비동기적 mapping function을 적용할 때 사용
- thenComposeAsync: 별도의 스레드에서 조합 동작을 수행
4.4 독립 CompletableFuture와 비독립 CompletableFuture 합치기
- 위의 예제에서는 첫 번째 CompletableFuture에 thenCompose 메서드를 실행해서 실행 결과를 두 번째 CompletableFuture로 전달함
- 그렇다면 독립적으로 실행된 두 개의 CompletableFuture를 합치려면 어떻게 해야할까?
- thenCombine 사용: BiFunction을 두 번째 인수로 받음
- BiFunction: 두 개의 CompletableFuture를 어떻게 합칠지 정의
- thenCombine 사용: BiFunction을 두 번째 인수로 받음
- 예제) 온라인 상점에서 유로 가격 정보를 제공하는데, 고객에게는 항상 달러 가격을 보여줘야함
- task1) 주어진 상품의 가격을 상점에 요청
- task2) 원격 환율 교환 서비스를 이용해서 유로와 달러의 현재 환율을 비동기적으로 요청
- 가격 * 환율해서 최종 결과값을 구할 수 있음
Future<Double> futurePriceInUSD = CompletableFuture.supplyAsync(()->shop.getPrice(product))
.thenCombine(
CompletableFuture.supplyAsync(
()->exchangeService.getRate(Money.EUR, Money.USD)),
(price, rate)->price*rate)
);
- 두번째 예제) 환율을 가져오는 것에 timeout을 걸고, timeout이 발생할 시 기본값을 사용하자
Future<Double> futurePriceInUSD = CompletableFuture.supplyAsync(()->shop.getPrice(product))
.thenCombine(
CompletableFuture.supplyAsync(
()->exchangeService.getRate(Money.EUR, Money.USD))
.completeOnTimeout(DEFAULT_RATE, 1, TimeUnit.SECONDS),
(price, rate)->price*rate)
).orTimeout(3, TimeUnit.SECONDS);
5. CompletableFuture의 종료에 대응하는 방법
- 모든 상점에서 가격 정보를 제공할때 까지 기다리지 않고 각 상점에서 가격 정보를 제공할 때마다 즉시 보여줄 수 있는 최저가격 검색 어플리케이션을 만들어보자
5.1 최저가격 검색 어플리케이션 리팩터링
1초에서 임의지연으로 변경
public static void delay(){
Random random = new Random();
int delay = 500 + random.nextInt(2000);
try{
Thread.sleep(delay);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
각 상점에서 가격 정보를 제공할때마다 출력하기 위해 List가 아닌 Stream 사용
public static Stream<CompletableFuture<String>> findPricesStream(String product){
return shops.stream()
.map(shop -> CompletableFuture.supplyAsync(()->shop.getPrice(product), ex))
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenCompose(quote->CompletableFuture.supplyAsync(()->Discount.applyDiscount(quote), ex)));
}
응답이 가장 느린 상점까지 기다리고 싶을 때
public static void main(String[] args){
long start = System.nanoTime();
CompletableFuture[] futures = findPricesStream("myPhone27S").map(f->f.thenAccept(
s->System.out.println(s + " done in " + ((System.nanoTime()-start) / 1_000_000) + "ms")
))
.toArray(size -> new CompletableFuture[size]);
CompletableFuture.allOf(futures).join();
long duration = (System.nanoTime()-start) / 1_000_000;
System.out.println("Done in " + duration + "ms");
}
- 팩토리 메서드 allOf: CompletableFuture 배열을 입력받아 CompletableFuture<Void>를 반환함
- 모든 CompletableFuture가 완료되어야 CompletableFuture<Void>가 완료됨
- allOf 메서드가 반환하는 CompletableFuture에 join을 호출하면 스트림의 CompletableFuture의 실행 완료를 기다릴 수 있음
'백엔드 > 모던자바인액션' 카테고리의 다른 글
ch17.2.3 Flow 라이브러리 예제 (0) | 2023.04.13 |
---|---|
ch17. 리액티브 프로그래밍 (0) | 2023.04.12 |
ch15. CompletableFuture와 리액티브 프로그래밍 컨셉의 기초 (0) | 2023.03.23 |
[ch4] 스트림 소개 (0) | 2022.09.19 |
[ch3] 람다 표현식 (2) (0) | 2022.09.03 |
댓글