Stream.ts overview
Since v2.0.0
Exports Grouped by Category
- combinators
- constants
- constructors
- acquireRelease
- async
- asyncEffect
- asyncPush
- asyncScoped
- concatAll
- die
- dieMessage
- dieSync
- empty
- execute
- fail
- failCause
- failCauseSync
- failSync
- finalizer
- fromAsyncIterable
- fromChannel
- fromChunk
- fromChunkPubSub
- fromChunkQueue
- fromChunks
- fromEffect
- fromEffectOption
- fromIterable
- fromIterableEffect
- fromIteratorSucceed
- fromPubSub
- fromPull
- fromQueue
- fromReadableStream
- fromReadableStreamByob
- fromSchedule
- fromTPubSub
- fromTQueue
- iterate
- make
- never
- paginate
- paginateChunk
- paginateChunkEffect
- paginateEffect
- range
- repeatEffect
- repeatEffectChunk
- repeatEffectChunkOption
- repeatEffectOption
- repeatEffectWithSchedule
- repeatValue
- scoped
- scopedWith
- succeed
- suspend
- sync
- tick
- toChannel
- unfold
- unfoldChunk
- unfoldChunkEffect
- unfoldEffect
- unwrap
- unwrapScoped
- unwrapScopedWith
- void
- whenCase
- context
- destructors
- run
- runCollect
- runCount
- runDrain
- runFold
- runFoldEffect
- runFoldScoped
- runFoldScopedEffect
- runFoldWhile
- runFoldWhileEffect
- runFoldWhileScoped
- runFoldWhileScopedEffect
- runForEach
- runForEachChunk
- runForEachChunkScoped
- runForEachScoped
- runForEachWhile
- runForEachWhileScoped
- runHead
- runIntoPubSub
- runIntoPubSubScoped
- runIntoQueue
- runIntoQueueElementsScoped
- runIntoQueueScoped
- runLast
- runScoped
- runSum
- toPubSub
- toPull
- toQueue
- toQueueOfElements
- toReadableStream
- toReadableStreamEffect
- toReadableStreamRuntime
- do notation
- elements
- encoding
- error handling
- filtering
- grouping
- mapping
- models
- racing
- sequencing
- symbols
- tracing
- type lambdas
- utils
- Stream (namespace)
- accumulate
- accumulateChunks
- aggregate
- aggregateWithin
- aggregateWithinEither
- broadcast
- broadcastDynamic
- broadcastedQueues
- broadcastedQueuesDynamic
- buffer
- bufferChunks
- changes
- changesWith
- changesWithEffect
- chunks
- chunksWith
- combine
- combineChunks
- concat
- cross
- crossLeft
- crossRight
- crossWith
- debounce
- distributedWith
- distributedWithDynamic
- drain
- drainFork
- drop
- dropRight
- dropUntil
- dropUntilEffect
- dropWhile
- dropWhileEffect
- either
- ensuring
- ensuringWith
- filterMap
- filterMapEffect
- filterMapWhile
- filterMapWhileEffect
- forever
- fromEventListener
- haltAfter
- haltWhen
- haltWhenDeferred
- identity
- interleave
- interleaveWith
- interruptAfter
- interruptWhen
- interruptWhenDeferred
- intersperse
- intersperseAffixes
- mapBoth
- merge
- mergeAll
- mergeEither
- mergeLeft
- mergeRight
- mergeWith
- mkString
- onDone
- onError
- partition
- partitionEither
- peel
- pipeThrough
- pipeThroughChannel
- pipeThroughChannelOrFail
- prepend
- rechunk
- repeat
- repeatEither
- repeatElements
- repeatElementsWith
- repeatWith
- retry
- scan
- scanEffect
- scanReduce
- scanReduceEffect
- schedule
- scheduleWith
- share
- sliding
- slidingSize
- some
- someOrElse
- someOrFail
- split
- splitOnChunk
- take
- takeRight
- takeUntil
- takeUntilEffect
- takeWhile
- tapErrorCause
- throttle
- throttleEffect
- timeout
- timeoutFail
- timeoutFailCause
- timeoutTo
- transduce
- when
- whenCaseEffect
- whenEffect
- zipping
combinators
mergeWithTag
Merges a struct of streams into a single stream of tagged values.
Example
import { Stream } from "effect"
// Stream.Stream<{ _tag: "a"; value: number; } | { _tag: "b"; value: string; }>
const res = Stream.mergeWithTag(
{
a: Stream.make(0),
b: Stream.make("")
},
{ concurrency: "unbounded" }
)
Signature
declare const mergeWithTag: {
<S extends { [k in string]: Stream<any, any, any> }>(
streams: S,
options: { readonly concurrency: number | "unbounded"; readonly bufferSize?: number | undefined }
): Stream<
{ [K in keyof S]: { _tag: K; value: Stream.Success<S[K]> } }[keyof S],
Stream.Error<S[keyof S]>,
Stream.Context<S[keyof S]>
>
(options: {
readonly concurrency: number | "unbounded"
readonly bufferSize?: number | undefined
}): <S extends { [k in string]: Stream<any, any, any> }>(
streams: S
) => Stream<
{ [K in keyof S]: { _tag: K; value: Stream.Success<S[K]> } }[keyof S],
Stream.Error<S[keyof S]>,
Stream.Context<S[keyof S]>
>
}
Since v3.8.5
splitLines
Splits strings on newlines. Handles both Windows newlines (\r\n
) and UNIX newlines (\n
).
Signature
declare const splitLines: <E, R>(self: Stream<string, E, R>) => Stream<string, E, R>
Since v2.0.0
constants
DefaultChunkSize
The default chunk size used by the various combinators and constructors of Stream
.
Signature
declare const DefaultChunkSize: number
Since v2.0.0
constructors
acquireRelease
Creates a stream from a single value that will get cleaned up after the stream is consumed.
Example
import { Console, Effect, Stream } from "effect"
// Simulating File operations
const open = (filename: string) =>
Effect.gen(function* () {
yield* Console.log(`Opening ${filename}`)
return {
getLines: Effect.succeed(["Line 1", "Line 2", "Line 3"]),
close: Console.log(`Closing ${filename}`)
}
})
const stream = Stream.acquireRelease(open("file.txt"), (file) => file.close).pipe(
Stream.flatMap((file) => file.getLines)
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// Opening file.txt
// Closing file.txt
// { _id: 'Chunk', values: [ [ 'Line 1', 'Line 2', 'Line 3' ] ] }
Signature
declare const acquireRelease: <A, E, R, R2, X>(
acquire: Effect.Effect<A, E, R>,
release: (resource: A, exit: Exit.Exit<unknown, unknown>) => Effect.Effect<X, never, R2>
) => Stream<A, E, R | R2>
Since v2.0.0
async
Creates a stream from an asynchronous callback that can be called multiple times. The optionality of the error type E
in Emit
can be used to signal the end of the stream by setting it to None
.
The registration function can optionally return an Effect
, which will be executed if the Fiber
executing this Effect is interrupted.
Example
import type { StreamEmit } from "effect"
import { Chunk, Effect, Option, Stream } from "effect"
const events = [1, 2, 3, 4]
const stream = Stream.async((emit: StreamEmit.Emit<never, never, number, void>) => {
events.forEach((n) => {
setTimeout(() => {
if (n === 3) {
emit(Effect.fail(Option.none())) // Terminate the stream
} else {
emit(Effect.succeed(Chunk.of(n))) // Add the current item to the stream
}
}, 100 * n)
})
})
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2 ] }
Signature
declare const async: <A, E = never, R = never>(
register: (emit: Emit.Emit<R, E, A, void>) => Effect.Effect<void, never, R> | void,
bufferSize?:
| number
| "unbounded"
| { readonly bufferSize?: number | undefined; readonly strategy?: "dropping" | "sliding" | "suspend" | undefined }
| undefined
) => Stream<A, E, R>
Since v2.0.0
asyncEffect
Creates a stream from an asynchronous callback that can be called multiple times The registration of the callback itself returns an effect. The optionality of the error type E
can be used to signal the end of the stream, by setting it to None
.
Signature
declare const asyncEffect: <A, E = never, R = never>(
register: (emit: Emit.Emit<R, E, A, void>) => Effect.Effect<unknown, E, R>,
bufferSize?:
| number
| "unbounded"
| { readonly bufferSize?: number | undefined; readonly strategy?: "dropping" | "sliding" | "suspend" | undefined }
| undefined
) => Stream<A, E, R>
Since v2.0.0
asyncPush
Creates a stream from an external push-based resource.
You can use the emit
helper to emit values to the stream. The emit
helper returns a boolean indicating whether the value was emitted or not.
You can also use the emit
helper to signal the end of the stream by using apis such as emit.end
or emit.fail
.
By default it uses an “unbounded” buffer size. You can customize the buffer size and strategy by passing an object as the second argument with the bufferSize
and strategy
fields.
Example
import { Effect, Stream } from "effect"
Stream.asyncPush<string>(
(emit) =>
Effect.acquireRelease(
Effect.gen(function* () {
yield* Effect.log("subscribing")
return setInterval(() => emit.single("tick"), 1000)
}),
(handle) =>
Effect.gen(function* () {
yield* Effect.log("unsubscribing")
clearInterval(handle)
})
),
{ bufferSize: 16, strategy: "dropping" }
)
Signature
declare const asyncPush: <A, E = never, R = never>(
register: (emit: Emit.EmitOpsPush<E, A>) => Effect.Effect<unknown, E, R | Scope.Scope>,
options?:
| { readonly bufferSize: "unbounded" }
| { readonly bufferSize?: number | undefined; readonly strategy?: "dropping" | "sliding" | undefined }
| undefined
) => Stream<A, E, Exclude<R, Scope.Scope>>
Since v3.6.0
asyncScoped
Creates a stream from an asynchronous callback that can be called multiple times. The registration of the callback itself returns an a scoped resource. The optionality of the error type E
can be used to signal the end of the stream, by setting it to None
.
Signature
declare const asyncScoped: <A, E = never, R = never>(
register: (emit: Emit.Emit<R, E, A, void>) => Effect.Effect<unknown, E, R | Scope.Scope>,
bufferSize?:
| number
| "unbounded"
| { readonly bufferSize?: number | undefined; readonly strategy?: "dropping" | "sliding" | "suspend" | undefined }
| undefined
) => Stream<A, E, Exclude<R, Scope.Scope>>
Since v2.0.0
concatAll
Concatenates all of the streams in the chunk to one stream.
Example
import { Chunk, Effect, Stream } from "effect"
const s1 = Stream.make(1, 2, 3)
const s2 = Stream.make(4, 5)
const s3 = Stream.make(6, 7, 8)
const stream = Stream.concatAll(Chunk.make(s1, s2, s3))
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// {
// _id: 'Chunk',
// values: [
// 1, 2, 3, 4,
// 5, 6, 7, 8
// ]
// }
Signature
declare const concatAll: <A, E, R>(streams: Chunk.Chunk<Stream<A, E, R>>) => Stream<A, E, R>
Since v2.0.0
die
The stream that dies with the specified defect.
Signature
declare const die: (defect: unknown) => Stream<never>
Since v2.0.0
dieMessage
The stream that dies with an exception described by message
.
Signature
declare const dieMessage: (message: string) => Stream<never>
Since v2.0.0
dieSync
The stream that dies with the specified lazily evaluated defect.
Signature
declare const dieSync: (evaluate: LazyArg<unknown>) => Stream<never>
Since v2.0.0
empty
The empty stream.
Example
import { Effect, Stream } from "effect"
const stream = Stream.empty
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [] }
Signature
declare const empty: Stream<never, never, never>
Since v2.0.0
execute
Creates a stream that executes the specified effect but emits no elements.
Signature
declare const execute: <X, E, R>(effect: Effect.Effect<X, E, R>) => Stream<never, E, R>
Since v2.0.0
fail
Terminates with the specified error.
Example
import { Effect, Stream } from "effect"
const stream = Stream.fail("Uh oh!")
Effect.runPromiseExit(Stream.runCollect(stream)).then(console.log)
// {
// _id: 'Exit',
// _tag: 'Failure',
// cause: { _id: 'Cause', _tag: 'Fail', failure: 'Uh oh!' }
// }
Signature
declare const fail: <E>(error: E) => Stream<never, E>
Since v2.0.0
failCause
The stream that always fails with the specified Cause
.
Signature
declare const failCause: <E>(cause: Cause.Cause<E>) => Stream<never, E>
Since v2.0.0
failCauseSync
The stream that always fails with the specified lazily evaluated Cause
.
Signature
declare const failCauseSync: <E>(evaluate: LazyArg<Cause.Cause<E>>) => Stream<never, E>
Since v2.0.0
failSync
Terminates with the specified lazily evaluated error.
Signature
declare const failSync: <E>(evaluate: LazyArg<E>) => Stream<never, E>
Since v2.0.0
finalizer
Creates a one-element stream that never fails and executes the finalizer when it ends.
Example
import { Console, Effect, Stream } from "effect"
const application = Stream.fromEffect(Console.log("Application Logic."))
const deleteDir = (dir: string) => Console.log(`Deleting dir: ${dir}`)
const program = application.pipe(
Stream.concat(
Stream.finalizer(deleteDir("tmp").pipe(Effect.andThen(Console.log("Temporary directory was deleted."))))
)
)
Effect.runPromise(Stream.runCollect(program)).then(console.log)
// Application Logic.
// Deleting dir: tmp
// Temporary directory was deleted.
// { _id: 'Chunk', values: [ undefined, undefined ] }
Signature
declare const finalizer: <R, X>(finalizer: Effect.Effect<X, never, R>) => Stream<void, never, R>
Since v2.0.0
fromAsyncIterable
Creates a stream from an AsyncIterable
.
Example
import { Effect, Stream } from "effect"
const myAsyncIterable = async function* () {
yield 1
yield 2
}
const stream = Stream.fromAsyncIterable(
myAsyncIterable(),
(e) => new Error(String(e)) // Error Handling
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2 ] }
Signature
declare const fromAsyncIterable: <A, E>(iterable: AsyncIterable<A>, onError: (e: unknown) => E) => Stream<A, E>
Since v2.0.0
fromChannel
Creates a stream from a Channel
.
Signature
declare const fromChannel: <A, E, R>(
channel: Channel.Channel<Chunk.Chunk<A>, unknown, E, unknown, unknown, unknown, R>
) => Stream<A, E, R>
Since v2.0.0
fromChunk
Creates a stream from a Chunk
of values.
Example
import { Chunk, Effect, Stream } from "effect"
// Creating a stream with values from a single Chunk
const stream = Stream.fromChunk(Chunk.make(1, 2, 3))
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3 ] }
Signature
declare const fromChunk: <A>(chunk: Chunk.Chunk<A>) => Stream<A>
Since v2.0.0
fromChunkPubSub
Creates a stream from a subscription to a PubSub
.
Options
shutdown
: Iftrue
, thePubSub
will be shutdown after the stream is evaluated (defaults tofalse
)
Signature
declare const fromChunkPubSub: {
<A>(
pubsub: PubSub.PubSub<Chunk.Chunk<A>>,
options: { readonly scoped: true; readonly shutdown?: boolean | undefined }
): Effect.Effect<Stream<A>, never, Scope.Scope>
<A>(
pubsub: PubSub.PubSub<Chunk.Chunk<A>>,
options?: { readonly scoped?: false | undefined; readonly shutdown?: boolean | undefined } | undefined
): Stream<A>
}
Since v2.0.0
fromChunkQueue
Creates a stream from a Queue
of values.
Options
shutdown
: Iftrue
, the queue will be shutdown after the stream is evaluated (defaults tofalse
)
Signature
declare const fromChunkQueue: <A>(
queue: Queue.Dequeue<Chunk.Chunk<A>>,
options?: { readonly shutdown?: boolean | undefined }
) => Stream<A>
Since v2.0.0
fromChunks
Creates a stream from an arbitrary number of chunks.
Example
import { Chunk, Effect, Stream } from "effect"
// Creating a stream with values from multiple Chunks
const stream = Stream.fromChunks(Chunk.make(1, 2, 3), Chunk.make(4, 5, 6))
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 4, 5, 6 ] }
Signature
declare const fromChunks: <A>(...chunks: Array<Chunk.Chunk<A>>) => Stream<A>
Since v2.0.0
fromEffect
Either emits the success value of this effect or terminates the stream with the failure value of this effect.
Example
import { Effect, Random, Stream } from "effect"
const stream = Stream.fromEffect(Random.nextInt)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// Example Output: { _id: 'Chunk', values: [ 922694024 ] }
Signature
declare const fromEffect: <A, E, R>(effect: Effect.Effect<A, E, R>) => Stream<A, E, R>
Since v2.0.0
fromEffectOption
Creates a stream from an effect producing a value of type A
or an empty Stream
.
Signature
declare const fromEffectOption: <A, E, R>(effect: Effect.Effect<A, Option.Option<E>, R>) => Stream<A, E, R>
Since v2.0.0
fromIterable
Creates a new Stream
from an iterable collection of values.
Example
import { Effect, Stream } from "effect"
const numbers = [1, 2, 3]
const stream = Stream.fromIterable(numbers)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3 ] }
Signature
declare const fromIterable: <A>(iterable: Iterable<A>) => Stream<A>
Since v2.0.0
fromIterableEffect
Creates a stream from an effect producing a value of type Iterable<A>
.
Example
import { Context, Effect, Stream } from "effect"
class Database extends Context.Tag("Database")<Database, { readonly getUsers: Effect.Effect<Array<string>> }>() {}
const getUsers = Database.pipe(Effect.andThen((_) => _.getUsers))
const stream = Stream.fromIterableEffect(getUsers)
Effect.runPromise(
Stream.runCollect(stream.pipe(Stream.provideService(Database, { getUsers: Effect.succeed(["user1", "user2"]) })))
).then(console.log)
// { _id: 'Chunk', values: [ 'user1', 'user2' ] }
Signature
declare const fromIterableEffect: <A, E, R>(effect: Effect.Effect<Iterable<A>, E, R>) => Stream<A, E, R>
Since v2.0.0
fromIteratorSucceed
Creates a stream from an iterator
Signature
declare const fromIteratorSucceed: <A>(iterator: IterableIterator<A>, maxChunkSize?: number) => Stream<A>
Since v2.0.0
fromPubSub
Creates a stream from a subscription to a PubSub
.
Options
shutdown
: Iftrue
, thePubSub
will be shutdown after the stream is evaluated (defaults tofalse
)
Signature
declare const fromPubSub: {
<A>(
pubsub: PubSub.PubSub<A>,
options: {
readonly scoped: true
readonly maxChunkSize?: number | undefined
readonly shutdown?: boolean | undefined
}
): Effect.Effect<Stream<A>, never, Scope.Scope>
<A>(
pubsub: PubSub.PubSub<A>,
options?:
| {
readonly scoped?: false | undefined
readonly maxChunkSize?: number | undefined
readonly shutdown?: boolean | undefined
}
| undefined
): Stream<A>
}
Since v2.0.0
fromPull
Creates a stream from an effect that pulls elements from another stream.
See Stream.toPull
for reference.
Signature
declare const fromPull: <R, R2, E, A>(
effect: Effect.Effect<Effect.Effect<Chunk.Chunk<A>, Option.Option<E>, R2>, never, Scope.Scope | R>
) => Stream<A, E, R2 | Exclude<R, Scope.Scope>>
Since v2.0.0
fromQueue
Creates a stream from a queue of values
Options
maxChunkSize
: The maximum number of queued elements to put in one chunk in the streamshutdown
: Iftrue
, the queue will be shutdown after the stream is evaluated (defaults tofalse
)
Signature
declare const fromQueue: <A>(
queue: Queue.Dequeue<A>,
options?: { readonly maxChunkSize?: number | undefined; readonly shutdown?: boolean | undefined }
) => Stream<A>
Since v2.0.0
fromReadableStream
Creates a stream from a ReadableStream
.
See https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream.
Signature
declare const fromReadableStream: {
<A, E>(options: {
readonly evaluate: LazyArg<ReadableStream<A>>
readonly onError: (error: unknown) => E
readonly releaseLockOnEnd?: boolean | undefined
}): Stream<A, E>
<A, E>(evaluate: LazyArg<ReadableStream<A>>, onError: (error: unknown) => E): Stream<A, E>
}
Since v2.0.0
fromReadableStreamByob
Creates a stream from a ReadableStreamBYOBReader
.
See https://developer.mozilla.org/en-US/docs/Web/API/ReadableStreamBYOBReader.
Signature
declare const fromReadableStreamByob: {
<E>(options: {
readonly evaluate: LazyArg<ReadableStream<Uint8Array>>
readonly onError: (error: unknown) => E
readonly bufferSize?: number | undefined
readonly releaseLockOnEnd?: boolean | undefined
}): Stream<Uint8Array, E>
<E>(
evaluate: LazyArg<ReadableStream<Uint8Array>>,
onError: (error: unknown) => E,
allocSize?: number
): Stream<Uint8Array, E>
}
Since v2.0.0
fromSchedule
Creates a stream from a Schedule
that does not require any further input. The stream will emit an element for each value output from the schedule, continuing for as long as the schedule continues.
Example
import { Effect, Schedule, Stream } from "effect"
// Emits values every 1 second for a total of 5 emissions
const schedule = Schedule.spaced("1 second").pipe(Schedule.compose(Schedule.recurs(5)))
const stream = Stream.fromSchedule(schedule)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 0, 1, 2, 3, 4 ] }
Signature
declare const fromSchedule: <A, R>(schedule: Schedule.Schedule<A, unknown, R>) => Stream<A, never, R>
Since v2.0.0
fromTPubSub
Creates a stream from a subscription to a TPubSub
.
Signature
declare const fromTPubSub: <A>(pubsub: TPubSub<A>) => Stream<A>
Since v3.10.0
fromTQueue
Creates a stream from a TQueue of values
Signature
declare const fromTQueue: <A>(queue: TDequeue<A>) => Stream<A>
Since v3.10.0
iterate
The infinite stream of iterative function application: a, f(a), f(f(a)), f(f(f(a))), …
Example
import { Effect, Stream } from "effect"
// An infinite Stream of numbers starting from 1 and incrementing
const stream = Stream.iterate(1, (n) => n + 1)
Effect.runPromise(Stream.runCollect(stream.pipe(Stream.take(10)))).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ] }
Signature
declare const iterate: <A>(value: A, next: (value: A) => A) => Stream<A>
Since v2.0.0
make
Creates a stream from an sequence of values.
Example
import { Effect, Stream } from "effect"
const stream = Stream.make(1, 2, 3)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3 ] }
Signature
declare const make: <As extends Array<any>>(...as: As) => Stream<As[number]>
Since v2.0.0
never
The stream that never produces any value or fails with any error.
Signature
declare const never: Stream<never, never, never>
Since v2.0.0
paginate
Like Stream.unfold
, but allows the emission of values to end one step further than the unfolding of the state. This is useful for embedding paginated APIs, hence the name.
Example
import { Effect, Option, Stream } from "effect"
const stream = Stream.paginate(0, (n) => [n, n < 3 ? Option.some(n + 1) : Option.none()])
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 0, 1, 2, 3 ] }
Signature
declare const paginate: <S, A>(s: S, f: (s: S) => readonly [A, Option.Option<S>]) => Stream<A>
Since v2.0.0
paginateChunk
Like Stream.unfoldChunk
, but allows the emission of values to end one step further than the unfolding of the state. This is useful for embedding paginated APIs, hence the name.
Signature
declare const paginateChunk: <S, A>(s: S, f: (s: S) => readonly [Chunk.Chunk<A>, Option.Option<S>]) => Stream<A>
Since v2.0.0
paginateChunkEffect
Like Stream.unfoldChunkEffect
, but allows the emission of values to end one step further than the unfolding of the state. This is useful for embedding paginated APIs, hence the name.
Signature
declare const paginateChunkEffect: <S, A, E, R>(
s: S,
f: (s: S) => Effect.Effect<readonly [Chunk.Chunk<A>, Option.Option<S>], E, R>
) => Stream<A, E, R>
Since v2.0.0
paginateEffect
Like Stream.unfoldEffect
but allows the emission of values to end one step further than the unfolding of the state. This is useful for embedding paginated APIs, hence the name.
Signature
declare const paginateEffect: <S, A, E, R>(
s: S,
f: (s: S) => Effect.Effect<readonly [A, Option.Option<S>], E, R>
) => Stream<A, E, R>
Since v2.0.0
range
Constructs a stream from a range of integers, including both endpoints.
Example
import { Effect, Stream } from "effect"
// A Stream with a range of numbers from 1 to 5
const stream = Stream.range(1, 5)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 4, 5 ] }
Signature
declare const range: (min: number, max: number, chunkSize?: number) => Stream<number>
Since v2.0.0
repeatEffect
Creates a stream from an effect producing a value of type A
which repeats forever.
Example
import { Effect, Random, Stream } from "effect"
const stream = Stream.repeatEffect(Random.nextInt)
Effect.runPromise(Stream.runCollect(stream.pipe(Stream.take(5)))).then(console.log)
// Example Output: { _id: 'Chunk', values: [ 3891571149, 4239494205, 2352981603, 2339111046, 1488052210 ] }
Signature
declare const repeatEffect: <A, E, R>(effect: Effect.Effect<A, E, R>) => Stream<A, E, R>
Since v2.0.0
repeatEffectChunk
Creates a stream from an effect producing chunks of A
values which repeats forever.
Signature
declare const repeatEffectChunk: <A, E, R>(effect: Effect.Effect<Chunk.Chunk<A>, E, R>) => Stream<A, E, R>
Since v2.0.0
repeatEffectChunkOption
Creates a stream from an effect producing chunks of A
values until it fails with None
.
Signature
declare const repeatEffectChunkOption: <A, E, R>(
effect: Effect.Effect<Chunk.Chunk<A>, Option.Option<E>, R>
) => Stream<A, E, R>
Since v2.0.0
repeatEffectOption
Creates a stream from an effect producing values of type A
until it fails with None
.
Example
// In this example, we're draining an Iterator to create a stream from it
import { Stream, Effect, Option } from "effect"
const drainIterator = <A>(it: Iterator<A>): Stream.Stream<A> =>
Stream.repeatEffectOption(
Effect.sync(() => it.next()).pipe(
Effect.andThen((res) => {
if (res.done) {
return Effect.fail(Option.none())
}
return Effect.succeed(res.value)
})
)
)
Signature
declare const repeatEffectOption: <A, E, R>(effect: Effect.Effect<A, Option.Option<E>, R>) => Stream<A, E, R>
Since v2.0.0
repeatEffectWithSchedule
Creates a stream from an effect producing a value of type A
, which is repeated using the specified schedule.
Signature
declare const repeatEffectWithSchedule: <A, E, R, X, A0 extends A, R2>(
effect: Effect.Effect<A, E, R>,
schedule: Schedule.Schedule<X, A0, R2>
) => Stream<A, E, R | R2>
Since v2.0.0
repeatValue
Repeats the provided value infinitely.
Example
import { Effect, Stream } from "effect"
const stream = Stream.repeatValue(0)
Effect.runPromise(Stream.runCollect(stream.pipe(Stream.take(5)))).then(console.log)
// { _id: 'Chunk', values: [ 0, 0, 0, 0, 0 ] }
Signature
declare const repeatValue: <A>(value: A) => Stream<A>
Since v2.0.0
scoped
Creates a single-valued stream from a scoped resource.
Example
import { Console, Effect, Stream } from "effect"
// Creating a single-valued stream from a scoped resource
const stream = Stream.scoped(Effect.acquireRelease(Console.log("acquire"), () => Console.log("release"))).pipe(
Stream.flatMap(() => Console.log("use"))
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// acquire
// use
// release
// { _id: 'Chunk', values: [ undefined ] }
Signature
declare const scoped: <A, E, R>(effect: Effect.Effect<A, E, R>) => Stream<A, E, Exclude<R, Scope.Scope>>
Since v2.0.0
scopedWith
Use a function that receives a scope and returns an effect to emit an output element. The output element will be the result of the returned effect, if successful.
Signature
declare const scopedWith: <A, E, R>(f: (scope: Scope.Scope) => Effect.Effect<A, E, R>) => Stream<A, E, R>
Since v3.11.0
succeed
Creates a single-valued pure stream.
Example
import { Effect, Stream } from "effect"
// A Stream with a single number
const stream = Stream.succeed(3)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 3 ] }
Signature
declare const succeed: <A>(value: A) => Stream<A>
Since v2.0.0
suspend
Returns a lazily constructed stream.
Signature
declare const suspend: <A, E, R>(stream: LazyArg<Stream<A, E, R>>) => Stream<A, E, R>
Since v2.0.0
sync
Creates a single-valued pure stream.
Signature
declare const sync: <A>(evaluate: LazyArg<A>) => Stream<A>
Since v2.0.0
tick
A stream that emits void values spaced by the specified duration.
Example
import { Effect, Stream } from "effect"
let last = Date.now()
const log = (message: string) =>
Effect.sync(() => {
const end = Date.now()
console.log(`${message} after ${end - last}ms`)
last = end
})
const stream = Stream.tick("1 seconds").pipe(Stream.tap(() => log("tick")))
Effect.runPromise(Stream.runCollect(stream.pipe(Stream.take(5)))).then(console.log)
// tick after 4ms
// tick after 1003ms
// tick after 1001ms
// tick after 1002ms
// tick after 1002ms
// { _id: 'Chunk', values: [ undefined, undefined, undefined, undefined, undefined ] }
Signature
declare const tick: (interval: Duration.DurationInput) => Stream<void>
Since v2.0.0
toChannel
Creates a channel from a Stream
.
Signature
declare const toChannel: <A, E, R>(
stream: Stream<A, E, R>
) => Channel.Channel<Chunk.Chunk<A>, unknown, E, unknown, unknown, unknown, R>
Since v2.0.0
unfold
Creates a stream by peeling off the “layers” of a value of type S
.
Example
import { Effect, Option, Stream } from "effect"
const stream = Stream.unfold(1, (n) => Option.some([n, n + 1]))
Effect.runPromise(Stream.runCollect(stream.pipe(Stream.take(5)))).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 4, 5 ] }
Signature
declare const unfold: <S, A>(s: S, f: (s: S) => Option.Option<readonly [A, S]>) => Stream<A>
Since v2.0.0
unfoldChunk
Creates a stream by peeling off the “layers” of a value of type S
.
Signature
declare const unfoldChunk: <S, A>(s: S, f: (s: S) => Option.Option<readonly [Chunk.Chunk<A>, S]>) => Stream<A>
Since v2.0.0
unfoldChunkEffect
Creates a stream by effectfully peeling off the “layers” of a value of type S
.
Signature
declare const unfoldChunkEffect: <S, A, E, R>(
s: S,
f: (s: S) => Effect.Effect<Option.Option<readonly [Chunk.Chunk<A>, S]>, E, R>
) => Stream<A, E, R>
Since v2.0.0
unfoldEffect
Creates a stream by effectfully peeling off the “layers” of a value of type S
.
Example
import { Effect, Option, Random, Stream } from "effect"
const stream = Stream.unfoldEffect(1, (n) =>
Random.nextBoolean.pipe(Effect.map((b) => (b ? Option.some([n, -n]) : Option.some([n, n]))))
)
Effect.runPromise(Stream.runCollect(stream.pipe(Stream.take(5)))).then(console.log)
// { _id: 'Chunk', values: [ 1, -1, -1, -1, -1 ] }
Signature
declare const unfoldEffect: <S, A, E, R>(
s: S,
f: (s: S) => Effect.Effect<Option.Option<readonly [A, S]>, E, R>
) => Stream<A, E, R>
Since v2.0.0
unwrap
Creates a stream produced from an Effect
.
Signature
declare const unwrap: <A, E2, R2, E, R>(effect: Effect.Effect<Stream<A, E2, R2>, E, R>) => Stream<A, E | E2, R | R2>
Since v2.0.0
unwrapScoped
Creates a stream produced from a scoped Effect
.
Signature
declare const unwrapScoped: <A, E2, R2, E, R>(
effect: Effect.Effect<Stream<A, E2, R2>, E, R>
) => Stream<A, E | E2, R2 | Exclude<R, Scope.Scope>>
Since v2.0.0
unwrapScopedWith
Creates a stream produced from a function which receives a Scope
and returns an Effect
. The resulting stream will emit a single element, which will be the result of the returned effect, if successful.
Signature
declare const unwrapScopedWith: <A, E2, R2, E, R>(
f: (scope: Scope.Scope) => Effect.Effect<Stream<A, E2, R2>, E, R>
) => Stream<A, E | E2, R | R2>
Since v3.11.0
void
A stream that contains a single void
value.
Example
import { Effect, Stream } from "effect"
const stream = Stream.void
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ undefined ] }
Signature
declare const void: Stream<void, never, never>
Since v2.0.0
whenCase
Returns the resulting stream when the given PartialFunction
is defined for the given value, otherwise returns an empty stream.
Signature
declare const whenCase: <A, A2, E, R>(
evaluate: LazyArg<A>,
pf: (a: A) => Option.Option<Stream<A2, E, R>>
) => Stream<A2, E, R>
Since v2.0.0
context
context
Accesses the whole context of the stream.
Signature
declare const context: <R>() => Stream<Context.Context<R>, never, R>
Since v2.0.0
contextWith
Accesses the context of the stream.
Signature
declare const contextWith: <R, A>(f: (env: Context.Context<R>) => A) => Stream<A, never, R>
Since v2.0.0
contextWithEffect
Accesses the context of the stream in the context of an effect.
Signature
declare const contextWithEffect: <R0, A, E, R>(
f: (env: Context.Context<R0>) => Effect.Effect<A, E, R>
) => Stream<A, E, R0 | R>
Since v2.0.0
contextWithStream
Accesses the context of the stream in the context of a stream.
Signature
declare const contextWithStream: <R0, A, E, R>(f: (env: Context.Context<R0>) => Stream<A, E, R>) => Stream<A, E, R0 | R>
Since v2.0.0
mapInputContext
Transforms the context being provided to the stream with the specified function.
Signature
declare const mapInputContext: {
<R0, R>(f: (env: Context.Context<R0>) => Context.Context<R>): <A, E>(self: Stream<A, E, R>) => Stream<A, E, R0>
<A, E, R0, R>(self: Stream<A, E, R>, f: (env: Context.Context<R0>) => Context.Context<R>): Stream<A, E, R0>
}
Since v2.0.0
provideContext
Provides the stream with its required context, which eliminates its dependency on R
.
Signature
declare const provideContext: {
<R>(context: Context.Context<R>): <A, E>(self: Stream<A, E, R>) => Stream<A, E>
<A, E, R>(self: Stream<A, E, R>, context: Context.Context<R>): Stream<A, E>
}
Since v2.0.0
provideLayer
Provides a Layer
to the stream, which translates it to another level.
Signature
declare const provideLayer: {
<RIn, E2, ROut>(layer: Layer.Layer<ROut, E2, RIn>): <A, E>(self: Stream<A, E, ROut>) => Stream<A, E2 | E, RIn>
<A, E, RIn, E2, ROut>(self: Stream<A, E, ROut>, layer: Layer.Layer<ROut, E2, RIn>): Stream<A, E | E2, RIn>
}
Since v2.0.0
provideService
Provides the stream with the single service it requires. If the stream requires more than one service use Stream.provideContext
instead.
Signature
declare const provideService: {
<I, S>(tag: Context.Tag<I, S>, resource: NoInfer<S>): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, Exclude<R, I>>
<A, E, R, I, S>(self: Stream<A, E, R>, tag: Context.Tag<I, S>, resource: NoInfer<S>): Stream<A, E, Exclude<R, I>>
}
Since v2.0.0
provideServiceEffect
Provides the stream with the single service it requires. If the stream requires more than one service use Stream.provideContext
instead.
Signature
declare const provideServiceEffect: {
<I, S, E2, R2>(
tag: Context.Tag<I, S>,
effect: Effect.Effect<NoInfer<S>, E2, R2>
): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, R2 | Exclude<R, I>>
<A, E, R, I, S, E2, R2>(
self: Stream<A, E, R>,
tag: Context.Tag<I, S>,
effect: Effect.Effect<NoInfer<S>, E2, R2>
): Stream<A, E2 | E, R2 | Exclude<R, I>>
}
Since v2.0.0
provideServiceStream
Provides the stream with the single service it requires. If the stream requires more than one service use Stream.provideContext
instead.
Signature
declare const provideServiceStream: {
<I, S, E2, R2>(
tag: Context.Tag<I, S>,
stream: Stream<NoInfer<S>, E2, R2>
): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, R2 | Exclude<R, I>>
<A, E, R, I, S, E2, R2>(
self: Stream<A, E, R>,
tag: Context.Tag<I, S>,
stream: Stream<NoInfer<S>, E2, R2>
): Stream<A, E2 | E, R2 | Exclude<R, I>>
}
Since v2.0.0
provideSomeLayer
Splits the context into two parts, providing one part using the specified layer and leaving the remainder R0
.
Signature
declare const provideSomeLayer: {
<RIn, E2, ROut>(
layer: Layer.Layer<ROut, E2, RIn>
): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, RIn | Exclude<R, ROut>>
<A, E, R, RIn, E2, ROut>(
self: Stream<A, E, R>,
layer: Layer.Layer<ROut, E2, RIn>
): Stream<A, E | E2, RIn | Exclude<R, ROut>>
}
Since v2.0.0
updateService
Updates the specified service within the context of the Stream
.
Signature
declare const updateService: {
<I, S>(
tag: Context.Tag<I, S>,
f: (service: NoInfer<S>) => NoInfer<S>
): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, I | R>
<A, E, R, I, S>(
self: Stream<A, E, R>,
tag: Context.Tag<I, S>,
f: (service: NoInfer<S>) => NoInfer<S>
): Stream<A, E, I | R>
}
Since v2.0.0
destructors
run
Runs the sink on the stream to produce either the sink’s result or an error.
Signature
declare const run: {
<A2, A, E2, R2>(
sink: Sink.Sink<A2, A, unknown, E2, R2>
): <E, R>(self: Stream<A, E, R>) => Effect.Effect<A2, E2 | E, Exclude<R | R2, Scope.Scope>>
<A, E, R, A2, E2, R2>(
self: Stream<A, E, R>,
sink: Sink.Sink<A2, A, unknown, E2, R2>
): Effect.Effect<A2, E | E2, Exclude<R | R2, Scope.Scope>>
}
Since v2.0.0
runCollect
Runs the stream and collects all of its elements to a chunk.
Signature
declare const runCollect: <A, E, R>(self: Stream<A, E, R>) => Effect.Effect<Chunk.Chunk<A>, E, R>
Since v2.0.0
runCount
Runs the stream and emits the number of elements processed
Signature
declare const runCount: <A, E, R>(self: Stream<A, E, R>) => Effect.Effect<number, E, R>
Since v2.0.0
runDrain
Runs the stream only for its effects. The emitted elements are discarded.
Signature
declare const runDrain: <A, E, R>(self: Stream<A, E, R>) => Effect.Effect<void, E, R>
Since v2.0.0
runFold
Executes a pure fold over the stream of values - reduces all elements in the stream to a value of type S
.
Signature
declare const runFold: {
<S, A>(s: S, f: (s: S, a: A) => S): <E, R>(self: Stream<A, E, R>) => Effect.Effect<S, E, R>
<A, E, R, S>(self: Stream<A, E, R>, s: S, f: (s: S, a: A) => S): Effect.Effect<S, E, R>
}
Since v2.0.0
runFoldEffect
Executes an effectful fold over the stream of values.
Signature
declare const runFoldEffect: {
<S, A, E2, R2>(
s: S,
f: (s: S, a: A) => Effect.Effect<S, E2, R2>
): <E, R>(self: Stream<A, E, R>) => Effect.Effect<S, E2 | E, Exclude<R | R2, Scope.Scope>>
<A, E, R, S, E2, R2>(
self: Stream<A, E, R>,
s: S,
f: (s: S, a: A) => Effect.Effect<S, E2, R2>
): Effect.Effect<S, E | E2, Exclude<R | R2, Scope.Scope>>
}
Since v2.0.0
runFoldScoped
Executes a pure fold over the stream of values. Returns a scoped value that represents the scope of the stream.
Signature
declare const runFoldScoped: {
<S, A>(s: S, f: (s: S, a: A) => S): <E, R>(self: Stream<A, E, R>) => Effect.Effect<S, E, Scope.Scope | R>
<A, E, R, S>(self: Stream<A, E, R>, s: S, f: (s: S, a: A) => S): Effect.Effect<S, E, Scope.Scope | R>
}
Since v2.0.0
runFoldScopedEffect
Executes an effectful fold over the stream of values. Returns a scoped value that represents the scope of the stream.
Signature
declare const runFoldScopedEffect: {
<S, A, E2, R2>(
s: S,
f: (s: S, a: A) => Effect.Effect<S, E2, R2>
): <E, R>(self: Stream<A, E, R>) => Effect.Effect<S, E2 | E, Scope.Scope | R2 | R>
<A, E, R, S, E2, R2>(
self: Stream<A, E, R>,
s: S,
f: (s: S, a: A) => Effect.Effect<S, E2, R2>
): Effect.Effect<S, E | E2, Scope.Scope | R | R2>
}
Since v2.0.0
runFoldWhile
Reduces the elements in the stream to a value of type S
. Stops the fold early when the condition is not fulfilled. Example:
Signature
declare const runFoldWhile: {
<S, A>(s: S, cont: Predicate<S>, f: (s: S, a: A) => S): <E, R>(self: Stream<A, E, R>) => Effect.Effect<S, E, R>
<A, E, R, S>(self: Stream<A, E, R>, s: S, cont: Predicate<S>, f: (s: S, a: A) => S): Effect.Effect<S, E, R>
}
Since v2.0.0
runFoldWhileEffect
Executes an effectful fold over the stream of values. Stops the fold early when the condition is not fulfilled.
Signature
declare const runFoldWhileEffect: {
<S, A, E2, R2>(
s: S,
cont: Predicate<S>,
f: (s: S, a: A) => Effect.Effect<S, E2, R2>
): <E, R>(self: Stream<A, E, R>) => Effect.Effect<S, E2 | E, Exclude<R | R2, Scope.Scope>>
<A, E, R, S, E2, R2>(
self: Stream<A, E, R>,
s: S,
cont: Predicate<S>,
f: (s: S, a: A) => Effect.Effect<S, E2, R2>
): Effect.Effect<S, E | E2, Exclude<R | R2, Scope.Scope>>
}
Since v2.0.0
runFoldWhileScoped
Executes a pure fold over the stream of values. Returns a scoped value that represents the scope of the stream. Stops the fold early when the condition is not fulfilled.
Signature
declare const runFoldWhileScoped: {
<S, A>(
s: S,
cont: Predicate<S>,
f: (s: S, a: A) => S
): <E, R>(self: Stream<A, E, R>) => Effect.Effect<S, E, Scope.Scope | R>
<A, E, R, S>(
self: Stream<A, E, R>,
s: S,
cont: Predicate<S>,
f: (s: S, a: A) => S
): Effect.Effect<S, E, Scope.Scope | R>
}
Since v2.0.0
runFoldWhileScopedEffect
Executes an effectful fold over the stream of values. Returns a scoped value that represents the scope of the stream. Stops the fold early when the condition is not fulfilled.
Signature
declare const runFoldWhileScopedEffect: {
<S, A, E2, R2>(
s: S,
cont: Predicate<S>,
f: (s: S, a: A) => Effect.Effect<S, E2, R2>
): <E, R>(self: Stream<A, E, R>) => Effect.Effect<S, E2 | E, R2 | R | Scope.Scope>
<A, E, R, S, E2, R2>(
self: Stream<A, E, R>,
s: S,
cont: Predicate<S>,
f: (s: S, a: A) => Effect.Effect<S, E2, R2>
): Effect.Effect<S, E | E2, Scope.Scope | R | R2>
}
Since v2.0.0
runForEach
Consumes all elements of the stream, passing them to the specified callback.
Signature
declare const runForEach: {
<A, X, E2, R2>(
f: (a: A) => Effect.Effect<X, E2, R2>
): <E, R>(self: Stream<A, E, R>) => Effect.Effect<void, E2 | E, R2 | R>
<A, E, R, X, E2, R2>(
self: Stream<A, E, R>,
f: (a: A) => Effect.Effect<X, E2, R2>
): Effect.Effect<void, E | E2, R | R2>
}
Since v2.0.0
runForEachChunk
Consumes all elements of the stream, passing them to the specified callback.
Signature
declare const runForEachChunk: {
<A, X, E2, R2>(
f: (a: Chunk.Chunk<A>) => Effect.Effect<X, E2, R2>
): <E, R>(self: Stream<A, E, R>) => Effect.Effect<void, E2 | E, R2 | R>
<A, E, R, X, E2, R2>(
self: Stream<A, E, R>,
f: (a: Chunk.Chunk<A>) => Effect.Effect<X, E2, R2>
): Effect.Effect<void, E | E2, R | R2>
}
Since v2.0.0
runForEachChunkScoped
Like Stream.runForEachChunk
, but returns a scoped effect so the finalization order can be controlled.
Signature
declare const runForEachChunkScoped: {
<A, X, E2, R2>(
f: (a: Chunk.Chunk<A>) => Effect.Effect<X, E2, R2>
): <E, R>(self: Stream<A, E, R>) => Effect.Effect<void, E2 | E, Scope.Scope | R2 | R>
<A, E, R, X, E2, R2>(
self: Stream<A, E, R>,
f: (a: Chunk.Chunk<A>) => Effect.Effect<X, E2, R2>
): Effect.Effect<void, E | E2, Scope.Scope | R | R2>
}
Since v2.0.0
runForEachScoped
Like Stream.forEach
, but returns a scoped effect so the finalization order can be controlled.
Signature
declare const runForEachScoped: {
<A, X, E2, R2>(
f: (a: A) => Effect.Effect<X, E2, R2>
): <E, R>(self: Stream<A, E, R>) => Effect.Effect<void, E2 | E, Scope.Scope | R2 | R>
<A, E, R, X, E2, R2>(
self: Stream<A, E, R>,
f: (a: A) => Effect.Effect<X, E2, R2>
): Effect.Effect<void, E | E2, Scope.Scope | R | R2>
}
Since v2.0.0
runForEachWhile
Consumes elements of the stream, passing them to the specified callback, and terminating consumption when the callback returns false
.
Signature
declare const runForEachWhile: {
<A, E2, R2>(
f: (a: A) => Effect.Effect<boolean, E2, R2>
): <E, R>(self: Stream<A, E, R>) => Effect.Effect<void, E2 | E, R2 | R>
<A, E, R, E2, R2>(
self: Stream<A, E, R>,
f: (a: A) => Effect.Effect<boolean, E2, R2>
): Effect.Effect<void, E | E2, R | R2>
}
Since v2.0.0
runForEachWhileScoped
Like Stream.runForEachWhile
, but returns a scoped effect so the finalization order can be controlled.
Signature
declare const runForEachWhileScoped: {
<A, E2, R2>(
f: (a: A) => Effect.Effect<boolean, E2, R2>
): <E, R>(self: Stream<A, E, R>) => Effect.Effect<void, E2 | E, Scope.Scope | R2 | R>
<A, E, R, E2, R2>(
self: Stream<A, E, R>,
f: (a: A) => Effect.Effect<boolean, E2, R2>
): Effect.Effect<void, E | E2, Scope.Scope | R | R2>
}
Since v2.0.0
runHead
Runs the stream to completion and yields the first value emitted by it, discarding the rest of the elements.
Signature
declare const runHead: <A, E, R>(self: Stream<A, E, R>) => Effect.Effect<Option.Option<A>, E, R>
Since v2.0.0
runIntoPubSub
Publishes elements of this stream to a PubSub
. Stream failure and ending will also be signalled.
Signature
declare const runIntoPubSub: {
<A, E>(pubsub: PubSub.PubSub<Take.Take<A, E>>): <R>(self: Stream<A, E, R>) => Effect.Effect<void, never, R>
<A, E, R>(self: Stream<A, E, R>, pubsub: PubSub.PubSub<Take.Take<A, E>>): Effect.Effect<void, never, R>
}
Since v2.0.0
runIntoPubSubScoped
Like Stream.runIntoPubSub
, but provides the result as a scoped effect to allow for scope composition.
Signature
declare const runIntoPubSubScoped: {
<A, E>(
pubsub: PubSub.PubSub<Take.Take<A, E>>
): <R>(self: Stream<A, E, R>) => Effect.Effect<void, never, Scope.Scope | R>
<A, E, R>(self: Stream<A, E, R>, pubsub: PubSub.PubSub<Take.Take<A, E>>): Effect.Effect<void, never, Scope.Scope | R>
}
Since v2.0.0
runIntoQueue
Enqueues elements of this stream into a queue. Stream failure and ending will also be signalled.
Signature
declare const runIntoQueue: {
<A, E>(queue: Queue.Enqueue<Take.Take<A, E>>): <R>(self: Stream<A, E, R>) => Effect.Effect<void, never, R>
<A, E, R>(self: Stream<A, E, R>, queue: Queue.Enqueue<Take.Take<A, E>>): Effect.Effect<void, never, R>
}
Since v2.0.0
runIntoQueueElementsScoped
Like Stream.runIntoQueue
, but provides the result as a scoped [[ZIO]] to allow for scope composition.
Signature
declare const runIntoQueueElementsScoped: {
<A, E>(
queue: Queue.Enqueue<Exit.Exit<A, Option.Option<E>>>
): <R>(self: Stream<A, E, R>) => Effect.Effect<void, never, Scope.Scope | R>
<A, E, R>(
self: Stream<A, E, R>,
queue: Queue.Enqueue<Exit.Exit<A, Option.Option<E>>>
): Effect.Effect<void, never, Scope.Scope | R>
}
Since v2.0.0
runIntoQueueScoped
Like Stream.runIntoQueue
, but provides the result as a scoped effect to allow for scope composition.
Signature
declare const runIntoQueueScoped: {
<A, E>(
queue: Queue.Enqueue<Take.Take<A, E>>
): <R>(self: Stream<A, E, R>) => Effect.Effect<void, never, Scope.Scope | R>
<A, E, R>(self: Stream<A, E, R>, queue: Queue.Enqueue<Take.Take<A, E>>): Effect.Effect<void, never, Scope.Scope | R>
}
Since v2.0.0
runLast
Runs the stream to completion and yields the last value emitted by it, discarding the rest of the elements.
Signature
declare const runLast: <A, E, R>(self: Stream<A, E, R>) => Effect.Effect<Option.Option<A>, E, R>
Since v2.0.0
runScoped
Signature
declare const runScoped: {
<A2, A, E2, R2>(
sink: Sink.Sink<A2, A, unknown, E2, R2>
): <E, R>(self: Stream<A, E, R>) => Effect.Effect<A2, E2 | E, Scope.Scope | R2 | R>
<A, E, R, A2, E2, R2>(
self: Stream<A, E, R>,
sink: Sink.Sink<A2, A, unknown, E2, R2>
): Effect.Effect<A2, E | E2, Scope.Scope | R | R2>
}
Since v2.0.0
runSum
Runs the stream to a sink which sums elements, provided they are Numeric.
Signature
declare const runSum: <E, R>(self: Stream<number, E, R>) => Effect.Effect<number, E, R>
Since v2.0.0
toPubSub
Converts the stream to a scoped PubSub
of chunks. After the scope is closed, the PubSub
will never again produce values and should be discarded.
Signature
declare const toPubSub: {
(
capacity:
| number
| { readonly capacity: "unbounded"; readonly replay?: number | undefined }
| {
readonly capacity: number
readonly strategy?: "sliding" | "dropping" | "suspend" | undefined
readonly replay?: number | undefined
}
): <A, E, R>(self: Stream<A, E, R>) => Effect.Effect<PubSub.PubSub<Take.Take<A, E>>, never, Scope.Scope | R>
<A, E, R>(
self: Stream<A, E, R>,
capacity:
| number
| { readonly capacity: "unbounded"; readonly replay?: number | undefined }
| {
readonly capacity: number
readonly strategy?: "sliding" | "dropping" | "suspend" | undefined
readonly replay?: number | undefined
}
): Effect.Effect<PubSub.PubSub<Take.Take<A, E>>, never, Scope.Scope | R>
}
Since v2.0.0
toPull
Returns in a scope a ZIO effect that can be used to repeatedly pull chunks from the stream. The pull effect fails with None when the stream is finished, or with Some error if it fails, otherwise it returns a chunk of the stream’s output.
Example
import { Effect, Stream } from "effect"
// Simulate a chunked stream
const stream = Stream.fromIterable([1, 2, 3, 4, 5]).pipe(Stream.rechunk(2))
const program = Effect.gen(function* () {
// Create an effect to get data chunks from the stream
const getChunk = yield* Stream.toPull(stream)
// Continuously fetch and process chunks
while (true) {
const chunk = yield* getChunk
console.log(chunk)
}
})
Effect.runPromise(Effect.scoped(program)).then(console.log, console.error)
// { _id: 'Chunk', values: [ 1, 2 ] }
// { _id: 'Chunk', values: [ 3, 4 ] }
// { _id: 'Chunk', values: [ 5 ] }
// (FiberFailure) Error: {
// "_id": "Option",
// "_tag": "None"
// }
Signature
declare const toPull: <A, E, R>(
self: Stream<A, E, R>
) => Effect.Effect<Effect.Effect<Chunk.Chunk<A>, Option.Option<E>, R>, never, Scope.Scope | R>
Since v2.0.0
toQueue
Converts the stream to a scoped queue of chunks. After the scope is closed, the queue will never again produce values and should be discarded.
Defaults to the “suspend” back pressure strategy with a capacity of 2.
Signature
declare const toQueue: {
(
options?:
| { readonly strategy?: "dropping" | "sliding" | "suspend" | undefined; readonly capacity?: number | undefined }
| { readonly strategy: "unbounded" }
| undefined
): <A, E, R>(self: Stream<A, E, R>) => Effect.Effect<Queue.Dequeue<Take.Take<A, E>>, never, Scope.Scope | R>
<A, E, R>(
self: Stream<A, E, R>,
options?:
| { readonly strategy?: "dropping" | "sliding" | "suspend" | undefined; readonly capacity?: number | undefined }
| { readonly strategy: "unbounded" }
| undefined
): Effect.Effect<Queue.Dequeue<Take.Take<A, E>>, never, Scope.Scope | R>
}
Since v2.0.0
toQueueOfElements
Converts the stream to a scoped queue of elements. After the scope is closed, the queue will never again produce values and should be discarded.
Defaults to a capacity of 2.
Signature
declare const toQueueOfElements: {
(
options?: { readonly capacity?: number | undefined } | undefined
): <A, E, R>(
self: Stream<A, E, R>
) => Effect.Effect<Queue.Dequeue<Exit.Exit<A, Option.Option<E>>>, never, Scope.Scope | R>
<A, E, R>(
self: Stream<A, E, R>,
options?: { readonly capacity?: number | undefined } | undefined
): Effect.Effect<Queue.Dequeue<Exit.Exit<A, Option.Option<E>>>, never, Scope.Scope | R>
}
Since v2.0.0
toReadableStream
Converts the stream to a ReadableStream
.
See https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream.
Signature
declare const toReadableStream: {
<A>(options?: { readonly strategy?: QueuingStrategy<A> | undefined }): <E>(self: Stream<A, E>) => ReadableStream<A>
<A, E>(self: Stream<A, E>, options?: { readonly strategy?: QueuingStrategy<A> | undefined }): ReadableStream<A>
}
Since v2.0.0
toReadableStreamEffect
Converts the stream to a Effect<ReadableStream>
.
See https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream.
Signature
declare const toReadableStreamEffect: {
<A>(options?: {
readonly strategy?: QueuingStrategy<A> | undefined
}): <E, R>(self: Stream<A, E, R>) => Effect.Effect<ReadableStream<A>, never, R>
<A, E, R>(
self: Stream<A, E, R>,
options?: { readonly strategy?: QueuingStrategy<A> | undefined }
): Effect.Effect<ReadableStream<A>, never, R>
}
Since v2.0.0
toReadableStreamRuntime
Converts the stream to a ReadableStream
using the provided runtime.
See https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream.
Signature
declare const toReadableStreamRuntime: {
<A, XR>(
runtime: Runtime<XR>,
options?: { readonly strategy?: QueuingStrategy<A> | undefined }
): <E, R extends XR>(self: Stream<A, E, R>) => ReadableStream<A>
<A, E, XR, R extends XR>(
self: Stream<A, E, R>,
runtime: Runtime<XR>,
options?: { readonly strategy?: QueuingStrategy<A> | undefined }
): ReadableStream<A>
}
Since v2.0.0
do notation
Do
The “do simulation” in Effect allows you to write code in a more declarative style, similar to the “do notation” in other programming languages. It provides a way to define variables and perform operations on them using functions like bind
and let
.
Here’s how the do simulation works:
- Start the do simulation using the
Do
value - Within the do simulation scope, you can use the
bind
function to define variables and bind them toStream
values - You can accumulate multiple
bind
statements to define multiple variables within the scope - Inside the do simulation scope, you can also use the
let
function to define variables and bind them to simple values
Example
import * as assert from "node:assert"
import { Chunk, Effect, pipe, Stream } from "effect"
const result = pipe(
Stream.Do,
Stream.bind("x", () => Stream.succeed(2)),
Stream.bind("y", () => Stream.succeed(3)),
Stream.let("sum", ({ x, y }) => x + y)
)
assert.deepStrictEqual(Effect.runSync(Stream.runCollect(result)), Chunk.of({ x: 2, y: 3, sum: 5 }))
See
bindTo
bind
bindEffect
let
Signature
declare const Do: Stream<{}, never, never>
Since v2.0.0
bind
The “do simulation” in Effect allows you to write code in a more declarative style, similar to the “do notation” in other programming languages. It provides a way to define variables and perform operations on them using functions like bind
and let
.
Here’s how the do simulation works:
- Start the do simulation using the
Do
value - Within the do simulation scope, you can use the
bind
function to define variables and bind them toStream
values - You can accumulate multiple
bind
statements to define multiple variables within the scope - Inside the do simulation scope, you can also use the
let
function to define variables and bind them to simple values
Example
import * as assert from "node:assert"
import { Chunk, Effect, pipe, Stream } from "effect"
const result = pipe(
Stream.Do,
Stream.bind("x", () => Stream.succeed(2)),
Stream.bind("y", () => Stream.succeed(3)),
Stream.let("sum", ({ x, y }) => x + y)
)
assert.deepStrictEqual(Effect.runSync(Stream.runCollect(result)), Chunk.of({ x: 2, y: 3, sum: 5 }))
See
Do
bindTo
bindEffect
let
Signature
declare const bind: {
<N extends string, A, B, E2, R2>(
tag: Exclude<N, keyof A>,
f: (_: NoInfer<A>) => Stream<B, E2, R2>,
options?:
| { readonly concurrency?: number | "unbounded" | undefined; readonly bufferSize?: number | undefined }
| undefined
): <E, R>(self: Stream<A, E, R>) => Stream<{ [K in N | keyof A]: K extends keyof A ? A[K] : B }, E2 | E, R2 | R>
<A, E, R, N extends string, B, E2, R2>(
self: Stream<A, E, R>,
tag: Exclude<N, keyof A>,
f: (_: NoInfer<A>) => Stream<B, E2, R2>,
options?:
| { readonly concurrency?: number | "unbounded" | undefined; readonly bufferSize?: number | undefined }
| undefined
): Stream<{ [K in N | keyof A]: K extends keyof A ? A[K] : B }, E | E2, R | R2>
}
Since v2.0.0
bindEffect
Binds an effectful value in a do
scope
See
Do
bindTo
bind
let
Signature
declare const bindEffect: {
<N extends string, A, B, E2, R2>(
tag: Exclude<N, keyof A>,
f: (_: NoInfer<A>) => Effect.Effect<B, E2, R2>,
options?: { readonly concurrency?: number | "unbounded" | undefined; readonly bufferSize?: number | undefined }
): <E, R>(self: Stream<A, E, R>) => Stream<{ [K in keyof A | N]: K extends keyof A ? A[K] : B }, E | E2, R | R2>
<A, E, R, N extends string, B, E2, R2>(
self: Stream<A, E, R>,
tag: Exclude<N, keyof A>,
f: (_: NoInfer<A>) => Effect.Effect<B, E2, R2>,
options?: { readonly concurrency?: number | "unbounded" | undefined; readonly unordered?: boolean | undefined }
): Stream<{ [K in keyof A | N]: K extends keyof A ? A[K] : B }, E | E2, R | R2>
}
Since v2.0.0
bindTo
The “do simulation” in Effect allows you to write code in a more declarative style, similar to the “do notation” in other programming languages. It provides a way to define variables and perform operations on them using functions like bind
and let
.
Here’s how the do simulation works:
- Start the do simulation using the
Do
value - Within the do simulation scope, you can use the
bind
function to define variables and bind them toStream
values - You can accumulate multiple
bind
statements to define multiple variables within the scope - Inside the do simulation scope, you can also use the
let
function to define variables and bind them to simple values
Example
import * as assert from "node:assert"
import { Chunk, Effect, pipe, Stream } from "effect"
const result = pipe(
Stream.Do,
Stream.bind("x", () => Stream.succeed(2)),
Stream.bind("y", () => Stream.succeed(3)),
Stream.let("sum", ({ x, y }) => x + y)
)
assert.deepStrictEqual(Effect.runSync(Stream.runCollect(result)), Chunk.of({ x: 2, y: 3, sum: 5 }))
See
Do
bind
bindEffect
let
Signature
declare const bindTo: {
<N extends string>(name: N): <A, E, R>(self: Stream<A, E, R>) => Stream<{ [K in N]: A }, E, R>
<A, E, R, N extends string>(self: Stream<A, E, R>, name: N): Stream<{ [K in N]: A }, E, R>
}
Since v2.0.0
let
The “do simulation” in Effect allows you to write code in a more declarative style, similar to the “do notation” in other programming languages. It provides a way to define variables and perform operations on them using functions like bind
and let
.
Here’s how the do simulation works:
- Start the do simulation using the
Do
value - Within the do simulation scope, you can use the
bind
function to define variables and bind them toStream
values - You can accumulate multiple
bind
statements to define multiple variables within the scope - Inside the do simulation scope, you can also use the
let
function to define variables and bind them to simple values
Example
import * as assert from "node:assert"
import { Chunk, Effect, pipe, Stream } from "effect"
const result = pipe(
Stream.Do,
Stream.bind("x", () => Stream.succeed(2)),
Stream.bind("y", () => Stream.succeed(3)),
Stream.let("sum", ({ x, y }) => x + y)
)
assert.deepStrictEqual(Effect.runSync(Stream.runCollect(result)), Chunk.of({ x: 2, y: 3, sum: 5 }))
See
Do
bindTo
bind
bindEffect
Signature
declare const let: {
<N extends string, A extends object, B>(
name: Exclude<N, keyof A>,
f: (a: NoInfer<A>) => B
): <E, R>(self: Stream<A, E, R>) => Stream<{ [K in N | keyof A]: K extends keyof A ? A[K] : B }, E, R>
<A extends object, E, R, N extends string, B>(
self: Stream<A, E, R>,
name: Exclude<N, keyof A>,
f: (a: NoInfer<A>) => B
): Stream<{ [K in N | keyof A]: K extends keyof A ? A[K] : B }, E, R>
}
Since v2.0.0
elements
find
Finds the first element emitted by this stream that satisfies the provided predicate.
Signature
declare const find: {
<A, B extends A>(refinement: Refinement<NoInfer<A>, B>): <E, R>(self: Stream<A, E, R>) => Stream<B, E, R>
<A>(predicate: Predicate<NoInfer<A>>): <E, R>(self: Stream<A, E, R>) => Stream<A, E, R>
<A, E, R, B extends A>(self: Stream<A, E, R>, refinement: Refinement<A, B>): Stream<B, E, R>
<A, E, R>(self: Stream<A, E, R>, predicate: Predicate<A>): Stream<A, E, R>
}
Since v2.0.0
findEffect
Finds the first element emitted by this stream that satisfies the provided effectful predicate.
Signature
declare const findEffect: {
<A, E2, R2>(
predicate: (a: NoInfer<A>) => Effect.Effect<boolean, E2, R2>
): <E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, R2 | R>
<A, E, R, E2, R2>(
self: Stream<A, E, R>,
predicate: (a: NoInfer<A>) => Effect.Effect<boolean, E2, R2>
): Stream<A, E | E2, R | R2>
}
Since v2.0.0
encoding
decodeText
Decode Uint8Array chunks into a stream of strings using the specified encoding.
Signature
declare const decodeText: {
(encoding?: string | undefined): <E, R>(self: Stream<Uint8Array, E, R>) => Stream<string, E, R>
<E, R>(self: Stream<Uint8Array, E, R>, encoding?: string | undefined): Stream<string, E, R>
}
Since v2.0.0
encodeText
Encode a stream of strings into a stream of Uint8Array chunks using the specified encoding.
Signature
declare const encodeText: <E, R>(self: Stream<string, E, R>) => Stream<Uint8Array, E, R>
Since v2.0.0
error handling
catchAll
Switches over to the stream produced by the provided function in case this one fails with a typed error.
Signature
declare const catchAll: {
<E, A2, E2, R2>(f: (error: E) => Stream<A2, E2, R2>): <A, R>(self: Stream<A, E, R>) => Stream<A2 | A, E2, R2 | R>
<A, E, R, A2, E2, R2>(self: Stream<A, E, R>, f: (error: E) => Stream<A2, E2, R2>): Stream<A | A2, E2, R | R2>
}
Since v2.0.0
catchAllCause
Switches over to the stream produced by the provided function in case this one fails. Allows recovery from all causes of failure, including interruption if the stream is uninterruptible.
Signature
declare const catchAllCause: {
<E, A2, E2, R2>(
f: (cause: Cause.Cause<E>) => Stream<A2, E2, R2>
): <A, R>(self: Stream<A, E, R>) => Stream<A2 | A, E2, R2 | R>
<A, E, R, A2, E2, R2>(
self: Stream<A, E, R>,
f: (cause: Cause.Cause<E>) => Stream<A2, E2, R2>
): Stream<A | A2, E2, R | R2>
}
Since v2.0.0
catchSome
Switches over to the stream produced by the provided function in case this one fails with some typed error.
Signature
declare const catchSome: {
<E, A2, E2, R2>(
pf: (error: E) => Option.Option<Stream<A2, E2, R2>>
): <A, R>(self: Stream<A, E, R>) => Stream<A2 | A, E | E2, R2 | R>
<A, E, R, A2, E2, R2>(
self: Stream<A, E, R>,
pf: (error: E) => Option.Option<Stream<A2, E2, R2>>
): Stream<A | A2, E | E2, R | R2>
}
Since v2.0.0
catchSomeCause
Switches over to the stream produced by the provided function in case this one fails with some errors. Allows recovery from all causes of failure, including interruption if the stream is uninterruptible.
Signature
declare const catchSomeCause: {
<E, A2, E2, R2>(
pf: (cause: Cause.Cause<E>) => Option.Option<Stream<A2, E2, R2>>
): <A, R>(self: Stream<A, E, R>) => Stream<A2 | A, E | E2, R2 | R>
<A, E, R, A2, E2, R2>(
self: Stream<A, E, R>,
pf: (cause: Cause.Cause<E>) => Option.Option<Stream<A2, E2, R2>>
): Stream<A | A2, E | E2, R | R2>
}
Since v2.0.0
catchTag
Switches over to the stream produced by the provided function in case this one fails with an error matching the given _tag
.
Signature
declare const catchTag: {
<K extends E["_tag"] & string, E extends { _tag: string }, A1, E1, R1>(
k: K,
f: (e: Extract<E, { _tag: K }>) => Stream<A1, E1, R1>
): <A, R>(self: Stream<A, E, R>) => Stream<A1 | A, E1 | Exclude<E, { _tag: K }>, R1 | R>
<A, E extends { _tag: string }, R, K extends E["_tag"] & string, A1, E1, R1>(
self: Stream<A, E, R>,
k: K,
f: (e: Extract<E, { _tag: K }>) => Stream<A1, E1, R1>
): Stream<A | A1, E1 | Exclude<E, { _tag: K }>, R | R1>
}
Since v2.0.0
catchTags
Switches over to the stream produced by one of the provided functions, in case this one fails with an error matching one of the given _tag
’s.
Signature
declare const catchTags: {
<
E extends { _tag: string },
Cases extends { [K in E["_tag"]]+?: (error: Extract<E, { _tag: K }>) => Stream<any, any, any> }
>(
cases: Cases
): <A, R>(
self: Stream<A, E, R>
) => Stream<
| A
| {
[K in keyof Cases]: Cases[K] extends (...args: Array<any>) => Stream.Variance<infer A, infer _E, infer _R>
? A
: never
}[keyof Cases],
| Exclude<E, { _tag: keyof Cases }>
| {
[K in keyof Cases]: Cases[K] extends (...args: Array<any>) => Stream.Variance<infer _A, infer E, infer _R>
? E
: never
}[keyof Cases],
| R
| {
[K in keyof Cases]: Cases[K] extends (...args: Array<any>) => Stream.Variance<infer _A, infer _E, infer R>
? R
: never
}[keyof Cases]
>
<
A,
E extends { _tag: string },
R,
Cases extends { [K in E["_tag"]]+?: (error: Extract<E, { _tag: K }>) => Stream<any, any, any> }
>(
self: Stream<A, E, R>,
cases: Cases
): Stream<
| A
| {
[K in keyof Cases]: Cases[K] extends (...args: Array<any>) => Stream.Variance<infer _R, infer _E, infer A>
? A
: never
}[keyof Cases],
| Exclude<E, { _tag: keyof Cases }>
| {
[K in keyof Cases]: Cases[K] extends (...args: Array<any>) => Stream.Variance<infer _R, infer E, infer _A>
? E
: never
}[keyof Cases],
| R
| {
[K in keyof Cases]: Cases[K] extends (...args: Array<any>) => Stream.Variance<infer R, infer _E, infer _A>
? R
: never
}[keyof Cases]
>
}
Since v2.0.0
orDie
Translates any failure into a stream termination, making the stream infallible and all failures unchecked.
Signature
declare const orDie: <A, E, R>(self: Stream<A, E, R>) => Stream<A, never, R>
Since v2.0.0
orDieWith
Keeps none of the errors, and terminates the stream with them, using the specified function to convert the E
into a defect.
Signature
declare const orDieWith: {
<E>(f: (e: E) => unknown): <A, R>(self: Stream<A, E, R>) => Stream<A, never, R>
<A, E, R>(self: Stream<A, E, R>, f: (e: E) => unknown): Stream<A, never, R>
}
Since v2.0.0
orElse
Switches to the provided stream in case this one fails with a typed error.
See also Stream.catchAll
.
Signature
declare const orElse: {
<A2, E2, R2>(that: LazyArg<Stream<A2, E2, R2>>): <A, E, R>(self: Stream<A, E, R>) => Stream<A2 | A, E2, R2 | R>
<A, E, R, A2, E2, R2>(self: Stream<A, E, R>, that: LazyArg<Stream<A2, E2, R2>>): Stream<A | A2, E2, R | R2>
}
Since v2.0.0
orElseEither
Switches to the provided stream in case this one fails with a typed error.
See also Stream.catchAll
.
Signature
declare const orElseEither: {
<A2, E2, R2>(
that: LazyArg<Stream<A2, E2, R2>>
): <A, E, R>(self: Stream<A, E, R>) => Stream<Either.Either<A2, A>, E2, R2 | R>
<A, E, R, A2, E2, R2>(
self: Stream<A, E, R>,
that: LazyArg<Stream<A2, E2, R2>>
): Stream<Either.Either<A2, A>, E2, R | R2>
}
Since v2.0.0
orElseFail
Fails with given error in case this one fails with a typed error.
See also Stream.catchAll
.
Signature
declare const orElseFail: {
<E2>(error: LazyArg<E2>): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E2, R>
<A, E, R, E2>(self: Stream<A, E, R>, error: LazyArg<E2>): Stream<A, E2, R>
}
Since v2.0.0
orElseIfEmpty
Produces the specified element if this stream is empty.
Signature
declare const orElseIfEmpty: {
<A2>(element: LazyArg<A2>): <A, E, R>(self: Stream<A, E, R>) => Stream<A2 | A, E, R>
<A, E, R, A2>(self: Stream<A, E, R>, element: LazyArg<A2>): Stream<A | A2, E, R>
}
Since v2.0.0
orElseIfEmptyChunk
Produces the specified chunk if this stream is empty.
Signature
declare const orElseIfEmptyChunk: {
<A2>(chunk: LazyArg<Chunk.Chunk<A2>>): <A, E, R>(self: Stream<A, E, R>) => Stream<A2 | A, E, R>
<A, E, R, A2>(self: Stream<A, E, R>, chunk: LazyArg<Chunk.Chunk<A2>>): Stream<A | A2, E, R>
}
Since v2.0.0
orElseIfEmptyStream
Switches to the provided stream in case this one is empty.
Signature
declare const orElseIfEmptyStream: {
<A2, E2, R2>(stream: LazyArg<Stream<A2, E2, R2>>): <A, E, R>(self: Stream<A, E, R>) => Stream<A2 | A, E2 | E, R2 | R>
<A, E, R, A2, E2, R2>(self: Stream<A, E, R>, stream: LazyArg<Stream<A2, E2, R2>>): Stream<A | A2, E | E2, R | R2>
}
Since v2.0.0
orElseSucceed
Succeeds with the specified value if this one fails with a typed error.
Signature
declare const orElseSucceed: {
<A2>(value: LazyArg<A2>): <A, E, R>(self: Stream<A, E, R>) => Stream<A2 | A, never, R>
<A, E, R, A2>(self: Stream<A, E, R>, value: LazyArg<A2>): Stream<A | A2, never, R>
}
Since v2.0.0
refineOrDie
Keeps some of the errors, and terminates the fiber with the rest
Signature
declare const refineOrDie: {
<E, E2>(pf: (error: E) => Option.Option<E2>): <A, R>(self: Stream<A, E, R>) => Stream<A, E2, R>
<A, E, R, E2>(self: Stream<A, E, R>, pf: (error: E) => Option.Option<E2>): Stream<A, E2, R>
}
Since v2.0.0
refineOrDieWith
Keeps some of the errors, and terminates the fiber with the rest, using the specified function to convert the E
into a defect.
Signature
declare const refineOrDieWith: {
<E, E2>(
pf: (error: E) => Option.Option<E2>,
f: (error: E) => unknown
): <A, R>(self: Stream<A, E, R>) => Stream<A, E2, R>
<A, E, R, E2>(self: Stream<A, E, R>, pf: (error: E) => Option.Option<E2>, f: (error: E) => unknown): Stream<A, E2, R>
}
Since v2.0.0
filtering
filter
Filters the elements emitted by this stream using the provided function.
Example
import { Effect, Stream } from "effect"
const stream = Stream.range(1, 11).pipe(Stream.filter((n) => n % 2 === 0))
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 2, 4, 6, 8, 10 ] }
Signature
declare const filter: {
<A, B extends A>(refinement: Refinement<NoInfer<A>, B>): <E, R>(self: Stream<A, E, R>) => Stream<B, E, R>
<A, B extends A>(predicate: Predicate<B>): <E, R>(self: Stream<A, E, R>) => Stream<A, E, R>
<A, E, R, B extends A>(self: Stream<A, E, R>, refinement: Refinement<A, B>): Stream<B, E, R>
<A, E, R>(self: Stream<A, E, R>, predicate: Predicate<A>): Stream<A, E, R>
}
Since v2.0.0
filterEffect
Effectfully filters the elements emitted by this stream.
Signature
declare const filterEffect: {
<A, E2, R2>(
f: (a: NoInfer<A>) => Effect.Effect<boolean, E2, R2>
): <E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, R2 | R>
<A, E, R, E2, R2>(self: Stream<A, E, R>, f: (a: A) => Effect.Effect<boolean, E2, R2>): Stream<A, E | E2, R | R2>
}
Since v2.0.0
grouping
groupAdjacentBy
Creates a pipeline that groups on adjacent keys, calculated by the specified function.
Signature
declare const groupAdjacentBy: {
<A, K>(f: (a: A) => K): <E, R>(self: Stream<A, E, R>) => Stream<[K, Chunk.NonEmptyChunk<A>], E, R>
<A, E, R, K>(self: Stream<A, E, R>, f: (a: A) => K): Stream<[K, Chunk.NonEmptyChunk<A>], E, R>
}
Since v2.0.0
groupBy
More powerful version of Stream.groupByKey
.
Example
import { Chunk, Effect, GroupBy, Stream } from "effect"
const groupByKeyResult = Stream.fromIterable([
"Mary",
"James",
"Robert",
"Patricia",
"John",
"Jennifer",
"Rebecca",
"Peter"
]).pipe(Stream.groupBy((name) => Effect.succeed([name.substring(0, 1), name])))
const stream = GroupBy.evaluate(groupByKeyResult, (key, stream) =>
Stream.fromEffect(Stream.runCollect(stream).pipe(Effect.andThen((chunk) => [key, Chunk.size(chunk)] as const)))
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// {
// _id: 'Chunk',
// values: [ [ 'M', 1 ], [ 'J', 3 ], [ 'R', 2 ], [ 'P', 2 ] ]
// }
Signature
declare const groupBy: {
<A, K, V, E2, R2>(
f: (a: A) => Effect.Effect<readonly [K, V], E2, R2>,
options?: { readonly bufferSize?: number | undefined } | undefined
): <E, R>(self: Stream<A, E, R>) => GroupBy.GroupBy<K, V, E2 | E, R2 | R>
<A, E, R, K, V, E2, R2>(
self: Stream<A, E, R>,
f: (a: A) => Effect.Effect<readonly [K, V], E2, R2>,
options?: { readonly bufferSize?: number | undefined } | undefined
): GroupBy.GroupBy<K, V, E | E2, R | R2>
}
Since v2.0.0
groupByKey
Partition a stream using a function and process each stream individually. This returns a data structure that can be used to further filter down which groups shall be processed.
After calling apply on the GroupBy object, the remaining groups will be processed in parallel and the resulting streams merged in a nondeterministic fashion.
Up to buffer
elements may be buffered in any group stream before the producer is backpressured. Take care to consume from all streams in order to prevent deadlocks.
For example, to collect the first 2 words for every starting letter from a stream of words:
import { pipe, GroupBy, Stream } from "effect"
pipe(
Stream.fromIterable(["hello", "world", "hi", "holla"]),
Stream.groupByKey((word) => word[0]),
GroupBy.evaluate((key, stream) =>
pipe(
stream,
Stream.take(2),
Stream.map((words) => [key, words] as const)
)
)
)
Signature
declare const groupByKey: {
<A, K>(
f: (a: A) => K,
options?: { readonly bufferSize?: number | undefined }
): <E, R>(self: Stream<A, E, R>) => GroupBy.GroupBy<K, A, E, R>
<A, E, R, K>(
self: Stream<A, E, R>,
f: (a: A) => K,
options?: { readonly bufferSize?: number | undefined }
): GroupBy.GroupBy<K, A, E, R>
}
Since v2.0.0
grouped
Partitions the stream with specified chunkSize
.
Example
import { Effect, Stream } from "effect"
const stream = Stream.range(0, 8).pipe(Stream.grouped(3))
Effect.runPromise(Stream.runCollect(stream)).then((chunks) => console.log("%o", chunks))
// {
// _id: 'Chunk',
// values: [
// { _id: 'Chunk', values: [ 0, 1, 2, [length]: 3 ] },
// { _id: 'Chunk', values: [ 3, 4, 5, [length]: 3 ] },
// { _id: 'Chunk', values: [ 6, 7, 8, [length]: 3 ] },
// [length]: 3
// ]
// }
Signature
declare const grouped: {
(chunkSize: number): <A, E, R>(self: Stream<A, E, R>) => Stream<Chunk.Chunk<A>, E, R>
<A, E, R>(self: Stream<A, E, R>, chunkSize: number): Stream<Chunk.Chunk<A>, E, R>
}
Since v2.0.0
groupedWithin
Partitions the stream with the specified chunkSize
or until the specified duration
has passed, whichever is satisfied first.
Example
import { Chunk, Effect, Schedule, Stream } from "effect"
const stream = Stream.range(0, 9).pipe(
Stream.repeat(Schedule.spaced("1 second")),
Stream.groupedWithin(18, "1.5 seconds"),
Stream.take(3)
)
Effect.runPromise(Stream.runCollect(stream)).then((chunks) => console.log(Chunk.toArray(chunks)))
// [
// {
// _id: 'Chunk',
// values: [
// 0, 1, 2, 3, 4, 5, 6,
// 7, 8, 9, 0, 1, 2, 3,
// 4, 5, 6, 7
// ]
// },
// {
// _id: 'Chunk',
// values: [
// 8, 9, 0, 1, 2,
// 3, 4, 5, 6, 7,
// 8, 9
// ]
// },
// {
// _id: 'Chunk',
// values: [
// 0, 1, 2, 3, 4, 5, 6,
// 7, 8, 9, 0, 1, 2, 3,
// 4, 5, 6, 7
// ]
// }
// ]
Signature
declare const groupedWithin: {
(
chunkSize: number,
duration: Duration.DurationInput
): <A, E, R>(self: Stream<A, E, R>) => Stream<Chunk.Chunk<A>, E, R>
<A, E, R>(self: Stream<A, E, R>, chunkSize: number, duration: Duration.DurationInput): Stream<Chunk.Chunk<A>, E, R>
}
Since v2.0.0
mapping
as
Maps the success values of this stream to the specified constant value.
Example
import { Effect, Stream } from "effect"
const stream = Stream.range(1, 5).pipe(Stream.as(null))
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ null, null, null, null, null ] }
Signature
declare const as: {
<B>(value: B): <A, E, R>(self: Stream<A, E, R>) => Stream<B, E, R>
<A, E, R, B>(self: Stream<A, E, R>, value: B): Stream<B, E, R>
}
Since v2.0.0
map
Transforms the elements of this stream using the supplied function.
Example
import { Effect, Stream } from "effect"
const stream = Stream.make(1, 2, 3).pipe(Stream.map((n) => n + 1))
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 2, 3, 4 ] }
Signature
declare const map: {
<A, B>(f: (a: A) => B): <E, R>(self: Stream<A, E, R>) => Stream<B, E, R>
<A, E, R, B>(self: Stream<A, E, R>, f: (a: A) => B): Stream<B, E, R>
}
Since v2.0.0
mapAccum
Statefully maps over the elements of this stream to produce new elements.
Example
import { Effect, Stream } from "effect"
const runningTotal = (stream: Stream.Stream<number>): Stream.Stream<number> =>
stream.pipe(Stream.mapAccum(0, (s, a) => [s + a, s + a]))
// input: 0, 1, 2, 3, 4, 5, 6
Effect.runPromise(Stream.runCollect(runningTotal(Stream.range(0, 6)))).then(console.log)
// { _id: "Chunk", values: [ 0, 1, 3, 6, 10, 15, 21 ] }
Signature
declare const mapAccum: {
<S, A, A2>(s: S, f: (s: S, a: A) => readonly [S, A2]): <E, R>(self: Stream<A, E, R>) => Stream<A2, E, R>
<A, E, R, S, A2>(self: Stream<A, E, R>, s: S, f: (s: S, a: A) => readonly [S, A2]): Stream<A2, E, R>
}
Since v2.0.0
mapAccumEffect
Statefully and effectfully maps over the elements of this stream to produce new elements.
Signature
declare const mapAccumEffect: {
<S, A, A2, E2, R2>(
s: S,
f: (s: S, a: A) => Effect.Effect<readonly [S, A2], E2, R2>
): <E, R>(self: Stream<A, E, R>) => Stream<A2, E2 | E, R2 | R>
<A, E, R, S, A2, E2, R2>(
self: Stream<A, E, R>,
s: S,
f: (s: S, a: A) => Effect.Effect<readonly [S, A2], E2, R2>
): Stream<A2, E | E2, R | R2>
}
Since v2.0.0
mapChunks
Transforms the chunks emitted by this stream.
Signature
declare const mapChunks: {
<A, B>(f: (chunk: Chunk.Chunk<A>) => Chunk.Chunk<B>): <E, R>(self: Stream<A, E, R>) => Stream<B, E, R>
<A, E, R, B>(self: Stream<A, E, R>, f: (chunk: Chunk.Chunk<A>) => Chunk.Chunk<B>): Stream<B, E, R>
}
Since v2.0.0
mapChunksEffect
Effectfully transforms the chunks emitted by this stream.
Signature
declare const mapChunksEffect: {
<A, B, E2, R2>(
f: (chunk: Chunk.Chunk<A>) => Effect.Effect<Chunk.Chunk<B>, E2, R2>
): <E, R>(self: Stream<A, E, R>) => Stream<B, E2 | E, R2 | R>
<A, E, R, B, E2, R2>(
self: Stream<A, E, R>,
f: (chunk: Chunk.Chunk<A>) => Effect.Effect<Chunk.Chunk<B>, E2, R2>
): Stream<B, E | E2, R | R2>
}
Since v2.0.0
mapConcat
Maps each element to an iterable, and flattens the iterables into the output of this stream.
Example
import { Effect, Stream } from "effect"
const numbers = Stream.make("1-2-3", "4-5", "6").pipe(
Stream.mapConcat((s) => s.split("-")),
Stream.map((s) => parseInt(s))
)
Effect.runPromise(Stream.runCollect(numbers)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 4, 5, 6 ] }
Signature
declare const mapConcat: {
<A, A2>(f: (a: A) => Iterable<A2>): <E, R>(self: Stream<A, E, R>) => Stream<A2, E, R>
<A, E, R, A2>(self: Stream<A, E, R>, f: (a: A) => Iterable<A2>): Stream<A2, E, R>
}
Since v2.0.0
mapConcatChunk
Maps each element to a chunk, and flattens the chunks into the output of this stream.
Signature
declare const mapConcatChunk: {
<A, A2>(f: (a: A) => Chunk.Chunk<A2>): <E, R>(self: Stream<A, E, R>) => Stream<A2, E, R>
<A, E, R, A2>(self: Stream<A, E, R>, f: (a: A) => Chunk.Chunk<A2>): Stream<A2, E, R>
}
Since v2.0.0
mapConcatChunkEffect
Effectfully maps each element to a chunk, and flattens the chunks into the output of this stream.
Signature
declare const mapConcatChunkEffect: {
<A, A2, E2, R2>(
f: (a: A) => Effect.Effect<Chunk.Chunk<A2>, E2, R2>
): <E, R>(self: Stream<A, E, R>) => Stream<A2, E2 | E, R2 | R>
<A, E, R, A2, E2, R2>(
self: Stream<A, E, R>,
f: (a: A) => Effect.Effect<Chunk.Chunk<A2>, E2, R2>
): Stream<A2, E | E2, R | R2>
}
Since v2.0.0
mapConcatEffect
Effectfully maps each element to an iterable, and flattens the iterables into the output of this stream.
Signature
declare const mapConcatEffect: {
<A, A2, E2, R2>(
f: (a: A) => Effect.Effect<Iterable<A2>, E2, R2>
): <E, R>(self: Stream<A, E, R>) => Stream<A2, E2 | E, R2 | R>
<A, E, R, A2, E2, R2>(
self: Stream<A, E, R>,
f: (a: A) => Effect.Effect<Iterable<A2>, E2, R2>
): Stream<A2, E | E2, R | R2>
}
Since v2.0.0
mapEffect
Maps over elements of the stream with the specified effectful function.
Example
import { Effect, Random, Stream } from "effect"
const stream = Stream.make(10, 20, 30).pipe(Stream.mapEffect((n) => Random.nextIntBetween(0, n)))
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// Example Output: { _id: 'Chunk', values: [ 7, 19, 8 ] }
Signature
declare const mapEffect: {
<A, A2, E2, R2>(
f: (a: A) => Effect.Effect<A2, E2, R2>,
options?:
| { readonly concurrency?: number | "unbounded" | undefined; readonly unordered?: boolean | undefined }
| undefined
): <E, R>(self: Stream<A, E, R>) => Stream<A2, E2 | E, R2 | R>
<A, A2, E2, R2, K>(
f: (a: A) => Effect.Effect<A2, E2, R2>,
options: { readonly key: (a: A) => K; readonly bufferSize?: number | undefined }
): <E, R>(self: Stream<A, E, R>) => Stream<A2, E2 | E, R2 | R>
<A, E, R, A2, E2, R2>(
self: Stream<A, E, R>,
f: (a: A) => Effect.Effect<A2, E2, R2>,
options?:
| { readonly concurrency?: number | "unbounded" | undefined; readonly unordered?: boolean | undefined }
| undefined
): Stream<A2, E | E2, R | R2>
<A, E, R, A2, E2, R2, K>(
self: Stream<A, E, R>,
f: (a: A) => Effect.Effect<A2, E2, R2>,
options: { readonly key: (a: A) => K; readonly bufferSize?: number | undefined }
): Stream<A2, E | E2, R | R2>
}
Since v2.0.0
mapError
Transforms the errors emitted by this stream using f
.
Signature
declare const mapError: {
<E, E2>(f: (error: E) => E2): <A, R>(self: Stream<A, E, R>) => Stream<A, E2, R>
<A, E, R, E2>(self: Stream<A, E, R>, f: (error: E) => E2): Stream<A, E2, R>
}
Since v2.0.0
mapErrorCause
Transforms the full causes of failures emitted by this stream.
Signature
declare const mapErrorCause: {
<E, E2>(f: (cause: Cause.Cause<E>) => Cause.Cause<E2>): <A, R>(self: Stream<A, E, R>) => Stream<A, E2, R>
<A, E, R, E2>(self: Stream<A, E, R>, f: (cause: Cause.Cause<E>) => Cause.Cause<E2>): Stream<A, E2, R>
}
Since v2.0.0
models
EventListener (interface)
Signature
export interface EventListener<A> {
addEventListener(
event: string,
f: (event: A) => void,
options?:
| {
readonly capture?: boolean
readonly passive?: boolean
readonly once?: boolean
readonly signal?: AbortSignal
}
| boolean
): void
removeEventListener(
event: string,
f: (event: A) => void,
options?:
| {
readonly capture?: boolean
}
| boolean
): void
}
Since v3.4.0
Stream (interface)
A Stream<A, E, R>
is a description of a program that, when evaluated, may emit zero or more values of type A
, may fail with errors of type E
, and uses an context of type R
. One way to think of Stream
is as a Effect
program that could emit multiple values.
Stream
is a purely functional pull based stream. Pull based streams offer inherent laziness and backpressure, relieving users of the need to manage buffers between operators. As an optimization, Stream
does not emit single values, but rather an array of values. This allows the cost of effect evaluation to be amortized.
Stream
forms a monad on its A
type parameter, and has error management facilities for its E
type parameter, modeled similarly to Effect
(with some adjustments for the multiple-valued nature of Stream
). These aspects allow for rich and expressive composition of streams.
Signature
export interface Stream<out A, out E = never, out R = never> extends Stream.Variance<A, E, R>, Pipeable {
[Unify.typeSymbol]?: unknown
[Unify.unifySymbol]?: StreamUnify<this>
[Unify.ignoreSymbol]?: StreamUnifyIgnore
}
Since v2.0.0
StreamUnify (interface)
Signature
export interface StreamUnify<A extends { [Unify.typeSymbol]?: any }> extends Effect.EffectUnify<A> {
Stream?: () => A[Unify.typeSymbol] extends Stream<infer A0, infer E0, infer R0> | infer _ ? Stream<A0, E0, R0> : never
}
Since v2.0.0
StreamUnifyIgnore (interface)
Signature
export interface StreamUnifyIgnore extends Effect.EffectUnifyIgnore {
Effect?: true
}
Since v2.0.0
racing
race
Returns a stream that mirrors the first upstream to emit an item. As soon as one of the upstream emits a first value, the other is interrupted. The resulting stream will forward all items from the “winning” source stream. Any upstream failures will cause the returned stream to fail.
Example
import { Stream, Schedule, Console, Effect } from "effect"
const stream = Stream.fromSchedule(Schedule.spaced("2 millis")).pipe(
Stream.race(Stream.fromSchedule(Schedule.spaced("1 millis"))),
Stream.take(6),
Stream.tap(Console.log)
)
Effect.runPromise(Stream.runDrain(stream))
// Output each millisecond from the first stream, the rest streams are interrupted
// 0
// 1
// 2
// 3
// 4
// 5
Signature
declare const race: {
<AR, ER, RR>(right: Stream<AR, ER, RR>): <AL, EL, RL>(left: Stream<AL, EL, RL>) => Stream<AL | AR, EL | ER, RL | RR>
<AL, EL, RL, AR, ER, RR>(left: Stream<AL, EL, RL>, right: Stream<AR, ER, RR>): Stream<AL | AR, EL | ER, RL | RR>
}
Since v3.7.0
raceAll
Returns a stream that mirrors the first upstream to emit an item. As soon as one of the upstream emits a first value, all the others are interrupted. The resulting stream will forward all items from the “winning” source stream. Any upstream failures will cause the returned stream to fail.
Example
import { Stream, Schedule, Console, Effect } from "effect"
const stream = Stream.raceAll(
Stream.fromSchedule(Schedule.spaced("1 millis")),
Stream.fromSchedule(Schedule.spaced("2 millis")),
Stream.fromSchedule(Schedule.spaced("4 millis"))
).pipe(Stream.take(6), Stream.tap(Console.log))
Effect.runPromise(Stream.runDrain(stream))
// Output each millisecond from the first stream, the rest streams are interrupted
// 0
// 1
// 2
// 3
// 4
// 5
Signature
declare const raceAll: <S extends ReadonlyArray<Stream<any, any, any>>>(
...streams: S
) => Stream<Stream.Success<S[number]>, Stream.Error<S[number]>, Stream.Context<S[number]>>
Since v3.5.0
sequencing
branchAfter
Returns a Stream
that first collects n
elements from the input Stream
, and then creates a new Stream
using the specified function, and sends all the following elements through that.
Signature
declare const branchAfter: {
<A, A2, E2, R2>(
n: number,
f: (input: Chunk.Chunk<A>) => Stream<A2, E2, R2>
): <E, R>(self: Stream<A, E, R>) => Stream<A2, E2 | E, R2 | R>
<A, E, R, A2, E2, R2>(
self: Stream<A, E, R>,
n: number,
f: (input: Chunk.Chunk<A>) => Stream<A2, E2, R2>
): Stream<A2, E | E2, R | R2>
}
Since v2.0.0
flatMap
Returns a stream made of the concatenation in strict order of all the streams produced by passing each element of this stream to f0
Signature
declare const flatMap: {
<A, A2, E2, R2>(
f: (a: A) => Stream<A2, E2, R2>,
options?:
| {
readonly concurrency?: number | "unbounded" | undefined
readonly bufferSize?: number | undefined
readonly switch?: boolean | undefined
}
| undefined
): <E, R>(self: Stream<A, E, R>) => Stream<A2, E2 | E, R2 | R>
<A, E, R, A2, E2, R2>(
self: Stream<A, E, R>,
f: (a: A) => Stream<A2, E2, R2>,
options?:
| {
readonly concurrency?: number | "unbounded" | undefined
readonly bufferSize?: number | undefined
readonly switch?: boolean | undefined
}
| undefined
): Stream<A2, E | E2, R | R2>
}
Since v2.0.0
flatten
Flattens this stream-of-streams into a stream made of the concatenation in strict order of all the streams.
Signature
declare const flatten: {
(
options?:
| { readonly concurrency?: number | "unbounded" | undefined; readonly bufferSize?: number | undefined }
| undefined
): <A, E2, R2, E, R>(self: Stream<Stream<A, E2, R2>, E, R>) => Stream<A, E2 | E, R2 | R>
<A, E2, R2, E, R>(
self: Stream<Stream<A, E2, R2>, E, R>,
options?:
| { readonly concurrency?: number | "unbounded" | undefined; readonly bufferSize?: number | undefined }
| undefined
): Stream<A, E2 | E, R2 | R>
}
Since v2.0.0
flattenChunks
Submerges the chunks carried by this stream into the stream’s structure, while still preserving them.
Signature
declare const flattenChunks: <A, E, R>(self: Stream<Chunk.Chunk<A>, E, R>) => Stream<A, E, R>
Since v2.0.0
flattenEffect
Flattens Effect
values into the stream’s structure, preserving all information about the effect.
Signature
declare const flattenEffect: {
(
options?:
| { readonly concurrency?: number | "unbounded" | undefined; readonly unordered?: boolean | undefined }
| undefined
): <A, E2, R2, E, R>(self: Stream<Effect.Effect<A, E2, R2>, E, R>) => Stream<A, E2 | E, R2 | R>
<A, E2, R2, E, R>(
self: Stream<Effect.Effect<A, E2, R2>, E, R>,
options?:
| { readonly concurrency?: number | "unbounded" | undefined; readonly unordered?: boolean | undefined }
| undefined
): Stream<A, E2 | E, R2 | R>
}
Since v2.0.0
flattenExitOption
Unwraps Exit
values that also signify end-of-stream by failing with None
.
Signature
declare const flattenExitOption: <A, E2, E, R>(
self: Stream<Exit.Exit<A, Option.Option<E2>>, E, R>
) => Stream<A, E | E2, R>
Since v2.0.0
flattenIterables
Submerges the iterables carried by this stream into the stream’s structure, while still preserving them.
Signature
declare const flattenIterables: <A, E, R>(self: Stream<Iterable<A>, E, R>) => Stream<A, E, R>
Since v2.0.0
flattenTake
Unwraps Exit
values and flatten chunks that also signify end-of-stream by failing with None
.
Signature
declare const flattenTake: <A, E2, E, R>(self: Stream<Take.Take<A, E2>, E, R>) => Stream<A, E | E2, R>
Since v2.0.0
onEnd
Adds an effect to be executed at the end of the stream.
Example
import { Console, Effect, Stream } from "effect"
const stream = Stream.make(1, 2, 3).pipe(
Stream.map((n) => n * 2),
Stream.tap((n) => Console.log(`after mapping: ${n}`)),
Stream.onEnd(Console.log("Stream ended"))
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// after mapping: 2
// after mapping: 4
// after mapping: 6
// Stream ended
// { _id: 'Chunk', values: [ 2, 4, 6 ] }
Signature
declare const onEnd: {
<_, E2, R2>(effect: Effect.Effect<_, E2, R2>): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, R2 | R>
<A, E, R, _, E2, R2>(self: Stream<A, E, R>, effect: Effect.Effect<_, E2, R2>): Stream<A, E | E2, R | R2>
}
Since v3.6.0
onStart
Adds an effect to be executed at the start of the stream.
Example
import { Console, Effect, Stream } from "effect"
const stream = Stream.make(1, 2, 3).pipe(
Stream.onStart(Console.log("Stream started")),
Stream.map((n) => n * 2),
Stream.tap((n) => Console.log(`after mapping: ${n}`))
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// Stream started
// after mapping: 2
// after mapping: 4
// after mapping: 6
// { _id: 'Chunk', values: [ 2, 4, 6 ] }
Signature
declare const onStart: {
<_, E2, R2>(effect: Effect.Effect<_, E2, R2>): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, R2 | R>
<A, E, R, _, E2, R2>(self: Stream<A, E, R>, effect: Effect.Effect<_, E2, R2>): Stream<A, E | E2, R | R2>
}
Since v3.6.0
tap
Adds an effect to consumption of every element of the stream.
Example
import { Console, Effect, Stream } from "effect"
const stream = Stream.make(1, 2, 3).pipe(
Stream.tap((n) => Console.log(`before mapping: ${n}`)),
Stream.map((n) => n * 2),
Stream.tap((n) => Console.log(`after mapping: ${n}`))
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// before mapping: 1
// after mapping: 2
// before mapping: 2
// after mapping: 4
// before mapping: 3
// after mapping: 6
// { _id: 'Chunk', values: [ 2, 4, 6 ] }
Signature
declare const tap: {
<A, X, E2, R2>(
f: (a: NoInfer<A>) => Effect.Effect<X, E2, R2>
): <E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, R2 | R>
<A, E, R, X, E2, R2>(self: Stream<A, E, R>, f: (a: NoInfer<A>) => Effect.Effect<X, E2, R2>): Stream<A, E | E2, R | R2>
}
Since v2.0.0
tapBoth
Returns a stream that effectfully “peeks” at the failure or success of the stream.
Signature
declare const tapBoth: {
<E, X1, E2, R2, A, X2, E3, R3>(options: {
readonly onFailure: (e: NoInfer<E>) => Effect.Effect<X1, E2, R2>
readonly onSuccess: (a: NoInfer<A>) => Effect.Effect<X2, E3, R3>
}): <R>(self: Stream<A, E, R>) => Stream<A, E | E2 | E3, R2 | R3 | R>
<A, E, R, X1, E2, R2, X2, E3, R3>(
self: Stream<A, E, R>,
options: {
readonly onFailure: (e: NoInfer<E>) => Effect.Effect<X1, E2, R2>
readonly onSuccess: (a: NoInfer<A>) => Effect.Effect<X2, E3, R3>
}
): Stream<A, E | E2 | E3, R | R2 | R3>
}
Since v2.0.0
tapError
Returns a stream that effectfully “peeks” at the failure of the stream.
Signature
declare const tapError: {
<E, X, E2, R2>(
f: (error: NoInfer<E>) => Effect.Effect<X, E2, R2>
): <A, R>(self: Stream<A, E, R>) => Stream<A, E | E2, R2 | R>
<A, E, R, X, E2, R2>(self: Stream<A, E, R>, f: (error: E) => Effect.Effect<X, E2, R2>): Stream<A, E | E2, R | R2>
}
Since v2.0.0
tapSink
Sends all elements emitted by this stream to the specified sink in addition to emitting them.
Signature
declare const tapSink: {
<A, E2, R2>(sink: Sink.Sink<unknown, A, unknown, E2, R2>): <E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, R2 | R>
<A, E, R, E2, R2>(self: Stream<A, E, R>, sink: Sink.Sink<unknown, A, unknown, E2, R2>): Stream<A, E | E2, R | R2>
}
Since v2.0.0
symbols
StreamTypeId
Signature
declare const StreamTypeId: unique symbol
Since v2.0.0
StreamTypeId (type alias)
Signature
type StreamTypeId = typeof StreamTypeId
Since v2.0.0
tracing
withSpan
Wraps the stream with a new span for tracing.
Signature
declare const withSpan: {
(
name: string,
options?: Tracer.SpanOptions | undefined
): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, Exclude<R, Tracer.ParentSpan>>
<A, E, R>(
self: Stream<A, E, R>,
name: string,
options?: Tracer.SpanOptions | undefined
): Stream<A, E, Exclude<R, Tracer.ParentSpan>>
}
Since v2.0.0
type lambdas
StreamTypeLambda (interface)
Signature
export interface StreamTypeLambda extends TypeLambda {
readonly type: Stream<this["Target"], this["Out1"], this["Out2"]>
}
Since v2.0.0
utils
Stream (namespace)
Since v2.0.0
Variance (interface)
Signature
export interface Variance<out A, out E, out R> {
readonly [StreamTypeId]: VarianceStruct<A, E, R>
}
Since v2.0.0
VarianceStruct (interface)
Signature
export interface VarianceStruct<out A, out E, out R> {
readonly _A: Covariant<A>
readonly _E: Covariant<E>
readonly _R: Covariant<R>
}
Since v3.4.0
Success (type alias)
Signature
type Success<T> = [T] extends [Stream<infer _A, infer _E, infer _R>] ? _A : never
Since v3.4.0
Error (type alias)
Signature
type Error<T> = [T] extends [Stream<infer _A, infer _E, infer _R>] ? _E : never
Since v3.4.0
Context (type alias)
Signature
type Context<T> = [T] extends [Stream<infer _A, infer _E, infer _R>] ? _R : never
Since v3.4.0
DynamicTuple (type alias)
Signature
type DynamicTuple<T, N> = N extends N ? (number extends N ? Array<T> : DynamicTupleOf<T, N, []>) : never
Since v2.0.0
DynamicTupleOf (type alias)
Signature
type DynamicTupleOf<T, N, R> = R["length"] extends N ? R : DynamicTupleOf<T, N, [T, ...R]>
Since v2.0.0
accumulate
Collects each underlying Chunk of the stream into a new chunk, and emits it on each pull.
Signature
declare const accumulate: <A, E, R>(self: Stream<A, E, R>) => Stream<Chunk.Chunk<A>, E, R>
Since v2.0.0
accumulateChunks
Re-chunks the elements of the stream by accumulating each underlying chunk.
Signature
declare const accumulateChunks: <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, R>
Since v2.0.0
aggregate
Aggregates elements of this stream using the provided sink for as long as the downstream operators on the stream are busy.
This operator divides the stream into two asynchronous “islands”. Operators upstream of this operator run on one fiber, while downstream operators run on another. Whenever the downstream fiber is busy processing elements, the upstream fiber will feed elements into the sink until it signals completion.
Any sink can be used here, but see Sink.foldWeightedEffect
and Sink.foldUntilEffect
for sinks that cover the common usecases.
Signature
declare const aggregate: {
<B, A, A2, E2, R2>(sink: Sink.Sink<B, A | A2, A2, E2, R2>): <E, R>(self: Stream<A, E, R>) => Stream<B, E2 | E, R2 | R>
<A, E, R, B, A2, E2, R2>(self: Stream<A, E, R>, sink: Sink.Sink<B, A | A2, A2, E2, R2>): Stream<B, E | E2, R | R2>
}
Since v2.0.0
aggregateWithin
Like aggregateWithinEither
, but only returns the Right
results.
Signature
declare const aggregateWithin: {
<B, A, A2, E2, R2, C, R3>(
sink: Sink.Sink<B, A | A2, A2, E2, R2>,
schedule: Schedule.Schedule<C, Option.Option<B>, R3>
): <E, R>(self: Stream<A, E, R>) => Stream<B, E2 | E, R2 | R3 | R>
<A, E, R, B, A2, E2, R2, C, R3>(
self: Stream<A, E, R>,
sink: Sink.Sink<B, A | A2, A2, E2, R2>,
schedule: Schedule.Schedule<C, Option.Option<B>, R3>
): Stream<B, E | E2, R | R2 | R3>
}
Since v2.0.0
aggregateWithinEither
Aggregates elements using the provided sink until it completes, or until the delay signalled by the schedule has passed.
This operator divides the stream into two asynchronous islands. Operators upstream of this operator run on one fiber, while downstream operators run on another. Elements will be aggregated by the sink until the downstream fiber pulls the aggregated value, or until the schedule’s delay has passed.
Aggregated elements will be fed into the schedule to determine the delays between pulls.
Signature
declare const aggregateWithinEither: {
<B, A, A2, E2, R2, C, R3>(
sink: Sink.Sink<B, A | A2, A2, E2, R2>,
schedule: Schedule.Schedule<C, Option.Option<B>, R3>
): <E, R>(self: Stream<A, E, R>) => Stream<Either.Either<B, C>, E2 | E, R2 | R3 | R>
<A, E, R, B, A2, E2, R2, C, R3>(
self: Stream<A, E, R>,
sink: Sink.Sink<B, A | A2, A2, E2, R2>,
schedule: Schedule.Schedule<C, Option.Option<B>, R3>
): Stream<Either.Either<B, C>, E | E2, R | R2 | R3>
}
Since v2.0.0
broadcast
Fan out the stream, producing a list of streams that have the same elements as this stream. The driver stream will only ever advance the maximumLag
chunks before the slowest downstream stream.
Example
import { Console, Effect, Fiber, Schedule, Stream } from "effect"
const numbers = Effect.scoped(
Stream.range(1, 20).pipe(
Stream.tap((n) => Console.log(`Emit ${n} element before broadcasting`)),
Stream.broadcast(2, 5),
Stream.flatMap(([first, second]) =>
Effect.gen(function* () {
const fiber1 = yield* Stream.runFold(first, 0, (acc, e) => Math.max(acc, e)).pipe(
Effect.andThen((max) => Console.log(`Maximum: ${max}`)),
Effect.fork
)
const fiber2 = yield* second.pipe(
Stream.schedule(Schedule.spaced("1 second")),
Stream.runForEach((n) => Console.log(`Logging to the Console: ${n}`)),
Effect.fork
)
yield* Fiber.join(fiber1).pipe(Effect.zip(Fiber.join(fiber2), { concurrent: true }))
})
),
Stream.runCollect
)
)
Effect.runPromise(numbers).then(console.log)
// Emit 1 element before broadcasting
// Emit 2 element before broadcasting
// Emit 3 element before broadcasting
// Emit 4 element before broadcasting
// Emit 5 element before broadcasting
// Emit 6 element before broadcasting
// Emit 7 element before broadcasting
// Emit 8 element before broadcasting
// Emit 9 element before broadcasting
// Emit 10 element before broadcasting
// Emit 11 element before broadcasting
// Logging to the Console: 1
// Logging to the Console: 2
// Logging to the Console: 3
// Logging to the Console: 4
// Logging to the Console: 5
// Emit 12 element before broadcasting
// Emit 13 element before broadcasting
// Emit 14 element before broadcasting
// Emit 15 element before broadcasting
// Emit 16 element before broadcasting
// Logging to the Console: 6
// Logging to the Console: 7
// Logging to the Console: 8
// Logging to the Console: 9
// Logging to the Console: 10
// Emit 17 element before broadcasting
// Emit 18 element before broadcasting
// Emit 19 element before broadcasting
// Emit 20 element before broadcasting
// Logging to the Console: 11
// Logging to the Console: 12
// Logging to the Console: 13
// Logging to the Console: 14
// Logging to the Console: 15
// Maximum: 20
// Logging to the Console: 16
// Logging to the Console: 17
// Logging to the Console: 18
// Logging to the Console: 19
// Logging to the Console: 20
// { _id: 'Chunk', values: [ undefined ] }
Signature
declare const broadcast: {
<N extends number>(
n: N,
maximumLag:
| number
| { readonly capacity: "unbounded"; readonly replay?: number | undefined }
| {
readonly capacity: number
readonly strategy?: "sliding" | "dropping" | "suspend" | undefined
readonly replay?: number | undefined
}
): <A, E, R>(self: Stream<A, E, R>) => Effect.Effect<TupleOf<N, Stream<A, E>>, never, Scope.Scope | R>
<A, E, R, N extends number>(
self: Stream<A, E, R>,
n: N,
maximumLag:
| number
| { readonly capacity: "unbounded"; readonly replay?: number | undefined }
| {
readonly capacity: number
readonly strategy?: "sliding" | "dropping" | "suspend" | undefined
readonly replay?: number | undefined
}
): Effect.Effect<TupleOf<N, Stream<A, E>>, never, Scope.Scope | R>
}
Since v2.0.0
broadcastDynamic
Fan out the stream, producing a dynamic number of streams that have the same elements as this stream. The driver stream will only ever advance the maximumLag
chunks before the slowest downstream stream.
Signature
declare const broadcastDynamic: {
(
maximumLag:
| number
| { readonly capacity: "unbounded"; readonly replay?: number | undefined }
| {
readonly capacity: number
readonly strategy?: "sliding" | "dropping" | "suspend" | undefined
readonly replay?: number | undefined
}
): <A, E, R>(self: Stream<A, E, R>) => Effect.Effect<Stream<A, E>, never, Scope.Scope | R>
<A, E, R>(
self: Stream<A, E, R>,
maximumLag:
| number
| { readonly capacity: "unbounded"; readonly replay?: number | undefined }
| {
readonly capacity: number
readonly strategy?: "sliding" | "dropping" | "suspend" | undefined
readonly replay?: number | undefined
}
): Effect.Effect<Stream<A, E>, never, Scope.Scope | R>
}
Since v2.0.0
broadcastedQueues
Converts the stream to a scoped list of queues. Every value will be replicated to every queue with the slowest queue being allowed to buffer maximumLag
chunks before the driver is back pressured.
Queues can unsubscribe from upstream by shutting down.
Signature
declare const broadcastedQueues: {
<N extends number>(
n: N,
maximumLag:
| number
| { readonly capacity: "unbounded"; readonly replay?: number | undefined }
| {
readonly capacity: number
readonly strategy?: "sliding" | "dropping" | "suspend" | undefined
readonly replay?: number | undefined
}
): <A, E, R>(
self: Stream<A, E, R>
) => Effect.Effect<TupleOf<N, Queue.Dequeue<Take.Take<A, E>>>, never, Scope.Scope | R>
<A, E, R, N extends number>(
self: Stream<A, E, R>,
n: N,
maximumLag:
| number
| { readonly capacity: "unbounded"; readonly replay?: number | undefined }
| {
readonly capacity: number
readonly strategy?: "sliding" | "dropping" | "suspend" | undefined
readonly replay?: number | undefined
}
): Effect.Effect<TupleOf<N, Queue.Dequeue<Take.Take<A, E>>>, never, Scope.Scope | R>
}
Since v2.0.0
broadcastedQueuesDynamic
Converts the stream to a scoped dynamic amount of queues. Every chunk will be replicated to every queue with the slowest queue being allowed to buffer maximumLag
chunks before the driver is back pressured.
Queues can unsubscribe from upstream by shutting down.
Signature
declare const broadcastedQueuesDynamic: {
(
maximumLag:
| number
| { readonly capacity: "unbounded"; readonly replay?: number | undefined }
| {
readonly capacity: number
readonly strategy?: "sliding" | "dropping" | "suspend" | undefined
readonly replay?: number | undefined
}
): <A, E, R>(
self: Stream<A, E, R>
) => Effect.Effect<Effect.Effect<Queue.Dequeue<Take.Take<A, E>>, never, Scope.Scope>, never, Scope.Scope | R>
<A, E, R>(
self: Stream<A, E, R>,
maximumLag:
| number
| { readonly capacity: "unbounded"; readonly replay?: number | undefined }
| {
readonly capacity: number
readonly strategy?: "sliding" | "dropping" | "suspend" | undefined
readonly replay?: number | undefined
}
): Effect.Effect<Effect.Effect<Queue.Dequeue<Take.Take<A, E>>, never, Scope.Scope>, never, Scope.Scope | R>
}
Since v2.0.0
buffer
Allows a faster producer to progress independently of a slower consumer by buffering up to capacity
elements in a queue.
Note: This combinator destroys the chunking structure. It’s recommended to use rechunk afterwards. Additionally, prefer capacities that are powers of 2 for better performance.
Example
import { Console, Effect, Schedule, Stream } from "effect"
const stream = Stream.range(1, 10).pipe(
Stream.tap((n) => Console.log(`before buffering: ${n}`)),
Stream.buffer({ capacity: 4 }),
Stream.tap((n) => Console.log(`after buffering: ${n}`)),
Stream.schedule(Schedule.spaced("5 seconds"))
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// before buffering: 1
// before buffering: 2
// before buffering: 3
// before buffering: 4
// before buffering: 5
// before buffering: 6
// after buffering: 1
// after buffering: 2
// before buffering: 7
// after buffering: 3
// before buffering: 8
// after buffering: 4
// before buffering: 9
// after buffering: 5
// before buffering: 10
// ...
Signature
declare const buffer: {
(
options:
| { readonly capacity: "unbounded" }
| { readonly capacity: number; readonly strategy?: "dropping" | "sliding" | "suspend" | undefined }
): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, R>
<A, E, R>(
self: Stream<A, E, R>,
options:
| { readonly capacity: "unbounded" }
| { readonly capacity: number; readonly strategy?: "dropping" | "sliding" | "suspend" | undefined }
): Stream<A, E, R>
}
Since v2.0.0
bufferChunks
Allows a faster producer to progress independently of a slower consumer by buffering up to capacity
chunks in a queue.
Signature
declare const bufferChunks: {
(options: {
readonly capacity: number
readonly strategy?: "dropping" | "sliding" | "suspend" | undefined
}): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, R>
<A, E, R>(
self: Stream<A, E, R>,
options: { readonly capacity: number; readonly strategy?: "dropping" | "sliding" | "suspend" | undefined }
): Stream<A, E, R>
}
Since v2.0.0
changes
Returns a new stream that only emits elements that are not equal to the previous element emitted, using natural equality to determine whether two elements are equal.
Example
import { Effect, Stream } from "effect"
const stream = Stream.make(1, 1, 1, 2, 2, 3, 4).pipe(Stream.changes)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 4 ] }
Signature
declare const changes: <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, R>
Since v2.0.0
changesWith
Returns a new stream that only emits elements that are not equal to the previous element emitted, using the specified function to determine whether two elements are equal.
Signature
declare const changesWith: {
<A>(f: (x: A, y: A) => boolean): <E, R>(self: Stream<A, E, R>) => Stream<A, E, R>
<A, E, R>(self: Stream<A, E, R>, f: (x: A, y: A) => boolean): Stream<A, E, R>
}
Since v2.0.0
changesWithEffect
Returns a new stream that only emits elements that are not equal to the previous element emitted, using the specified effectual function to determine whether two elements are equal.
Signature
declare const changesWithEffect: {
<A, E2, R2>(
f: (x: A, y: A) => Effect.Effect<boolean, E2, R2>
): <E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, R2 | R>
<A, E, R, E2, R2>(self: Stream<A, E, R>, f: (x: A, y: A) => Effect.Effect<boolean, E2, R2>): Stream<A, E | E2, R | R2>
}
Since v2.0.0
chunks
Exposes the underlying chunks of the stream as a stream of chunks of elements.
Signature
declare const chunks: <A, E, R>(self: Stream<A, E, R>) => Stream<Chunk.Chunk<A>, E, R>
Since v2.0.0
chunksWith
Performs the specified stream transformation with the chunk structure of the stream exposed.
Signature
declare const chunksWith: {
<A, E, R, A2, E2, R2>(
f: (stream: Stream<Chunk.Chunk<A>, E, R>) => Stream<Chunk.Chunk<A2>, E2, R2>
): (self: Stream<A, E, R>) => Stream<A2, E | E2, R | R2>
<A, E, R, A2, E2, R2>(
self: Stream<A, E, R>,
f: (stream: Stream<Chunk.Chunk<A>, E, R>) => Stream<Chunk.Chunk<A2>, E2, R2>
): Stream<A2, E | E2, R | R2>
}
Since v2.0.0
combine
Combines the elements from this stream and the specified stream by repeatedly applying the function f
to extract an element using both sides and conceptually “offer” it to the destination stream. f
can maintain some internal state to control the combining process, with the initial state being specified by s
.
Where possible, prefer Stream.combineChunks
for a more efficient implementation.
Signature
declare const combine: {
<A2, E2, R2, S, R3, E, A, R4, R5, A3>(
that: Stream<A2, E2, R2>,
s: S,
f: (
s: S,
pullLeft: Effect.Effect<A, Option.Option<E>, R3>,
pullRight: Effect.Effect<A2, Option.Option<E2>, R4>
) => Effect.Effect<Exit.Exit<readonly [A3, S], Option.Option<E2 | E>>, never, R5>
): <R>(self: Stream<A, E, R>) => Stream<A3, E2 | E, R2 | R3 | R4 | R5 | R>
<R, A2, E2, R2, S, R3, E, A, R4, R5, A3>(
self: Stream<A, E, R>,
that: Stream<A2, E2, R2>,
s: S,
f: (
s: S,
pullLeft: Effect.Effect<A, Option.Option<E>, R3>,
pullRight: Effect.Effect<A2, Option.Option<E2>, R4>
) => Effect.Effect<Exit.Exit<readonly [A3, S], Option.Option<E2 | E>>, never, R5>
): Stream<A3, E2 | E, R | R2 | R3 | R4 | R5>
}
Since v2.0.0
combineChunks
Combines the chunks from this stream and the specified stream by repeatedly applying the function f
to extract a chunk using both sides and conceptually “offer” it to the destination stream. f
can maintain some internal state to control the combining process, with the initial state being specified by s
.
Signature
declare const combineChunks: {
<A2, E2, R2, S, R3, E, A, R4, R5, A3>(
that: Stream<A2, E2, R2>,
s: S,
f: (
s: S,
pullLeft: Effect.Effect<Chunk.Chunk<A>, Option.Option<E>, R3>,
pullRight: Effect.Effect<Chunk.Chunk<A2>, Option.Option<E2>, R4>
) => Effect.Effect<Exit.Exit<readonly [Chunk.Chunk<A3>, S], Option.Option<E2 | E>>, never, R5>
): <R>(self: Stream<A, E, R>) => Stream<A3, E2 | E, R2 | R3 | R4 | R5 | R>
<R, A2, E2, R2, S, R3, E, A, R4, R5, A3>(
self: Stream<A, E, R>,
that: Stream<A2, E2, R2>,
s: S,
f: (
s: S,
pullLeft: Effect.Effect<Chunk.Chunk<A>, Option.Option<E>, R3>,
pullRight: Effect.Effect<Chunk.Chunk<A2>, Option.Option<E2>, R4>
) => Effect.Effect<Exit.Exit<readonly [Chunk.Chunk<A3>, S], Option.Option<E2 | E>>, never, R5>
): Stream<A3, E2 | E, R | R2 | R3 | R4 | R5>
}
Since v2.0.0
concat
Concatenates the specified stream with this stream, resulting in a stream that emits the elements from this stream and then the elements from the specified stream.
Example
import { Effect, Stream } from "effect"
const s1 = Stream.make(1, 2, 3)
const s2 = Stream.make(4, 5)
const stream = Stream.concat(s1, s2)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 4, 5 ] }
Signature
declare const concat: {
<A2, E2, R2>(that: Stream<A2, E2, R2>): <A, E, R>(self: Stream<A, E, R>) => Stream<A2 | A, E2 | E, R2 | R>
<A, E, R, A2, E2, R2>(self: Stream<A, E, R>, that: Stream<A2, E2, R2>): Stream<A | A2, E | E2, R | R2>
}
Since v2.0.0
cross
Composes this stream with the specified stream to create a cartesian product of elements. The right
stream would be run multiple times, for every element in the left
stream.
See also Stream.zip
for the more common point-wise variant.
Example
import { Effect, Stream } from "effect"
const s1 = Stream.make(1, 2, 3)
const s2 = Stream.make("a", "b")
const product = Stream.cross(s1, s2)
Effect.runPromise(Stream.runCollect(product)).then(console.log)
// {
// _id: "Chunk",
// values: [
// [ 1, "a" ], [ 1, "b" ], [ 2, "a" ], [ 2, "b" ], [ 3, "a" ], [ 3, "b" ]
// ]
// }
Signature
declare const cross: {
<AR, ER, RR>(right: Stream<AR, ER, RR>): <AL, EL, RL>(left: Stream<AL, EL, RL>) => Stream<[AL, AR], EL | ER, RL | RR>
<AL, ER, RR, AR, EL, RL>(left: Stream<AL, ER, RR>, right: Stream<AR, EL, RL>): Stream<[AL, AR], EL | ER, RL | RR>
}
Since v2.0.0
crossLeft
Composes this stream with the specified stream to create a cartesian product of elements, but keeps only elements from left
stream. The right
stream would be run multiple times, for every element in the left
stream.
See also Stream.zipLeft
for the more common point-wise variant.
Signature
declare const crossLeft: {
<AR, ER, RR>(right: Stream<AR, ER, RR>): <AL, EL, RL>(left: Stream<AL, EL, RL>) => Stream<AL, EL | ER, RL | RR>
<AL, EL, RL, AR, ER, RR>(left: Stream<AL, EL, RL>, right: Stream<AR, ER, RR>): Stream<AL, EL | ER, RL | RR>
}
Since v2.0.0
crossRight
Composes this stream with the specified stream to create a cartesian product of elements, but keeps only elements from the right
stream. The left
stream would be run multiple times, for every element in the right
stream.
See also Stream.zipRight
for the more common point-wise variant.
Signature
declare const crossRight: {
<AR, ER, RR>(right: Stream<AR, ER, RR>): <AL, EL, RL>(left: Stream<AL, EL, RL>) => Stream<AR, EL | ER, RL | RR>
<AL, EL, RL, AR, ER, RR>(left: Stream<AL, EL, RL>, right: Stream<AR, ER, RR>): Stream<AR, EL | ER, RL | RR>
}
Since v2.0.0
crossWith
Composes this stream with the specified stream to create a cartesian product of elements with a specified function. The right
stream would be run multiple times, for every element in the left
stream.
See also Stream.zipWith
for the more common point-wise variant.
Signature
declare const crossWith: {
<AR, ER, RR, AL, A>(
right: Stream<AR, ER, RR>,
f: (left: AL, right: AR) => A
): <EL, RL>(left: Stream<AL, EL, RL>) => Stream<A, EL | ER, RL | RR>
<AL, EL, RL, AR, ER, RR, A>(
left: Stream<AL, EL, RL>,
right: Stream<AR, ER, RR>,
f: (left: AL, right: AR) => A
): Stream<A, EL | ER, RL | RR>
}
Since v2.0.0
debounce
Delays the emission of values by holding new values for a set duration. If no new values arrive during that time the value is emitted, however if a new value is received during the holding period the previous value is discarded and the process is repeated with the new value.
This operator is useful if you have a stream of “bursty” events which eventually settle down and you only need the final event of the burst. For example, a search engine may only want to initiate a search after a user has paused typing so as to not prematurely recommend results.
Example
import { Effect, Stream } from "effect"
let last = Date.now()
const log = (message: string) =>
Effect.sync(() => {
const end = Date.now()
console.log(`${message} after ${end - last}ms`)
last = end
})
const stream = Stream.make(1, 2, 3).pipe(
Stream.concat(
Stream.fromEffect(Effect.sleep("200 millis").pipe(Effect.as(4))) // Emit 4 after 200 ms
),
Stream.concat(Stream.make(5, 6)), // Continue with more rapid values
Stream.concat(
Stream.fromEffect(Effect.sleep("150 millis").pipe(Effect.as(7))) // Emit 7 after 150 ms
),
Stream.concat(Stream.make(8)),
Stream.tap((n) => log(`Received ${n}`)),
Stream.debounce("100 millis"), // Only emit values after a pause of at least 100 milliseconds,
Stream.tap((n) => log(`> Emitted ${n}`))
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// Received 1 after 5ms
// Received 2 after 2ms
// Received 3 after 0ms
// > Emitted 3 after 104ms
// Received 4 after 99ms
// Received 5 after 1ms
// Received 6 after 0ms
// > Emitted 6 after 101ms
// Received 7 after 50ms
// Received 8 after 1ms
// > Emitted 8 after 101ms
// { _id: 'Chunk', values: [ 3, 6, 8 ] }
Signature
declare const debounce: {
(duration: Duration.DurationInput): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, R>
<A, E, R>(self: Stream<A, E, R>, duration: Duration.DurationInput): Stream<A, E, R>
}
Since v2.0.0
distributedWith
More powerful version of Stream.broadcast
. Allows to provide a function that determines what queues should receive which elements. The decide function will receive the indices of the queues in the resulting list.
Signature
declare const distributedWith: {
<N extends number, A>(options: {
readonly size: N
readonly maximumLag: number
readonly decide: (a: A) => Effect.Effect<Predicate<number>>
}): <E, R>(
self: Stream<A, E, R>
) => Effect.Effect<TupleOf<N, Queue.Dequeue<Exit.Exit<A, Option.Option<E>>>>, never, Scope.Scope | R>
<A, E, R, N extends number>(
self: Stream<A, E, R>,
options: {
readonly size: N
readonly maximumLag: number
readonly decide: (a: A) => Effect.Effect<Predicate<number>>
}
): Effect.Effect<TupleOf<N, Queue.Dequeue<Exit.Exit<A, Option.Option<E>>>>, never, Scope.Scope | R>
}
Since v2.0.0
distributedWithDynamic
More powerful version of Stream.distributedWith
. This returns a function that will produce new queues and corresponding indices. You can also provide a function that will be executed after the final events are enqueued in all queues. Shutdown of the queues is handled by the driver. Downstream users can also shutdown queues manually. In this case the driver will continue but no longer backpressure on them.
Signature
declare const distributedWithDynamic: {
<A>(options: {
readonly maximumLag: number
readonly decide: (a: A) => Effect.Effect<Predicate<number>, never, never>
}): <E, R>(
self: Stream<A, E, R>
) => Effect.Effect<
Effect.Effect<[number, Queue.Dequeue<Exit.Exit<A, Option.Option<E>>>], never, never>,
never,
Scope.Scope | R
>
<A, E, R>(
self: Stream<A, E, R>,
options: { readonly maximumLag: number; readonly decide: (a: A) => Effect.Effect<Predicate<number>, never, never> }
): Effect.Effect<
Effect.Effect<[number, Queue.Dequeue<Exit.Exit<A, Option.Option<E>>>], never, never>,
never,
Scope.Scope | R
>
}
Since v2.0.0
drain
Converts this stream to a stream that executes its effects but emits no elements. Useful for sequencing effects using streams:
Example
import { Effect, Stream } from "effect"
// We create a stream and immediately drain it.
const stream = Stream.range(1, 6).pipe(Stream.drain)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [] }
Signature
declare const drain: <A, E, R>(self: Stream<A, E, R>) => Stream<never, E, R>
Since v2.0.0
drainFork
Drains the provided stream in the background for as long as this stream is running. If this stream ends before other
, other
will be interrupted. If other
fails, this stream will fail with that error.
Signature
declare const drainFork: {
<A2, E2, R2>(that: Stream<A2, E2, R2>): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, R2 | R>
<A, E, R, A2, E2, R2>(self: Stream<A, E, R>, that: Stream<A2, E2, R2>): Stream<A, E | E2, R | R2>
}
Since v2.0.0
drop
Drops the specified number of elements from this stream.
Signature
declare const drop: {
(n: number): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, R>
<A, E, R>(self: Stream<A, E, R>, n: number): Stream<A, E, R>
}
Since v2.0.0
dropRight
Drops the last specified number of elements from this stream.
Signature
declare const dropRight: {
(n: number): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, R>
<A, E, R>(self: Stream<A, E, R>, n: number): Stream<A, E, R>
}
Since v2.0.0
dropUntil
Drops all elements of the stream until the specified predicate evaluates to true
.
Signature
declare const dropUntil: {
<A>(predicate: Predicate<NoInfer<A>>): <E, R>(self: Stream<A, E, R>) => Stream<A, E, R>
<A, E, R>(self: Stream<A, E, R>, predicate: Predicate<A>): Stream<A, E, R>
}
Since v2.0.0
dropUntilEffect
Drops all elements of the stream until the specified effectful predicate evaluates to true
.
Signature
declare const dropUntilEffect: {
<A, E2, R2>(
predicate: (a: NoInfer<A>) => Effect.Effect<boolean, E2, R2>
): <E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, R2 | R>
<A, E, R, E2, R2>(
self: Stream<A, E, R>,
predicate: (a: NoInfer<A>) => Effect.Effect<boolean, E2, R2>
): Stream<A, E | E2, R | R2>
}
Since v2.0.0
dropWhile
Drops all elements of the stream for as long as the specified predicate evaluates to true
.
Signature
declare const dropWhile: {
<A>(predicate: Predicate<NoInfer<A>>): <E, R>(self: Stream<A, E, R>) => Stream<A, E, R>
<A, E, R>(self: Stream<A, E, R>, predicate: Predicate<A>): Stream<A, E, R>
}
Since v2.0.0
dropWhileEffect
Drops all elements of the stream for as long as the specified predicate produces an effect that evalutates to true
Signature
declare const dropWhileEffect: {
<A, E2, R2>(
predicate: (a: NoInfer<A>) => Effect.Effect<boolean, E2, R2>
): <E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, R2 | R>
<A, E, R, E2, R2>(
self: Stream<A, E, R>,
predicate: (a: A) => Effect.Effect<boolean, E2, R2>
): Stream<A, E | E2, R | R2>
}
Since v2.0.0
either
Returns a stream whose failures and successes have been lifted into an Either
. The resulting stream cannot fail, because the failures have been exposed as part of the Either
success case.
Signature
declare const either: <A, E, R>(self: Stream<A, E, R>) => Stream<Either.Either<A, E>, never, R>
Since v2.0.0
ensuring
Executes the provided finalizer after this stream’s finalizers run.
Example
import { Console, Effect, Stream } from "effect"
const program = Stream.fromEffect(Console.log("Application Logic.")).pipe(
Stream.concat(Stream.finalizer(Console.log("Finalizing the stream"))),
Stream.ensuring(Console.log("Doing some other works after stream's finalization"))
)
Effect.runPromise(Stream.runCollect(program)).then(console.log)
// Application Logic.
// Finalizing the stream
// Doing some other works after stream's finalization
// { _id: 'Chunk', values: [ undefined, undefined ] }
Signature
declare const ensuring: {
<X, R2>(finalizer: Effect.Effect<X, never, R2>): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, R2 | R>
<A, E, R, X, R2>(self: Stream<A, E, R>, finalizer: Effect.Effect<X, never, R2>): Stream<A, E, R | R2>
}
Since v2.0.0
ensuringWith
Executes the provided finalizer after this stream’s finalizers run.
Signature
declare const ensuringWith: {
<E, R2>(
finalizer: (exit: Exit.Exit<unknown, E>) => Effect.Effect<unknown, never, R2>
): <A, R>(self: Stream<A, E, R>) => Stream<A, E, R2 | R>
<A, E, R, R2>(
self: Stream<A, E, R>,
finalizer: (exit: Exit.Exit<unknown, E>) => Effect.Effect<unknown, never, R2>
): Stream<A, E, R | R2>
}
Since v2.0.0
filterMap
Performs a filter and map in a single step.
Signature
declare const filterMap: {
<A, B>(pf: (a: A) => Option.Option<B>): <E, R>(self: Stream<A, E, R>) => Stream<B, E, R>
<A, E, R, B>(self: Stream<A, E, R>, pf: (a: A) => Option.Option<B>): Stream<B, E, R>
}
Since v2.0.0
filterMapEffect
Performs an effectful filter and map in a single step.
Signature
declare const filterMapEffect: {
<A, A2, E2, R2>(
pf: (a: A) => Option.Option<Effect.Effect<A2, E2, R2>>
): <E, R>(self: Stream<A, E, R>) => Stream<A2, E2 | E, R2 | R>
<A, E, R, A2, E2, R2>(
self: Stream<A, E, R>,
pf: (a: A) => Option.Option<Effect.Effect<A2, E2, R2>>
): Stream<A2, E | E2, R | R2>
}
Since v2.0.0
filterMapWhile
Transforms all elements of the stream for as long as the specified partial function is defined.
Signature
declare const filterMapWhile: {
<A, A2>(pf: (a: A) => Option.Option<A2>): <E, R>(self: Stream<A, E, R>) => Stream<A2, E, R>
<A, E, R, A2>(self: Stream<A, E, R>, pf: (a: A) => Option.Option<A2>): Stream<A2, E, R>
}
Since v2.0.0
filterMapWhileEffect
Effectfully transforms all elements of the stream for as long as the specified partial function is defined.
Signature
declare const filterMapWhileEffect: {
<A, A2, E2, R2>(
pf: (a: A) => Option.Option<Effect.Effect<A2, E2, R2>>
): <E, R>(self: Stream<A, E, R>) => Stream<A2, E2 | E, R2 | R>
<A, E, R, A2, E2, R2>(
self: Stream<A, E, R>,
pf: (a: A) => Option.Option<Effect.Effect<A2, E2, R2>>
): Stream<A2, E | E2, R | R2>
}
Since v2.0.0
forever
Repeats this stream forever.
Signature
declare const forever: <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, R>
Since v2.0.0
fromEventListener
Creates a Stream
using addEventListener.
Signature
declare const fromEventListener: <A = unknown>(
target: EventListener<A>,
type: string,
options?:
| boolean
| {
readonly capture?: boolean
readonly passive?: boolean
readonly once?: boolean
readonly bufferSize?: number | "unbounded" | undefined
}
| undefined
) => Stream<A>
Since v3.1.0
haltAfter
Specialized version of haltWhen which halts the evaluation of this stream after the given duration.
An element in the process of being pulled will not be interrupted when the given duration completes. See interruptAfter
for this behavior.
Signature
declare const haltAfter: {
(duration: Duration.DurationInput): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, R>
<A, E, R>(self: Stream<A, E, R>, duration: Duration.DurationInput): Stream<A, E, R>
}
Since v2.0.0
haltWhen
Halts the evaluation of this stream when the provided effect completes. The given effect will be forked as part of the returned stream, and its success will be discarded.
An element in the process of being pulled will not be interrupted when the effect completes. See interruptWhen
for this behavior.
If the effect completes with a failure, the stream will emit that failure.
Signature
declare const haltWhen: {
<X, E2, R2>(effect: Effect.Effect<X, E2, R2>): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, R2 | R>
<A, E, R, X, E2, R2>(self: Stream<A, E, R>, effect: Effect.Effect<X, E2, R2>): Stream<A, E | E2, R | R2>
}
Since v2.0.0
haltWhenDeferred
Halts the evaluation of this stream when the provided promise resolves.
If the promise completes with a failure, the stream will emit that failure.
Signature
declare const haltWhenDeferred: {
<X, E2>(deferred: Deferred.Deferred<X, E2>): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, R>
<A, E, R, X, E2>(self: Stream<A, E, R>, deferred: Deferred.Deferred<X, E2>): Stream<A, E | E2, R>
}
Since v2.0.0
identity
The identity pipeline, which does not modify streams in any way.
Signature
declare const identity: <A, E = never, R = never>() => Stream<A, E, R>
Since v2.0.0
interleave
Interleaves this stream and the specified stream deterministically by alternating pulling values from this stream and the specified stream. When one stream is exhausted all remaining values in the other stream will be pulled.
Example
import { Effect, Stream } from "effect"
const s1 = Stream.make(1, 2, 3)
const s2 = Stream.make(4, 5, 6)
const stream = Stream.interleave(s1, s2)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 4, 2, 5, 3, 6 ] }
Signature
declare const interleave: {
<A2, E2, R2>(that: Stream<A2, E2, R2>): <A, E, R>(self: Stream<A, E, R>) => Stream<A2 | A, E2 | E, R2 | R>
<A, E, R, A2, E2, R2>(self: Stream<A, E, R>, that: Stream<A2, E2, R2>): Stream<A | A2, E | E2, R | R2>
}
Since v2.0.0
interleaveWith
Combines this stream and the specified stream deterministically using the stream of boolean values pull
to control which stream to pull from next. A value of true
indicates to pull from this stream and a value of false
indicates to pull from the specified stream. Only consumes as many elements as requested by the pull
stream. If either this stream or the specified stream are exhausted further requests for values from that stream will be ignored.
Example
import { Effect, Stream } from "effect"
const s1 = Stream.make(1, 3, 5, 7, 9)
const s2 = Stream.make(2, 4, 6, 8, 10)
const booleanStream = Stream.make(true, false, false).pipe(Stream.forever)
const stream = Stream.interleaveWith(s1, s2, booleanStream)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// {
// _id: 'Chunk',
// values: [
// 1, 2, 4, 3, 6,
// 8, 5, 10, 7, 9
// ]
// }
Signature
declare const interleaveWith: {
<A2, E2, R2, E3, R3>(
that: Stream<A2, E2, R2>,
decider: Stream<boolean, E3, R3>
): <A, E, R>(self: Stream<A, E, R>) => Stream<A2 | A, E2 | E3 | E, R2 | R3 | R>
<A, E, R, A2, E2, R2, E3, R3>(
self: Stream<A, E, R>,
that: Stream<A2, E2, R2>,
decider: Stream<boolean, E3, R3>
): Stream<A | A2, E | E2 | E3, R | R2 | R3>
}
Since v2.0.0
interruptAfter
Specialized version of Stream.interruptWhen
which interrupts the evaluation of this stream after the given Duration
.
Signature
declare const interruptAfter: {
(duration: Duration.DurationInput): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, R>
<A, E, R>(self: Stream<A, E, R>, duration: Duration.DurationInput): Stream<A, E, R>
}
Since v2.0.0
interruptWhen
Interrupts the evaluation of this stream when the provided effect completes. The given effect will be forked as part of this stream, and its success will be discarded. This combinator will also interrupt any in-progress element being pulled from upstream.
If the effect completes with a failure before the stream completes, the returned stream will emit that failure.
Signature
declare const interruptWhen: {
<X, E2, R2>(effect: Effect.Effect<X, E2, R2>): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, R2 | R>
<A, E, R, X, E2, R2>(self: Stream<A, E, R>, effect: Effect.Effect<X, E2, R2>): Stream<A, E | E2, R | R2>
}
Since v2.0.0
interruptWhenDeferred
Interrupts the evaluation of this stream when the provided promise resolves. This combinator will also interrupt any in-progress element being pulled from upstream.
If the promise completes with a failure, the stream will emit that failure.
Signature
declare const interruptWhenDeferred: {
<X, E2>(deferred: Deferred.Deferred<X, E2>): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, R>
<A, E, R, X, E2>(self: Stream<A, E, R>, deferred: Deferred.Deferred<X, E2>): Stream<A, E | E2, R>
}
Since v2.0.0
intersperse
Intersperse stream with provided element
.
Example
import { Effect, Stream } from "effect"
const stream = Stream.make(1, 2, 3, 4, 5).pipe(Stream.intersperse(0))
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// {
// _id: 'Chunk',
// values: [
// 1, 0, 2, 0, 3,
// 0, 4, 0, 5
// ]
// }
Signature
declare const intersperse: {
<A2>(element: A2): <A, E, R>(self: Stream<A, E, R>) => Stream<A2 | A, E, R>
<A, E, R, A2>(self: Stream<A, E, R>, element: A2): Stream<A | A2, E, R>
}
Since v2.0.0
intersperseAffixes
Intersperse the specified element, also adding a prefix and a suffix.
Example
import { Effect, Stream } from "effect"
const stream = Stream.make(1, 2, 3, 4, 5).pipe(
Stream.intersperseAffixes({
start: "[",
middle: "-",
end: "]"
})
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// {
// _id: 'Chunk',
// values: [
// '[', 1, '-', 2, '-',
// 3, '-', 4, '-', 5,
// ']'
// ]
// }
Signature
declare const intersperseAffixes: {
<A2, A3, A4>(options: {
readonly start: A2
readonly middle: A3
readonly end: A4
}): <A, E, R>(self: Stream<A, E, R>) => Stream<A2 | A3 | A4 | A, E, R>
<A, E, R, A2, A3, A4>(
self: Stream<A, E, R>,
options: { readonly start: A2; readonly middle: A3; readonly end: A4 }
): Stream<A | A2 | A3 | A4, E, R>
}
Since v2.0.0
mapBoth
Returns a stream whose failure and success channels have been mapped by the specified onFailure
and onSuccess
functions.
Signature
declare const mapBoth: {
<E, E2, A, A2>(options: {
readonly onFailure: (e: E) => E2
readonly onSuccess: (a: A) => A2
}): <R>(self: Stream<A, E, R>) => Stream<A2, E2, R>
<A, E, R, E2, A2>(
self: Stream<A, E, R>,
options: { readonly onFailure: (e: E) => E2; readonly onSuccess: (a: A) => A2 }
): Stream<A2, E2, R>
}
Since v2.0.0
merge
Merges this stream and the specified stream together.
New produced stream will terminate when both specified stream terminate if no termination strategy is specified.
Example
import { Effect, Schedule, Stream } from "effect"
const s1 = Stream.make(1, 2, 3).pipe(Stream.schedule(Schedule.spaced("100 millis")))
const s2 = Stream.make(4, 5, 6).pipe(Stream.schedule(Schedule.spaced("200 millis")))
const stream = Stream.merge(s1, s2)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 4, 2, 3, 5, 6 ] }
Signature
declare const merge: {
<A2, E2, R2>(
that: Stream<A2, E2, R2>,
options?: { readonly haltStrategy?: HaltStrategy.HaltStrategyInput | undefined } | undefined
): <A, E, R>(self: Stream<A, E, R>) => Stream<A2 | A, E2 | E, R2 | R>
<A, E, R, A2, E2, R2>(
self: Stream<A, E, R>,
that: Stream<A2, E2, R2>,
options?: { readonly haltStrategy?: HaltStrategy.HaltStrategyInput | undefined } | undefined
): Stream<A | A2, E | E2, R | R2>
}
Since v2.0.0
mergeAll
Merges a variable list of streams in a non-deterministic fashion. Up to n
streams may be consumed in parallel and up to outputBuffer
chunks may be buffered by this operator.
Signature
declare const mergeAll: {
(options: {
readonly concurrency: number | "unbounded"
readonly bufferSize?: number | undefined
}): <A, E, R>(streams: Iterable<Stream<A, E, R>>) => Stream<A, E, R>
<A, E, R>(
streams: Iterable<Stream<A, E, R>>,
options: { readonly concurrency: number | "unbounded"; readonly bufferSize?: number | undefined }
): Stream<A, E, R>
}
Since v2.0.0
mergeEither
Merges this stream and the specified stream together to produce a stream of eithers.
Signature
declare const mergeEither: {
<A2, E2, R2>(
that: Stream<A2, E2, R2>
): <A, E, R>(self: Stream<A, E, R>) => Stream<Either.Either<A2, A>, E2 | E, R2 | R>
<A, E, R, A2, E2, R2>(self: Stream<A, E, R>, that: Stream<A2, E2, R2>): Stream<Either.Either<A2, A>, E | E2, R | R2>
}
Since v2.0.0
mergeLeft
Merges this stream and the specified stream together, discarding the values from the right stream.
Signature
declare const mergeLeft: {
<AR, ER, RR>(right: Stream<AR, ER, RR>): <AL, EL, RL>(left: Stream<AL, EL, RL>) => Stream<AL, ER | EL, RR | RL>
<AL, EL, RL, AR, ER, RR>(left: Stream<AL, EL, RL>, right: Stream<AR, ER, RR>): Stream<AL, EL | ER, RL | RR>
}
Since v2.0.0
mergeRight
Merges this stream and the specified stream together, discarding the values from the left stream.
Signature
declare const mergeRight: {
<AR, ER, RR>(right: Stream<AR, ER, RR>): <AL, EL, RL>(left: Stream<AL, EL, RL>) => Stream<AR, ER | EL, RR | RL>
<AL, EL, RL, AR, ER, RR>(left: Stream<AL, EL, RL>, right: Stream<AR, ER, RR>): Stream<AR, EL | ER, RL | RR>
}
Since v2.0.0
mergeWith
Merges this stream and the specified stream together to a common element type with the specified mapping functions.
New produced stream will terminate when both specified stream terminate if no termination strategy is specified.
Example
import { Effect, Schedule, Stream } from "effect"
const s1 = Stream.make("1", "2", "3").pipe(Stream.schedule(Schedule.spaced("100 millis")))
const s2 = Stream.make(4.1, 5.3, 6.2).pipe(Stream.schedule(Schedule.spaced("200 millis")))
const stream = Stream.mergeWith(s1, s2, {
onSelf: (s) => parseInt(s),
onOther: (n) => Math.floor(n)
})
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 4, 2, 3, 5, 6 ] }
Signature
declare const mergeWith: {
<A2, E2, R2, A, A3, A4>(
other: Stream<A2, E2, R2>,
options: {
readonly onSelf: (a: A) => A3
readonly onOther: (a2: A2) => A4
readonly haltStrategy?: HaltStrategy.HaltStrategyInput | undefined
}
): <E, R>(self: Stream<A, E, R>) => Stream<A3 | A4, E2 | E, R2 | R>
<A, E, R, A2, E2, R2, A3, A4>(
self: Stream<A, E, R>,
other: Stream<A2, E2, R2>,
options: {
readonly onSelf: (a: A) => A3
readonly onOther: (a2: A2) => A4
readonly haltStrategy?: HaltStrategy.HaltStrategyInput | undefined
}
): Stream<A3 | A4, E | E2, R | R2>
}
Since v2.0.0
mkString
Returns a combined string resulting from concatenating each of the values from the stream.
Signature
declare const mkString: <E, R>(self: Stream<string, E, R>) => Effect.Effect<string, E, R>
Since v2.0.0
onDone
Runs the specified effect if this stream ends.
Signature
declare const onDone: {
<X, R2>(cleanup: () => Effect.Effect<X, never, R2>): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, R2 | R>
<A, E, R, X, R2>(self: Stream<A, E, R>, cleanup: () => Effect.Effect<X, never, R2>): Stream<A, E, R | R2>
}
Since v2.0.0
onError
Runs the specified effect if this stream fails, providing the error to the effect if it exists.
Note: Unlike Effect.onError
there is no guarantee that the provided effect will not be interrupted.
Signature
declare const onError: {
<E, X, R2>(
cleanup: (cause: Cause.Cause<E>) => Effect.Effect<X, never, R2>
): <A, R>(self: Stream<A, E, R>) => Stream<A, E, R2 | R>
<A, E, R, X, R2>(
self: Stream<A, E, R>,
cleanup: (cause: Cause.Cause<E>) => Effect.Effect<X, never, R2>
): Stream<A, E, R | R2>
}
Since v2.0.0
partition
Splits a stream into two substreams based on a predicate.
Details
The Stream.partition
function splits a stream into two parts: one for elements that satisfy the predicate (evaluated to true
) and another for those that do not (evaluated to false
).
The faster stream may advance up to bufferSize
elements ahead of the slower one.
Example (Partitioning a Stream into Even and Odd Numbers)
import { Effect, Stream } from "effect"
const partition = Stream.range(1, 9).pipe(Stream.partition((n) => n % 2 === 0, { bufferSize: 5 }))
const program = Effect.scoped(
Effect.gen(function* () {
const [odds, evens] = yield* partition
console.log(yield* Stream.runCollect(odds))
console.log(yield* Stream.runCollect(evens))
})
)
Effect.runPromise(program)
// { _id: 'Chunk', values: [ 1, 3, 5, 7, 9 ] }
// { _id: 'Chunk', values: [ 2, 4, 6, 8 ] }
See
partitionEither
for partitioning a stream based on effectful conditions.
Signature
declare const partition: {
<C extends A, B extends A, A = C>(
refinement: Refinement<NoInfer<A>, B>,
options?: { bufferSize?: number | undefined } | undefined
): <E, R>(
self: Stream<C, E, R>
) => Effect.Effect<[excluded: Stream<Exclude<C, B>, E, never>, satisfying: Stream<B, E, never>], E, R | Scope.Scope>
<A>(
predicate: Predicate<A>,
options?: { bufferSize?: number | undefined } | undefined
): <E, R>(
self: Stream<A, E, R>
) => Effect.Effect<[excluded: Stream<A, E, never>, satisfying: Stream<A, E, never>], E, Scope.Scope | R>
<C extends A, E, R, B extends A, A = C>(
self: Stream<C, E, R>,
refinement: Refinement<A, B>,
options?: { bufferSize?: number | undefined } | undefined
): Effect.Effect<[excluded: Stream<Exclude<C, B>, E, never>, satisfying: Stream<B, E, never>], E, R | Scope.Scope>
<A, E, R>(
self: Stream<A, E, R>,
predicate: Predicate<A>,
options?: { bufferSize?: number | undefined } | undefined
): Effect.Effect<[excluded: Stream<A, E, never>, satisfying: Stream<A, E, never>], E, R | Scope.Scope>
}
Since v2.0.0
partitionEither
Splits a stream into two substreams based on an effectful condition.
Details
The Stream.partitionEither
function is used to divide a stream into two parts: one for elements that satisfy a condition producing Either.left
values, and another for those that produce Either.right
values. This function applies an effectful predicate to each element in the stream to determine which substream it belongs to.
The faster stream may advance up to bufferSize
elements ahead of the slower one.
Example (Partitioning a Stream with an Effectful Predicate)
import { Effect, Either, Stream } from "effect"
const partition = Stream.range(1, 9).pipe(
Stream.partitionEither((n) => Effect.succeed(n % 2 === 0 ? Either.right(n) : Either.left(n)), { bufferSize: 5 })
)
const program = Effect.scoped(
Effect.gen(function* () {
const [evens, odds] = yield* partition
console.log(yield* Stream.runCollect(evens))
console.log(yield* Stream.runCollect(odds))
})
)
Effect.runPromise(program)
// { _id: 'Chunk', values: [ 1, 3, 5, 7, 9 ] }
// { _id: 'Chunk', values: [ 2, 4, 6, 8 ] }
See
partition
for partitioning a stream based on simple conditions.
Signature
declare const partitionEither: {
<A, A3, A2, E2, R2>(
predicate: (a: NoInfer<A>) => Effect.Effect<Either.Either<A3, A2>, E2, R2>,
options?: { readonly bufferSize?: number | undefined } | undefined
): <E, R>(
self: Stream<A, E, R>
) => Effect.Effect<[left: Stream<A2, E2 | E, never>, right: Stream<A3, E2 | E, never>], E2 | E, Scope.Scope | R2 | R>
<A, E, R, A3, A2, E2, R2>(
self: Stream<A, E, R>,
predicate: (a: A) => Effect.Effect<Either.Either<A3, A2>, E2, R2>,
options?: { readonly bufferSize?: number | undefined } | undefined
): Effect.Effect<[left: Stream<A2, E | E2, never>, right: Stream<A3, E | E2, never>], E | E2, Scope.Scope | R | R2>
}
Since v2.0.0
peel
Peels off enough material from the stream to construct a Z
using the provided Sink
and then returns both the Z
and the rest of the Stream
in a scope. Like all scoped values, the provided stream is valid only within the scope.
Signature
declare const peel: {
<A2, A, E2, R2>(
sink: Sink.Sink<A2, A, A, E2, R2>
): <E, R>(self: Stream<A, E, R>) => Effect.Effect<[A2, Stream<A, E, never>], E2 | E, Scope.Scope | R2 | R>
<A, E, R, A2, E2, R2>(
self: Stream<A, E, R>,
sink: Sink.Sink<A2, A, A, E2, R2>
): Effect.Effect<[A2, Stream<A, E, never>], E | E2, Scope.Scope | R | R2>
}
Since v2.0.0
pipeThrough
Pipes all of the values from this stream through the provided sink.
See also Stream.transduce
.
Signature
declare const pipeThrough: {
<A2, A, L, E2, R2>(sink: Sink.Sink<A2, A, L, E2, R2>): <E, R>(self: Stream<A, E, R>) => Stream<L, E2 | E, R2 | R>
<A, E, R, A2, L, E2, R2>(self: Stream<A, E, R>, sink: Sink.Sink<A2, A, L, E2, R2>): Stream<L, E | E2, R | R2>
}
Since v2.0.0
pipeThroughChannel
Pipes all the values from this stream through the provided channel.
Signature
declare const pipeThroughChannel: {
<R2, E, E2, A, A2>(
channel: Channel.Channel<Chunk.Chunk<A2>, Chunk.Chunk<A>, E2, E, unknown, unknown, R2>
): <R>(self: Stream<A, E, R>) => Stream<A2, E2, R2 | R>
<R, R2, E, E2, A, A2>(
self: Stream<A, E, R>,
channel: Channel.Channel<Chunk.Chunk<A2>, Chunk.Chunk<A>, E2, E, unknown, unknown, R2>
): Stream<A2, E2, R | R2>
}
Since v2.0.0
pipeThroughChannelOrFail
Pipes all values from this stream through the provided channel, passing through any error emitted by this stream unchanged.
Signature
declare const pipeThroughChannelOrFail: {
<R2, E, E2, A, A2>(
chan: Channel.Channel<Chunk.Chunk<A2>, Chunk.Chunk<A>, E2, E, unknown, unknown, R2>
): <R>(self: Stream<A, E, R>) => Stream<A2, E | E2, R2 | R>
<R, R2, E, E2, A, A2>(
self: Stream<A, E, R>,
chan: Channel.Channel<Chunk.Chunk<A2>, Chunk.Chunk<A>, E2, E, unknown, unknown, R2>
): Stream<A2, E | E2, R | R2>
}
Since v2.0.0
prepend
Emits the provided chunk before emitting any other value.
Signature
declare const prepend: {
<B>(values: Chunk.Chunk<B>): <A, E, R>(self: Stream<A, E, R>) => Stream<B | A, E, R>
<A, E, R, B>(self: Stream<A, E, R>, values: Chunk.Chunk<B>): Stream<A | B, E, R>
}
Since v2.0.0
rechunk
Re-chunks the elements of the stream into chunks of n
elements each. The last chunk might contain less than n
elements.
Signature
declare const rechunk: {
(n: number): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, R>
<A, E, R>(self: Stream<A, E, R>, n: number): Stream<A, E, R>
}
Since v2.0.0
repeat
Repeats the entire stream using the specified schedule. The stream will execute normally, and then repeat again according to the provided schedule.
Example
import { Effect, Schedule, Stream } from "effect"
const stream = Stream.repeat(Stream.succeed(1), Schedule.forever)
Effect.runPromise(Stream.runCollect(stream.pipe(Stream.take(5)))).then(console.log)
// { _id: 'Chunk', values: [ 1, 1, 1, 1, 1 ] }
Signature
declare const repeat: {
<B, R2>(schedule: Schedule.Schedule<B, unknown, R2>): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, R2 | R>
<A, E, R, B, R2>(self: Stream<A, E, R>, schedule: Schedule.Schedule<B, unknown, R2>): Stream<A, E, R | R2>
}
Since v2.0.0
repeatEither
Repeats the entire stream using the specified schedule. The stream will execute normally, and then repeat again according to the provided schedule. The schedule output will be emitted at the end of each repetition.
Signature
declare const repeatEither: {
<B, R2>(
schedule: Schedule.Schedule<B, unknown, R2>
): <A, E, R>(self: Stream<A, E, R>) => Stream<Either.Either<A, B>, E, R2 | R>
<A, E, R, B, R2>(
self: Stream<A, E, R>,
schedule: Schedule.Schedule<B, unknown, R2>
): Stream<Either.Either<A, B>, E, R | R2>
}
Since v2.0.0
repeatElements
Repeats each element of the stream using the provided schedule. Repetitions are done in addition to the first execution, which means using Schedule.recurs(1)
actually results in the original effect, plus an additional recurrence, for a total of two repetitions of each value in the stream.
Signature
declare const repeatElements: {
<B, R2>(schedule: Schedule.Schedule<B, unknown, R2>): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, R2 | R>
<A, E, R, B, R2>(self: Stream<A, E, R>, schedule: Schedule.Schedule<B, unknown, R2>): Stream<A, E, R | R2>
}
Since v2.0.0
repeatElementsWith
Repeats each element of the stream using the provided schedule. When the schedule is finished, then the output of the schedule will be emitted into the stream. Repetitions are done in addition to the first execution, which means using Schedule.recurs(1)
actually results in the original effect, plus an additional recurrence, for a total of two repetitions of each value in the stream.
This function accepts two conversion functions, which allow the output of this stream and the output of the provided schedule to be unified into a single type. For example, Either
or similar data type.
Signature
declare const repeatElementsWith: {
<B, R2, A, C>(
schedule: Schedule.Schedule<B, unknown, R2>,
options: { readonly onElement: (a: A) => C; readonly onSchedule: (b: B) => C }
): <E, R>(self: Stream<A, E, R>) => Stream<C, E, R2 | R>
<A, E, R, B, R2, C>(
self: Stream<A, E, R>,
schedule: Schedule.Schedule<B, unknown, R2>,
options: { readonly onElement: (a: A) => C; readonly onSchedule: (b: B) => C }
): Stream<C, E, R | R2>
}
Since v2.0.0
repeatWith
Repeats the entire stream using the specified schedule. The stream will execute normally, and then repeat again according to the provided schedule. The schedule output will be emitted at the end of each repetition and can be unified with the stream elements using the provided functions.
Signature
declare const repeatWith: {
<B, R2, A, C>(
schedule: Schedule.Schedule<B, unknown, R2>,
options: { readonly onElement: (a: A) => C; readonly onSchedule: (b: B) => C }
): <E, R>(self: Stream<A, E, R>) => Stream<C, E, R2 | R>
<A, E, R, B, R2, C>(
self: Stream<A, E, R>,
schedule: Schedule.Schedule<B, unknown, R2>,
options: { readonly onElement: (a: A) => C; readonly onSchedule: (b: B) => C }
): Stream<C, E, R | R2>
}
Since v2.0.0
retry
When the stream fails, retry it according to the given schedule
This retries the entire stream, so will re-execute all of the stream’s acquire operations.
The schedule is reset as soon as the first element passes through the stream again.
Signature
declare const retry: {
<E0 extends E, R2, E, X>(
schedule: Schedule.Schedule<X, E0, R2>
): <A, R>(self: Stream<A, E, R>) => Stream<A, E, R2 | R>
<A, E, R, X, E0 extends E, R2>(self: Stream<A, E, R>, schedule: Schedule.Schedule<X, E0, R2>): Stream<A, E, R | R2>
}
Since v2.0.0
scan
Statefully maps over the elements of this stream to produce all intermediate results of type S
given an initial S.
Example
import { Effect, Stream } from "effect"
const stream = Stream.range(1, 6).pipe(Stream.scan(0, (a, b) => a + b))
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 0, 1, 3, 6, 10, 15, 21 ] }
Signature
declare const scan: {
<S, A>(s: S, f: (s: S, a: A) => S): <E, R>(self: Stream<A, E, R>) => Stream<S, E, R>
<A, E, R, S>(self: Stream<A, E, R>, s: S, f: (s: S, a: A) => S): Stream<S, E, R>
}
Since v2.0.0
scanEffect
Statefully and effectfully maps over the elements of this stream to produce all intermediate results of type S
given an initial S.
Signature
declare const scanEffect: {
<S, A, E2, R2>(
s: S,
f: (s: S, a: A) => Effect.Effect<S, E2, R2>
): <E, R>(self: Stream<A, E, R>) => Stream<S, E2 | E, R2 | R>
<A, E, R, S, E2, R2>(
self: Stream<A, E, R>,
s: S,
f: (s: S, a: A) => Effect.Effect<S, E2, R2>
): Stream<S, E | E2, R | R2>
}
Since v2.0.0
scanReduce
Statefully maps over the elements of this stream to produce all intermediate results.
See also Stream.scan
.
Signature
declare const scanReduce: {
<A2, A>(f: (a2: A2 | A, a: A) => A2): <E, R>(self: Stream<A, E, R>) => Stream<A2 | A, E, R>
<A, E, R, A2>(self: Stream<A, E, R>, f: (a2: A | A2, a: A) => A2): Stream<A | A2, E, R>
}
Since v2.0.0
scanReduceEffect
Statefully and effectfully maps over the elements of this stream to produce all intermediate results.
See also Stream.scanEffect
.
Signature
declare const scanReduceEffect: {
<A2, A, E2, R2>(
f: (a2: A2 | A, a: A) => Effect.Effect<A2 | A, E2, R2>
): <E, R>(self: Stream<A, E, R>) => Stream<A2 | A, E2 | E, R2 | R>
<A, E, R, A2, E2, R2>(
self: Stream<A, E, R>,
f: (a2: A | A2, a: A) => Effect.Effect<A | A2, E2, R2>
): Stream<A | A2, E | E2, R | R2>
}
Since v2.0.0
schedule
Schedules the output of the stream using the provided schedule
.
Signature
declare const schedule: {
<X, A0 extends A, R2, A>(
schedule: Schedule.Schedule<X, A0, R2>
): <E, R>(self: Stream<A, E, R>) => Stream<A, E, R2 | R>
<A, E, R, X, A0 extends A, R2>(self: Stream<A, E, R>, schedule: Schedule.Schedule<X, A0, R2>): Stream<A, E, R | R2>
}
Since v2.0.0
scheduleWith
Schedules the output of the stream using the provided schedule
and emits its output at the end (if schedule
is finite). Uses the provided function to align the stream and schedule outputs on the same type.
Signature
declare const scheduleWith: {
<B, A0 extends A, R2, A, C>(
schedule: Schedule.Schedule<B, A0, R2>,
options: { readonly onElement: (a: A) => C; readonly onSchedule: (b: B) => C }
): <E, R>(self: Stream<A, E, R>) => Stream<C, E, R2 | R>
<A, E, R, B, A0 extends A, R2, C>(
self: Stream<A, E, R>,
schedule: Schedule.Schedule<B, A0, R2>,
options: { readonly onElement: (a: A) => C; readonly onSchedule: (b: B) => C }
): Stream<C, E, R | R2>
}
Since v2.0.0
share
Returns a new Stream that multicasts the original Stream, subscribing to it as soon as the first consumer subscribes. As long as there is at least one consumer, the upstream will continue running and emitting data. When all consumers have exited, the upstream will be finalized.
Signature
declare const share: {
<A, E>(
config:
| {
readonly capacity: "unbounded"
readonly replay?: number | undefined
readonly idleTimeToLive?: Duration.DurationInput | undefined
}
| {
readonly capacity: number
readonly strategy?: "sliding" | "dropping" | "suspend" | undefined
readonly replay?: number | undefined
readonly idleTimeToLive?: Duration.DurationInput | undefined
}
): <R>(self: Stream<A, E, R>) => Effect.Effect<Stream<A, E>, never, R | Scope.Scope>
<A, E, R>(
self: Stream<A, E, R>,
config:
| {
readonly capacity: "unbounded"
readonly replay?: number | undefined
readonly idleTimeToLive?: Duration.DurationInput | undefined
}
| {
readonly capacity: number
readonly strategy?: "sliding" | "dropping" | "suspend" | undefined
readonly replay?: number | undefined
readonly idleTimeToLive?: Duration.DurationInput | undefined
}
): Effect.Effect<Stream<A, E>, never, R | Scope.Scope>
}
Since v3.8.0
sliding
Emits a sliding window of n
elements.
import { pipe, Stream } from "effect"
pipe(Stream.make(1, 2, 3, 4), Stream.sliding(2), Stream.runCollect)
// => Chunk(Chunk(1, 2), Chunk(2, 3), Chunk(3, 4))
Signature
declare const sliding: {
(chunkSize: number): <A, E, R>(self: Stream<A, E, R>) => Stream<Chunk.Chunk<A>, E, R>
<A, E, R>(self: Stream<A, E, R>, chunkSize: number): Stream<Chunk.Chunk<A>, E, R>
}
Since v2.0.0
slidingSize
Like sliding
, but with a configurable stepSize
parameter.
Signature
declare const slidingSize: {
(chunkSize: number, stepSize: number): <A, E, R>(self: Stream<A, E, R>) => Stream<Chunk.Chunk<A>, E, R>
<A, E, R>(self: Stream<A, E, R>, chunkSize: number, stepSize: number): Stream<Chunk.Chunk<A>, E, R>
}
Since v2.0.0
some
Converts an option on values into an option on errors.
Signature
declare const some: <A, E, R>(self: Stream<Option.Option<A>, E, R>) => Stream<A, Option.Option<E>, R>
Since v2.0.0
someOrElse
Extracts the optional value, or returns the given ‘default’.
Signature
declare const someOrElse: {
<A2>(fallback: LazyArg<A2>): <A, E, R>(self: Stream<Option.Option<A>, E, R>) => Stream<A2 | A, E, R>
<A, E, R, A2>(self: Stream<Option.Option<A>, E, R>, fallback: LazyArg<A2>): Stream<A | A2, E, R>
}
Since v2.0.0
someOrFail
Extracts the optional value, or fails with the given error ‘e’.
Signature
declare const someOrFail: {
<E2>(error: LazyArg<E2>): <A, E, R>(self: Stream<Option.Option<A>, E, R>) => Stream<A, E2 | E, R>
<A, E, R, E2>(self: Stream<Option.Option<A>, E, R>, error: LazyArg<E2>): Stream<A, E | E2, R>
}
Since v2.0.0
split
Splits elements based on a predicate or refinement.
import { pipe, Stream } from "effect"
pipe(
Stream.range(1, 10),
Stream.split((n) => n % 4 === 0),
Stream.runCollect
)
// => Chunk(Chunk(1, 2, 3), Chunk(5, 6, 7), Chunk(9))
Signature
declare const split: {
<A, B extends A>(
refinement: Refinement<NoInfer<A>, B>
): <E, R>(self: Stream<A, E, R>) => Stream<Chunk.Chunk<Exclude<A, B>>, E, R>
<A>(predicate: Predicate<NoInfer<A>>): <E, R>(self: Stream<A, E, R>) => Stream<Chunk.Chunk<A>, E, R>
<A, E, R, B extends A>(self: Stream<A, E, R>, refinement: Refinement<A, B>): Stream<Chunk.Chunk<Exclude<A, B>>, E, R>
<A, E, R>(self: Stream<A, E, R>, predicate: Predicate<A>): Stream<Chunk.Chunk<A>, E, R>
}
Since v2.0.0
splitOnChunk
Splits elements on a delimiter and transforms the splits into desired output.
Signature
declare const splitOnChunk: {
<A>(delimiter: Chunk.Chunk<A>): <E, R>(self: Stream<A, E, R>) => Stream<Chunk.Chunk<A>, E, R>
<A, E, R>(self: Stream<A, E, R>, delimiter: Chunk.Chunk<A>): Stream<Chunk.Chunk<A>, E, R>
}
Since v2.0.0
take
Takes the specified number of elements from this stream.
Example
import { Effect, Stream } from "effect"
const stream = Stream.take(
Stream.iterate(0, (n) => n + 1),
5
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 0, 1, 2, 3, 4 ] }
Signature
declare const take: {
(n: number): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, R>
<A, E, R>(self: Stream<A, E, R>, n: number): Stream<A, E, R>
}
Since v2.0.0
takeRight
Takes the last specified number of elements from this stream.
Example
import { Effect, Stream } from "effect"
const stream = Stream.takeRight(Stream.make(1, 2, 3, 4, 5, 6), 3)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 4, 5, 6 ] }
Signature
declare const takeRight: {
(n: number): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, R>
<A, E, R>(self: Stream<A, E, R>, n: number): Stream<A, E, R>
}
Since v2.0.0
takeUntil
Takes all elements of the stream until the specified predicate evaluates to true
.
Example
import { Effect, Stream } from "effect"
const stream = Stream.takeUntil(
Stream.iterate(0, (n) => n + 1),
(n) => n === 4
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 0, 1, 2, 3, 4 ] }
Signature
declare const takeUntil: {
<A>(predicate: Predicate<NoInfer<A>>): <E, R>(self: Stream<A, E, R>) => Stream<A, E, R>
<A, E, R>(self: Stream<A, E, R>, predicate: Predicate<A>): Stream<A, E, R>
}
Since v2.0.0
takeUntilEffect
Takes all elements of the stream until the specified effectual predicate evaluates to true
.
Signature
declare const takeUntilEffect: {
<A, E2, R2>(
predicate: (a: NoInfer<A>) => Effect.Effect<boolean, E2, R2>
): <E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, R2 | R>
<A, E, R, E2, R2>(
self: Stream<A, E, R>,
predicate: (a: A) => Effect.Effect<boolean, E2, R2>
): Stream<A, E | E2, R | R2>
}
Since v2.0.0
takeWhile
Takes all elements of the stream for as long as the specified predicate evaluates to true
.
Example
import { Effect, Stream } from "effect"
const stream = Stream.takeWhile(
Stream.iterate(0, (n) => n + 1),
(n) => n < 5
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 0, 1, 2, 3, 4 ] }
Signature
declare const takeWhile: {
<A, B extends A>(refinement: Refinement<NoInfer<A>, B>): <E, R>(self: Stream<A, E, R>) => Stream<B, E, R>
<A>(predicate: Predicate<NoInfer<A>>): <E, R>(self: Stream<A, E, R>) => Stream<A, E, R>
<A, E, R, B extends A>(self: Stream<A, E, R>, refinement: Refinement<A, B>): Stream<B, E, R>
<A, E, R>(self: Stream<A, E, R>, predicate: Predicate<A>): Stream<A, E, R>
}
Since v2.0.0
tapErrorCause
Returns a stream that effectfully “peeks” at the cause of failure of the stream.
Signature
declare const tapErrorCause: {
<E, X, E2, R2>(
f: (cause: Cause.Cause<NoInfer<E>>) => Effect.Effect<X, E2, R2>
): <A, R>(self: Stream<A, E, R>) => Stream<A, E | E2, R2 | R>
<A, E, R, X, E2, R2>(
self: Stream<A, E, R>,
f: (cause: Cause.Cause<E>) => Effect.Effect<X, E2, R2>
): Stream<A, E | E2, R | R2>
}
Since v2.0.0
throttle
Delays the chunks of this stream according to the given bandwidth parameters using the token bucket algorithm. Allows for burst in the processing of elements by allowing the token bucket to accumulate tokens up to a units + burst
threshold. The weight of each chunk is determined by the cost
function.
If using the “enforce” strategy, chunks that do not meet the bandwidth constraints are dropped. If using the “shape” strategy, chunks are delayed until they can be emitted without exceeding the bandwidth constraints.
Defaults to the “shape” strategy.
Example
import { Chunk, Effect, Schedule, Stream } from "effect"
let last = Date.now()
const log = (message: string) =>
Effect.sync(() => {
const end = Date.now()
console.log(`${message} after ${end - last}ms`)
last = end
})
const stream = Stream.fromSchedule(Schedule.spaced("50 millis")).pipe(
Stream.take(6),
Stream.tap((n) => log(`Received ${n}`)),
Stream.throttle({
cost: Chunk.size,
duration: "100 millis",
units: 1
}),
Stream.tap((n) => log(`> Emitted ${n}`))
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// Received 0 after 56ms
// > Emitted 0 after 0ms
// Received 1 after 52ms
// > Emitted 1 after 48ms
// Received 2 after 52ms
// > Emitted 2 after 49ms
// Received 3 after 52ms
// > Emitted 3 after 48ms
// Received 4 after 52ms
// > Emitted 4 after 47ms
// Received 5 after 52ms
// > Emitted 5 after 49ms
// { _id: 'Chunk', values: [ 0, 1, 2, 3, 4, 5 ] }
Signature
declare const throttle: {
<A>(options: {
readonly cost: (chunk: Chunk.Chunk<A>) => number
readonly units: number
readonly duration: Duration.DurationInput
readonly burst?: number | undefined
readonly strategy?: "enforce" | "shape" | undefined
}): <E, R>(self: Stream<A, E, R>) => Stream<A, E, R>
<A, E, R>(
self: Stream<A, E, R>,
options: {
readonly cost: (chunk: Chunk.Chunk<A>) => number
readonly units: number
readonly duration: Duration.DurationInput
readonly burst?: number | undefined
readonly strategy?: "enforce" | "shape" | undefined
}
): Stream<A, E, R>
}
Since v2.0.0
throttleEffect
Delays the chunks of this stream according to the given bandwidth parameters using the token bucket algorithm. Allows for burst in the processing of elements by allowing the token bucket to accumulate tokens up to a units + burst
threshold. The weight of each chunk is determined by the effectful costFn
function.
If using the “enforce” strategy, chunks that do not meet the bandwidth constraints are dropped. If using the “shape” strategy, chunks are delayed until they can be emitted without exceeding the bandwidth constraints.
Defaults to the “shape” strategy.
Signature
declare const throttleEffect: {
<A, E2, R2>(options: {
readonly cost: (chunk: Chunk.Chunk<A>) => Effect.Effect<number, E2, R2>
readonly units: number
readonly duration: Duration.DurationInput
readonly burst?: number | undefined
readonly strategy?: "enforce" | "shape" | undefined
}): <E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, R2 | R>
<A, E, R, E2, R2>(
self: Stream<A, E, R>,
options: {
readonly cost: (chunk: Chunk.Chunk<A>) => Effect.Effect<number, E2, R2>
readonly units: number
readonly duration: Duration.DurationInput
readonly burst?: number | undefined
readonly strategy?: "enforce" | "shape" | undefined
}
): Stream<A, E | E2, R | R2>
}
Since v2.0.0
timeout
Ends the stream if it does not produce a value after the specified duration.
Signature
declare const timeout: {
(duration: Duration.DurationInput): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, R>
<A, E, R>(self: Stream<A, E, R>, duration: Duration.DurationInput): Stream<A, E, R>
}
Since v2.0.0
timeoutFail
Fails the stream with given error if it does not produce a value after d duration.
Signature
declare const timeoutFail: {
<E2>(error: LazyArg<E2>, duration: Duration.DurationInput): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, R>
<A, E, R, E2>(self: Stream<A, E, R>, error: LazyArg<E2>, duration: Duration.DurationInput): Stream<A, E | E2, R>
}
Since v2.0.0
timeoutFailCause
Fails the stream with given cause if it does not produce a value after d duration.
Signature
declare const timeoutFailCause: {
<E2>(
cause: LazyArg<Cause.Cause<E2>>,
duration: Duration.DurationInput
): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, R>
<A, E, R, E2>(
self: Stream<A, E, R>,
cause: LazyArg<Cause.Cause<E2>>,
duration: Duration.DurationInput
): Stream<A, E | E2, R>
}
Since v2.0.0
timeoutTo
Switches the stream if it does not produce a value after the specified duration.
Signature
declare const timeoutTo: {
<A2, E2, R2>(
duration: Duration.DurationInput,
that: Stream<A2, E2, R2>
): <A, E, R>(self: Stream<A, E, R>) => Stream<A2 | A, E2 | E, R2 | R>
<A, E, R, A2, E2, R2>(
self: Stream<A, E, R>,
duration: Duration.DurationInput,
that: Stream<A2, E2, R2>
): Stream<A | A2, E | E2, R | R2>
}
Since v2.0.0
transduce
Applies the transducer to the stream and emits its outputs.
Signature
declare const transduce: {
<A2, A, E2, R2>(sink: Sink.Sink<A2, A, A, E2, R2>): <E, R>(self: Stream<A, E, R>) => Stream<A2, E2 | E, R2 | R>
<A, E, R, A2, E2, R2>(self: Stream<A, E, R>, sink: Sink.Sink<A2, A, A, E2, R2>): Stream<A2, E | E2, R | R2>
}
Since v2.0.0
when
Returns the specified stream if the given condition is satisfied, otherwise returns an empty stream.
Signature
declare const when: {
(test: LazyArg<boolean>): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, R>
<A, E, R>(self: Stream<A, E, R>, test: LazyArg<boolean>): Stream<A, E, R>
}
Since v2.0.0
whenCaseEffect
Returns the stream when the given partial function is defined for the given effectful value, otherwise returns an empty stream.
Signature
declare const whenCaseEffect: {
<A, A2, E2, R2>(
pf: (a: A) => Option.Option<Stream<A2, E2, R2>>
): <E, R>(self: Effect.Effect<A, E, R>) => Stream<A2, E2 | E, R2 | R>
<A, E, R, A2, E2, R2>(
self: Effect.Effect<A, E, R>,
pf: (a: A) => Option.Option<Stream<A2, E2, R2>>
): Stream<A2, E | E2, R | R2>
}
Since v2.0.0
whenEffect
Returns the stream if the given effectful condition is satisfied, otherwise returns an empty stream.
Signature
declare const whenEffect: {
<E2, R2>(effect: Effect.Effect<boolean, E2, R2>): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, R2 | R>
<A, E, R, E2, R2>(self: Stream<A, E, R>, effect: Effect.Effect<boolean, E2, R2>): Stream<A, E | E2, R | R2>
}
Since v2.0.0
zipping
zip
Zips this stream with another point-wise and emits tuples of elements from both streams.
The new stream will end when one of the sides ends.
Example
import { Effect, Stream } from "effect"
// We create two streams and zip them together.
const stream = Stream.zip(Stream.make(1, 2, 3, 4, 5, 6), Stream.make("a", "b", "c"))
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ [ 1, 'a' ], [ 2, 'b' ], [ 3, 'c' ] ] }
Signature
declare const zip: {
<A2, E2, R2>(that: Stream<A2, E2, R2>): <A, E, R>(self: Stream<A, E, R>) => Stream<[A, A2], E2 | E, R2 | R>
<A, E, R, A2, E2, R2>(self: Stream<A, E, R>, that: Stream<A2, E2, R2>): Stream<[A, A2], E | E2, R | R2>
}
Since v2.0.0
zipAll
Zips this stream with another point-wise, creating a new stream of pairs of elements from both sides.
The defaults defaultLeft
and defaultRight
will be used if the streams have different lengths and one of the streams has ended before the other.
Example
import { Effect, Stream } from "effect"
const stream = Stream.zipAll(Stream.make(1, 2, 3, 4, 5, 6), {
other: Stream.make("a", "b", "c"),
defaultSelf: 0,
defaultOther: "x"
})
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: "Chunk", values: [ [ 1, "a" ], [ 2, "b" ], [ 3, "c" ], [ 4, "x" ], [ 5, "x" ], [ 6, "x" ] ] }
Signature
declare const zipAll: {
<A2, E2, R2, A>(options: {
readonly other: Stream<A2, E2, R2>
readonly defaultSelf: A
readonly defaultOther: A2
}): <E, R>(self: Stream<A, E, R>) => Stream<[A, A2], E2 | E, R2 | R>
<A, E, R, A2, E2, R2>(
self: Stream<A, E, R>,
options: { readonly other: Stream<A2, E2, R2>; readonly defaultSelf: A; readonly defaultOther: A2 }
): Stream<[A, A2], E | E2, R | R2>
}
Since v2.0.0
zipAllLeft
Zips this stream with another point-wise, and keeps only elements from this stream.
The provided default value will be used if the other stream ends before this one.
Signature
declare const zipAllLeft: {
<A2, E2, R2, A>(that: Stream<A2, E2, R2>, defaultLeft: A): <E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, R2 | R>
<A, E, R, A2, E2, R2>(self: Stream<A, E, R>, that: Stream<A2, E2, R2>, defaultLeft: A): Stream<A, E | E2, R | R2>
}
Since v2.0.0
zipAllRight
Zips this stream with another point-wise, and keeps only elements from the other stream.
The provided default value will be used if this stream ends before the other one.
Signature
declare const zipAllRight: {
<A2, E2, R2>(
that: Stream<A2, E2, R2>,
defaultRight: A2
): <A, E, R>(self: Stream<A, E, R>) => Stream<A2, E2 | E, R2 | R>
<A, E, R, A2, E2, R2>(self: Stream<A, E, R>, that: Stream<A2, E2, R2>, defaultRight: A2): Stream<A2, E | E2, R | R2>
}
Since v2.0.0
zipAllSortedByKey
Zips this stream that is sorted by distinct keys and the specified stream that is sorted by distinct keys to produce a new stream that is sorted by distinct keys. Combines values associated with each key into a tuple, using the specified values defaultLeft
and defaultRight
to fill in missing values.
This allows zipping potentially unbounded streams of data by key in constant space but the caller is responsible for ensuring that the streams are sorted by distinct keys.
Signature
declare const zipAllSortedByKey: {
<A2, E2, R2, A, K>(options: {
readonly other: Stream<readonly [K, A2], E2, R2>
readonly defaultSelf: A
readonly defaultOther: A2
readonly order: Order.Order<K>
}): <E, R>(self: Stream<readonly [K, A], E, R>) => Stream<[K, [A, A2]], E2 | E, R2 | R>
<K, A, E, R, A2, E2, R2>(
self: Stream<readonly [K, A], E, R>,
options: {
readonly other: Stream<readonly [K, A2], E2, R2>
readonly defaultSelf: A
readonly defaultOther: A2
readonly order: Order.Order<K>
}
): Stream<[K, [A, A2]], E | E2, R | R2>
}
Since v2.0.0
zipAllSortedByKeyLeft
Zips this stream that is sorted by distinct keys and the specified stream that is sorted by distinct keys to produce a new stream that is sorted by distinct keys. Keeps only values from this stream, using the specified value default
to fill in missing values.
This allows zipping potentially unbounded streams of data by key in constant space but the caller is responsible for ensuring that the streams are sorted by distinct keys.
Signature
declare const zipAllSortedByKeyLeft: {
<A2, E2, R2, A, K>(options: {
readonly other: Stream<readonly [K, A2], E2, R2>
readonly defaultSelf: A
readonly order: Order.Order<K>
}): <E, R>(self: Stream<readonly [K, A], E, R>) => Stream<[K, A], E2 | E, R2 | R>
<K, A, E, R, A2, E2, R2>(
self: Stream<readonly [K, A], E, R>,
options: {
readonly other: Stream<readonly [K, A2], E2, R2>
readonly defaultSelf: A
readonly order: Order.Order<K>
}
): Stream<[K, A], E | E2, R | R2>
}
Since v2.0.0
zipAllSortedByKeyRight
Zips this stream that is sorted by distinct keys and the specified stream that is sorted by distinct keys to produce a new stream that is sorted by distinct keys. Keeps only values from that stream, using the specified value default
to fill in missing values.
This allows zipping potentially unbounded streams of data by key in constant space but the caller is responsible for ensuring that the streams are sorted by distinct keys.
Signature
declare const zipAllSortedByKeyRight: {
<K, A2, E2, R2>(options: {
readonly other: Stream<readonly [K, A2], E2, R2>
readonly defaultOther: A2
readonly order: Order.Order<K>
}): <A, E, R>(self: Stream<readonly [K, A], E, R>) => Stream<[K, A2], E2 | E, R2 | R>
<A, E, R, K, A2, E2, R2>(
self: Stream<readonly [K, A], E, R>,
options: {
readonly other: Stream<readonly [K, A2], E2, R2>
readonly defaultOther: A2
readonly order: Order.Order<K>
}
): Stream<[K, A2], E | E2, R | R2>
}
Since v2.0.0
zipAllSortedByKeyWith
Zips this stream that is sorted by distinct keys and the specified stream that is sorted by distinct keys to produce a new stream that is sorted by distinct keys. Uses the functions left
, right
, and both
to handle the cases where a key and value exist in this stream, that stream, or both streams.
This allows zipping potentially unbounded streams of data by key in constant space but the caller is responsible for ensuring that the streams are sorted by distinct keys.
Signature
declare const zipAllSortedByKeyWith: {
<K, A2, E2, R2, A, A3>(options: {
readonly other: Stream<readonly [K, A2], E2, R2>
readonly onSelf: (a: A) => A3
readonly onOther: (a2: A2) => A3
readonly onBoth: (a: A, a2: A2) => A3
readonly order: Order.Order<K>
}): <E, R>(self: Stream<readonly [K, A], E, R>) => Stream<[K, A3], E2 | E, R2 | R>
<K, A, E, R, A2, E2, R2, A3>(
self: Stream<readonly [K, A], E, R>,
options: {
readonly other: Stream<readonly [K, A2], E2, R2>
readonly onSelf: (a: A) => A3
readonly onOther: (a2: A2) => A3
readonly onBoth: (a: A, a2: A2) => A3
readonly order: Order.Order<K>
}
): Stream<[K, A3], E | E2, R | R2>
}
Since v2.0.0
zipAllWith
Zips this stream with another point-wise. The provided functions will be used to create elements for the composed stream.
The functions left
and right
will be used if the streams have different lengths and one of the streams has ended before the other.
Example
import { Effect, Stream } from "effect"
const stream = Stream.zipAllWith(Stream.make(1, 2, 3, 4, 5, 6), {
other: Stream.make("a", "b", "c"),
onSelf: (n) => [n, "x"],
onOther: (s) => [0, s],
onBoth: (n, s) => [n - s.length, s]
})
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: "Chunk", values: [ [ 0, "a" ], [ 1, "b" ], [ 2, "c" ], [ 4, "x" ], [ 5, "x" ], [ 6, "x" ] ] }
Signature
declare const zipAllWith: {
<A2, E2, R2, A, A3>(options: {
readonly other: Stream<A2, E2, R2>
readonly onSelf: (a: A) => A3
readonly onOther: (a2: A2) => A3
readonly onBoth: (a: A, a2: A2) => A3
}): <E, R>(self: Stream<A, E, R>) => Stream<A3, E2 | E, R2 | R>
<A, E, R, A2, E2, R2, A3>(
self: Stream<A, E, R>,
options: {
readonly other: Stream<A2, E2, R2>
readonly onSelf: (a: A) => A3
readonly onOther: (a2: A2) => A3
readonly onBoth: (a: A, a2: A2) => A3
}
): Stream<A3, E | E2, R | R2>
}
Since v2.0.0
zipFlatten
Zips this stream with another point-wise and emits tuples of elements from both streams.
The new stream will end when one of the sides ends.
Signature
declare const zipFlatten: {
<A2, E2, R2>(
that: Stream<A2, E2, R2>
): <A extends ReadonlyArray<any>, E, R>(self: Stream<A, E, R>) => Stream<[...A, A2], E2 | E, R2 | R>
<A extends ReadonlyArray<any>, E, R, A2, E2, R2>(
self: Stream<A, E, R>,
that: Stream<A2, E2, R2>
): Stream<[...A, A2], E | E2, R | R2>
}
Since v2.0.0
zipLatest
Zips the two streams so that when a value is emitted by either of the two streams, it is combined with the latest value from the other stream to produce a result.
Note: tracking the latest value is done on a per-chunk basis. That means that emitted elements that are not the last value in chunks will never be used for zipping.
Example
import { Effect, Schedule, Stream } from "effect"
const s1 = Stream.make(1, 2, 3).pipe(Stream.schedule(Schedule.spaced("1 second")))
const s2 = Stream.make("a", "b", "c", "d").pipe(Stream.schedule(Schedule.spaced("500 millis")))
const stream = Stream.zipLatest(s1, s2)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: "Chunk", values: [ [ 1, "a" ], [ 1, "b" ], [ 2, "b" ], [ 2, "c" ], [ 2, "d" ], [ 3, "d" ] ] }
Signature
declare const zipLatest: {
<AR, ER, RR>(right: Stream<AR, ER, RR>): <AL, EL, RL>(left: Stream<AL, EL, RL>) => Stream<[AL, AR], EL | ER, RL | RR>
<AL, EL, RL, AR, ER, RR>(left: Stream<AL, EL, RL>, right: Stream<AR, ER, RR>): Stream<[AL, AR], EL | ER, RL | RR>
}
Since v2.0.0
zipLatestAll
Zips multiple streams so that when a value is emitted by any of the streams, it is combined with the latest values from the other streams to produce a result.
Note: tracking the latest value is done on a per-chunk basis. That means that emitted elements that are not the last value in chunks will never be used for zipping.
Example
import { Stream, Schedule, Console, Effect } from "effect"
const stream = Stream.zipLatestAll(
Stream.fromSchedule(Schedule.spaced("1 millis")),
Stream.fromSchedule(Schedule.spaced("2 millis")),
Stream.fromSchedule(Schedule.spaced("4 millis"))
).pipe(Stream.take(6), Stream.tap(Console.log))
Effect.runPromise(Stream.runDrain(stream))
// Output:
// [ 0, 0, 0 ]
// [ 1, 0, 0 ]
// [ 1, 1, 0 ]
// [ 2, 1, 0 ]
// [ 3, 1, 0 ]
// [ 3, 1, 1 ]
// .....
Signature
declare const zipLatestAll: <T extends ReadonlyArray<Stream<any, any, any>>>(
...streams: T
) => Stream<
[T[number]] extends [never]
? never
: { [K in keyof T]: T[K] extends Stream<infer A, infer _E, infer _R> ? A : never },
[T[number]] extends [never] ? never : T[number] extends Stream<infer _A, infer _E, infer _R> ? _E : never,
[T[number]] extends [never] ? never : T[number] extends Stream<infer _A, infer _E, infer _R> ? _R : never
>
Since v3.3.0
zipLatestWith
Zips the two streams so that when a value is emitted by either of the two streams, it is combined with the latest value from the other stream to produce a result.
Note: tracking the latest value is done on a per-chunk basis. That means that emitted elements that are not the last value in chunks will never be used for zipping.
Signature
declare const zipLatestWith: {
<AR, ER, RR, AL, A>(
right: Stream<AR, ER, RR>,
f: (left: AL, right: AR) => A
): <EL, RL>(left: Stream<AL, EL, RL>) => Stream<A, EL | ER, RL | RR>
<AL, EL, RL, AR, ER, RR, A>(
left: Stream<AL, EL, RL>,
right: Stream<AR, ER, RR>,
f: (left: AL, right: AR) => A
): Stream<A, EL | ER, RL | RR>
}
Since v2.0.0
zipLeft
Zips this stream with another point-wise, but keeps only the outputs of left
stream.
The new stream will end when one of the sides ends.
Signature
declare const zipLeft: {
<AR, ER, RR>(right: Stream<AR, ER, RR>): <AL, EL, RL>(left: Stream<AL, EL, RL>) => Stream<AL, ER | EL, RR | RL>
<AL, EL, RL, AR, ER, RR>(left: Stream<AL, EL, RL>, right: Stream<AR, ER, RR>): Stream<AL, EL | ER, RL | RR>
}
Since v2.0.0
zipRight
Zips this stream with another point-wise, but keeps only the outputs of the right
stream.
The new stream will end when one of the sides ends.
Signature
declare const zipRight: {
<AR, ER, RR>(right: Stream<AR, ER, RR>): <AL, EL, RL>(left: Stream<AL, EL, RL>) => Stream<AR, ER | EL, RR | RL>
<AL, EL, RL, AR, ER, RR>(left: Stream<AL, EL, RL>, right: Stream<AR, ER, RR>): Stream<AR, EL | ER, RL | RR>
}
Since v2.0.0
zipWith
Zips this stream with another point-wise and applies the function to the paired elements.
The new stream will end when one of the sides ends.
Example
import { Effect, Stream } from "effect"
// We create two streams and zip them with custom logic.
const stream = Stream.zipWith(Stream.make(1, 2, 3, 4, 5, 6), Stream.make("a", "b", "c"), (n, s) => [n - s.length, s])
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ [ 0, 'a' ], [ 1, 'b' ], [ 2, 'c' ] ] }
Signature
declare const zipWith: {
<AR, ER, RR, AL, A>(
right: Stream<AR, ER, RR>,
f: (left: AL, right: AR) => A
): <EL, RL>(left: Stream<AL, EL, RL>) => Stream<A, EL | ER, RL | RR>
<AL, EL, RL, AR, ER, RR, A>(
left: Stream<AL, EL, RL>,
right: Stream<AR, ER, RR>,
f: (left: AL, right: AR) => A
): Stream<A, EL | ER, RL | RR>
}
Since v2.0.0
zipWithChunks
Zips this stream with another point-wise and applies the function to the paired elements.
The new stream will end when one of the sides ends.
Signature
declare const zipWithChunks: {
<A2, E2, R2, A, A3>(
that: Stream<A2, E2, R2>,
f: (
left: Chunk.Chunk<A>,
right: Chunk.Chunk<A2>
) => readonly [Chunk.Chunk<A3>, Either.Either<Chunk.Chunk<A2>, Chunk.Chunk<A>>]
): <E, R>(self: Stream<A, E, R>) => Stream<A3, E2 | E, R2 | R>
<A, E, R, A2, E2, R2, A3>(
self: Stream<A, E, R>,
that: Stream<A2, E2, R2>,
f: (
left: Chunk.Chunk<A>,
right: Chunk.Chunk<A2>
) => readonly [Chunk.Chunk<A3>, Either.Either<Chunk.Chunk<A2>, Chunk.Chunk<A>>]
): Stream<A3, E | E2, R | R2>
}
Since v2.0.0
zipWithIndex
Zips this stream together with the index of elements.
Example
import { Effect, Stream } from "effect"
const stream = Stream.make("Mary", "James", "Robert", "Patricia")
const indexedStream = Stream.zipWithIndex(stream)
Effect.runPromise(Stream.runCollect(indexedStream)).then(console.log)
// {
// _id: 'Chunk',
// values: [ [ 'Mary', 0 ], [ 'James', 1 ], [ 'Robert', 2 ], [ 'Patricia', 3 ] ]
// }
Signature
declare const zipWithIndex: <A, E, R>(self: Stream<A, E, R>) => Stream<[A, number], E, R>
Since v2.0.0
zipWithNext
Zips each element with the next element if present.
Example
import { Chunk, Effect, Stream } from "effect"
const stream = Stream.zipWithNext(Stream.make(1, 2, 3, 4))
Effect.runPromise(Stream.runCollect(stream)).then((chunk) => console.log(Chunk.toArray(chunk)))
// [
// [ 1, { _id: 'Option', _tag: 'Some', value: 2 } ],
// [ 2, { _id: 'Option', _tag: 'Some', value: 3 } ],
// [ 3, { _id: 'Option', _tag: 'Some', value: 4 } ],
// [ 4, { _id: 'Option', _tag: 'None' } ]
// ]
Signature
declare const zipWithNext: <A, E, R>(self: Stream<A, E, R>) => Stream<[A, Option.Option<A>], E, R>
Since v2.0.0
zipWithPrevious
Zips each element with the previous element. Initially accompanied by None
.
Example
import { Chunk, Effect, Stream } from "effect"
const stream = Stream.zipWithPrevious(Stream.make(1, 2, 3, 4))
Effect.runPromise(Stream.runCollect(stream)).then((chunk) => console.log(Chunk.toArray(chunk)))
// [
// [ { _id: 'Option', _tag: 'None' }, 1 ],
// [ { _id: 'Option', _tag: 'Some', value: 1 }, 2 ],
// [ { _id: 'Option', _tag: 'Some', value: 2 }, 3 ],
// [ { _id: 'Option', _tag: 'Some', value: 3 }, 4 ]
// ]
Signature
declare const zipWithPrevious: <A, E, R>(self: Stream<A, E, R>) => Stream<[Option.Option<A>, A], E, R>
Since v2.0.0
zipWithPreviousAndNext
Zips each element with both the previous and next element.
Example
import { Chunk, Effect, Stream } from "effect"
const stream = Stream.zipWithPreviousAndNext(Stream.make(1, 2, 3, 4))
Effect.runPromise(Stream.runCollect(stream)).then((chunk) => console.log(Chunk.toArray(chunk)))
// [
// [
// { _id: 'Option', _tag: 'None' },
// 1,
// { _id: 'Option', _tag: 'Some', value: 2 }
// ],
// [
// { _id: 'Option', _tag: 'Some', value: 1 },
// 2,
// { _id: 'Option', _tag: 'Some', value: 3 }
// ],
// [
// { _id: 'Option', _tag: 'Some', value: 2 },
// 3,
// { _id: 'Option', _tag: 'Some', value: 4 }
// ],
// [
// { _id: 'Option', _tag: 'Some', value: 3 },
// 4,
// { _id: 'Option', _tag: 'None' }
// ]
// ]
Signature
declare const zipWithPreviousAndNext: <A, E, R>(
self: Stream<A, E, R>
) => Stream<[Option.Option<A>, A, Option.Option<A>], E, R>
Since v2.0.0