스칼라와 앞에서 논의한 피네이글 프레임워크를 사용해 간단한 분산 검색 엔진을 만들고자 한다.

설계 목표: 전체 그림

넓게 보면, 우리의 설계 목표에는 추상화 (내부 구조를 몰라도 만들어진 시스템을 사용할 수 있어야 한다), 모듈화 (시스템을 이해하기 쉽고 대치하기 쉬운 더 작은 덩어리들로 나눈다), 그리고 확장성 (쉽게 시스템의 용량을 확대할 수 있어야 한다)이 포함된다.

여기서 설명할 시스템은 세 부분으로 되어있다. (1) 클라이언트 는 요청을 (2) 서버 에 보낸다. _서버_는 그에 대해 응답한다. (3) 전송 메카니즘이 있어서 이런 통신을 감싸고 있다. 보통 클라이언트와 서버는 서로 다른 기계에 존재하며 특정 포트 로 네트워크를 통해 통신한다. 하지만 이 예제에서 서버와 클라이언트는 같은 기계에 존재하한다(하지만, 통신은 여전히 특정 포트를 사용한다). 예제에서 클라이언트와 서버는 모두 스칼라로 작성되어 있으며, 전송은 쓰리프트 를 사용해 이루어진다. 이 자습서의 목적은 성능 향상을 위해 확장 가능한 서버와 클라이언트를 보여주는 것이다.

기본 시작 프로젝트 살펴보기

우선, 뼈대 프로젝트(“Searchbird”)를 scala-bootstrapper 를 이용해 만들자. 이 명령어는 메모리 내장 키-값 저장소를 외부로 노출시키는 간단한 피네이글 기반 스칼라 서비스를 만든다. 이를 사용해 값을 검색하도록 확장할 것이다. 그리고 다시 이를 여러 프로세스들이 제공하는 여러 메모리 저장소에서 검색 가능 하도록 확장할 것이다.

$ mkdir searchbird ; cd searchbird
$ scala-bootstrapper searchbird
writing build.sbt
writing config/development.scala
writing config/production.scala
writing config/staging.scala
writing config/test.scala
writing console
writing Gemfile
writing project/plugins.sbt
writing README.md
writing sbt
writing src/main/scala/com/twitter/searchbird/SearchbirdConsoleClient.scala
writing src/main/scala/com/twitter/searchbird/SearchbirdServiceImpl.scala
writing src/main/scala/com/twitter/searchbird/config/SearchbirdServiceConfig.scala
writing src/main/scala/com/twitter/searchbird/Main.scala
writing src/main/thrift/searchbird.thrift
writing src/scripts/searchbird.sh
writing src/scripts/config.sh
writing src/scripts/devel.sh
writing src/scripts/server.sh
writing src/scripts/service.sh
writing src/test/scala/com/twitter/searchbird/AbstractSpec.scala
writing src/test/scala/com/twitter/searchbird/SearchbirdServiceSpec.scala
writing TUTORIAL.md

우선 scala-bootstrapper 가 만든 기본 프로젝트를 살펴보자. 이 프로젝트는 템플릿 역할을 하도록 만들어져 있다. 결국 대부분은 변경해야 할 것이다. 하지만, 사용하기 편한 뼈대 역할을 할 수 있다. 간단한 (하지만 완전한) 키-값 저장소가 정의되어 있다. 설정과 쓰리프트 인터페이스, 로그 기능 모두 포함되어 있다.

코드를 보기 전에 서버와 클라이언트를 실행하고 동작하는 모습을 보자. 다음은 우리가 만들고 있는 것이다.

검색조 구현. 리비전 1

그리고 여기 우리 서비스가 노출하고 있는 인터페이스가 있다. 검색조 서비스는 쓰리프트 서비스이기도 하다(다른 대부분의 우리 서비스도 마찬가지이다). 외부 인터페이스는 쓰리프트 IDL로 정의되어 있다.

src/main/thrift/searchbird.thrift
service SearchbirdService {
  string get(1: string key) throws(1: SearchbirdException ex)

  void put(1: string key, 2: string value)
}

이 부부은 아주 단순하다. 서비스 SearchbirdService 는 2개의 RPC 메소드 getput 을 노출한다. 이 둘은 간단한 키-값 저장소의 인터페이스를 구성한다.

