sealed abstract class AsyncStream[+A] extends AnyRef
A representation of a lazy (and possibly infinite) sequence of asynchronous values. We provide combinators for non-blocking computation over the sequence of values.
It is composable with Future, Seq and Option.
val ids = Seq(123, 124, ...) val users = fromSeq(ids).flatMap(id => fromFuture(getUser(id))) // Or as a for-comprehension... val users = for { id <- fromSeq(ids) user <- fromFuture(getUser(id)) } yield user
All of its operations are lazy and don't force evaluation, unless otherwise noted.
The stream is persistent and can be shared safely by multiple threads.
- Alphabetic
- By Inheritance
- AsyncStream
- AnyRef
- Any
- Hide All
- Show All
- Public
- Protected
Value Members
- final def !=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def ##: Int
- Definition Classes
- AnyRef → Any
- def ++[B >: A](that: => AsyncStream[B]): AsyncStream[B]
Concatenates two streams.
Concatenates two streams.
Note: If this stream is infinite, we never process the concatenated stream; effectively: m ++ k == m.
- See also
concat for Java users.
- final def ==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def asInstanceOf[T0]: T0
- Definition Classes
- Any
- def clone(): AnyRef
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.CloneNotSupportedException]) @native()
- def concat[B >: A](that: => AsyncStream[B]): AsyncStream[B]
- See also
++
- def concatImpl[B >: A](that: () => AsyncStream[B]): AsyncStream[B]
- Attributes
- protected
- def drop(n: Int): AsyncStream[A]
Returns the suffix of this stream after the first
n
elements, orAsyncStream.empty
ifn
is larger than the number of elements in the stream.Returns the suffix of this stream after the first
n
elements, orAsyncStream.empty
ifn
is larger than the number of elements in the stream.Note: this forces all of the intermediate dropped elements.
- def dropWhile(p: (A) => Boolean): AsyncStream[A]
Given a predicate
p
returns the suffix remaining aftertakeWhile(p)
:Given a predicate
p
returns the suffix remaining aftertakeWhile(p)
:AsyncStream(1, 2, 3, 4, 1).dropWhile(_ < 3) = AsyncStream(3, 4, 1) AsyncStream(1, 2, 3).dropWhile(_ < 5) = AsyncStream.empty AsyncStream(1, 2, 3).dropWhile(_ < 0) = AsyncStream(1, 2, 3)
- final def eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- def equals(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef → Any
- def filter(p: (A) => Boolean): AsyncStream[A]
Returns a stream of elements that satisfy the predicate
p
.Returns a stream of elements that satisfy the predicate
p
.Note: forces the stream up to the first element which satisfies the predicate. This operation may block forever on infinite streams in which no elements match.
- def finalize(): Unit
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.Throwable])
- def flatMap[B](f: (A) => AsyncStream[B]): AsyncStream[B]
Map a function
f
over the elements in this stream and concatenate the results.Map a function
f
over the elements in this stream and concatenate the results.- Note
We use
flatMap
onFuture
instead ofmap
to maintain stack-safety for monadic recursion.
- def flatten[B](implicit ev: <:<[A, AsyncStream[B]]): AsyncStream[B]
Concatenate a stream of streams.
Concatenate a stream of streams.
val a = AsyncStream(1) AsyncStream(a, a, a).flatten = AsyncStream(1, 1, 1)
Java users see AsyncStream.flattens.
- def foldLeft[B](z: B)(f: (B, A) => B): Future[B]
Applies a binary operator to a start value and all elements of the stream, from head to tail.
Applies a binary operator to a start value and all elements of the stream, from head to tail.
Note: forces the stream. If the stream is infinite, the resulting future is equivalent to Future.never.
- z
the starting value.
- f
a binary operator applied to elements of this stream.
- def foldLeftF[B](z: B)(f: (B, A) => Future[B]): Future[B]
Like
foldLeft
, except that its result is encapsulated in a Future.Like
foldLeft
, except that its result is encapsulated in a Future.foldLeftF
works from head to tail over the stream.Note: forces the stream. If the stream is infinite, the resulting future is equivalent to Future.never.
- z
the starting value.
- f
a binary operator applied to elements of this stream.
- def foldRight[B](z: => Future[B])(f: (A, => Future[B]) => Future[B]): Future[B]
This is a powerful and expert level function.
This is a powerful and expert level function. A fold operation encapsulated in a Future. Like foldRight on normal lists, it replaces every cons with the folded function
f
, and the empty element withz
.Note: For clarity, we imagine that surrounding a function with backticks (`) allows infix usage.
(1 +:: 2 +:: 3 +:: empty).foldRight(z)(f) = 1 `f` flatMap (2 `f` flatMap (3 `f` z))
Note: if
f
always forces the second parameter, for infinite streams the future never resolves.- z
the parameter that replaces the end of the list.
- f
a binary operator applied to elements of this stream. Note that the second paramter is call-by-name.
- def force: Future[Unit]
Force the entire stream.
Force the entire stream. If you hold a reference to the head of the stream, this will cause the entire stream to be held in memory. The resulting Future will be satisfied once the entire stream has been consumed.
This is useful when you want the side-effects of consuming the stream to occur, but do not need to do anything with the resulting values.
- def foreach(f: (A) => Unit): Future[Unit]
Note: forces the stream.
Note: forces the stream. For infinite streams, the future never resolves.
- def foreachF(f: (A) => Future[Unit]): Future[Unit]
Maps each element of the stream to a Future action, resolving them from head to tail.
Maps each element of the stream to a Future action, resolving them from head to tail. The resulting Future completes when the action completes for the last element.
Note: forces the stream. For infinite streams, the future never resolves.
- final def getClass(): Class[_ <: AnyRef]
- Definition Classes
- AnyRef → Any
- Annotations
- @native()
- def grouped(groupSize: Int): AsyncStream[Seq[A]]
Convert the stream into a stream of groups of items.
Convert the stream into a stream of groups of items. This facilitates batch processing of the items in the stream. In all cases, this method should act like <https://www.scala-lang.org/api/current/index.html#scala.collection.IterableLike@grouped(size:Int):Iterator[Repr]> The resulting stream will cause this original stream to be evaluated group-wise, so calling this method will cause the first
groupSize
cells to be evaluated (even without examining the result), and accessing each subsequent element will evaluate a furthergroupSize
elements from the stream.- groupSize
must be a positive number, or an IllegalArgumentException will be thrown.
- def hashCode(): Int
- Definition Classes
- AnyRef → Any
- Annotations
- @native()
- def head: Future[Option[A]]
Returns the head of this stream if not empty.
- def isEmpty: Future[Boolean]
Returns true if there are no elements in the stream.
- final def isInstanceOf[T0]: Boolean
- Definition Classes
- Any
- def map[B](f: (A) => B): AsyncStream[B]
stream.map(f)
is the stream obtained by applyingf
to each element ofstream
. - def mapConcurrent[B](concurrencyLevel: Int)(f: (A) => Future[B]): AsyncStream[B]
Map over this stream with the given concurrency.
Map over this stream with the given concurrency. The items will likely be completed out of order, and if so, the stream of results will also be out of order.
concurrencyLevel
specifies an "eagerness factor", and that many actions will be started when this method is called. Forcing the stream will yield the results of completed actions, returning the results that finished first, and will block if none of the actions has yet completed.This method is useful for speeding up calculations over a stream where executing the actions (and processing their results) in order is not important. To implement a concurrent fold, first call
mapConcurrent
and then fold that stream. Similarly, concurrentforeachF
can be achieved by applyingmapConcurrent
and thenforeach
. - def mapF[B](f: (A) => Future[B]): AsyncStream[B]
Constructs a new stream by mapping each element of this stream to a Future action, evaluated from head to tail.
- final def ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- final def notify(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
- final def notifyAll(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
- def observe(): Future[(Seq[A], Option[Throwable])]
Attempts to transform the stream into a Seq, and in the case of failure,
observe
returns whatever was able to be transformed up to the point of failure along with the exception.Attempts to transform the stream into a Seq, and in the case of failure,
observe
returns whatever was able to be transformed up to the point of failure along with the exception. As a result, this Future never fails, and if there are errors they can be accessed via the Option.Note: forces the stream. For infinite streams, the future never resolves.
- def scanLeft[B](z: B)(f: (B, A) => B): AsyncStream[B]
Similar to foldLeft, but produces a stream from the result of each successive fold:
Similar to foldLeft, but produces a stream from the result of each successive fold:
AsyncStream(1, 2, ...).scanLeft(z)(f) == z +:: f(z, 1) +:: f(f(z, 1), 2) +:: ...
Note that for an
AsyncStream as
:as.scanLeft(z)(f).last == as.foldLeft(z)(f)
The resulting stream always begins with the initial value
z
, not subject to the fate of the underlying future, i.e.:val never = AsyncStream.fromFuture(Future.never) never.scanLeft(z)(f) == z +:: never // logical equality
- def size: Future[Int]
Eagerly consume the entire stream and return the number of elements that are in it.
Eagerly consume the entire stream and return the number of elements that are in it. If you hold a reference to the head of the stream, this will cause the entire stream to be held in memory.
Note: forces the stream. If the stream is infinite, the resulting future is equivalent to Future.never.
- def sum[B >: A](implicit numeric: Numeric[B]): Future[B]
Add up the values of all of the elements in this stream.
Add up the values of all of the elements in this stream. If you hold a reference to the head of the stream, this will cause the entire stream to be held in memory.
Note: forces the stream. If the stream is infinite, the resulting future is equivalent to Future.never.
- final def synchronized[T0](arg0: => T0): T0
- Definition Classes
- AnyRef
- def tail: Future[Option[AsyncStream[A]]]
Note: forces the first element of the tail.
- def take(n: Int): AsyncStream[A]
Returns the prefix of this stream of length
n
, or the stream itself ifn
is larger than the number of elements in the stream. - def takeWhile(p: (A) => Boolean): AsyncStream[A]
Given a predicate
p
, returns the longest prefix (possibly empty) of this stream that satisfesp
:Given a predicate
p
, returns the longest prefix (possibly empty) of this stream that satisfesp
:AsyncStream(1, 2, 3, 4, 1).takeWhile(_ < 3) = AsyncStream(1, 2) AsyncStream(1, 2, 3).takeWhile(_ < 5) = AsyncStream(1, 2, 3) AsyncStream(1, 2, 3).takeWhile(_ < 0) = AsyncStream.empty
- def toSeq(): Future[Seq[A]]
A Future of the stream realized as a list.
A Future of the stream realized as a list. This future completes when all elements of the stream are resolved.
Note: forces the entire stream. If one asynchronous call fails, it fails the aggregated result.
- def toString(): String
- Definition Classes
- AnyRef → Any
- def uncons: Future[Option[(A, () => AsyncStream[A])]]
The head and tail of this stream, if not empty.
The head and tail of this stream, if not empty. Note the tail thunk which preserves the tail's laziness.
empty.uncons == Future.None (a +:: m).uncons == Future.value(Some(a, () => m))
- final def wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException]) @native()
- def withEffect(f: (A) => Unit): AsyncStream[A]
Execute the specified effect as each element of the resulting stream is demanded.
Execute the specified effect as each element of the resulting stream is demanded. This method does not force the stream. Since the head of the stream is not lazy, the effect will happen to the first item in the stream (if any) right away.
The effects will occur as the resulting stream is demanded and will not occur if the original stream is demanded.
This is useful for e.g. counting the number of items that were consumed from a stream by a consuming process, regardless of whether the entire stream is consumed.
- def withFilter(f: (A) => Boolean): AsyncStream[A]
- See also
filter