final class Pipe[A] extends Reader[A] with Writer[A]

A synchronous in-memory pipe that connects Reader and Writer in the sense that a reader's input is the output of a writer.

A pipe is structured as a smash of both interfaces, a Reader and a Writer such that can be passed directly to a consumer or a producer.

def consumer(r: Reader[Buf]): Future[Unit] = ???
def producer(w: Writer[Buf]): Future[Unit] = ???

val p = new Pipe[Buf]

consumer(p)
producer(p)

Reads and writes on the pipe are matched one to one and only one outstanding read or write is permitted in the current implementation (multiple pending writes or reads resolve into IllegalStateException while leaving the pipe healthy). That is, the write (its returned com.twitter.util.Future) is resolved when the read consumes the written data.

Here is, for example, a very typical write-loop that writes into a pipe-backed Writer:

def writeLoop(w: Writer[Buf], data: List[Buf]): Future[Unit] = data match {
  case h :: t => p.write(h).before(writeLoop(w, t))
  case Nil => w.close()
}

Reading from a pipe-backed Reader is no different from working with any other reader:

def readLoop(r: Reader[Buf], process: Buf => Future[Unit]): Future[Unit] = r.read().flatMap {
  case Some(chunk) => process(chunk).before(readLoop(r, process))
  case None => Future.Done
}

Thread Safety

It is safe to call read, write, fail, discard, and close concurrently. The individual calls are synchronized on the given Pipe.

Closing or Failing Pipes

Besides expecting a write or a read, a pipe can be closed or failed. A writer can do both close and fail the pipe, while reader can only fail the pipe via discard.

The following behavior is expected with regards to reading from or writing into a closed or a failed pipe:

  • Writing into a closed pipe yields IllegalStateException
  • Reading from a closed pipe yields EOF (com.twitter.util.Future.None)
  • Reading from or writing into a failed pipe yields a failure it was failed with

It's also worth discussing how pipes are being closed. As closure is always initiated by a producer (writer), there is a machinery allowing it to be notified when said closure is observed by a consumer (reader).

The following rules should help reasoning about closure signals in pipes:

- Closing a pipe with a pending read resolves said read into EOF and returns a com.twitter.util.Future.Unit - Closing a pipe with a pending write by default fails said write with IllegalStateException and returns a future that will be satisfied when a consumer observes the closure (EOF) via read. If a timer is provided, the pipe will wait until the provided deadline for a successful read before failing the write. - Closing an idle pipe returns a future that will be satisfied when a consumer observes the closure (EOF) via read when a timer is provided, otherwise the pipe will be closed immedidately.

Linear Supertypes
Ordering
  1. Alphabetic
  2. By Inheritance
Inherited
  1. Pipe
  2. Writer
  3. Closable
  4. Reader
  5. AnyRef
  6. Any
  1. Hide All
  2. Show All
Visibility
  1. Public
  2. Protected

Instance Constructors

  1. new Pipe()

    For Java compatability

  2. new Pipe(timer: Timer)

Value Members

  1. def close(deadline: Time): Future[Unit]

    Close the resource with the given deadline.

    Close the resource with the given deadline. This deadline is advisory, giving the callee some leeway, for example to drain clients or finish up other tasks.

    Definition Classes
    PipeClosable
  2. def close(after: Duration): Future[Unit]

    Close the resource with the given timeout.

    Close the resource with the given timeout. This timeout is advisory, giving the callee some leeway, for example to drain clients or finish up other tasks.

    Definition Classes
    Closable
  3. final def close(): Future[Unit]

    Close the resource.

    Close the resource. The returned Future is completed when the resource has been fully relinquished.

    Definition Classes
    Closable
  4. final def contramap[B](f: (B) => A): Writer[B]

    Given f, a function from B into A, creates an Writer[B] whose fail and close functions are equivalent to Writer[A]'s.

    Given f, a function from B into A, creates an Writer[B] whose fail and close functions are equivalent to Writer[A]'s. Writer[B]'s write function is equivalent to:

    def write(element: B) = Writer[A].write(f(element))
    Definition Classes
    Writer
  5. def discard(): Unit

    Discard this stream as its output is no longer required.

    Discard this stream as its output is no longer required. This could be used to signal the producer of this stream similarly how com.twitter.util.Future.raise used to propagate interrupts across future chains.

    Definition Classes
    PipeReader
    Note

    Although unnecessary, it's always safe to discard a fully-consumed stream.

  6. def fail(cause: Throwable): Unit

    Fail this stream with a given cause.

    Fail this stream with a given cause. No further writes are allowed, but if happen, will resolve into a com.twitter.util.Future failed with cause.

    Definition Classes
    PipeWriter
    Note

    Failing an already closed stream does not have an effect.

  7. final def flatMap[B](f: (A) => Reader[B]): Reader[B]

    Construct a new Reader by applying f to every item read from this Reader

    Construct a new Reader by applying f to every item read from this Reader

    f

    the function constructs a new Reader[B] from the value of this Reader.read

    Definition Classes
    Reader
    Note

    All operations of the new Reader will be in sync with self Reader. Discarding one Reader will discard the other Reader. When one Reader's onClose resolves, the other Reader's onClose will be resolved immediately with the same value.

  8. def flatten[B](implicit ev: <:<[A, Reader[B]]): Reader[B]

    Converts a Reader[Reader[B]] into a Reader[B]

    Converts a Reader[Reader[B]] into a Reader[B]

    Definition Classes
    Reader
    Note

    All operations of the new Reader will be in sync with the outermost Reader. Discarding one Reader will discard the other Reader. When one Reader's onClose resolves, the other Reader's onClose will be resolved immediately with the same value. The subsequent readers are unmanaged, the caller is responsible for discarding those when abandoned.

  9. final def map[B](f: (A) => B): Reader[B]

    Construct a new Reader by applying f to every item read from this Reader

    Construct a new Reader by applying f to every item read from this Reader

    f

    the function transforms data of type A to B

    Definition Classes
    Reader
    Note

    All operations of the new Reader will be in sync with self Reader. Discarding one Reader will discard the other Reader. When one Reader's onClose resolves, the other Reader's onClose will be resolved immediately with the same value.

  10. def onClose: Future[StreamTermination]

    A com.twitter.util.Future that resolves once this writer is closed and flushed.

    A com.twitter.util.Future that resolves once this writer is closed and flushed.

    It may contain an error if the writer is failed.

    This is useful for any extra resource cleanup that you must do once the stream is no longer being used.

    Definition Classes
    PipeWriterReader
  11. def read(): Future[Option[A]]

    Asynchronously read the next element of this stream.

    Asynchronously read the next element of this stream. Returned com.twitter.util.Future will resolve into Some(e) when the element is available or into None when stream is exhausted.

    Stream failures are terminal such that all subsequent reads will resolve in failed com.twitter.util.Futures.

    Definition Classes
    PipeReader
  12. def toString(): String
    Definition Classes
    Pipe → AnyRef → Any
  13. def write(buf: A): Future[Unit]

    Write an element into this stream.

    Write an element into this stream. Although undefined by this contract, a trustworthy implementation (such as Pipe) would do its best to resolve the returned com.twitter.util.Future only when a consumer observes a written element.

    The write can also resolve into a failure (failed com.twitter.util.Future).

    Definition Classes
    PipeWriter