이제 기본 서비스를 실행하고, 그 서비스에 접속하는 클라이언트를 실행하자. 그리고 위의 인터페이스를 통해 서비스를 탐험해 보자. 창을 두개 열어서 하나는 서버, 하나는 클라이언트를 실행할 것이다.

첫번째 창에서 sbt를 대화식으로 시작하라 (./sbt 를 명령행1에서 실행한다). 그리고 sbt로 프로젝트를 빌드해 실행하자. 그러면 Main.scalamain 루틴이 실행된다.

$ ./sbt
...
> compile
> run -f config/development.scala
...
[info] Running com.twitter.searchbird.Main -f config/development.scala

설정 파일(development.scala)은 새 서비스를 활성화하고 로컬 기계의 포트 9999에 이를 노출한다. 클라이언트는 9999번 포트에 연결해 서비스와 통신할 수 있다.

이제 제공된 console 셀 스크립트를 활용해 클라이언트를 실행하자. 이 스크립트는 @SearchbirdConsoleClient@의 인스턴스를 (SearchbirdConsoleClient.scala로부터) 만든다. 이 스크립트를 다른 창에서 실행하라.

$ ./console 127.0.0.1 9999
[info] Running com.twitter.searchbird.SearchbirdConsoleClient 127.0.0.1 9999
'client' is bound to your thrift client.

finagle-client>

클라이언트 객체 @client@는 이제 9999 포트에 접속했으며, 앞에서 같은 포트에 실행했던 서버와 통신이 가능하다. 이제 요청을 몇 개 보내 보자.

scala> client.put("marius", "Marius Eriksen")
res0: ...

scala> client.put("stevej", "Steve Jenson")
res1: ...

scala> client.get("marius")
res2: com.twitter.util.Future[String] = ...

scala> Await.result(client.get("marius"))
res3: String = Marius Eriksen

(마지막 줄의 두번째 Await.result() 호출은 client.get()의 결과인 Future 타입의 객체를 값이 준비될 때까지 블록시킨다.)

서버는 런타임 통계 정보도 제공한다(설정 파일에는 9000번 포트에서 이를 찾도록 되어 있다). 이 통계는 개별 서버를 관찰하거나, 전체 서비스의 통계를 내기 위해 통합될 수 있다(이를 위해 기계가 읽을 수 있는 JSON 인터페이스도 제공한다). 세번째 창을 열어서 이 통계를 확인해 보자.

$ curl localhost:9900/stats.txt
counters:
  Searchbird/connects: 1
  Searchbird/received_bytes: 264
  Searchbird/requests: 3
  Searchbird/sent_bytes: 128
  Searchbird/success: 3
  jvm_gc_ConcurrentMarkSweep_cycles: 1
  jvm_gc_ConcurrentMarkSweep_msec: 15
  jvm_gc_ParNew_cycles: 24
  jvm_gc_ParNew_msec: 191
  jvm_gc_cycles: 25
  jvm_gc_msec: 206
gauges:
  Searchbird/connections: 1
  Searchbird/pending: 0
  jvm_fd_count: 135
  jvm_fd_limit: 10240
  jvm_heap_committed: 85000192
  jvm_heap_max: 530186240
  jvm_heap_used: 54778640
  jvm_nonheap_committed: 89657344
  jvm_nonheap_max: 136314880
  jvm_nonheap_used: 66238144
  jvm_num_cpus: 4
  jvm_post_gc_CMS_Old_Gen_used: 36490088
  jvm_post_gc_CMS_Perm_Gen_used: 54718880
  jvm_post_gc_Par_Eden_Space_used: 0
  jvm_post_gc_Par_Survivor_Space_used: 1315280
  jvm_post_gc_used: 92524248
  jvm_start_time: 1345072684280
  jvm_thread_count: 16
  jvm_thread_daemon_count: 7
  jvm_thread_peak_count: 16
  jvm_uptime: 1671792
labels:
metrics:
  Searchbird/handletime_us: (average=9598, count=4, maximum=19138, minimum=637, p25=637, p50=4265, p75=14175, p90=19138, p95=19138, p99=19138, p999=19138, p9999=19138, sum=38393)
  Searchbird/request_latency_ms: (average=4, count=3, maximum=9, minimum=0, p25=0, p50=5, p75=9, p90=9, p95=9, p99=9, p999=9, p9999=9, sum=14)

서비스 통계와 더불어 일반적인 JVM 상태 정보도 제공된다. 이런 정보도 유용할 때가 있다.

…/config/SearchbirdServiceConfig.scala

