final class Pipe[A] extends Reader[A] with Writer[A]
A synchronous in-memory pipe that connects Reader and Writer in the sense that a reader's input is the output of a writer.
A pipe is structured as a smash of both interfaces, a Reader and a Writer such that can be passed directly to a consumer or a producer.
def consumer(r: Reader[Buf]): Future[Unit] = ??? def producer(w: Writer[Buf]): Future[Unit] = ??? val p = new Pipe[Buf] consumer(p) producer(p)
Reads and writes on the pipe are matched one to one and only one outstanding read
or write
is permitted in the current implementation (multiple pending writes or reads
resolve into IllegalStateException
while leaving the pipe healthy). That is, the write
(its returned com.twitter.util.Future) is resolved when the read
consumes the written data.
Here is, for example, a very typical write-loop that writes into a pipe-backed Writer:
def writeLoop(w: Writer[Buf], data: List[Buf]): Future[Unit] = data match { case h :: t => p.write(h).before(writeLoop(w, t)) case Nil => w.close() }
Reading from a pipe-backed Reader is no different from working with any other reader:
def readLoop(r: Reader[Buf], process: Buf => Future[Unit]): Future[Unit] = r.read().flatMap { case Some(chunk) => process(chunk).before(readLoop(r, process)) case None => Future.Done }
Thread Safety
It is safe to call read
, write
, fail
, discard
, and close
concurrently. The individual
calls are synchronized on the given Pipe.
Closing or Failing Pipes
Besides expecting a write or a read, a pipe can be closed or failed. A writer can do both close
and fail
the pipe, while reader can only fail the pipe via discard
.
The following behavior is expected with regards to reading from or writing into a closed or a failed pipe:
- Writing into a closed pipe yields
IllegalStateException
- Reading from a closed pipe yields EOF (com.twitter.util.Future.None)
- Reading from or writing into a failed pipe yields a failure it was failed with
It's also worth discussing how pipes are being closed. As closure is always initiated by a producer (writer), there is a machinery allowing it to be notified when said closure is observed by a consumer (reader).
The following rules should help reasoning about closure signals in pipes:
- Closing a pipe with a pending read resolves said read into EOF and returns a
com.twitter.util.Future.Unit
- Closing a pipe with a pending write by default fails said write with IllegalStateException
and
returns a future that will be satisfied when a consumer observes the closure (EOF) via read. If
a timer is provided, the pipe will wait until the provided deadline for a successful read before
failing the write.
- Closing an idle pipe returns a future that will be satisfied when a consumer observes the
closure (EOF) via read when a timer is provided, otherwise the pipe will be closed immedidately.
- Alphabetic
- By Inheritance
- Pipe
- Writer
- Closable
- Reader
- AnyRef
- Any
- Hide All
- Show All
- Public
- Protected
Value Members
- def close(deadline: Time): Future[Unit]
Close the resource with the given deadline.
- def close(after: Duration): Future[Unit]
Close the resource with the given timeout.
Close the resource with the given timeout. This timeout is advisory, giving the callee some leeway, for example to drain clients or finish up other tasks.
- Definition Classes
- Closable
- final def close(): Future[Unit]
Close the resource.
Close the resource. The returned Future is completed when the resource has been fully relinquished.
- Definition Classes
- Closable
- final def contramap[B](f: (B) => A): Writer[B]
Given f, a function from B into A, creates an Writer[B] whose
fail
andclose
functions are equivalent to Writer[A]'s.Given f, a function from B into A, creates an Writer[B] whose
fail
andclose
functions are equivalent to Writer[A]'s. Writer[B]'swrite
function is equivalent to:def write(element: B) = Writer[A].write(f(element))
- Definition Classes
- Writer
- def discard(): Unit
Discard this stream as its output is no longer required.
Discard this stream as its output is no longer required. This could be used to signal the producer of this stream similarly how com.twitter.util.Future.raise used to propagate interrupts across future chains.
- def fail(cause: Throwable): Unit
Fail this stream with a given
cause
.Fail this stream with a given
cause
. No further writes are allowed, but if happen, will resolve into a com.twitter.util.Future failed withcause
. - final def flatMap[B](f: (A) => Reader[B]): Reader[B]
Construct a new Reader by applying
f
to every item read from this ReaderConstruct a new Reader by applying
f
to every item read from this Reader- f
the function constructs a new Reader[B] from the value of this Reader.read
- Definition Classes
- Reader
- Note
All operations of the new Reader will be in sync with self Reader. Discarding one Reader will discard the other Reader. When one Reader's onClose resolves, the other Reader's onClose will be resolved immediately with the same value.
- def flatten[B](implicit ev: <:<[A, Reader[B]]): Reader[B]
Converts a
Reader[Reader[B]]
into aReader[B]
Converts a
Reader[Reader[B]]
into aReader[B]
- Definition Classes
- Reader
- Note
All operations of the new Reader will be in sync with the outermost Reader. Discarding one Reader will discard the other Reader. When one Reader's onClose resolves, the other Reader's onClose will be resolved immediately with the same value. The subsequent readers are unmanaged, the caller is responsible for discarding those when abandoned.
- final def map[B](f: (A) => B): Reader[B]
Construct a new Reader by applying
f
to every item read from this ReaderConstruct a new Reader by applying
f
to every item read from this Reader- f
the function transforms data of type A to B
- Definition Classes
- Reader
- Note
All operations of the new Reader will be in sync with self Reader. Discarding one Reader will discard the other Reader. When one Reader's onClose resolves, the other Reader's onClose will be resolved immediately with the same value.
- def onClose: Future[StreamTermination]
A com.twitter.util.Future that resolves once this writer is closed and flushed.
A com.twitter.util.Future that resolves once this writer is closed and flushed.
It may contain an error if the writer is failed.
This is useful for any extra resource cleanup that you must do once the stream is no longer being used.
- def read(): Future[Option[A]]
Asynchronously read the next element of this stream.
Asynchronously read the next element of this stream. Returned com.twitter.util.Future will resolve into
Some(e)
when the element is available or intoNone
when stream is exhausted.Stream failures are terminal such that all subsequent reads will resolve in failed com.twitter.util.Futures.
- def toString(): String
- Definition Classes
- Pipe → AnyRef → Any
- def write(buf: A): Future[Unit]
Write an
element
into this stream.Write an
element
into this stream. Although undefined by this contract, a trustworthy implementation (such as Pipe) would do its best to resolve the returned com.twitter.util.Future only when a consumer observes a writtenelement
.The write can also resolve into a failure (failed com.twitter.util.Future).