package coordination
- Alphabetic
- Public
- Protected
Type Members
- case class Shard(id: Int, node: ZNode, permit: Permit) extends ShardPermit with Product with Serializable
- class ShardCoordinator extends AnyRef
A rudimentary shard/partition coordinator.
A rudimentary shard/partition coordinator. Provides ShardPermits by lowest available ID first (linear scan).
val shardCoordinator = new ShardCoordinator(zkClient, "/testing/twitter/service/something/shards", numShards) log.trace("Waiting for shard permit...") shardCoordinator.acquire flatMap { shard => log.trace("Working as shard %d", shard.id) { // inside some Future if (Hashing.consistentHash(item, numShards) == shard.id) action } ensure { shard.release } }
- sealed trait ShardPermit extends AnyRef
- class ZkAsyncSemaphore extends AnyRef
ZkAsyncSemaphore is a distributed semaphore with asynchronous execution.
ZkAsyncSemaphore is a distributed semaphore with asynchronous execution. Grabbing a permit constitutes a vote on the number of permits the semaphore can permit and returns a Future[Permit]. If consensus on the number of permits is lost, an exception is raised when acquiring a permit (so expect it).
Care must be taken to handle zookeeper client session expiry. A ZkAsyncSemaphore cannot be used after the zookeeper session has expired. Likewise, any permits acquired via the session must be considered invalid. Additionally, it is the client's responsibility to determine if a permit is still valid in the case that the zookeeper client becomes disconnected.
Attempts to clone AsyncSemaphore
Ex.
implicit val timer = new JavaTimer(true) val connector = NativeConnector("localhost:2181", 5.seconds, 10.minutes) val zk = ZkClient(connector).withRetryPolicy(RetryPolicy.Basic(3)) val path = "/testing/twitter/service/charm/shards" val sem = new ZkAsyncSemaphore(zk, path, 4) sem.acquire flatMap { permit => Future { ... } ensure { permit.release } } // handle { ... }
Value Members
- object ShardCoordinator
- object ZkAsyncSemaphore