설정은 우리가 만들고 싶은 T 를 만드는 apply: RuntimeEnvironment => T 메소드를 제공하는 스칼라 트레잇이다. 이런 의미에서 설정은 "팩토리"이기도 하다. 실행시 설정 파일은 스크립트처럼 해석되고 (이때 스칼라 컴파일러를 라이브러리로 사용한다), 앞에서 말한 설정 객체를 생성해 낼 것이다. RuntimeEnvironment 에게 여러 런타임 매개변수(명령행 플래그, JVM 버전, 빌드 타임 스탬프 등)를 질의할 수 있다.

SearchbirdServiceConfig 클래스는 이런 설정 클래스의 구현이다. SearchbirdServiceConfig 는 설정 매개변수와 기본 값을 지정한다. (피네이글은 일반적 추적 시스템을 제공하지만, 이 글에서는 다룰 필요가 없어 넘어간다. 집킨 은 분산 트레이싱 시스템으로 피네이글의 추적 시스템과 같은 것에서 나온 여러 추적 내용을 수집/통합한다.)

class SearchbirdServiceConfig extends ServerConfig[SearchbirdService.ThriftServer] {
  var thriftPort: Int = 9999
  var tracerFactory: Tracer.Factory = NullTracer.factory

  def apply(runtime: RuntimeEnvironment) = new SearchbirdServiceImpl(this)
}

여기서 우리는 SearchbirdService.ThriftServer 를 만들고 싶다. 이는 쓰리프트 코드 생성자2가 만들어주는 서버 타입이다.

…/Main.scala

"run"을 sbt 콘솔에서 입력하면 @main@을 호출한다. @main@은 서버를 설정하고 서버 인스턴스를 만든다. 설정 정보(development.scala에 지정되거나 “run” 명령의 인자로 넘길 수 있음)를 읽고, SearchbirdService.ThriftServer를 만들고, 시작한다. RuntimeEnvironment.loadRuntimeConfig 는 설정을 평가(컴파일 및 실행)하고, 그 설정의 apply 메소드에 자기 자신을 넘긴다.

object Main {
  private val log = Logger.get(getClass)

  def main(args: Array[String]) {
    val runtime = RuntimeEnvironment(this, args)
    val server = runtime.loadRuntimeConfig[SearchbirdService.ThriftServer]
    try {
      log.info("Starting SearchbirdService")
      server.start()
    } catch {
      case e: Exception =>
        log.error(e, "Failed starting SearchbirdService, exiting")
        ServiceTracker.shutdown()
        System.exit(1)
    }
  }
}
…/SearchbirdServiceImpl.scala

이 부분이 서비스의 핵심이다. SearchbirdService.ThriftServer 를 확장해 우리가 필요한 구현을 추가한다. SearchbirdService.ThriftServer 는 쓰리프트 코드 생성기에 의해 만들어졌다는 점을 기억하라. 코드 생성기는 쓰리프트 메소드마다 스칼라 메소드를 만들어준다. 우리 예제에서 이렇게 만들어진 인터페이스는 다음과 같다.

trait SearchbirdService {
  def put(key: String, value: String): Future[Void]
  def get(key: String): Future[String]
}

Future[Value] 라는 형태의 값이 Value 대신 쓰이는 이유는 계산이 지연될 수 있기 때문이다(피네이글의 문서 에서 Future 에 대해 더 자세히 다룬다). 이번 강좌에서는 Future 가 있으면 그 값을 나중에 get() 을 사용해 얻을 수 있다는 것만 알면 충분하다.

scala-bootstrapper 가 제공하는 키-값 저장소의 기본 구현은 단순하다. database 데이터 구조를 가지고 있고, getput 호출은 이 데이터 구조를 읽거나 쓴다.

class SearchbirdServiceImpl(config: SearchbirdServiceConfig) extends SearchbirdService.ThriftServer {
  val serverName = "Searchbird"
  val thriftPort = config.thriftPort
  override val tracerFactory = config.tracerFactory

  val database = new mutable.HashMap[String, String]()

  def get(key: String) = {
    database.get(key) match {
      case None =>
        log.debug("get %s: miss", key)
        Future.exception(SearchbirdException("No such key"))
      case Some(value) =>
        log.debug("get %s: hit", key)
        Future(value)
    }
  }

  def put(key: String, value: String) = {
    log.debug("put %s", key)
    database(key) = value
    Future.Unit
  }

