Scala

Java의 CompletableFuture, Scala의 Future와 Promise

partner_jun 2017. 3. 28. 23:04

대부분의 언어에는 Event-Driven으로 구현이 가능해지는 Future나 Promise가 존재한다. 그 중 Java와 Scala에서의 Future와 Promise를 적어보려 한다.


Scala에서의 Future[T]는 '언젠가 사용 가능해지는 타입 T의 값', Promise[T]는 '언젠가 얻게 될 타입 T의 값'이라고 할 수 있다. 그래서 Future는 read-only, Promise는 writable하다고 한다. '언젠가'에 대한 행동이므로 비동기적인 처리를 뜻하고, Future를 위한 쓰레드가 필요하다.



자바 5의 Future는 위에서 이야기한 Future와 같다. Callable과 함께 추가되었던 Future<T>는 '언젠가 가지게 되는 타입 T의 값'이다. 하지만 결과를 받아 값을 사용하기 위해서 블로킹 메소드 get()을 호출해야 한다. 그래서 Event-Driven의 '무슨 일이 일어났을 때 그 이후에 행동하는' 구현이 모호하다. (물론 쓰레드를 만들어 체크한다거나 하는 방법은 있다.)



System.out.println("GO");

ExecutorService ex = Executors.newFixedThreadPool(2);
Future<Integer> future = ex.submit(() -> { // 2초 후 200을 가지게 되는 값
TimeUnit.SECONDS.sleep(2L);
return 200;
});

try {
Integer result = future.get();
System.out.println(result);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("END");

/*
GO
<2초 후>
200
END
*/

get()은 블로킹 메소드다.


자바 8에서는 람다의 추가와 함께 CompletableFuture가 추가되었다. 자바의 CompletableFuture는 Scala의 Future와 비슷하다. 먼저 언젠가 얻게 될 값을 정의한 후, 그 값을 얻었을 때 하는 행동을 정의한다. 자바 람다식을 사용하는 then~ 메소드가 있어 그 메소드를 호출하면 된다. 



System.out.println("GO");

CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> { // 2초 후 값을 가지게 된다.
try {
TimeUnit.SECONDS.sleep(2L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 200;
});
cf.thenAccept(System.out::println);

System.out.println("END");

/*
GO
END
<2초 후>
200
*/

cf.join(); // 메인 스레드가 종료되지 않게 블로킹

CompletableFuture의 thenAccept메소드에  파라미터를 입력받아 void를 리턴하는 메소드 중 하나인 System.out.println 메소드를 전달했다.

then~메소드의 리턴 값은 CompletableFuture이므로 메소드 체이닝이 가능하다.



스칼라의 Future는 훨씬 간단하게 사용할 수 있다. 간단한 문법뿐만 아니라 ExecutorContext를 암시적으로 선언함으로써 더 짧은 코드를 만들 수 있기 때문이다.



println("GO")

val future = Future { // 2초 후 값을 가지게 된다.
TimeUnit.SECONDS.sleep(2L)
"Hello World"
}

future.onComplete{ // future가 값을 얻었을 때
case Success(x) => println(x)
case Failure(x) => println(s"Fail : $x")
}

println("END")

/*
GO
END
<2초 후>
Hello World

*/

Scala Future의 간단한 사용 예


*  The result becomes available once the asynchronous computation is completed.
*
* @tparam T the type of the result
* @param body the asynchronous computation
* @param executor the execution context on which the future is run
* @return the `Future` holding the result of the computation
*/
def apply[T](body: =>T)(implicit @deprecatedName('execctx) executor: ExecutionContext): Future[T] =

Future Object의 Apply는 두번째 파라미터로 ExecutionContext를 암시적으로 입력받는다.




Finagle 라이브러리의 Future는 join이라는 함수를 이용해 퓨쳐끼리 합성할 수 있다. 스칼라 기본 라이브러리는 reduceLeft나 foldLeft같은 고차함수로 합성할 수 있다.


val futures: List[Future[Int]] = (0 to 9).map{ v => Future{ // 2초 후 값을 가지게되는 Future의 리스트
TimeUnit.SECONDS.sleep(2L)
v
}}.toList
// Future.reduceLeft는 Future들을 합성할 수 있다.
val result1: Future[Int] = Future.reduceLeft[Int, Int](futures)(_ + _) // futures의 값들을 합친다.
result1.foreach{ v => println(v)
println(System.currentTimeMillis() - cur)
}
/*
45
6388 // 코어 수에 따라 달라짐.
*/


Future.sequence 함수를 이용해 List[Future[T]]를 Future[List[T]]로 바꿀 수도 있다.

val futureList: Future[List[Int]] = Future.sequence(futures)
futureList.onComplete{
case Success(x) => println(x)
case Failure(x) => println(Nil)
}
/*
List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
*/


블로킹하고 Future의 값을 기다려야 한다면 Await 객체를 사용한다. 결과 값만이 필요하면 result 메소드, Future객체가 필요하다면 ready 메소드를 사용하면 된다.


import scala.concurrent.{Await, Future}
import scala.concurrent.duration._

val future = Future {
TimeUnit.SECONDS.sleep(2L)
"Hello World"
}
println("GO")
val result: String = Await.result(future, 10.seconds)
// val ready: Future[String] = Await.ready(future, 10.seconds)
println(result)
println("END")
/*
GO
<2초 후>
Hello World
End
*/






스칼라의 Promise는 Promise 객체를 이용해 Future를 만들고, Future에 대한 행동을 정의해둔 후 값을 얻었을 때 Promise에 그 값을 전달하는 방식으로 사용한다. 


val promise = Promise[String]

val promiseFuture = promise.future // future 객체를 만들어 행동을 정의한다.
promiseFuture.onComplete{
case Success(x) => println(x)
case Failure(x) => println(s"Fail : $x")
}

println("GO")
promise.success("Hello")
println("END")

/*
GO
Hello
END
*/


Promise는 단독적으로 쓰기보다 다른 Promise나 Future와 연쇄적인 호출이 필요할때 사용하면 유용하다.


val future = Future { // 2초 후 값을 가지게 되는 future
TimeUnit.SECONDS.sleep(2L)
1
}

val promise = Promise[Int]
promise.future.onComplete { // promise의 동작 정의
case Success(x) => println(x)
case Failure(x) => println(s"Fail : $x")
}

promise.completeWith(future) // future가 완료되면 promise도 완료됨.
/*
<2초 후>
1
*/


Future 객체의 andThen 메소드로도 같은 효과를 낼 수 있다. Promise를 주체로 보는지, Future를 주체로 보는 지의 차이다.

future.andThen{ // future가 완료되면
case Success(x) => promise.success(x) // promise에 전달
case Failure(x) => println(s"Fail : $x")
}

andThen의 리턴은 Future 객체이므로 메소드 체이닝이 가능하다.



스칼라와 자바의 Future 둘 모두 기본적으로 ExecutorService의 쓰레드풀을 사용한다. 그래서 코어 수에 따라 의도하지 않은 결과가 나타날 수 있다. Future나 Promise는 재활용이 불가능하기 때문에 매번 정의해야 하고, 그런 코드가 나열된다면 Actor를 사용하는 것이 더 나은 선택이 될 수 있을 것 같다.