피네이글 은 트위터의 RPC 시스템이다.
이 블로그 글 은 만들게 된 동기와 설계 원칙을, 피네이글 README 에는 더 자세한 설명이 있다. 피네이글은 튼튼한 클라이언트와 서버를 쉽게 만들려는 목적으로 쓰여졌다.
표준 스칼라에 포함되지 않는 다른 코드를 설명할 것이다. REPL을 사용해 학습을 하려 한다면, 트위터의 피네이글과 피네이글이 의존하는 여러 모듈에 대을 적재한 스칼라 REPL을 시작하는 방법이 궁금할 것이다.
피네이글 소스 코드가 필요하다.
소스가 finagle
이라는 디렉터리에 있다면, 다음과 같이 콘솔을 시작할 수 있다.
$ cd finagle $ git checkout master $ ./sbt "project finagle-http" console ...build output... scala>
피네이글은 com.twitter.util.Future
1 을 사용해 지연된 동작을 표시한다. Future는 아직 사용할 수 없는 값에 대한 핸들이다. 피네이글에서 비 동기식인 API의 반환값은 Future로 되어 있다. 동기식 API는 결과가 반환될 때까지 기다리게 되지만, 비동기의 경우에는 그렇지 않다. 예를 들어 인터넷상의 어떤 서비스에 HTTP 요청을 하면 0.5초가 지나야 값을 받을 수 있다고 하자. 0.5초나 프로그램이 블록되는 것을 바라지는 않을 것이다. 이런 “느린” API는 요청을 받으면 바로 Future를 반환하고, 나중에 값이 실제 구해지면 그때 이를 “채워넣을” 수 있다.
val myFuture = MySlowService(request) // returns right away ...do other things... val serviceResult = Await.result(myFuture) // blocks until service "fills in" myFuture
실제 상황에서는 요청을 보내고 바로 몇 문장 아래에서 myFuture.get
를 호출하지는 않는다. Future에는 콜백을 등록하는 메소드가 있다. 값이 사용 가능해지면 이 콜백이 호출된다.
다른 비동기 API를 써 본 사람이라면 바로 위의 "콜백"이라는 문장을 본 순간 움츠러들었을지도 모르겠다. 그런 사람은 "콜백"이란 단어를 어떤 함수가 실제 등록되는 시점에서 멀리 떨어진 다른 곳에 숨어있어서 거의 프로그램의 흐름 판독이 불가능했던 코드를 떠올렸으리라. 하지만 스칼라가 제공하는 1등 함수를 사용하면 Future를 사용하면서도 흐름을 파악하기 좋은 코드를 만들 수 있다. 간단한 처리 함수를 API 호출 시점에 바로 만들어서 넘길 수 있다.
요청을 디스패치하고 결과를 "처리"하는 아래 예를 보면, 코드가 한데 모여 있음을 볼 수 있다.
val future = dispatch(req) // returns immediately, but future is "empty" future onSuccess { reply => // when the future gets "filled", use its value println(reply) }
REPL에서 Future를 가지고 장난을 쳐 볼 수 있다. 실제 코드에서의 사용법을 배우기에 좋은 방법은 아니다. 하지만, API를 이해하는 데는 도움이 된다. REPL을 사용하면 Promise를 쉽게 사용할 수 있다. Promise는 추상 Future 클래스의 구체적인 하위 클래스이다. Promise를 사용해 아직 값이 없는 Future를 만들 수 있다.
scala> import com.twitter.util.{Await, Future,Promise} import com.twitter.util.{Await, Future, Promise} scala> val f6 = Future.value(6) // create already-resolved future f6: com.twitter.util.Future[Int] = com.twitter.util.ConstFuture@c63a8af scala> Await.result(f6) res0: Int = 6 scala> val fex = Future.exception(new Exception) // create resolved sad future fex: com.twitter.util.Future[Nothing] = com.twitter.util.ConstFuture@38ddab20 scala> Await.result(fex) java.lang.Exception ... stack trace ... scala> val pr7 = new Promise[Int] // create unresolved future pr7: com.twitter.util.Promise[Int] = Promise@1994943491(...) scala> Await.result(pr7) ...console hangs, waiting for future to resolve... Ctrl-C Execution interrupted by signal. scala> pr7.setValue(7) scala> Await.result(pr7) res1: Int = 7 scala>
실제 코드에서 Future를 사용할 때는 보통 get
을 호출하지 않는다. 대신 콜백 함수를 사용한다. get
은 REPL에서 이것저것 두드려볼 때나 손쉽게 쓸 수 있는 것이다.
Future에도 컬렉션 API의 콤비네이터와와 유사한 콤비네이터가 있다(예: map, flatMap).
기억을 되살려 보자. 컬렉션의 콤비네이터를 사용하면 “정수의 리스트와 정수를 제곱하는 squre함수가 있다. 그 함수를 내 정수 리스트에 적용해서 제곱된 값의 리스트를 구하자” 같은 표현이 가능하다. 아주 깔끔하다. 콤비네이터 함수와 다른 함수들을 함께 조합하면 새로운 함수를 정의하는 효과를 얻는다. Future 콤비네이터로는 “나는 미래에 정수가 될 Future가 있고, squire가 있다. 이 함수를 Future에 적용해서 이 미래에 정수가 될 잠재적 정수의 제곱을 구하자”라고 할 수 있다.
만약 비동기 API를 정의한다면, 요청 값이 API에 들어오고, API는 Future로 둘러싸인 응답을 돌려줄 것이다. 따라서 입력과 함수를 Future로 바꿔주는 콤비네이터가 있다면 아주 유용할 것이다. 이를 사용하면 비동기 API를 다른 동기식 API를 기반으로 정의할 수 있기 때문이다.
가장 중요한 Future
콤비네이터는 flatMap
2 이다.
def Future[A].flatMap[B](f: A => Future[B]): Future[B]
flatMap
은 두 Future를 순서대로 연결한다. 즉, 한 Future와 비동기식 함수를 받아서 다른 Future를 반환한다. flatMap 메소드의 시그니쳐가 말하는 바 대로이다.
앞의 Future가 성공해서 값을 반환하면, 함수 f
는 다음 Future
를 제공한다.
flatMap
은 입력 Future가 성공적으로 완료된 경우에 자동으로 f
를 호출한다. 이 호출의 결과는 새로운 Future
이며, 이는 두 Future(입력 Future와 비동기 함수)가 모두 성공적으로 완료된 경우에만 완료된다.
두 Future
중 하나라도 실패하면 flatMap의 결과값으로 나온 Future
도 또한 실패할 것이다.
묵시적으로 오류를 넘기는 것을 통해 의미상 중요한 경우에만 오류를 처리할 수 있다. flatMap
은 이런 의미를 가지는 콤비네이터를 정의할 때 표준적으로 사용하는 이름이다.
Future가 있고, 그 결과에 비동기 API를 젹용할 생각이면 flatMap을 써라. 예를 들어 Future[User]가 있고 어떤 사용자 계정이 사용중지되었는지를 표시하는 Future[Boolean]이 필요하다 하자. 어떤 사용자가 사용정지상태인지를 파악하는 isBanned
API가 있을 것이다. 그런데, 이 함수가 비동기적이라 하자. 이럴 때 flatMap을 쓸 수 있다.
scala> import com.twitter.util.{Await, Future,Promise} import com.twitter.util.{Await, Future, Promise} scala> class User(n: String) { val name = n } defined class User scala> def isBanned(u: User) = { Future.value(false) } isBanned: (u: User)com.twitter.util.Future[Boolean] scala> val pru = new Promise[User] pru: com.twitter.util.Promise[User] = Promise@897588993(...) scala> val futBan = pru flatMap isBanned // apply isBanned to future futBan: com.twitter.util.Future[Boolean] = Promise@1733189548(...) scala> Await.result(futBan) ...REPL hangs, futBan not resolved yet... Ctrl-C Execution interrupted by signal. scala> pru.setValue(new User("prudence")) scala> Await.result(futBan) res45: Boolean = false scala>
이와 비슷하게 동기식 함수를 Future에 적용하려면 map을 사용하면 된다.
예를 들어 Future[RawCredentials]가 있고 Future[Credentials]이 필요하다 치자. 동기식 함수 normalize
가 RawCredentials를 Credentials로 바꿔준다면 map
을 쓸 수 있다.
scala> class RawCredentials(u: String, pw: String) { | val username = u | val password = pw | } defined class RawCredentials scala> class Credentials(u: String, pw: String) { | val username = u | val password = pw | } defined class Credentials scala> def normalize(raw: RawCredentials) = { | new Credentials(raw.username.toLowerCase(), raw.password) | } normalize: (raw: RawCredentials)Credentials scala> val praw = new Promise[RawCredentials] praw: com.twitter.util.Promise[RawCredentials] = Promise@1341283926(...) scala> val fcred = praw map normalize // apply normalize to future fcred: com.twitter.util.Future[Credentials] = Promise@1309582018(...) scala> Await.result(fcred) ...REPL hangs, fcred doesn't have a value yet... Ctrl-C Execution interrupted by signal. scala> praw.setValue(new RawCredentials("Florence", "nightingale")) scala> Await.result(fcred).username res48: String = florence scala>
스칼라에는 flatMap을 호출하는 것을 문법적으로 간편하게 해주는 for
컴프리헨션이 있다.
로그인 요청을 비동기 API를 통해 인증하고 해당 사용자가 사용 정지 중인지를 비동기 API를 통해 검사한다 하자. 컴프리헨션을 사용하면 다음과 같이 쓸 수 있다.
scala> def authenticate(req: LoginRequest) = { | // TODO: we should check the password | Future.value(new User(req.username)) | } authenticate: (req: LoginRequest)com.twitter.util.Future[User] scala> val f = for { | u <- authenticate(request) | b <- isBanned(u) | } yield (u, b) f: com.twitter.util.Future[(User, Boolean)] = Promise@35785606(...) scala>
위 코드에서 f: Future[(User, Boolean)]
를 사용자 객체와 사용자가 정지중인지를 표시하는 부울 값을 사용해 만든다. 여기서 순차적 합성이 어떻게 이루어졌나를 살펴 보라. isBanned
는 authenticate
의 출력을 인자로 받는다.
둘 이상의 서비스에서 동시에 데이터를 가져올 때도 있다. 예를 들어 컨텐츠와 광고를 보여주는 웹 서비스를 만든다고 하자. 컨텐츠는 한 서비스에서 가져오고 광고는 다른 곳에서 가져올 것이다. 이럴때 어떻게 두 서비스를 모두 기다리도록 해야 할까? 직접 코드를 작성한다면 약간 어려운 작업이 될 것이다. 하지만, 이럴 때 동시성 콤비네이터를 쓸 수 있다.
Future
에는 동시성 콤비네이터가 몇가지 포함되어 있다. 이들은 여러 Future
를 하나의 Future
로 묶는데, 각각이 약간 다른 방식을 취한다. 이를 활용하면 여러 Future를 하나의 Future로 묶을 수 있기 때문에 좋다.
object Future { … def collect[A](fs: Seq[Future[A]]): Future[Seq[A]] def join(fs: Seq[Future[_]]): Future[Unit] def select(fs: Seq[Future[A]]) : Future[(Try[A], Seq[Future[A]])] }
collect
는 같은 타입 Future[A]
열을 받아서 A 열(Seq[A])을 반환하는 Future
를 만든다. 이 결과 Future는 모든 기반 Future들이 완료되거나 그 중 하나가 실패한 경우 완료된다. 반환되는 열의 순서는 인자로 넘긴 Future들의 순서와 같다.
scala> val f2 = Future.value(2) f2: com.twitter.util.Future[Int] = com.twitter.util.ConstFuture@13ecdec0 scala> val f3 = Future.value(3) f3: com.twitter.util.Future[Int] = com.twitter.util.ConstFuture@263bb672 scala> val f23 = Future.collect(Seq(f2, f3)) f23: com.twitter.util.Future[Seq[Int]] = Promise@635209178(...) scala> val f5 = f23 map (_.sum) f5: com.twitter.util.Future[Int] = Promise@1954478838(...) scala> Await.result(f5) res9: Int = 5
join
는 여러 타입의 Future
로 이루어진 열을 받아서 Future[Unit]
를 만들어낸다. 이 Future는 모든 내부 Future가 종료되거나, 그 중 하나에서 오류가 발생하면 끝난다. 이는 여러 연산이 모드 끝나는 시점을 알아야 할 때 유용하다. 앞의 컨텐츠/광고 예제에 이를 활용할 수 있을 것이다.
scala> val ready = Future.join(Seq(f2, f3)) ready: com.twitter.util.Future[Unit] = Promise@699347471(...) scala> Await.result(ready) // doesn't ret value, but I know my futures are done scala>
select
는 주어진 Future
중 어느 하나가 종료되면 바로 종료되는 Future
를 반환한다. 이 Future
는 결과값과 함께 아직 종료되지 않은 나머지 Future를 담고 있는 열을 반환한다. (나머지 Future에 대해서는 아무 짓도 하지 않고 돌려준다. 따라서 응답을 계속 기다리려면 기다릴 수도 있고, 응답을 무시하려면 그렇게 할 수도 있다.)
scala> val pr7 = new Promise[Int] // unresolved future pr7: com.twitter.util.Promise[Int] = Promise@1608532943(...) scala> val sel = Future.select(Seq(f2, pr7)) // select from 2 futs, one resolved sel: com.twitter.util.Future[...] = Promise@1003382737(...) scala> val(complete, stragglers) = Await.result(sel) complete: com.twitter.util.Try[Int] = Return(2) stragglers: Seq[...] = List(...) scala> Await.result(complete) res110: Int = 2 scala> Await.result(stragglers(0)) // our list of not-yet-finished futures has one item ...Await.result() hangs the REPL because this straggling future is not finished... Ctrl-C Execution interrupted by signal. scala> pr7.setValue(7) scala> Await.result(stragglers(0)) res113: Int = 7 scala>
이런 콤비네이터들은 네트워크 서비스에 있어 전형적인 연산을 표현한다. 다음의 (가상의) 코드는 사용자 대신 요청을 뒷단에 디스패치하면서 동시에 비율 제한(지역 비율 제한 캐시를 유지하기 위해 사용함)을 수행한다.
// 사용자가 비율 제한에 걸렸는지를 본다. 이 과정은 시간이 오래 걸릴 수 있다. // 원격 서버에게 사용자의 제한 여부를 물어봐야 하기 때문이다 def isRateLimited(u: User): Future[Boolean] = { ... } // 이 구현을 들어내고 다른 더 제약이 많은 정책을 구현하도록 할 수 있는지 한번 생각해 보라 // 캐시를 검사해 사용자가 비율 제한에 걸렸는지를 본다. 캐시는 단순한 맵으로 되어 있다. // 따라서 값을 바로 반환한다. 하지만, 혹시 나중에 더 느린 캐시 구현을 사용할 수도 있으므로 // 어쨌든 Future를 반환하도록 한다 def isLimitedByCache(u: User): Future[Boolean] = Future.value(limitCache(u)) // 캐시를 업데이트한다 def setIsLimitedInCache(user: User, v: Boolean) { limitCache(user) = v } // 트윗 타임라인을 가져온다. 단, 사용자가 비율 제한에 걸려있지 않다면. // (만약 비율제한에 걸려있다면 예외를 발생시킨다) def getTimeline(cred: Credentials): Future[Timeline] = isLimitedByCache(cred.user) flatMap { case true => Future.exception(new Exception("rate limited")) case false => // 인증을 한 다음에, 타임라인을 가져온다. // 비동기 API를 순차적으로 결합하는 것이므로 flatMap을 쓴다. val timeline = auth(cred) flatMap(getTimeline) val limited = isRateLimited(cred.user) onSuccess( setIsLimitedInCache(cred.user, _)) // 'join'으로 타입이 다른 Future들을 동시에 엮는다. // 'flatMap'은 각 Future간의 순서를 지정해준다. timeline join limited flatMap { case (_, true) => Future.exception(new Exception("rate limited")) case (timeline, _) => Future.value(timeline) } } }
이 가상의 예제는 순차와 동시 합성을 함께 사용한다. 비율 제약이 걸려 예외가 발생하는 경우를 빼고는 아무 오류 처리가 없다는 점을 확인해 보라. 사용된 Future중 어느 하나가 실패한다면, 그 사실은 반환되는 Future
에 자동으로 전달된다.
콤비네이터를 Future와 사용하는 방법을 보긴 했지만, 예제가 더 있으면 좋겠다고 생각했을 것이다. 인터넷을 단순화한 모형을 생각해보자. 그 모형에는 HTML 페이지와 이미지가 존재한다. 페이지에는 이미지나 다른 페이지에 대한 링크가 들어 있다. 페이지나 이미지를 가져올 수 있지만 비동기적인 API를 활용해야만 한다. 이 API에서 “가져올 수 있는” 것을 자원(Resource)이라 부른다.
import com.twitter.util.{Try,Future,Promise} // 가져올 수 있는 것들 trait Resource { def imageLinks(): Seq[String] def links(): Seq[String] } // HTML 페이지는 다른 Img나 다른 Page에 대한 링크를 포함할 수 있다. class HTMLPage(val i: Seq[String], val l: Seq[String]) extends Resource { def imageLinks() = i def links = l } // IMG에는 다른 자원을 링크하는 부분은 없다. class Img() extends Resource { def imageLinks() = Seq() def links() = Seq() } // profile.html 안에는 gallery.html과 portrait.jpg에 대한 링크가 있다. val profile = new HTMLPage(Seq("portrait.jpg"), Seq("gallery.html")) val portrait = new Img // gallery.html은 profile.html과 두 이미지를 링크한다. val gallery = new HTMLPage(Seq("kitten.jpg", "puppy.jpg"), Seq("profile.html")) val kitten = new Img val puppy = new Img val internet = Map( "profile.html" -> profile, "gallery.html" -> gallery, "portrait.jpg" -> portrait, "kitten.jpg" -> kitten, "puppy.jpg" -> puppy ) // fetch(url)을 호출하면 인터넷 모델에서 자원을 가져온다. // 반환된 Future에는 예외나 Resource가 들어가게 될 것이다. def fetch(url: String) = { new Promise(Try(internet(url))) }
순차 합성
어떤 페이지 주소가 주어지면 그 페이지의 첫번째 이미지를 가져오고 싶다고 하자. 예를 들어 사람들이 관심 페이지를 올리는 사이트를 만든다면 이런 기능이 필요할 것이다. 다른 사용자가 어떤 링크를 따라가 봐야 할지 결정할 때 도움이 되도록 썸네일 이미지를 표시해 주고 싶어서이다.
콤비네이터를 모르는 사람이라도 썸네일을 가져오는 함수를 만드는 것은 얼마든지 가능하다.
def getThumbnail(url: String): Future[Resource]={ val returnVal = new Promise[Resource] fetch(url) onSuccess { page => // callback for successful page fetch fetch(page.imageLinks()(0)) onSuccess { p => // callback for successful img fetch returnVal.setValue(p) } onFailure { exc => // callback for failed img fetch returnVal.setException(exc) } } onFailure { exc => // callback for failed page fetch returnVal.setException(exc) } returnVal }
이 함수도 물론 잘 작동한다. 대부분의 코드는 Future를 벗겨내서 이를 다른 Future에 넣는 과정으로 되어 있다.
페이지를 가져와서 그 페이지에서 이미지를 얻고 싶다. A를 하고 그 후 B를 해야 한다고 말한다면, 이는 순차 합성이라는 의미이다. B가 비동기적인 경우 flatMap을 쓰면 된다.
def getThumbnail(url: String): Future[Resource] = fetch(url) flatMap { page => fetch(page.imageLinks()(0)) }
…동시 합성도 함께 사용
어떤 페이지의 첫 이미지를 가져오는 것은 좋다. 하지만 모든 이미지를 가져와서 사용자가 그 중 원하는 것을 고르게 한다면 더 좋을 것이다. 각 이미지를 차례로 가져오도록 for
루프를 돌릴 수도 있다. 하지만 시간이 오래 걸린다. “병렬적으로” 여러 이미지를 받으면 더 좋을 것이다. "병렬적으로"라는 말은 동시 합성을 의미한다. 따라서 Future.collect를 사용해 이미지를 모두 가져오게 할 수 있다.
def getThumbnails(url:String): Future[Seq[Resource]] = fetch(url) flatMap { page => Future.collect( page.imageLinks map { u => fetch(u) } ) }
수긍이 되는가? 좋다. page.imageLinks map { u => fetch(u) }
부분이 마음에 걸릴지도 모르겠다. 여기서는 map
뒤의 무명 함수가 반환하는 것은 Future이다. 다음번에 오는 것이 비동기적이면 flatMap을 쓰라고 하지 않았나? 하지만, map
의 앞에 있는게 뭔지 자세히 보라. 그것은 Future가 아니고 컬렉션이다. 컬렉션의 map은 컬렉션을 반환한다. 그리고 나서, Future.collect를 사용해 Future의 컬렉션을 하나의 Future로 합쳤다.
동시성+재귀호출
어떤 페이지이 있는 이미지를 가져오는 대신, 그 페이지가 링크하고 있는 다른 페이지들을 가져오고 싶다고 하자. 이를 재귀적으로 반복하면 간단한 웹 크롤러가 된다.
// Return def crawl(url: String): Future[Seq[Resource]] = fetch(url) flatMap { page => Future.collect( page.links map { u => crawl(u) } ) map { pps => pps.flatten } } crawl("profile.html") ...hangs REPL, infinite loop... Ctrl-C Execution interrupted by signal. scala> // She's gone rogue, captain! Have to take her out! // Calling Thread.stop on runaway Thread[Thread-93,5,main] with offending code: // scala> crawl("profile.html")
실용적 측면에서 볼 때 이 크롤러는 그리 유용하지 않다. 크롤링을 멈출 시점도 지정하지 않았고, 바로 직전에 가져왔던 자원도 기꺼이 반복적으로 가져오게 된다.
피네이글 Service
는 RPC를 처리하는 서비스를 표현한다. 서비스는 요청을 받아 응답 해준다. 서비스는 요청과 응답 타입에 따라 정의되는 Req => Future[Rep]
타입의 함수이다.
abstract class Service[-Req, +Rep] extends (Req => Future[Rep])
클라이언트와 서버를 Service를 기반으로 정의한다.
피네이글 클라이언트는 네트워크에서 서비스를 "수입"한다. 개념상 피네이글 클라이언트는 두 부분으로 구성된다.
Req
을 디스패치하고 Future[Rep]
를 처리한다.api.twitter.com
의 80포트로 HTTP요청을 보내는 등의 정보를 말한다.마찬가지로 피네이글 서버는 네트워크로 서비스를 "수출"한다. 서버도 두 부분으로 이루어진다.
Req
을 받아서 Future[Rep]
를 반환한다.이렇게 하면 서비스 "로직"을 네트워크에서 정보가 흐르는 방법에 대한 설정과 분리할 수 있다.
피네이글 "필터"에 대해서도 설명하자. 필터는 서비스와 서비스 사이에 있으면서, 거쳐가는 데이터를 변경해준다. 필터와 서비스는 쉽게 결합 가능하다. 예를 들어 비율 제한 필터와 트윗을 제공하는 서비스가 있다면, 이를 함께 엮어서 비율이 제한된 트윗 제공 서비스를 만들 수 있다.
피네이글 클라이언트는 서비스를 "수입"한다. 또한 네트워크를 통해 정보를 보내는 방법에 대한 설정도 포함한다. 다음은 간단한 HTTP 클라이언트의 예이다.
import org.jboss.netty.handler.codec.http.{DefaultHttpRequest, HttpRequest, HttpResponse, HttpVersion, HttpMethod} import com.twitter.finagle.Service import com.twitter.finagle.builder.ClientBuilder import com.twitter.finagle.http.Http // 걱정하지 말라. 나중에 마법 클래스 "ClientBuilder"에 대해 다룰 것이다 val client: Service[HttpRequest, HttpResponse] = ClientBuilder() .codec(Http()) .hosts("twitter.com:80") // 호스트가 >1 라면, 클라이언트가 간단한 부하 균등화를 수행한다. .hostConnectionLimit(1) .build() val req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/") val f = client(req) // Client야 요청을 보내렴! // 응답을 처리한다 f onSuccess { res => println("got response", res) } onFailure { exc => println("failed :-(", exc) }
서버는 Service의 측면에서 설계되며. 네트워크에서 어떻게 요청을 “들을 수” 있는지에 대한 설정도 포함하고 있다. 간단한 HTTP 서버는 다음과 같다.
import com.twitter.finagle.Service import com.twitter.finagle.http.Http import com.twitter.util.Future import org.jboss.netty.handler.codec.http.{DefaultHttpResponse, HttpVersion, HttpResponseStatus, HttpRequest, HttpResponse} import java.net.{SocketAddress, InetSocketAddress} import com.twitter.finagle.builder.{Server, ServerBuilder} import com.twitter.finagle.builder.ServerBuilder // Define our service: OK response for root, 404 for other paths val rootService = new Service[HttpRequest, HttpResponse] { def apply(request: HttpRequest) = { val r = request.getUri match { case "/" => new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK) case _ => new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND) } Future.value(r) } } // Serve our service on a port val address: SocketAddress = new InetSocketAddress(10000) val server: Server = ServerBuilder() .codec(Http()) .bindTo(address) .name("HttpServer") .build(rootService)
이 예제에서는 사용하지 않지만, 필수적으로 지정해야 하는 `name`은 프로파일링과 디버깅시 유용하다.
필터는 서비스를 변환한다. 필터는 일반적인 서비스기능을 제공한다. 예를 들어 비율 제한을 지원해야 하는 서비스가 여럿 있을 수도 있다. 이럴때 비율을 제한하는 필터를 하나 작성해 모든 서비스에 적용할 수 있다. 또한 서비스를 여러 별개의 단계로 나누어 구성할 때도 필터를 유용하게 쓸 수 있다.
간단한 프락시를 다음과 같이 만들 수 있을 것이다.
class MyService(client: Service[..]) extends Service[HttpRequest, HttpResponse] { def apply(request: HttpRequest) = { client(rewriteReq(request)) map { res => rewriteRes(res) } } }
여기서 rewriteReq
와 rewriteRes
는 프로토콜 변환을 제공할 수 있다. 예를 들어,
abstract class Filter[-ReqIn, +RepOut, +ReqOut, -RepIn] extends ((ReqIn, Service[ReqOut, RepIn]) => Future[RepOut])
위의 타입은 다음과 같이 다이어그램으로 표시하면 더 보기 좋다.
((ReqIn, Service[ReqOut, RepIn]) => Future[RepOut]) (* Service *) [ReqIn -> (ReqOut -> RepIn) -> RepOut]
아래 필터는 타임아웃 기능을 제공한다.
class TimeoutFilter[Req, Rep]( timeout: Duration, exception: RequestTimeoutException, timer: Timer) extends Filter[Req, Rep, Req, Rep] { def this(timeout: Duration, timer: Timer) = this(timeout, new IndividualRequestTimeoutException(timeout), timer) def apply(request: Req, service: Service[Req, Rep]): Future[Rep] = { val res = service(request) res.within(timer, timeout) rescue { case _: java.util.concurrent.TimeoutException => res.cancel() Trace.record(TimeoutFilter.TimeoutAnnotation) Future.exception(exception) } } }
이 예는 (인증 서비스를 활용해) 인증을 수행하는 방법을 보여준다. Service[AuthHttpReq, HttpRep]
를 Service[HttpReq, HttpRep]
로 변환하기 위해 사용할 수 있다.
class RequireAuthentication(authService: AuthService) extends Filter[HttpReq, HttpRep, AuthHttpReq, HttpRep] { def apply( req: HttpReq, service: Service[AuthHttpReq, HttpRep] ) = { authService.auth(req) flatMap { case AuthResult(AuthResultCode.OK, Some(passport), _) => service(AuthHttpReq(req, passport)) case ar: AuthResult => Future.exception( new RequestUnauthenticated(ar.resultCode)) } } }
필터를 이런 식으로 사용하면 좋은 점이 몇가지 있다. “인증 로직”을 한군데에 분리해 둘 수 있게 된다. 인증된 요청에 대해 타입을 별도로 가져가면 프로그램의 보안성을 더 쉽게 파악할 수 있다.
필터는 andThen
를 사용해 서로 합성할 수 있다. Service
를 andThen
에 인자로 주면 (필터가 적용된) Service
가 된다(설명을 위해 타입을 일일이 적어두었다).
val authFilter: Filter[HttpReq, HttpRep, AuthHttpReq, HttpRep] val timeoutfilter[Req, Rep]: Filter[Req, Rep, Req, Rep] val serviceRequiringAuth: Service[AuthHttpReq, HttpRep] val authenticateAndTimedOut: Filter[HttpReq, HttpRep, AuthHttpReq, HttpRep] = authFilter andThen timeoutFilter val authenticatedTimedOutService: Service[HttpReq, HttpRep] = authenticateAndTimedOut andThen serviceRequiringAuth
빌드는 모두를 한데 묶어준다. ClientBuilder
에 인자를 몇가지 지정해 주면 Service
가 나오고, ServerBuilder
에 Service
인스턴스를 받아서 외부에서 들어오는 요청을 그 서비스에 전달해준다. Service
의 유형을 결정하기 위해서 Codec
을 지정해야만 한다. 코덱은 기반이 되는 프로토콜 구현을 제공한다(예. HTTP, thrift, memcached). 두 빌더 모두 많은 매개변수를 받을 수 있기는 하만, 꼭 필요한 것은 그리 많지 않다.
아래는 ClientBuilder
를 호출하는 예이다(설명을 위해 타입을 일일이 적어두었다).
val client: Service[HttpRequest, HttpResponse] = ClientBuilder() .codec(Http) .hosts("host1.twitter.com:10000,host2.twitter.com:10001,host3.twitter.com:10003") .hostConnectionLimit(1) .tcpConnectTimeout(1.second) .retries(2) .reportTo(new OstrichStatsReceiver) .build()
위 코드는 세 호스트로 부하를 균등화하는 클라이언트를 만든다. 클라이언트는 각 호스트에 연결을 최대 하나만 열고, 실패가 두번 발생하면 포기한다. 상태는 ostrich 에 전달된다. hosts
나 cluster
중 하나와 codec
, 그리고 hostConnectionLimit
는 필수적으로 지정해야만 한다(정적으로 이 요소들이 지정되었는지 여부가 검사된다).
비슷한 방식으로 ServerBuilder
를 사용해 서비스가 들어오는 요청을 “들을” 수 있도록 할 수 있다.
val service = new MyService(...) // 피네이글 서비스의 인스턴스이다. var filter = new MyFilter(...) // 필터도 있을 수 있다. var filteredServce = filter andThen service val server = ServerBuilder() .bindTo(new InetSocketAddress(port)) .codec(ThriftServerFramedCodec()) .name("my filtered service") // .hostConnectionMaxLifeTime(5.minutes) // .readTimeout(2.minutes) .build(filteredService)
이렇게 하면 port 상에, service로 요청을 전달하는 Thrift 서버를 만들게 된다. hostConnectionMaxLifeTime
부분의 주석을 제거하면 각 연결이 최대 5분간만 살아있도록 제한하게 된다.
readTimeout
의 주석을 제거하면, 요청이 2분 이내에 들어와야만 처리하게 된다. 필수적인 ServerBuilder
옵션은 name
, bindTo
, codec
이다.
피네이글은 서비스가 부드럽게 동작하도록 자동으로 쓰레드를 관리한다. 어느 한 서비스가 블록되면 모든 피네이글 쓰레드가 블록된다.
apply
나 get
)을 호출한다면, Future Pool을 사용해 그 블록되는 코드를 감싸라. 이렇게 하면 블록킹 연산이 자체 쓰레드 풀 안에서 실행되고, Future를 통해 완료(또는 실패)시점을 알 수 있게 된다. 또한 이 Future는 다른 Future와 함성할 수 있다.1 경고. 다른 “Future” 클래스도 존재한다. com.twitter.util.Future
을 scala.concurrent.Future
나 java.util.concurrent.Future
와 혼동하지 말라!
2 flatMap
이 모나드의 바인드(bind) 연산이다. 타입 시스템이나 카테고리 이론을 공부하는 독자라면 이해하리라 본다.