  def shutdown() = {
    super.shutdown(0.seconds)
  }
}

결과적으로 스칼라 HashMap 에 간단한 쓰리프트 인터페이스를 제공하는 셈이다.

간단한 검색 엔진

이제 우리 예제를 확장해 간단한 검색 엔진을 만들자. 그 검색엔진을 더 확장해서 기계 한 대에 넣을 수 없을 정도로 큰 검색어 목록도 처리할 수 있도록 분산 검색 엔진을 만들 것이다.

단순화하기 위해 현재의 쓰리프트 서비스를 가능한 적게 고쳐서 검색 연산을 지원하게 하자. 사용 모델은 put 으로 문서를 검색 엔진에 넣는다. 이때 각 문서는 토큰(단어)을 여럿 포함한다. 검색시에는 여러 토큰으로 구성된 문자열을 사용하여 모든 토큰이 포함된 문서를 반환할 것이다. 기본 구조는 앞의 예제와 같지만, 추가로 search 호출을 사용한다.

검색조 구현. 리비전 2

이런 검색엔진을 구현하려면 아래와 같이 파일 둘을 변경해야 한다.

src/main/thrift/searchbird.thrift
service SearchbirdService {
  string get(1: string key) throws(1: SearchbirdException ex)

  void put(1: string key, 2: string value)

  list<string> search(1: string query)
}

현재의 해시테이블을 검색하는 search 메소드를 추가했다. 이 메소드는 질의와 일치하는 키의 목록을 반환한다. 구현은 상당히 단순하다.

…/SearchbirdServiceImpl.scala

변경이 주로 이루어진 파일이 바로 이 파일이다.

현재의 database 해시맵은 키와 문서를 연결하는 정방향 색인(forward index)을 저장한다. 이 맵의 이름을 @forward@로 바꾸고, 역방향 색인(토큰을 해당 토큰을 포함한 문서의 집합으로 연결하는 색인) 역할을 하는 두번째 맵을 @reverse@라는 이름으로 추가하자. 따라서 SearchbirdServiceImpl.scala에서 database 정의는 다음과 같이 바뀐다.

val forward = new mutable.HashMap[String, String]
  with mutable.SynchronizedMap[String, String]
val reverse = new mutable.HashMap[String, Set[String]]
  with mutable.SynchronizedMap[String, Set[String]]

get 호출에서 @database@를 @forward@로 바꾼다. 대신 get 의 나머지 부분은 그대로 둔다(해당 함수는 정방향 검색만을 수행한다). 하지만 @put@은 변경해야 한다. 문서가 추가될 때마다, 문서에서 토큰을 추출해 각 토큰마다 해당 문서의 키를 추가해야 하기 때문이다. put 호출을 다음 코드로 바꾸자. 이제 검색 토큰이 주어지면 reverse 맵을 사용해 문서를 찾을 수 있다.

def put(key: String, value: String) = {
  log.debug("put %s", key)

  forward(key) = value

  // serialize updaters
  synchronized {
    value.split(" ").toSet foreach { token =>
      val current = reverse.getOrElse(token, Set())
      reverse(token) = current + key
    }
  }

  Future.Unit
}

(@HashMap@이 쓰레드 안전하긴 하지만) 특정 맵 원소에 대한 읽고-변경하고-쓰기 연산의 원자성을 보장하기 위해 오직 한번에 한 쓰레드만 reverse 맵을 업데이트 할 수 있음을 기억하라. (지금 코드는 너무 보수적이다. 개별 읽고-변경하고-쓰기 연산에 락을 거는 대신 전체 맵에 락을 걸고 있다.) 또한 데이터 구조로 @Set@을 사용하고 있음을 기억하라. 집합을 사용하면 같은 토큰이 문서에 두번 이상 나타나도 foreach 루프에서 한번만 처리되도록 할 수 있다.

이 구현에는 여전히 문제점이 있다. 어떤 키를 새 문서로 덮어쓰더라도 기존의 문서를 기반으로 만들어졌던 역 색인 정보는 제거되지 않는다. 이는 연습문제로 독자들에게 남겨둔다.

이제 검색 엔진의 핵심부분인 search 메소드를 보자. 질의가 들어오면 토큰을 분리하고, 토큰마다 매치되는 문서 목록을 구한 다음, 각각의 교집합을 구한다. 따라서 질의에 있는 모든 토큰이 포함된 문서의 목록이 구해진다. 스칼라로 이를 표현하는 것은 쉽다. 다음을 SearchbirdServiceImpl 클래스에 추가하라.

