trait Reader[+A] extends AnyRef
A reader exposes a pull-based API to model a potentially infinite stream of arbitrary elements.
Given the pull-based API, the consumer is responsible for driving the computation. A very typical code pattern to consume a reader is to use a read-loop:
def consume[A](r: Reader[A])(process: A => Future[Unit]): Future[Unit] = r.read().flatMap { case Some(a) => process(a).before(consume(r)(process)) case None => Future.Done // reached the end of the stream; no need to discard }
One way to reason about the read-loop idiom is to view it as a subscription to a publisher (reader) in the Pub/Sub terminology. Perhaps the major difference between readers and traditional publishers, is readers only allow one subscriber (read-loop). It's generally safer to assume the reader is fully consumed (stream is exhausted) once its read-loop is run.
Error Handling
Given the read-loop above, its returned com.twitter.util.Future could be used to observe both successful and unsuccessful outcomes.
consume(stream)(processor).respond { case Return(()) => println("Consumed an entire stream successfully.") case Throw(e) => println(s"Encountered an error while consuming a stream: $e") }
- Self Type
- Reader[A]
- Note
Once failed, a stream can not be restarted such that all future reads will resolve into a failure. There is no need to discard an already failed stream.
Resource Safety
One of the important implications of readers, and streams in general, is that they are prone to resource leaks unless fully consumed or discarded (failed). Specifically, readers backed by network connections MUST be discarded unless already consumed (EOF observed) to prevent connection leaks.
,The read-loop above, for example, exhausts the stream (observes EOF) hence does not have to discard it (stream).
Back Pressure
The pattern above leverages com.twitter.util.Future recursion to exert back-pressure via allowing only one outstanding read. It's usually a good idea to structure consumers this way (i.e., the next read isn't issued until the previous read finishes). This would always ensure a finer grained back-pressure in network systems allowing the consumers to artificially slow down the producers and not rely solely on transport and IO buffering.
,Whether or not multiple outstanding reads are allowed on a
Reader
type is an undefined behaviour but could be changed in a refinement.Cancellations
If a consumer is no longer interested in the stream, it can discard it. Note a discarded reader (or stream) can not be restarted.
def consumeN[A](r: Reader[A], n: Int)(process: A => Future[Unit]): Future[Unit] = if (n == 0) Future(r.discard()) else r.read().flatMap { case Some(a) => process(a).before(consumeN(r, n - 1)(process)) case None => Future.Done // reached the end of the stream; no need to discard }
- Alphabetic
- By Inheritance
- Reader
- AnyRef
- Any
- Hide All
- Show All
- Public
- Protected
Abstract Value Members
- abstract def discard(): Unit
Discard this stream as its output is no longer required.
Discard this stream as its output is no longer required. This could be used to signal the producer of this stream similarly how com.twitter.util.Future.raise used to propagate interrupts across future chains.
- Note
Although unnecessary, it's always safe to discard a fully-consumed stream.
- abstract def onClose: Future[StreamTermination]
A com.twitter.util.Future that resolves once this reader is closed upon reading of end-of-stream.
A com.twitter.util.Future that resolves once this reader is closed upon reading of end-of-stream.
If the result is a failed future, this indicates that it was not closed either by reading until the end of the stream nor by discarding. This is useful for any extra resource cleanup that you must do once the stream is no longer being used.
- abstract def read(): Future[Option[A]]
Asynchronously read the next element of this stream.
Asynchronously read the next element of this stream. Returned com.twitter.util.Future will resolve into
Some(e)
when the element is available or intoNone
when stream is exhausted.Stream failures are terminal such that all subsequent reads will resolve in failed com.twitter.util.Futures.
Concrete Value Members
- final def !=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def ##: Int
- Definition Classes
- AnyRef → Any
- final def ==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def asInstanceOf[T0]: T0
- Definition Classes
- Any
- def clone(): AnyRef
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.CloneNotSupportedException]) @native()
- final def eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- def equals(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef → Any
- def finalize(): Unit
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.Throwable])
- final def flatMap[B](f: (A) => Reader[B]): Reader[B]
Construct a new Reader by applying
f
to every item read from this ReaderConstruct a new Reader by applying
f
to every item read from this Reader- f
the function constructs a new Reader[B] from the value of this Reader.read
- Note
All operations of the new Reader will be in sync with self Reader. Discarding one Reader will discard the other Reader. When one Reader's onClose resolves, the other Reader's onClose will be resolved immediately with the same value.
- def flatten[B](implicit ev: <:<[A, Reader[B]]): Reader[B]
Converts a
Reader[Reader[B]]
into aReader[B]
Converts a
Reader[Reader[B]]
into aReader[B]
- Note
All operations of the new Reader will be in sync with the outermost Reader. Discarding one Reader will discard the other Reader. When one Reader's onClose resolves, the other Reader's onClose will be resolved immediately with the same value. The subsequent readers are unmanaged, the caller is responsible for discarding those when abandoned.
- final def getClass(): Class[_ <: AnyRef]
- Definition Classes
- AnyRef → Any
- Annotations
- @native()
- def hashCode(): Int
- Definition Classes
- AnyRef → Any
- Annotations
- @native()
- final def isInstanceOf[T0]: Boolean
- Definition Classes
- Any
- final def map[B](f: (A) => B): Reader[B]
Construct a new Reader by applying
f
to every item read from this ReaderConstruct a new Reader by applying
f
to every item read from this Reader- f
the function transforms data of type A to B
- Note
All operations of the new Reader will be in sync with self Reader. Discarding one Reader will discard the other Reader. When one Reader's onClose resolves, the other Reader's onClose will be resolved immediately with the same value.
- final def ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- final def notify(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
- final def notifyAll(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
- final def synchronized[T0](arg0: => T0): T0
- Definition Classes
- AnyRef
- def toString(): String
- Definition Classes
- AnyRef → Any
- final def wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException]) @native()