def search(query: String) = Future.value {
  val tokens = query.split(" ")
  val hits = tokens map { token => reverse.getOrElse(token, Set()) }
  val intersected = hits reduceLeftOption { _ & _ } getOrElse Set()
  intersected.toList
}

이 길지 않은 코드에서도 알아둘만한 사항이 몇 가지 있다. hits 목록을 만들 때 키(token)가 없다면 @getOrElse@의 두번째 인자가 기본 값으로 제공된다(여기서는 빈 @Set@이다). 실제 교집합은 왼쪽 리듀스(left reduce)로 구한다. 특히 @reduceLeftOption@는 @hits@가 비어있는 경우 리듀스를 시도하지 않고 대신 @None@을 반환한다. 따라서 예외를 겪지 않고 기본 값을 제공할 수 있다. 실제 이는 다음과 동일하다.

def search(query: String) = Future.value {
  val tokens = query.split(" ")
  val hits = tokens map { token => reverse.getOrElse(token, Set()) }
  if (hits.isEmpty)
    Nil
  else
    hits reduceLeft { _ & _ } toList
}

어느쪽을 사용하던 취향의 문제이지만, 함수 언어 다운 것은 조건식을 가능한 삼가고 적절한 기본 값을 활용하는 것이다.

이제 콘솔에서 만든 검색 엔진을 실험해볼 수 있다. 서버를 다시 시작하자.

$ ./sbt
...
> compile
> run -f config/development.scala
...
[info] Running com.twitter.searchbird.Main -f config/development.scala

그리고 searchbird 디렉터리에서 클라이언트를 시작하자.

$ ./console 127.0.0.1 9999
...
[info] Running com.twitter.searchbird.SearchbirdConsoleClient 127.0.0.1 9999
'client' is bound to your thrift client.

finagle-client>

다음 본 학교의 강의 목록을 콘솔에 붙여넣도록 하라.

client.put("basics", " values functions classes methods inheritance try catch finally expression oriented")
client.put("basics", " case classes objects packages apply update functions are objects (uniform access principle) pattern")
client.put("collections", " lists maps functional combinators (map foreach filter zip")
client.put("pattern", " more functions! partialfunctions more pattern")
client.put("type", " basic types and type polymorphism type inference variance bounds")
client.put("advanced", " advanced types view bounds higher kinded types recursive types structural")
client.put("simple", " all about sbt the standard scala build")
client.put("more", " tour of the scala collections")
client.put("testing", " write tests with specs a bdd testing framework for")
client.put("concurrency", " runnable callable threads futures twitter")
client.put("java", " java interop using scala from")
client.put("searchbird", " building a distributed search engine using")

이제 검색을 해볼 수 있다. 설명에 검색어를 포함하고 있는 모든 강의의 키를 반환한다.

> Await.result(client.search("functions"))
res12: Seq[String] = ArrayBuffer(basics)

> Await.result(client.search("java"))
res13: Seq[String] = ArrayBuffer(java)

> Await.result(client.search("java scala"))
res14: Seq[String] = ArrayBuffer(java)

> Await.result(client.search("functional"))
res15: Seq[String] = ArrayBuffer(collections)

> Await.result(client.search("sbt"))
res16: Seq[String] = ArrayBuffer(simple)

> Await.result(client.search("types"))
res17: Seq[String] = ArrayBuffer(type, advanced)

호출이 @Future@를 반환하면 get()을 사용해 블록해야만 해당 future에서 값을 얻어올 수 있다는 사실을 잊지 마라. 또한 Future.collect를 사용해 여러 요청을 동시에 처리하고, 모든 요청이 성공할 때까지 기다릴 수 있다.

> import com.twitter.util.{Await, Future}
...
> Await.result(Future.collect(Seq(
    client.search("types"),
    client.search("sbt"),
    client.search("functional")
  )))
res18: Seq[Seq[String]] = ArrayBuffer(ArrayBuffer(type, advanced), ArrayBuffer(simple), ArrayBuffer(collections))

서비스를 분산시키기

기계가 하나 밖에 없다면, 앞의 간단한 메모리 검색 엔진이 검색할 수 있는 사전의 크기는 메모리 제한에 따를 수 밖에 없다. 이제 이 문제를 간단한 분산 정책(sharding scheme)을 사용해 서비스를 분산시켜 해결해 보자. 다음은 블록도이다.

분산 검색조 서비스

추상화

작업을 쉽게 하기 위해 우선 다른 추상 레이어(Index)를 도입해 색인 구현과 @SearchbirdService@를 분리할 것이다. 이렇게 리팩토링 하는 것은 간단하다. 우선 색인 파일을 도입하는 것을 시작점으로 삼자(searchbird/src/main/scala/com/twitter/searchbird/Index.scala라는 파일을 만들 것이다).

…/Index.scala
package com.twitter.searchbird

import scala.collection.mutable
import com.twitter.util._
import com.twitter.conversions.time._
import com.twitter.logging.Logger
import com.twitter.finagle.builder.ClientBuilder
import com.twitter.finagle.thrift.ThriftClientFramedCodec

trait Index {
  def get(key: String): Future[String]
  def put(key: String, value: String): Future[Unit]
  def search(key: String): Future[List[String]]
}

class ResidentIndex extends Index {
  val log = Logger.get(getClass)

  val forward = new mutable.HashMap[String, String]
    with mutable.SynchronizedMap[String, String]
  val reverse = new mutable.HashMap[String, Set[String]]
    with mutable.SynchronizedMap[String, Set[String]]

  def get(key: String) = {
    forward.get(key) match {
      case None =>
        log.debug("get %s: miss", key)
        Future.exception(SearchbirdException("No such key"))
      case Some(value) =>
        log.debug("get %s: hit", key)
        Future(value)
    }
  }

  def put(key: String, value: String) = {
    log.debug("put %s", key)

    forward(key) = value

    // admit only one updater.
    synchronized {
      (Set() ++ value.split(" ")) foreach { token =>
        val current = reverse.get(token) getOrElse Set()
        reverse(token) = current + key
      }
    }

    Future.Unit
  }

  def search(query: String) = Future.value {
    val tokens = query.split(" ")
    val hits = tokens map { token => reverse.getOrElse(token, Set()) }
    val intersected = hits reduceLeftOption { _ & _ } getOrElse Set()
    intersected.toList
  }
}

이제 우리가 개발한 쓰리프트 서비스를 간단한 디스패치 메카니즘으로 변경하자. 이제는 모든 Index 인스턴스에 대해 쓰리프트 인터페이스를 제공하게 될것이다. 이렇게 추상화 하는 것은 매우 유용하다. 왜냐하면 이렇게 하면 서비스 구현과 색인 구현을 분리할 구 있기 때문이다. 서비스는 더 이상 색인의 내부에 대해 알 필요가 없다. 색인의 위치는 로컬이나 원격 어느곳에나 있을 수 있고, 경우에 따라서는 여러 원격 색인을 조합한 것일 수도 있다. 하지만, 서비스가 이를 신경쓸 필요가 없다. 또한 색인을 변경하더라도 서비스를 변경할 필요는 없다.

기존 SearchbirdServiceImpl 클래스 정의를 다음의 (색인에 대한 정의를 포함하지 않아서 더 간단한) 정의로 바꾸자. 이제 서버를 초기화하기 위해서는 두번째 인자 @Index@가 필요하다는 사실에 유의하라.

…/SearchbirdServiceImpl.scala
class SearchbirdServiceImpl(config: SearchbirdServiceConfig, index: Index) extends SearchbirdService.ThriftServer {
  val serverName = "Searchbird"
  val thriftPort = config.thriftPort

  def get(key: String) = index.get(key)
  def put(key: String, value: String) =
    index.put(key, value) flatMap { _ => Future.Unit }
  def search(query: String) = index.search(query)

  def shutdown() = {
    super.shutdown(0.seconds)
  }
}
…/config/SearchbirdServiceConfig.scala

@SearchbirdServiceConfig@의 apply 호출도 적절히 바꿔줘야 한다.

class SearchbirdServiceConfig extends ServerConfig[SearchbirdService.ThriftServer] {
  var thriftPort: Int = 9999
  var tracerFactory: Tracer.Factory = NullTracer.factory

  def apply(runtime: RuntimeEnvironment) = new SearchbirdServiceImpl(this, new ResidentIndex)
}

간단한 분산 시스템을 설정하되 자식 노드에게 질의를 하는 것을 조정하기 위한 별도의 노드를 하나 만들 것이다. 이를 위해 두가지 유형의 @Index@를 추가할 팔요가 있다. 하나는 원격 색인을 표현하고, 나머지는 여러 다른 Index 인스턴스를 조합한 복합 색인을 표현한다. 두 Index 모두 인터페이스는 같다. 따라서 서버는 실제 연결하려는 색인이 어떤 종류인지를 구별할 필요가 없다.

…/Index.scala

Index.scala에 @CompositeIndex@를 정의하자:

class CompositeIndex(indices: Seq[Index]) extends Index {
  require(!indices.isEmpty)

  def get(key: String) = {
    val queries = indices.map { idx =>
      idx.get(key) map { r => Some(r) } handle { case e => None }
    }

    Future.collect(queries) flatMap { results =>
      results.find { _.isDefined } map { _.get } match {
        case Some(v) => Future.value(v)
        case None => Future.exception(SearchbirdException("No such key"))
      }
    }
  }

  def put(key: String, value: String) =
    Future.exception(SearchbirdException("put() not supported by CompositeIndex"))

  def search(query: String) = {
    val queries = indices.map { _.search(query) rescue { case _=> Future.value(Nil) } }
    Future.collect(queries) map { results => (Set() ++ results.flatten) toList }
  }
}

복합 색인은 여러 내부 Index 인스턴스에 대해 동작한다. 이때 내부 Index 들이 어떻게 구현되었는지에 대해서는 신경쓰지 않는다는 사실이 중요하다. 그렇기 때문에 여러 질의 방식을 만들 때 유연성이 엄청나게 커질 수 있다. 분산 정책을 따로 정의하지는 않았다. 따라서 복합 색인은 put 연산을 지원하지 않는다. 대신 이를 직접 자식 노드에 전달한다. @get@은 모든 자식 노드에 질의를 던져서 가장 먼저 도착하는 성공적인 응답을 채택하는 방식으로 이루어진다. 만약 모든 자식들이 실패한다면 예외를 발생시킨다. 값이 없는 경우에는 예외를 통해 상황 전달이 이루어진다. 따라서 이를 @Future@에서 None 값으로 변환하는 방식으로 handle 한다. 실제 시스템에서라면 예외를 사용하는 것 보다 적절한 오류 코드를 사용하는 것이 나을 것이다. 예외는 쓰기 편하고 프로토타입을 만들 때는 빠르게 써먹을 수 있지만, 서로 조합할 때는 문제가 되기 쉽다. 실제 예외상황과 값이 없는 경우를 구별하기 위해서 예외 자체를 검사해야만 한다. 그러느니, 이런 구별을 반환되는 값의 타입에 직접 집어넣는 편이 더 좋은 방식이다.

@search@는 앞에서와 비슷한 방식으로 동작한다. 최초의 결과만을 선택하는 대신 모든 결과를 한데 모은다. 그 후 Set 을 만들어서 유일성을 보장한다.

RemoteIndex 는 원격 서버에 대한 Index 인터페이스를 제공한다.

class RemoteIndex(hosts: String) extends Index {
  val transport = ClientBuilder()
    .name("remoteIndex")
    .hosts(hosts)
    .codec(ThriftClientFramedCodec())
    .hostConnectionLimit(1)
    .timeout(500.milliseconds)
    .build()
  val client = new SearchbirdService.FinagledClient(transport)

  def get(key: String) = client.get(key)
  def put(key: String, value: String) = client.put(key, value) map { _ => () }
  def search(query: String) = client.search(query) map { _.toList }
}

위 코드는 몇가지 적절한 기본 값을 사용해 피네이글 쓰리프트 클라이언트를 만든다. 그리고 호출 프락시 역할을 수행한다. 이를 위해 타입을 적절히 변경하였다.

한 곳으로 모으기

이제 필요한 모든 조각을 마련했다. 설정을 약간 바꿔서 어떤 노드가 데이터 분산 노드인지 조정을 위해 구별된 노드인지 지정할 수 있게 만들 필요가 있다. 이를 위해 시스템의 분산 노드들을 지정할 수 있는 설정 아이템을 도입하여 구분할 것이다. 또한 @SearchbirdServiceImpl@의 인스턴스에 Index 인자를 추가해야 한다. 그리고 나서 명령행 인자(@Config@가 이 인자를 활용한다는 사실을 기억하라)를 사용해 서버 시작시 어떤 노드를 시작하는 지 지정할 수 있게 할 것이다.

…/config/SearchbirdServiceConfig.scala
class SearchbirdServiceConfig extends ServerConfig[SearchbirdService.ThriftServer] {
  var thriftPort: Int = 9999
  var shards: Seq[String] = Seq()

  def apply(runtime: RuntimeEnvironment) = {
    val index = runtime.arguments.get("shard") match {
      case Some(arg) =>
        val which = arg.toInt
        if (which >= shards.size || which < 0)
          throw new Exception("invalid shard number %d".format(which))

        // override with the shard port
        val Array(_, port) = shards(which).split(":")
        thriftPort = port.toInt

        new ResidentIndex

      case None =>
        require(!shards.isEmpty)
        val remotes = shards map { new RemoteIndex(_) }
        new CompositeIndex(remotes)
    }

    new SearchbirdServiceImpl(this, index)
  }
}

이제 설정을 조정하자. “shards” 목록을 SearchbirdServiceConfig 인스턴스에서 초기화하게 하자(샤드 0번은 포트 9000, 1번은 9001 등의 순서로 접속하게 할 것이다).

config/development.scala
new SearchbirdServiceConfig {
  // Add your own config here
  shards = Seq(
    "localhost:9000",
    "localhost:9001",
    "localhost:9002"
  )
  ...

admin.httpPort 설정을 코맨트로 가리자(여러 서비스를 같은 기계에서 실행시키지는 않을 것이다. 그렇게 하면 같은 포트를 사용하려고 시도하게 되어 오류가 발생한다.):

  // admin.httpPort = 9900

서버에 인자를 주지 않고 실행하면 모든 샤드에 송신하는 특별한 노드가 시작된다. 샤드 정보를 인자로 주면 해당 샤드 번호에 대응하는 서버를 앞에 지정한 포트를 사용해 시작한다.

한번 실행해 보자. 3개의 서비스를 시작할 것이다. 1개는 특별 노드이고, 2개는 샤드이다. 우선 변경사항을 컴파일해야 한다.

$ ./sbt
> compile
...
> exit

그 후 서버를 시작하면 된다.

$ ./sbt 'run -f config/development.scala -D shard=0'
$ ./sbt 'run -f config/development.scala -D shard=1'
$ ./sbt 'run -f config/development.scala'

이 셋을 각각 다른 창에서 실행할 수도 있고, (한 창에서) 한 명령어씩 실행해서 ctrl-z를 눌러 작업을 일지중지시킨 후, @bg@를 사용해 백그라운드 작업으로 돌리는 방식으로 실행할 수도 있다.

이제 콘솔을 사용해 이 서비스를 사용해 보자. 우선 두 샤드 노드에 데이터를 집어넣어야 한다. 검색조 디렉터리에서 다음과 같이 실행하자.

$ ./console localhost 9000
...
> client.put("fromShardA", "a value from SHARD_A")
> client.put("hello", "world")
$ ./console localhost 9001
...
> client.put("fromShardB", "a value from SHARD_B")
> client.put("hello", "world again")

작업을 완료하고 나면 콘솔 세션을 중단해도 좋다. 이제 특별 노드(port 9999)에서 데이터베이스에 질의를 넣어 보자.

$ ./console localhost 9999
[info] Running com.twitter.searchbird.SearchbirdConsoleClient localhost 9999
'client' is bound to your thrift client.

finagle-client> Await.result(client.get("hello"))
res0: String = world

finagle-client> Await.result(client.get("fromShardC"))
SearchbirdException(No such key)
...

finagle-client> Await.result(client.get("fromShardA"))
res2: String = a value from SHARD_A

finagle-client> Await.result(client.search("hello"))
res3: Seq[String] = ArrayBuffer()

finagle-client> Await.result(client.search("world"))
res4: Seq[String] = ArrayBuffer(hello)

finagle-client> Await.result(client.search("value"))
res5: Seq[String] = ArrayBuffer(fromShardA, fromShardB)

여러 데이터 추상화를 포함하는 이런 방식으로 설계하면 더 모둘화되고 확장 가능한 구현이 가능하다.

아마도 이 구현을 다음과 같이 더 향상시킬 수 있을 것이다:

1 로컬 ./sbt 스크립트를 실행하면, 우리가 사용하는 sbt 버전이 적절한 라이브러리를 모두 포함하고 있도록 보장해준다.

2 target/gen-scala/com/twitter/searchbird/SearchbirdService.scala 에 있다.

3 오스트리치(Ostrich)의 README 에서 더 자세한 정보를 찾아보라.