Skip to main content Link Search Menu Expand Document (external link)

Stream overview

Added in v2.0.0

Table of contents



Merges a struct of streams into a single stream of tagged values.


export declare const mergeWithTag: {
  <S extends { [k in string]: Stream<any, any, any> }>(
    streams: S,
    options: { readonly concurrency: number | "unbounded"; readonly bufferSize?: number | undefined }
  ): Stream<
    { [K in keyof S]: { _tag: K; value: Stream.Success<S[K]> } }[keyof S],
    Stream.Error<S[keyof S]>,
    Stream.Context<S[keyof S]>
  (options: {
    readonly concurrency: number | "unbounded"
    readonly bufferSize?: number | undefined
  }): <S extends { [k in string]: Stream<any, any, any> }>(
    streams: S
  ) => Stream<
    { [K in keyof S]: { _tag: K; value: Stream.Success<S[K]> } }[keyof S],
    Stream.Error<S[keyof S]>,
    Stream.Context<S[keyof S]>


import { Stream } from "effect"
// Stream.Stream<{ _tag: "a"; value: number; } | { _tag: "b"; value: string; }>
const res = Stream.mergeWithTag(
    a: Stream.make(0),
    b: Stream.make("")
  { concurrency: "unbounded" }

Added in v3.8.5


Splits strings on newlines. Handles both Windows newlines (\r\n) and UNIX newlines (\n).


export declare const splitLines: <E, R>(self: Stream<string, E, R>) => Stream<string, E, R>

Added in v2.0.0



The default chunk size used by the various combinators and constructors of Stream.


export declare const DefaultChunkSize: number

Added in v2.0.0



Creates a stream from a single value that will get cleaned up after the stream is consumed.


export declare const acquireRelease: <A, E, R, R2, X>(
  acquire: Effect.Effect<A, E, R>,
  release: (resource: A, exit: Exit.Exit<unknown, unknown>) => Effect.Effect<X, never, R2>
) => Stream<A, E, R | R2>


import { Console, Effect, Stream } from "effect"

// Simulating File operations
const open = (filename: string) =>
  Effect.gen(function* () {
    yield* Console.log(`Opening ${filename}`)
    return {
      getLines: Effect.succeed(["Line 1", "Line 2", "Line 3"]),
      close: Console.log(`Closing ${filename}`)

const stream = Stream.acquireRelease(open("file.txt"), (file) => file.close).pipe(
  Stream.flatMap((file) => file.getLines)

// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// Opening file.txt
// Closing file.txt
// { _id: 'Chunk', values: [ [ 'Line 1', 'Line 2', 'Line 3' ] ] }

Added in v2.0.0


Creates a stream from an asynchronous callback that can be called multiple times. The optionality of the error type E in Emit can be used to signal the end of the stream by setting it to None.

The registration function can optionally return an Effect, which will be executed if the Fiber executing this Effect is interrupted.


export declare const async: <A, E = never, R = never>(
  register: (emit: Emit.Emit<R, E, A, void>) => Effect.Effect<void, never, R> | void,
    | number
    | "unbounded"
    | { readonly bufferSize?: number | undefined; readonly strategy?: "dropping" | "sliding" | "suspend" | undefined }
    | undefined
) => Stream<A, E, R>


import type { StreamEmit } from "effect"
import { Chunk, Effect, Option, Stream } from "effect"

const events = [1, 2, 3, 4]

const stream = Stream.async((emit: StreamEmit.Emit<never, never, number, void>) => {
  events.forEach((n) => {
    setTimeout(() => {
      if (n === 3) {
        emit( // Terminate the stream
      } else {
        emit(Effect.succeed(Chunk.of(n))) // Add the current item to the stream
    }, 100 * n)

// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2 ] }

Added in v2.0.0


Creates a stream from an asynchronous callback that can be called multiple times The registration of the callback itself returns an effect. The optionality of the error type E can be used to signal the end of the stream, by setting it to None.


export declare const asyncEffect: <A, E = never, R = never>(
  register: (emit: Emit.Emit<R, E, A, void>) => Effect.Effect<unknown, E, R>,
    | number
    | "unbounded"
    | { readonly bufferSize?: number | undefined; readonly strategy?: "dropping" | "sliding" | "suspend" | undefined }
    | undefined
) => Stream<A, E, R>

Added in v2.0.0


Creates a stream from an external push-based resource.

You can use the emit helper to emit values to the stream. The emit helper returns a boolean indicating whether the value was emitted or not.

You can also use the emit helper to signal the end of the stream by using apis such as emit.end or

By default it uses an “unbounded” buffer size. You can customize the buffer size and strategy by passing an object as the second argument with the bufferSize and strategy fields.


export declare const asyncPush: <A, E = never, R = never>(
  register: (emit: Emit.EmitOpsPush<E, A>) => Effect.Effect<unknown, E, R | Scope.Scope>,
    | { readonly bufferSize: "unbounded" }
    | { readonly bufferSize?: number | undefined; readonly strategy?: "dropping" | "sliding" | undefined }
    | undefined
) => Stream<A, E, Exclude<R, Scope.Scope>>


import { Effect, Stream } from "effect"

  (emit) =>
      Effect.gen(function* () {
        yield* Effect.log("subscribing")
        return setInterval(() => emit.single("tick"), 1000)
      (handle) =>
        Effect.gen(function* () {
          yield* Effect.log("unsubscribing")
  { bufferSize: 16, strategy: "dropping" }

Added in v3.6.0


Creates a stream from an asynchronous callback that can be called multiple times. The registration of the callback itself returns an a scoped resource. The optionality of the error type E can be used to signal the end of the stream, by setting it to None.


export declare const asyncScoped: <A, E = never, R = never>(
  register: (emit: Emit.Emit<R, E, A, void>) => Effect.Effect<unknown, E, R | Scope.Scope>,
    | number
    | "unbounded"
    | { readonly bufferSize?: number | undefined; readonly strategy?: "dropping" | "sliding" | "suspend" | undefined }
    | undefined
) => Stream<A, E, Exclude<R, Scope.Scope>>

Added in v2.0.0


Concatenates all of the streams in the chunk to one stream.


export declare const concatAll: <A, E, R>(streams: Chunk.Chunk<Stream<A, E, R>>) => Stream<A, E, R>


import { Chunk, Effect, Stream } from "effect"

const s1 = Stream.make(1, 2, 3)
const s2 = Stream.make(4, 5)
const s3 = Stream.make(6, 7, 8)

const stream = Stream.concatAll(Chunk.make(s1, s2, s3))

// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// {
//   _id: 'Chunk',
//   values: [
//     1, 2, 3, 4,
//     5, 6, 7, 8
//   ]
// }

Added in v2.0.0


The stream that dies with the specified defect.


export declare const die: (defect: unknown) => Stream<never>

Added in v2.0.0


The stream that dies with an exception described by message.


export declare const dieMessage: (message: string) => Stream<never>

Added in v2.0.0


The stream that dies with the specified lazily evaluated defect.


export declare const dieSync: (evaluate: LazyArg<unknown>) => Stream<never>

Added in v2.0.0


The empty stream.


export declare const empty: Stream<never, never, never>


import { Effect, Stream } from "effect"

const stream = Stream.empty

// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [] }

Added in v2.0.0


Creates a stream that executes the specified effect but emits no elements.


export declare const execute: <X, E, R>(effect: Effect.Effect<X, E, R>) => Stream<never, E, R>

Added in v2.0.0


Terminates with the specified error.


export declare const fail: <E>(error: E) => Stream<never, E>


import { Effect, Stream } from "effect"

const stream ="Uh oh!")

// {
//   _id: 'Exit',
//   _tag: 'Failure',
//   cause: { _id: 'Cause', _tag: 'Fail', failure: 'Uh oh!' }
// }

Added in v2.0.0


The stream that always fails with the specified Cause.


export declare const failCause: <E>(cause: Cause.Cause<E>) => Stream<never, E>

Added in v2.0.0


The stream that always fails with the specified lazily evaluated Cause.


export declare const failCauseSync: <E>(evaluate: LazyArg<Cause.Cause<E>>) => Stream<never, E>

Added in v2.0.0


Terminates with the specified lazily evaluated error.


export declare const failSync: <E>(evaluate: LazyArg<E>) => Stream<never, E>

Added in v2.0.0


Creates a one-element stream that never fails and executes the finalizer when it ends.


export declare const finalizer: <R, X>(finalizer: Effect.Effect<X, never, R>) => Stream<void, never, R>


import { Console, Effect, Stream } from "effect"

const application = Stream.fromEffect(Console.log("Application Logic."))

const deleteDir = (dir: string) => Console.log(`Deleting dir: ${dir}`)

const program = application.pipe(
    Stream.finalizer(deleteDir("tmp").pipe(Effect.andThen(Console.log("Temporary directory was deleted."))))

// Effect.runPromise(Stream.runCollect(program)).then(console.log)
// Application Logic.
// Deleting dir: tmp
// Temporary directory was deleted.
// { _id: 'Chunk', values: [ undefined, undefined ] }

Added in v2.0.0


Creates a stream from an AsyncIterable.


export declare const fromAsyncIterable: <A, E>(iterable: AsyncIterable<A>, onError: (e: unknown) => E) => Stream<A, E>


import { Effect, Stream } from "effect"

const myAsyncIterable = async function* () {
  yield 1
  yield 2

const stream = Stream.fromAsyncIterable(
  (e) => new Error(String(e)) // Error Handling

// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2 ] }

Added in v2.0.0


Creates a stream from a Channel.


export declare const fromChannel: <A, E, R>(
  channel: Channel.Channel<Chunk.Chunk<A>, unknown, E, unknown, unknown, unknown, R>
) => Stream<A, E, R>

Added in v2.0.0


Creates a stream from a Chunk of values.


export declare const fromChunk: <A>(chunk: Chunk.Chunk<A>) => Stream<A>


import { Chunk, Effect, Stream } from "effect"

// Creating a stream with values from a single Chunk
const stream = Stream.fromChunk(Chunk.make(1, 2, 3))

// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3 ] }

Added in v2.0.0


Creates a stream from a subscription to a PubSub.


export declare const fromChunkPubSub: {
    pubsub: PubSub.PubSub<Chunk.Chunk<A>>,
    options: { readonly scoped: true; readonly shutdown?: boolean | undefined }
  ): Effect.Effect<Stream<A>, never, Scope.Scope>
    pubsub: PubSub.PubSub<Chunk.Chunk<A>>,
    options?: { readonly scoped?: false | undefined; readonly shutdown?: boolean | undefined } | undefined
  ): Stream<A>

Added in v2.0.0


Creates a stream from a Queue of values.


export declare const fromChunkQueue: <A>(
  queue: Queue.Dequeue<Chunk.Chunk<A>>,
  options?: { readonly shutdown?: boolean | undefined }
) => Stream<A>

Added in v2.0.0


Creates a stream from an arbitrary number of chunks.


export declare const fromChunks: <A>(...chunks: Array<Chunk.Chunk<A>>) => Stream<A>


import { Chunk, Effect, Stream } from "effect"

// Creating a stream with values from multiple Chunks
const stream = Stream.fromChunks(Chunk.make(1, 2, 3), Chunk.make(4, 5, 6))

// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 4, 5, 6 ] }

Added in v2.0.0


Either emits the success value of this effect or terminates the stream with the failure value of this effect.


export declare const fromEffect: <A, E, R>(effect: Effect.Effect<A, E, R>) => Stream<A, E, R>


import { Effect, Random, Stream } from "effect"

const stream = Stream.fromEffect(Random.nextInt)

// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// Example Output: { _id: 'Chunk', values: [ 922694024 ] }

Added in v2.0.0


Creates a stream from an effect producing a value of type A or an empty Stream.


export declare const fromEffectOption: <A, E, R>(effect: Effect.Effect<A, Option.Option<E>, R>) => Stream<A, E, R>

Added in v2.0.0


Creates a new Stream from an iterable collection of values.


export declare const fromIterable: <A>(iterable: Iterable<A>) => Stream<A>


import { Effect, Stream } from "effect"

const numbers = [1, 2, 3]

const stream = Stream.fromIterable(numbers)

// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3 ] }

Added in v2.0.0


Creates a stream from an effect producing a value of type Iterable<A>.


export declare const fromIterableEffect: <A, E, R>(effect: Effect.Effect<Iterable<A>, E, R>) => Stream<A, E, R>


import { Context, Effect, Stream } from "effect"

class Database extends Context.Tag("Database")<Database, { readonly getUsers: Effect.Effect<Array<string>> }>() {}

const getUsers = Database.pipe(Effect.andThen((_) => _.getUsers))

const stream = Stream.fromIterableEffect(getUsers)

// Effect.runPromise(
//   Stream.runCollect(stream.pipe(Stream.provideService(Database, { getUsers: Effect.succeed(["user1", "user2"]) })))
// ).then(console.log)
// { _id: 'Chunk', values: [ 'user1', 'user2' ] }

Added in v2.0.0


Creates a stream from an iterator


export declare const fromIteratorSucceed: <A>(iterator: IterableIterator<A>, maxChunkSize?: number) => Stream<A>

Added in v2.0.0


Creates a stream from a subscription to a PubSub.


export declare const fromPubSub: {
    pubsub: PubSub.PubSub<A>,
    options: {
      readonly scoped: true
      readonly maxChunkSize?: number | undefined
      readonly shutdown?: boolean | undefined
  ): Effect.Effect<Stream<A>, never, Scope.Scope>
    pubsub: PubSub.PubSub<A>,
      | {
          readonly scoped?: false | undefined
          readonly maxChunkSize?: number | undefined
          readonly shutdown?: boolean | undefined
      | undefined
  ): Stream<A>

Added in v2.0.0


Creates a stream from an effect that pulls elements from another stream.

See Stream.toPull for reference.


export declare const fromPull: <R, R2, E, A>(
  effect: Effect.Effect<Effect.Effect<Chunk.Chunk<A>, Option.Option<E>, R2>, never, Scope.Scope | R>
) => Stream<A, E, R2 | Exclude<R, Scope.Scope>>

Added in v2.0.0


Creates a stream from a queue of values


export declare const fromQueue: <A>(
  queue: Queue.Dequeue<A>,
  options?: { readonly maxChunkSize?: number | undefined; readonly shutdown?: boolean | undefined }
) => Stream<A>

Added in v2.0.0


Creates a stream from a ReadableStream.



export declare const fromReadableStream: {
  <A, E>(options: {
    readonly evaluate: LazyArg<ReadableStream<A>>
    readonly onError: (error: unknown) => E
    readonly releaseLockOnEnd?: boolean | undefined
  }): Stream<A, E>
  <A, E>(evaluate: LazyArg<ReadableStream<A>>, onError: (error: unknown) => E): Stream<A, E>

Added in v2.0.0


Creates a stream from a ReadableStreamBYOBReader.



export declare const fromReadableStreamByob: {
  <E>(options: {
    readonly evaluate: LazyArg<ReadableStream<Uint8Array>>
    readonly onError: (error: unknown) => E
    readonly bufferSize?: number | undefined
    readonly releaseLockOnEnd?: boolean | undefined
  }): Stream<Uint8Array, E>
    evaluate: LazyArg<ReadableStream<Uint8Array>>,
    onError: (error: unknown) => E,
    allocSize?: number
  ): Stream<Uint8Array, E>

Added in v2.0.0


Creates a stream from a Schedule that does not require any further input. The stream will emit an element for each value output from the schedule, continuing for as long as the schedule continues.


export declare const fromSchedule: <A, R>(schedule: Schedule.Schedule<A, unknown, R>) => Stream<A, never, R>


import { Effect, Schedule, Stream } from "effect"

// Emits values every 1 second for a total of 5 emissions
const schedule = Schedule.spaced("1 second").pipe(Schedule.compose(Schedule.recurs(5)))

const stream = Stream.fromSchedule(schedule)

// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 0, 1, 2, 3, 4 ] }

Added in v2.0.0


Creates a stream from a subscription to a TPubSub.


export declare const fromTPubSub: <A>(pubsub: TPubSub<A>) => Stream<A>

Added in v3.10.0


Creates a stream from a TQueue of values


export declare const fromTQueue: <A>(queue: TDequeue<A>) => Stream<A>

Added in v3.10.0


The infinite stream of iterative function application: a, f(a), f(f(a)), f(f(f(a))), …


export declare const iterate: <A>(value: A, next: (value: A) => A) => Stream<A>


import { Effect, Stream } from "effect"

// An infinite Stream of numbers starting from 1 and incrementing
const stream = Stream.iterate(1, (n) => n + 1)

// Effect.runPromise(Stream.runCollect(stream.pipe(Stream.take(10)))).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ] }

Added in v2.0.0


Creates a stream from an sequence of values.


export declare const make: <As extends Array<any>>( As) => Stream<As[number]>


import { Effect, Stream } from "effect"

const stream = Stream.make(1, 2, 3)

// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3 ] }

Added in v2.0.0


The stream that never produces any value or fails with any error.


export declare const never: Stream<never, never, never>

Added in v2.0.0


Like Stream.unfold, but allows the emission of values to end one step further than the unfolding of the state. This is useful for embedding paginated APIs, hence the name.


export declare const paginate: <S, A>(s: S, f: (s: S) => readonly [A, Option.Option<S>]) => Stream<A>


import { Effect, Option, Stream } from "effect"

const stream = Stream.paginate(0, (n) => [n, n < 3 ? Option.some(n + 1) : Option.none()])

// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 0, 1, 2, 3 ] }

Added in v2.0.0


Like Stream.unfoldChunk, but allows the emission of values to end one step further than the unfolding of the state. This is useful for embedding paginated APIs, hence the name.


export declare const paginateChunk: <S, A>(s: S, f: (s: S) => readonly [Chunk.Chunk<A>, Option.Option<S>]) => Stream<A>

Added in v2.0.0


Like Stream.unfoldChunkEffect, but allows the emission of values to end one step further than the unfolding of the state. This is useful for embedding paginated APIs, hence the name.


export declare const paginateChunkEffect: <S, A, E, R>(
  s: S,
  f: (s: S) => Effect.Effect<readonly [Chunk.Chunk<A>, Option.Option<S>], E, R>
) => Stream<A, E, R>

Added in v2.0.0


Like Stream.unfoldEffect but allows the emission of values to end one step further than the unfolding of the state. This is useful for embedding paginated APIs, hence the name.


export declare const paginateEffect: <S, A, E, R>(
  s: S,
  f: (s: S) => Effect.Effect<readonly [A, Option.Option<S>], E, R>
) => Stream<A, E, R>

Added in v2.0.0


Constructs a stream from a range of integers, including both endpoints.


export declare const range: (min: number, max: number, chunkSize?: number) => Stream<number>


import { Effect, Stream } from "effect"

// A Stream with a range of numbers from 1 to 5
const stream = Stream.range(1, 5)

// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 4, 5 ] }

Added in v2.0.0


Creates a stream from an effect producing a value of type A which repeats forever.


export declare const repeatEffect: <A, E, R>(effect: Effect.Effect<A, E, R>) => Stream<A, E, R>


import { Effect, Random, Stream } from "effect"

const stream = Stream.repeatEffect(Random.nextInt)

// Effect.runPromise(Stream.runCollect(stream.pipe(Stream.take(5)))).then(console.log)
// Example Output: { _id: 'Chunk', values: [ 3891571149, 4239494205, 2352981603, 2339111046, 1488052210 ] }

Added in v2.0.0


Creates a stream from an effect producing chunks of A values which repeats forever.


export declare const repeatEffectChunk: <A, E, R>(effect: Effect.Effect<Chunk.Chunk<A>, E, R>) => Stream<A, E, R>

Added in v2.0.0


Creates a stream from an effect producing chunks of A values until it fails with None.


export declare const repeatEffectChunkOption: <A, E, R>(
  effect: Effect.Effect<Chunk.Chunk<A>, Option.Option<E>, R>
) => Stream<A, E, R>

Added in v2.0.0


Creates a stream from an effect producing values of type A until it fails with None.


export declare const repeatEffectOption: <A, E, R>(effect: Effect.Effect<A, Option.Option<E>, R>) => Stream<A, E, R>


// In this example, we're draining an Iterator to create a stream from it
import { Stream, Effect, Option } from "effect"

const drainIterator = <A>(it: Iterator<A>): Stream.Stream<A> =>
    Effect.sync(() =>
      Effect.andThen((res) => {
        if (res.done) {
        return Effect.succeed(res.value)

Added in v2.0.0


Creates a stream from an effect producing a value of type A, which is repeated using the specified schedule.


export declare const repeatEffectWithSchedule: <A, E, R, X, A0 extends A, R2>(
  effect: Effect.Effect<A, E, R>,
  schedule: Schedule.Schedule<X, A0, R2>
) => Stream<A, E, R | R2>

Added in v2.0.0


Repeats the provided value infinitely.


export declare const repeatValue: <A>(value: A) => Stream<A>


import { Effect, Stream } from "effect"

const stream = Stream.repeatValue(0)

// Effect.runPromise(Stream.runCollect(stream.pipe(Stream.take(5)))).then(console.log)
// { _id: 'Chunk', values: [ 0, 0, 0, 0, 0 ] }

Added in v2.0.0


Creates a single-valued stream from a scoped resource.


export declare const scoped: <A, E, R>(effect: Effect.Effect<A, E, R>) => Stream<A, E, Exclude<R, Scope.Scope>>


import { Console, Effect, Stream } from "effect"

// Creating a single-valued stream from a scoped resource
const stream = Stream.scoped(
    () => Console.log("use"),
    () => Console.log("release")

// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// acquire
// use
// release
// { _id: 'Chunk', values: [ undefined ] }

Added in v2.0.0


Creates a single-valued pure stream.


export declare const succeed: <A>(value: A) => Stream<A>


import { Effect, Stream } from "effect"

// A Stream with a single number
const stream = Stream.succeed(3)

// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 3 ] }

Added in v2.0.0


Returns a lazily constructed stream.


export declare const suspend: <A, E, R>(stream: LazyArg<Stream<A, E, R>>) => Stream<A, E, R>

Added in v2.0.0


Creates a single-valued pure stream.


export declare const sync: <A>(evaluate: LazyArg<A>) => Stream<A>

Added in v2.0.0


A stream that emits void values spaced by the specified duration.


export declare const tick: (interval: Duration.DurationInput) => Stream<void>


import { Effect, Stream } from "effect"

let last =
const log = (message: string) =>
  Effect.sync(() => {
    const end =
    console.log(`${message} after ${end - last}ms`)
    last = end

const stream = Stream.tick("1 seconds").pipe(Stream.tap(() => log("tick")))

// Effect.runPromise(Stream.runCollect(stream.pipe(Stream.take(5)))).then(console.log)
// tick after 4ms
// tick after 1003ms
// tick after 1001ms
// tick after 1002ms
// tick after 1002ms
// { _id: 'Chunk', values: [ undefined, undefined, undefined, undefined, undefined ] }

Added in v2.0.0


Creates a channel from a Stream.


export declare const toChannel: <A, E, R>(
  stream: Stream<A, E, R>
) => Channel.Channel<Chunk.Chunk<A>, unknown, E, unknown, unknown, unknown, R>

Added in v2.0.0


Creates a stream by peeling off the “layers” of a value of type S.


export declare const unfold: <S, A>(s: S, f: (s: S) => Option.Option<readonly [A, S]>) => Stream<A>


import { Effect, Option, Stream } from "effect"

const stream = Stream.unfold(1, (n) => Option.some([n, n + 1]))

// Effect.runPromise(Stream.runCollect(stream.pipe(Stream.take(5)))).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 4, 5 ] }

Added in v2.0.0


Creates a stream by peeling off the “layers” of a value of type S.


export declare const unfoldChunk: <S, A>(s: S, f: (s: S) => Option.Option<readonly [Chunk.Chunk<A>, S]>) => Stream<A>

Added in v2.0.0


Creates a stream by effectfully peeling off the “layers” of a value of type S.


export declare const unfoldChunkEffect: <S, A, E, R>(
  s: S,
  f: (s: S) => Effect.Effect<Option.Option<readonly [Chunk.Chunk<A>, S]>, E, R>
) => Stream<A, E, R>

Added in v2.0.0


Creates a stream by effectfully peeling off the “layers” of a value of type S.


export declare const unfoldEffect: <S, A, E, R>(
  s: S,
  f: (s: S) => Effect.Effect<Option.Option<readonly [A, S]>, E, R>
) => Stream<A, E, R>


import { Effect, Option, Random, Stream } from "effect"

const stream = Stream.unfoldEffect(1, (n) =>
  Random.nextBoolean.pipe( => (b ? Option.some([n, -n]) : Option.some([n, n]))))

// Effect.runPromise(Stream.runCollect(stream.pipe(Stream.take(5)))).then(console.log)
// { _id: 'Chunk', values: [ 1, -1, -1, -1, -1 ] }

Added in v2.0.0


Creates a stream produced from an Effect.


export declare const unwrap: <A, E2, R2, E, R>(
  effect: Effect.Effect<Stream<A, E2, R2>, E, R>
) => Stream<A, E | E2, R | R2>

Added in v2.0.0


Creates a stream produced from a scoped Effect.


export declare const unwrapScoped: <A, E2, R2, E, R>(
  effect: Effect.Effect<Stream<A, E2, R2>, E, R>
) => Stream<A, E | E2, R2 | Exclude<R, Scope.Scope>>

Added in v2.0.0


A stream that contains a single void value.


export declare const void: Stream<void, never, never>


import { Effect, Stream } from "effect"

const stream = Stream.void

// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ undefined ] }

Added in v2.0.0


Returns the resulting stream when the given PartialFunction is defined for the given value, otherwise returns an empty stream.


export declare const whenCase: <A, A2, E, R>(
  evaluate: LazyArg<A>,
  pf: (a: A) => Option.Option<Stream<A2, E, R>>
) => Stream<A2, E, R>

Added in v2.0.0



Accesses the whole context of the stream.


export declare const context: <R>() => Stream<Context.Context<R>, never, R>

Added in v2.0.0


Accesses the context of the stream.


export declare const contextWith: <R, A>(f: (env: Context.Context<R>) => A) => Stream<A, never, R>

Added in v2.0.0


Accesses the context of the stream in the context of an effect.


export declare const contextWithEffect: <R0, A, E, R>(
  f: (env: Context.Context<R0>) => Effect.Effect<A, E, R>
) => Stream<A, E, R0 | R>

Added in v2.0.0


Accesses the context of the stream in the context of a stream.


export declare const contextWithStream: <R0, A, E, R>(
  f: (env: Context.Context<R0>) => Stream<A, E, R>
) => Stream<A, E, R0 | R>

Added in v2.0.0


Transforms the context being provided to the stream with the specified function.


export declare const mapInputContext: {
  <R0, R>(f: (env: Context.Context<R0>) => Context.Context<R>): <A, E>(self: Stream<A, E, R>) => Stream<A, E, R0>
  <A, E, R0, R>(self: Stream<A, E, R>, f: (env: Context.Context<R0>) => Context.Context<R>): Stream<A, E, R0>

Added in v2.0.0


Provides the stream with its required context, which eliminates its dependency on R.


export declare const provideContext: {
  <R>(context: Context.Context<R>): <A, E>(self: Stream<A, E, R>) => Stream<A, E>
  <A, E, R>(self: Stream<A, E, R>, context: Context.Context<R>): Stream<A, E>

Added in v2.0.0


Provides a Layer to the stream, which translates it to another level.


export declare const provideLayer: {
  <RIn, E2, ROut>(layer: Layer.Layer<ROut, E2, RIn>): <A, E>(self: Stream<A, E, ROut>) => Stream<A, E2 | E, RIn>
  <A, E, RIn, E2, ROut>(self: Stream<A, E, ROut>, layer: Layer.Layer<ROut, E2, RIn>): Stream<A, E | E2, RIn>

Added in v2.0.0


Provides the stream with the single service it requires. If the stream requires more than one service use Stream.provideContext instead.


export declare const provideService: {
  <T extends Context.Tag<any, any>>(
    tag: T,
    resource: Context.Tag.Service<T>
  ): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, Exclude<R, Context.Tag.Identifier<T>>>
  <A, E, R, T extends Context.Tag<any, any>>(
    self: Stream<A, E, R>,
    tag: T,
    resource: Context.Tag.Service<T>
  ): Stream<A, E, Exclude<R, Context.Tag.Identifier<T>>>

Added in v2.0.0


Provides the stream with the single service it requires. If the stream requires more than one service use Stream.provideContext instead.


export declare const provideServiceEffect: {
  <T extends Context.Tag<any, any>, E2, R2>(
    tag: T,
    effect: Effect.Effect<Context.Tag.Service<T>, E2, R2>
  ): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, R2 | Exclude<R, Context.Tag.Identifier<T>>>
  <A, E, R, T extends Context.Tag<any, any>, E2, R2>(
    self: Stream<A, E, R>,
    tag: T,
    effect: Effect.Effect<Context.Tag.Service<T>, E2, R2>
  ): Stream<A, E | E2, R2 | Exclude<R, Context.Tag.Identifier<T>>>

Added in v2.0.0


Provides the stream with the single service it requires. If the stream requires more than one service use Stream.provideContext instead.


export declare const provideServiceStream: {
  <T extends Context.Tag<any, any>, E2, R2>(
    tag: T,
    stream: Stream<Context.Tag.Service<T>, E2, R2>
  ): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, R2 | Exclude<R, Context.Tag.Identifier<T>>>
  <A, E, R, T extends Context.Tag<any, any>, E2, R2>(
    self: Stream<A, E, R>,
    tag: T,
    stream: Stream<Context.Tag.Service<T>, E2, R2>
  ): Stream<A, E | E2, R2 | Exclude<R, Context.Tag.Identifier<T>>>

Added in v2.0.0


Splits the context into two parts, providing one part using the specified layer and leaving the remainder R0.


export declare const provideSomeLayer: {
  <RIn, E2, ROut>(
    layer: Layer.Layer<ROut, E2, RIn>
  ): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, RIn | Exclude<R, ROut>>
  <A, E, R, RIn, E2, ROut>(
    self: Stream<A, E, R>,
    layer: Layer.Layer<ROut, E2, RIn>
  ): Stream<A, E | E2, RIn | Exclude<R, ROut>>

Added in v2.0.0


Updates the specified service within the context of the Stream.


export declare const updateService: {
  <T extends Context.Tag<any, any>>(
    tag: T,
    f: (service: Context.Tag.Service<T>) => Context.Tag.Service<T>
  ): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, T | R>
  <A, E, R, T extends Context.Tag<any, any>>(
    self: Stream<A, E, R>,
    tag: T,
    f: (service: Context.Tag.Service<T>) => Context.Tag.Service<T>
  ): Stream<A, E, R | T>

Added in v2.0.0



Runs the sink on the stream to produce either the sink’s result or an error.


export declare const run: {
  <A2, A, E2, R2>(
    sink: Sink.Sink<A2, A, unknown, E2, R2>
  ): <E, R>(self: Stream<A, E, R>) => Effect.Effect<A2, E2 | E, Exclude<R | R2, Scope.Scope>>
  <A, E, R, A2, E2, R2>(
    self: Stream<A, E, R>,
    sink: Sink.Sink<A2, A, unknown, E2, R2>
  ): Effect.Effect<A2, E | E2, Exclude<R | R2, Scope.Scope>>

Added in v2.0.0


Runs the stream and collects all of its elements to a chunk.


export declare const runCollect: <A, E, R>(
  self: Stream<A, E, R>
) => Effect.Effect<Chunk.Chunk<A>, E, Exclude<R, Scope.Scope>>

Added in v2.0.0


Runs the stream and emits the number of elements processed


export declare const runCount: <A, E, R>(self: Stream<A, E, R>) => Effect.Effect<number, E, Exclude<R, Scope.Scope>>

Added in v2.0.0


Runs the stream only for its effects. The emitted elements are discarded.


export declare const runDrain: <A, E, R>(self: Stream<A, E, R>) => Effect.Effect<void, E, Exclude<R, Scope.Scope>>

Added in v2.0.0


Executes a pure fold over the stream of values - reduces all elements in the stream to a value of type S.


export declare const runFold: {
  <S, A>(s: S, f: (s: S, a: A) => S): <E, R>(self: Stream<A, E, R>) => Effect.Effect<S, E, Exclude<R, Scope.Scope>>
  <A, E, R, S>(self: Stream<A, E, R>, s: S, f: (s: S, a: A) => S): Effect.Effect<S, E, Exclude<R, Scope.Scope>>

Added in v2.0.0


Executes an effectful fold over the stream of values.


export declare const runFoldEffect: {
  <S, A, E2, R2>(
    s: S,
    f: (s: S, a: A) => Effect.Effect<S, E2, R2>
  ): <E, R>(self: Stream<A, E, R>) => Effect.Effect<S, E2 | E, Exclude<R | R2, Scope.Scope>>
  <A, E, R, S, E2, R2>(
    self: Stream<A, E, R>,
    s: S,
    f: (s: S, a: A) => Effect.Effect<S, E2, R2>
  ): Effect.Effect<S, E | E2, Exclude<R | R2, Scope.Scope>>

Added in v2.0.0


Executes a pure fold over the stream of values. Returns a scoped value that represents the scope of the stream.


export declare const runFoldScoped: {
  <S, A>(s: S, f: (s: S, a: A) => S): <E, R>(self: Stream<A, E, R>) => Effect.Effect<S, E, Scope.Scope | R>
  <A, E, R, S>(self: Stream<A, E, R>, s: S, f: (s: S, a: A) => S): Effect.Effect<S, E, Scope.Scope | R>

Added in v2.0.0


Executes an effectful fold over the stream of values. Returns a scoped value that represents the scope of the stream.


export declare const runFoldScopedEffect: {
  <S, A, E2, R2>(
    s: S,
    f: (s: S, a: A) => Effect.Effect<S, E2, R2>
  ): <E, R>(self: Stream<A, E, R>) => Effect.Effect<S, E2 | E, Scope.Scope | R2 | R>
  <A, E, R, S, E2, R2>(
    self: Stream<A, E, R>,
    s: S,
    f: (s: S, a: A) => Effect.Effect<S, E2, R2>
  ): Effect.Effect<S, E | E2, Scope.Scope | R | R2>

Added in v2.0.0


Reduces the elements in the stream to a value of type S. Stops the fold early when the condition is not fulfilled. Example:


export declare const runFoldWhile: {
  <S, A>(
    s: S,
    cont: Predicate<S>,
    f: (s: S, a: A) => S
  ): <E, R>(self: Stream<A, E, R>) => Effect.Effect<S, E, Exclude<R, Scope.Scope>>
  <A, E, R, S>(
    self: Stream<A, E, R>,
    s: S,
    cont: Predicate<S>,
    f: (s: S, a: A) => S
  ): Effect.Effect<S, E, Exclude<R, Scope.Scope>>

Added in v2.0.0


Executes an effectful fold over the stream of values. Stops the fold early when the condition is not fulfilled.


export declare const runFoldWhileEffect: {
  <S, A, E2, R2>(
    s: S,
    cont: Predicate<S>,
    f: (s: S, a: A) => Effect.Effect<S, E2, R2>
  ): <E, R>(self: Stream<A, E, R>) => Effect.Effect<S, E2 | E, Exclude<R | R2, Scope.Scope>>
  <A, E, R, S, E2, R2>(
    self: Stream<A, E, R>,
    s: S,
    cont: Predicate<S>,
    f: (s: S, a: A) => Effect.Effect<S, E2, R2>
  ): Effect.Effect<S, E | E2, Exclude<R | R2, Scope.Scope>>

Added in v2.0.0


Executes a pure fold over the stream of values. Returns a scoped value that represents the scope of the stream. Stops the fold early when the condition is not fulfilled.


export declare const runFoldWhileScoped: {
  <S, A>(
    s: S,
    cont: Predicate<S>,
    f: (s: S, a: A) => S
  ): <E, R>(self: Stream<A, E, R>) => Effect.Effect<S, E, Scope.Scope | R>
  <A, E, R, S>(
    self: Stream<A, E, R>,
    s: S,
    cont: Predicate<S>,
    f: (s: S, a: A) => S
  ): Effect.Effect<S, E, Scope.Scope | R>

Added in v2.0.0


Executes an effectful fold over the stream of values. Returns a scoped value that represents the scope of the stream. Stops the fold early when the condition is not fulfilled.


export declare const runFoldWhileScopedEffect: {
  <S, A, E2, R2>(
    s: S,
    cont: Predicate<S>,
    f: (s: S, a: A) => Effect.Effect<S, E2, R2>
  ): <E, R>(self: Stream<A, E, R>) => Effect.Effect<S, E2 | E, R2 | R | Scope.Scope>
  <A, E, R, S, E2, R2>(
    self: Stream<A, E, R>,
    s: S,
    cont: Predicate<S>,
    f: (s: S, a: A) => Effect.Effect<S, E2, R2>
  ): Effect.Effect<S, E | E2, Scope.Scope | R | R2>

Added in v2.0.0


Consumes all elements of the stream, passing them to the specified callback.


export declare const runForEach: {
  <A, X, E2, R2>(
    f: (a: A) => Effect.Effect<X, E2, R2>
  ): <E, R>(self: Stream<A, E, R>) => Effect.Effect<void, E2 | E, R2 | R>
  <A, E, R, X, E2, R2>(
    self: Stream<A, E, R>,
    f: (a: A) => Effect.Effect<X, E2, R2>
  ): Effect.Effect<void, E | E2, R | R2>

Added in v2.0.0


Consumes all elements of the stream, passing them to the specified callback.


export declare const runForEachChunk: {
  <A, X, E2, R2>(
    f: (a: Chunk.Chunk<A>) => Effect.Effect<X, E2, R2>
  ): <E, R>(self: Stream<A, E, R>) => Effect.Effect<void, E2 | E, R2 | R>
  <A, E, R, X, E2, R2>(
    self: Stream<A, E, R>,
    f: (a: Chunk.Chunk<A>) => Effect.Effect<X, E2, R2>
  ): Effect.Effect<void, E | E2, R | R2>

Added in v2.0.0


Like Stream.runForEachChunk, but returns a scoped effect so the finalization order can be controlled.


export declare const runForEachChunkScoped: {
  <A, X, E2, R2>(
    f: (a: Chunk.Chunk<A>) => Effect.Effect<X, E2, R2>
  ): <E, R>(self: Stream<A, E, R>) => Effect.Effect<void, E2 | E, Scope.Scope | R2 | R>
  <A, E, R, X, E2, R2>(
    self: Stream<A, E, R>,
    f: (a: Chunk.Chunk<A>) => Effect.Effect<X, E2, R2>
  ): Effect.Effect<void, E | E2, Scope.Scope | R | R2>

Added in v2.0.0


Like Stream.forEach, but returns a scoped effect so the finalization order can be controlled.


export declare const runForEachScoped: {
  <A, X, E2, R2>(
    f: (a: A) => Effect.Effect<X, E2, R2>
  ): <E, R>(self: Stream<A, E, R>) => Effect.Effect<void, E2 | E, Scope.Scope | R2 | R>
  <A, E, R, X, E2, R2>(
    self: Stream<A, E, R>,
    f: (a: A) => Effect.Effect<X, E2, R2>
  ): Effect.Effect<void, E | E2, Scope.Scope | R | R2>

Added in v2.0.0


Consumes elements of the stream, passing them to the specified callback, and terminating consumption when the callback returns false.


export declare const runForEachWhile: {
  <A, E2, R2>(
    f: (a: A) => Effect.Effect<boolean, E2, R2>
  ): <E, R>(self: Stream<A, E, R>) => Effect.Effect<void, E2 | E, R2 | R>
  <A, E, R, E2, R2>(
    self: Stream<A, E, R>,
    f: (a: A) => Effect.Effect<boolean, E2, R2>
  ): Effect.Effect<void, E | E2, R | R2>

Added in v2.0.0


Like Stream.runForEachWhile, but returns a scoped effect so the finalization order can be controlled.


export declare const runForEachWhileScoped: {
  <A, E2, R2>(
    f: (a: A) => Effect.Effect<boolean, E2, R2>
  ): <E, R>(self: Stream<A, E, R>) => Effect.Effect<void, E2 | E, Scope.Scope | R2 | R>
  <A, E, R, E2, R2>(
    self: Stream<A, E, R>,
    f: (a: A) => Effect.Effect<boolean, E2, R2>
  ): Effect.Effect<void, E | E2, Scope.Scope | R | R2>

Added in v2.0.0


Runs the stream to completion and yields the first value emitted by it, discarding the rest of the elements.


export declare const runHead: <A, E, R>(self: Stream<A, E, R>) => Effect.Effect<Option.Option<A>, E, R>

Added in v2.0.0


Publishes elements of this stream to a PubSub. Stream failure and ending will also be signalled.


export declare const runIntoPubSub: {
  <A, E>(pubsub: PubSub.PubSub<Take.Take<A, E>>): <R>(self: Stream<A, E, R>) => Effect.Effect<void, never, R>
  <A, E, R>(self: Stream<A, E, R>, pubsub: PubSub.PubSub<Take.Take<A, E>>): Effect.Effect<void, never, R>

Added in v2.0.0


Like Stream.runIntoPubSub, but provides the result as a scoped effect to allow for scope composition.


export declare const runIntoPubSubScoped: {
  <A, E>(
    pubsub: PubSub.PubSub<Take.Take<A, E>>
  ): <R>(self: Stream<A, E, R>) => Effect.Effect<void, never, Scope.Scope | R>
  <A, E, R>(self: Stream<A, E, R>, pubsub: PubSub.PubSub<Take.Take<A, E>>): Effect.Effect<void, never, Scope.Scope | R>

Added in v2.0.0


Enqueues elements of this stream into a queue. Stream failure and ending will also be signalled.


export declare const runIntoQueue: {
  <A, E>(queue: Queue.Enqueue<Take.Take<A, E>>): <R>(self: Stream<A, E, R>) => Effect.Effect<void, never, R>
  <A, E, R>(self: Stream<A, E, R>, queue: Queue.Enqueue<Take.Take<A, E>>): Effect.Effect<void, never, R>

Added in v2.0.0


Like Stream.runIntoQueue, but provides the result as a scoped [[ZIO]] to allow for scope composition.


export declare const runIntoQueueElementsScoped: {
  <A, E>(
    queue: Queue.Enqueue<Exit.Exit<A, Option.Option<E>>>
  ): <R>(self: Stream<A, E, R>) => Effect.Effect<void, never, Scope.Scope | R>
  <A, E, R>(
    self: Stream<A, E, R>,
    queue: Queue.Enqueue<Exit.Exit<A, Option.Option<E>>>
  ): Effect.Effect<void, never, Scope.Scope | R>

Added in v2.0.0


Like Stream.runIntoQueue, but provides the result as a scoped effect to allow for scope composition.


export declare const runIntoQueueScoped: {
  <A, E>(
    queue: Queue.Enqueue<Take.Take<A, E>>
  ): <R>(self: Stream<A, E, R>) => Effect.Effect<void, never, Scope.Scope | R>
  <A, E, R>(self: Stream<A, E, R>, queue: Queue.Enqueue<Take.Take<A, E>>): Effect.Effect<void, never, Scope.Scope | R>

Added in v2.0.0


Runs the stream to completion and yields the last value emitted by it, discarding the rest of the elements.


export declare const runLast: <A, E, R>(self: Stream<A, E, R>) => Effect.Effect<Option.Option<A>, E, R>

Added in v2.0.0



export declare const runScoped: {
  <A2, A, E2, R2>(
    sink: Sink.Sink<A2, A, unknown, E2, R2>
  ): <E, R>(self: Stream<A, E, R>) => Effect.Effect<A2, E2 | E, Scope.Scope | R2 | R>
  <A, E, R, A2, E2, R2>(
    self: Stream<A, E, R>,
    sink: Sink.Sink<A2, A, unknown, E2, R2>
  ): Effect.Effect<A2, E | E2, Scope.Scope | R | R2>

Added in v2.0.0


Runs the stream to a sink which sums elements, provided they are Numeric.


export declare const runSum: <E, R>(self: Stream<number, E, R>) => Effect.Effect<number, E, R>

Added in v2.0.0


Converts the stream to a scoped PubSub of chunks. After the scope is closed, the PubSub will never again produce values and should be discarded.


export declare const toPubSub: {
      | number
      | { readonly capacity: "unbounded"; readonly replay?: number | undefined }
      | {
          readonly capacity: number
          readonly strategy?: "sliding" | "dropping" | "suspend" | undefined
          readonly replay?: number | undefined
  ): <A, E, R>(self: Stream<A, E, R>) => Effect.Effect<PubSub.PubSub<Take.Take<A, E>>, never, Scope.Scope | R>
  <A, E, R>(
    self: Stream<A, E, R>,
      | number
      | { readonly capacity: "unbounded"; readonly replay?: number | undefined }
      | {
          readonly capacity: number
          readonly strategy?: "sliding" | "dropping" | "suspend" | undefined
          readonly replay?: number | undefined
  ): Effect.Effect<PubSub.PubSub<Take.Take<A, E>>, never, Scope.Scope | R>

Added in v2.0.0


Returns in a scope a ZIO effect that can be used to repeatedly pull chunks from the stream. The pull effect fails with None when the stream is finished, or with Some error if it fails, otherwise it returns a chunk of the stream’s output.


export declare const toPull: <A, E, R>(
  self: Stream<A, E, R>
) => Effect.Effect<Effect.Effect<Chunk.Chunk<A>, Option.Option<E>, R>, never, Scope.Scope | R>


import { Effect, Stream } from "effect"

// Simulate a chunked stream
const stream = Stream.fromIterable([1, 2, 3, 4, 5]).pipe(Stream.rechunk(2))

const program = Effect.gen(function* () {
  // Create an effect to get data chunks from the stream
  const getChunk = yield* Stream.toPull(stream)

  // Continuously fetch and process chunks
  while (true) {
    const chunk = yield* getChunk

// Effect.runPromise(Effect.scoped(program)).then(console.log, console.error)
// { _id: 'Chunk', values: [ 1, 2 ] }
// { _id: 'Chunk', values: [ 3, 4 ] }
// { _id: 'Chunk', values: [ 5 ] }
// (FiberFailure) Error: {
//   "_id": "Option",
//   "_tag": "None"
// }

Added in v2.0.0


Converts the stream to a scoped queue of chunks. After the scope is closed, the queue will never again produce values and should be discarded.

Defaults to the “suspend” back pressure strategy with a capacity of 2.


export declare const toQueue: {
      | { readonly strategy?: "dropping" | "sliding" | "suspend" | undefined; readonly capacity?: number | undefined }
      | { readonly strategy: "unbounded" }
      | undefined
  ): <A, E, R>(self: Stream<A, E, R>) => Effect.Effect<Queue.Dequeue<Take.Take<A, E>>, never, Scope.Scope | R>
  <A, E, R>(
    self: Stream<A, E, R>,
      | { readonly strategy?: "dropping" | "sliding" | "suspend" | undefined; readonly capacity?: number | undefined }
      | { readonly strategy: "unbounded" }
      | undefined
  ): Effect.Effect<Queue.Dequeue<Take.Take<A, E>>, never, Scope.Scope | R>

Added in v2.0.0


Converts the stream to a scoped queue of elements. After the scope is closed, the queue will never again produce values and should be discarded.

Defaults to a capacity of 2.


export declare const toQueueOfElements: {
    options?: { readonly capacity?: number | undefined } | undefined
  ): <A, E, R>(
    self: Stream<A, E, R>
  ) => Effect.Effect<Queue.Dequeue<Exit.Exit<A, Option.Option<E>>>, never, Scope.Scope | R>
  <A, E, R>(
    self: Stream<A, E, R>,
    options?: { readonly capacity?: number | undefined } | undefined
  ): Effect.Effect<Queue.Dequeue<Exit.Exit<A, Option.Option<E>>>, never, Scope.Scope | R>

Added in v2.0.0


Converts the stream to a ReadableStream.



export declare const toReadableStream: {
  <A>(options?: { readonly strategy?: QueuingStrategy<A> | undefined }): <E>(self: Stream<A, E>) => ReadableStream<A>
  <A, E>(self: Stream<A, E>, options?: { readonly strategy?: QueuingStrategy<A> | undefined }): ReadableStream<A>

Added in v2.0.0


Converts the stream to a Effect<ReadableStream>.



export declare const toReadableStreamEffect: {
  <A>(options?: {
    readonly strategy?: QueuingStrategy<A> | undefined
  }): <E, R>(self: Stream<A, E, R>) => Effect.Effect<ReadableStream<A>, never, R>
  <A, E, R>(
    self: Stream<A, E, R>,
    options?: { readonly strategy?: QueuingStrategy<A> | undefined }
  ): Effect.Effect<ReadableStream<A>, never, R>

Added in v2.0.0


Converts the stream to a ReadableStream using the provided runtime.



export declare const toReadableStreamRuntime: {
  <A, XR>(
    runtime: Runtime<XR>,
    options?: { readonly strategy?: QueuingStrategy<A> | undefined }
  ): <E, R extends XR>(self: Stream<A, E, R>) => ReadableStream<A>
  <A, E, XR, R extends XR>(
    self: Stream<A, E, R>,
    runtime: Runtime<XR>,
    options?: { readonly strategy?: QueuingStrategy<A> | undefined }
  ): ReadableStream<A>

Added in v2.0.0

do notation


The “do simulation” in Effect allows you to write code in a more declarative style, similar to the “do notation” in other programming languages. It provides a way to define variables and perform operations on them using functions like bind and let.

Here’s how the do simulation works:

  1. Start the do simulation using the Do value
  2. Within the do simulation scope, you can use the bind function to define variables and bind them to Stream values
  3. You can accumulate multiple bind statements to define multiple variables within the scope
  4. Inside the do simulation scope, you can also use the let function to define variables and bind them to simple values


export declare const Do: Stream<{}, never, never>


import { Chunk, Effect, pipe, Stream } from "effect"

const result = pipe(
  Stream.bind("x", () => Stream.succeed(2)),
  Stream.bind("y", () => Stream.succeed(3)),
  Stream.let("sum", ({ x, y }) => x + y)
assert.deepStrictEqual(Effect.runSync(Stream.runCollect(result)), Chunk.of({ x: 2, y: 3, sum: 5 }))

Added in v2.0.0


The “do simulation” in Effect allows you to write code in a more declarative style, similar to the “do notation” in other programming languages. It provides a way to define variables and perform operations on them using functions like bind and let.

Here’s how the do simulation works:

  1. Start the do simulation using the Do value
  2. Within the do simulation scope, you can use the bind function to define variables and bind them to Stream values
  3. You can accumulate multiple bind statements to define multiple variables within the scope
  4. Inside the do simulation scope, you can also use the let function to define variables and bind them to simple values


export declare const bind: {
  <N extends string, A, B, E2, R2>(
    tag: Exclude<N, keyof A>,
    f: (_: NoInfer<A>) => Stream<B, E2, R2>,
      | { readonly concurrency?: number | "unbounded" | undefined; readonly bufferSize?: number | undefined }
      | undefined
  ): <E, R>(self: Stream<A, E, R>) => Stream<{ [K in N | keyof A]: K extends keyof A ? A[K] : B }, E2 | E, R2 | R>
  <A, E, R, N extends string, B, E2, R2>(
    self: Stream<A, E, R>,
    tag: Exclude<N, keyof A>,
    f: (_: NoInfer<A>) => Stream<B, E2, R2>,
      | { readonly concurrency?: number | "unbounded" | undefined; readonly bufferSize?: number | undefined }
      | undefined
  ): Stream<{ [K in N | keyof A]: K extends keyof A ? A[K] : B }, E | E2, R | R2>


import { Chunk, Effect, pipe, Stream } from "effect"

const result = pipe(
  Stream.bind("x", () => Stream.succeed(2)),
  Stream.bind("y", () => Stream.succeed(3)),
  Stream.let("sum", ({ x, y }) => x + y)
assert.deepStrictEqual(Effect.runSync(Stream.runCollect(result)), Chunk.of({ x: 2, y: 3, sum: 5 }))

Added in v2.0.0


Binds an effectful value in a do scope


export declare const bindEffect: {
  <N extends string, A, B, E2, R2>(
    tag: Exclude<N, keyof A>,
    f: (_: NoInfer<A>) => Effect.Effect<B, E2, R2>,
    options?: { readonly concurrency?: number | "unbounded" | undefined; readonly bufferSize?: number | undefined }
  ): <E, R>(self: Stream<A, E, R>) => Stream<{ [K in keyof A | N]: K extends keyof A ? A[K] : B }, E | E2, R | R2>
  <A, E, R, N extends string, B, E2, R2>(
    self: Stream<A, E, R>,
    tag: Exclude<N, keyof A>,
    f: (_: NoInfer<A>) => Effect.Effect<B, E2, R2>,
    options?: { readonly concurrency?: number | "unbounded" | undefined; readonly unordered?: boolean | undefined }
  ): Stream<{ [K in keyof A | N]: K extends keyof A ? A[K] : B }, E | E2, R | R2>

Added in v2.0.0


The “do simulation” in Effect allows you to write code in a more declarative style, similar to the “do notation” in other programming languages. It provides a way to define variables and perform operations on them using functions like bind and let.

Here’s how the do simulation works:

  1. Start the do simulation using the Do value
  2. Within the do simulation scope, you can use the bind function to define variables and bind them to Stream values
  3. You can accumulate multiple bind statements to define multiple variables within the scope
  4. Inside the do simulation scope, you can also use the let function to define variables and bind them to simple values


export declare const bindTo: {
  <N extends string>(name: N): <A, E, R>(self: Stream<A, E, R>) => Stream<{ [K in N]: A }, E, R>
  <A, E, R, N extends string>(self: Stream<A, E, R>, name: N): Stream<{ [K in N]: A }, E, R>


import { Chunk, Effect, pipe, Stream } from "effect"

const result = pipe(
  Stream.bind("x", () => Stream.succeed(2)),
  Stream.bind("y", () => Stream.succeed(3)),
  Stream.let("sum", ({ x, y }) => x + y)
assert.deepStrictEqual(Effect.runSync(Stream.runCollect(result)), Chunk.of({ x: 2, y: 3, sum: 5 }))

Added in v2.0.0


The “do simulation” in Effect allows you to write code in a more declarative style, similar to the “do notation” in other programming languages. It provides a way to define variables and perform operations on them using functions like bind and let.

Here’s how the do simulation works:

  1. Start the do simulation using the Do value
  2. Within the do simulation scope, you can use the bind function to define variables and bind them to Stream values
  3. You can accumulate multiple bind statements to define multiple variables within the scope
  4. Inside the do simulation scope, you can also use the let function to define variables and bind them to simple values


export declare const let: {
  <N extends string, A extends object, B>(
    name: Exclude<N, keyof A>,
    f: (a: NoInfer<A>) => B
  ): <E, R>(self: Stream<A, E, R>) => Stream<{ [K in N | keyof A]: K extends keyof A ? A[K] : B }, E, R>
  <A extends object, E, R, N extends string, B>(
    self: Stream<A, E, R>,
    name: Exclude<N, keyof A>,
    f: (a: NoInfer<A>) => B
  ): Stream<{ [K in N | keyof A]: K extends keyof A ? A[K] : B }, E, R>


import { Chunk, Effect, pipe, Stream } from "effect"

const result = pipe(
  Stream.bind("x", () => Stream.succeed(2)),
  Stream.bind("y", () => Stream.succeed(3)),
  Stream.let("sum", ({ x, y }) => x + y)
assert.deepStrictEqual(Effect.runSync(Stream.runCollect(result)), Chunk.of({ x: 2, y: 3, sum: 5 }))

Added in v2.0.0



Finds the first element emitted by this stream that satisfies the provided predicate.


export declare const find: {
  <A, B extends A>(refinement: Refinement<NoInfer<A>, B>): <E, R>(self: Stream<A, E, R>) => Stream<B, E, R>
  <A>(predicate: Predicate<NoInfer<A>>): <E, R>(self: Stream<A, E, R>) => Stream<A, E, R>
  <A, E, R, B extends A>(self: Stream<A, E, R>, refinement: Refinement<A, B>): Stream<B, E, R>
  <A, E, R>(self: Stream<A, E, R>, predicate: Predicate<A>): Stream<A, E, R>

Added in v2.0.0


Finds the first element emitted by this stream that satisfies the provided effectful predicate.


export declare const findEffect: {
  <A, E2, R2>(
    predicate: (a: NoInfer<A>) => Effect.Effect<boolean, E2, R2>
  ): <E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, R2 | R>
  <A, E, R, E2, R2>(
    self: Stream<A, E, R>,
    predicate: (a: NoInfer<A>) => Effect.Effect<boolean, E2, R2>
  ): Stream<A, E | E2, R | R2>

Added in v2.0.0



Decode Uint8Array chunks into a stream of strings using the specified encoding.


export declare const decodeText: {
  (encoding?: string | undefined): <E, R>(self: Stream<Uint8Array, E, R>) => Stream<string, E, R>
  <E, R>(self: Stream<Uint8Array, E, R>, encoding?: string | undefined): Stream<string, E, R>

Added in v2.0.0


Encode a stream of strings into a stream of Uint8Array chunks using the specified encoding.


export declare const encodeText: <E, R>(self: Stream<string, E, R>) => Stream<Uint8Array, E, R>

Added in v2.0.0

error handling


Switches over to the stream produced by the provided function in case this one fails with a typed error.


export declare const catchAll: {
  <E, A2, E2, R2>(f: (error: E) => Stream<A2, E2, R2>): <A, R>(self: Stream<A, E, R>) => Stream<A2 | A, E2, R2 | R>
  <A, E, R, A2, E2, R2>(self: Stream<A, E, R>, f: (error: E) => Stream<A2, E2, R2>): Stream<A | A2, E2, R | R2>

Added in v2.0.0


Switches over to the stream produced by the provided function in case this one fails. Allows recovery from all causes of failure, including interruption if the stream is uninterruptible.


export declare const catchAllCause: {
  <E, A2, E2, R2>(
    f: (cause: Cause.Cause<E>) => Stream<A2, E2, R2>
  ): <A, R>(self: Stream<A, E, R>) => Stream<A2 | A, E2, R2 | R>
  <A, E, R, A2, E2, R2>(
    self: Stream<A, E, R>,
    f: (cause: Cause.Cause<E>) => Stream<A2, E2, R2>
  ): Stream<A | A2, E2, R | R2>

Added in v2.0.0


Switches over to the stream produced by the provided function in case this one fails with some typed error.


export declare const catchSome: {
  <E, A2, E2, R2>(
    pf: (error: E) => Option.Option<Stream<A2, E2, R2>>
  ): <A, R>(self: Stream<A, E, R>) => Stream<A2 | A, E | E2, R2 | R>
  <A, E, R, A2, E2, R2>(
    self: Stream<A, E, R>,
    pf: (error: E) => Option.Option<Stream<A2, E2, R2>>
  ): Stream<A | A2, E | E2, R | R2>

Added in v2.0.0


Switches over to the stream produced by the provided function in case this one fails with some errors. Allows recovery from all causes of failure, including interruption if the stream is uninterruptible.


export declare const catchSomeCause: {
  <E, A2, E2, R2>(
    pf: (cause: Cause.Cause<E>) => Option.Option<Stream<A2, E2, R2>>
  ): <A, R>(self: Stream<A, E, R>) => Stream<A2 | A, E | E2, R2 | R>
  <A, E, R, A2, E2, R2>(
    self: Stream<A, E, R>,
    pf: (cause: Cause.Cause<E>) => Option.Option<Stream<A2, E2, R2>>
  ): Stream<A | A2, E | E2, R | R2>

Added in v2.0.0


Switches over to the stream produced by the provided function in case this one fails with an error matching the given _tag.


export declare const catchTag: {
  <K extends E["_tag"] & string, E extends { _tag: string }, A1, E1, R1>(
    k: K,
    f: (e: Extract<E, { _tag: K }>) => Stream<A1, E1, R1>
  ): <A, R>(self: Stream<A, E, R>) => Stream<A1 | A, E1 | Exclude<E, { _tag: K }>, R1 | R>
  <A, E extends { _tag: string }, R, K extends E["_tag"] & string, A1, E1, R1>(
    self: Stream<A, E, R>,
    k: K,
    f: (e: Extract<E, { _tag: K }>) => Stream<A1, E1, R1>
  ): Stream<A | A1, E1 | Exclude<E, { _tag: K }>, R | R1>

Added in v2.0.0


Switches over to the stream produced by one of the provided functions, in case this one fails with an error matching one of the given _tag’s.


export declare const catchTags: {
    E extends { _tag: string },
    Cases extends { [K in E["_tag"]]+?: (error: Extract<E, { _tag: K }>) => Stream<any, any, any> }
    cases: Cases
  ): <A, R>(
    self: Stream<A, E, R>
  ) => Stream<
    | A
    | {
        [K in keyof Cases]: Cases[K] extends (...args: Array<any>) => Stream.Variance<infer A, infer _E, infer _R>
          ? A
          : never
      }[keyof Cases],
    | Exclude<E, { _tag: keyof Cases }>
    | {
        [K in keyof Cases]: Cases[K] extends (...args: Array<any>) => Stream.Variance<infer _A, infer E, infer _R>
          ? E
          : never
      }[keyof Cases],
    | R
    | {
        [K in keyof Cases]: Cases[K] extends (...args: Array<any>) => Stream.Variance<infer _A, infer _E, infer R>
          ? R
          : never
      }[keyof Cases]
    E extends { _tag: string },
    Cases extends { [K in E["_tag"]]+?: (error: Extract<E, { _tag: K }>) => Stream<any, any, any> }
    self: Stream<A, E, R>,
    cases: Cases
  ): Stream<
    | A
    | {
        [K in keyof Cases]: Cases[K] extends (...args: Array<any>) => Stream.Variance<infer _R, infer _E, infer A>
          ? A
          : never
      }[keyof Cases],
    | Exclude<E, { _tag: keyof Cases }>
    | {
        [K in keyof Cases]: Cases[K] extends (...args: Array<any>) => Stream.Variance<infer _R, infer E, infer _A>
          ? E
          : never
      }[keyof Cases],
    | R
    | {
        [K in keyof Cases]: Cases[K] extends (...args: Array<any>) => Stream.Variance<infer R, infer _E, infer _A>
          ? R
          : never
      }[keyof Cases]

Added in v2.0.0


Translates any failure into a stream termination, making the stream infallible and all failures unchecked.


export declare const orDie: <A, E, R>(self: Stream<A, E, R>) => Stream<A, never, R>

Added in v2.0.0


Keeps none of the errors, and terminates the stream with them, using the specified function to convert the E into a defect.


export declare const orDieWith: {
  <E>(f: (e: E) => unknown): <A, R>(self: Stream<A, E, R>) => Stream<A, never, R>
  <A, E, R>(self: Stream<A, E, R>, f: (e: E) => unknown): Stream<A, never, R>

Added in v2.0.0


Switches to the provided stream in case this one fails with a typed error.

See also Stream.catchAll.


export declare const orElse: {
  <A2, E2, R2>(that: LazyArg<Stream<A2, E2, R2>>): <A, E, R>(self: Stream<A, E, R>) => Stream<A2 | A, E2, R2 | R>
  <A, E, R, A2, E2, R2>(self: Stream<A, E, R>, that: LazyArg<Stream<A2, E2, R2>>): Stream<A | A2, E2, R | R2>

Added in v2.0.0


Switches to the provided stream in case this one fails with a typed error.

See also Stream.catchAll.


export declare const orElseEither: {
  <A2, E2, R2>(
    that: LazyArg<Stream<A2, E2, R2>>
  ): <A, E, R>(self: Stream<A, E, R>) => Stream<Either.Either<A2, A>, E2, R2 | R>
  <A, E, R, A2, E2, R2>(
    self: Stream<A, E, R>,
    that: LazyArg<Stream<A2, E2, R2>>
  ): Stream<Either.Either<A2, A>, E2, R | R2>

Added in v2.0.0


Fails with given error in case this one fails with a typed error.

See also Stream.catchAll.


export declare const orElseFail: {
  <E2>(error: LazyArg<E2>): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E2, R>
  <A, E, R, E2>(self: Stream<A, E, R>, error: LazyArg<E2>): Stream<A, E2, R>

Added in v2.0.0


Produces the specified element if this stream is empty.


export declare const orElseIfEmpty: {
  <A2>(element: LazyArg<A2>): <A, E, R>(self: Stream<A, E, R>) => Stream<A2 | A, E, R>
  <A, E, R, A2>(self: Stream<A, E, R>, element: LazyArg<A2>): Stream<A | A2, E, R>

Added in v2.0.0


Produces the specified chunk if this stream is empty.


export declare const orElseIfEmptyChunk: {
  <A2>(chunk: LazyArg<Chunk.Chunk<A2>>): <A, E, R>(self: Stream<A, E, R>) => Stream<A2 | A, E, R>
  <A, E, R, A2>(self: Stream<A, E, R>, chunk: LazyArg<Chunk.Chunk<A2>>): Stream<A | A2, E, R>

Added in v2.0.0


Switches to the provided stream in case this one is empty.


export declare const orElseIfEmptyStream: {
  <A2, E2, R2>(stream: LazyArg<Stream<A2, E2, R2>>): <A, E, R>(self: Stream<A, E, R>) => Stream<A2 | A, E2 | E, R2 | R>
  <A, E, R, A2, E2, R2>(self: Stream<A, E, R>, stream: LazyArg<Stream<A2, E2, R2>>): Stream<A | A2, E | E2, R | R2>

Added in v2.0.0


Succeeds with the specified value if this one fails with a typed error.


export declare const orElseSucceed: {
  <A2>(value: LazyArg<A2>): <A, E, R>(self: Stream<A, E, R>) => Stream<A2 | A, never, R>
  <A, E, R, A2>(self: Stream<A, E, R>, value: LazyArg<A2>): Stream<A | A2, never, R>

Added in v2.0.0


Keeps some of the errors, and terminates the fiber with the rest


export declare const refineOrDie: {
  <E, E2>(pf: (error: E) => Option.Option<E2>): <A, R>(self: Stream<A, E, R>) => Stream<A, E2, R>
  <A, E, R, E2>(self: Stream<A, E, R>, pf: (error: E) => Option.Option<E2>): Stream<A, E2, R>

Added in v2.0.0


Keeps some of the errors, and terminates the fiber with the rest, using the specified function to convert the E into a defect.


export declare const refineOrDieWith: {
  <E, E2>(
    pf: (error: E) => Option.Option<E2>,
    f: (error: E) => unknown
  ): <A, R>(self: Stream<A, E, R>) => Stream<A, E2, R>
  <A, E, R, E2>(self: Stream<A, E, R>, pf: (error: E) => Option.Option<E2>, f: (error: E) => unknown): Stream<A, E2, R>

Added in v2.0.0



Filters the elements emitted by this stream using the provided function.


export declare const filter: {
  <A, B extends A>(refinement: Refinement<NoInfer<A>, B>): <E, R>(self: Stream<A, E, R>) => Stream<B, E, R>
  <A, B extends A>(predicate: Predicate<B>): <E, R>(self: Stream<A, E, R>) => Stream<A, E, R>
  <A, E, R, B extends A>(self: Stream<A, E, R>, refinement: Refinement<A, B>): Stream<B, E, R>
  <A, E, R>(self: Stream<A, E, R>, predicate: Predicate<A>): Stream<A, E, R>


import { Effect, Stream } from "effect"

const stream = Stream.range(1, 11).pipe(Stream.filter((n) => n % 2 === 0))

// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 2, 4, 6, 8, 10 ] }

Added in v2.0.0


Effectfully filters the elements emitted by this stream.


export declare const filterEffect: {
  <A, E2, R2>(
    f: (a: NoInfer<A>) => Effect.Effect<boolean, E2, R2>
  ): <E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, R2 | R>
  <A, E, R, E2, R2>(self: Stream<A, E, R>, f: (a: A) => Effect.Effect<boolean, E2, R2>): Stream<A, E | E2, R | R2>

Added in v2.0.0



Creates a pipeline that groups on adjacent keys, calculated by the specified function.


export declare const groupAdjacentBy: {
  <A, K>(f: (a: A) => K): <E, R>(self: Stream<A, E, R>) => Stream<[K, Chunk.NonEmptyChunk<A>], E, R>
  <A, E, R, K>(self: Stream<A, E, R>, f: (a: A) => K): Stream<[K, Chunk.NonEmptyChunk<A>], E, R>

Added in v2.0.0


More powerful version of Stream.groupByKey.


export declare const groupBy: {
  <A, K, V, E2, R2>(
    f: (a: A) => Effect.Effect<readonly [K, V], E2, R2>,
    options?: { readonly bufferSize?: number | undefined } | undefined
  ): <E, R>(self: Stream<A, E, R>) => GroupBy.GroupBy<K, V, E2 | E, R2 | R>
  <A, E, R, K, V, E2, R2>(
    self: Stream<A, E, R>,
    f: (a: A) => Effect.Effect<readonly [K, V], E2, R2>,
    options?: { readonly bufferSize?: number | undefined } | undefined
  ): GroupBy.GroupBy<K, V, E | E2, R | R2>


import { Chunk, Effect, GroupBy, Stream } from "effect"

const groupByKeyResult = Stream.fromIterable([
]).pipe(Stream.groupBy((name) => Effect.succeed([name.substring(0, 1), name])))

const stream = GroupBy.evaluate(groupByKeyResult, (key, stream) =>
  Stream.fromEffect(Stream.runCollect(stream).pipe(Effect.andThen((chunk) => [key, Chunk.size(chunk)] as const)))

// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// {
//   _id: 'Chunk',
//   values: [ [ 'M', 1 ], [ 'J', 3 ], [ 'R', 2 ], [ 'P', 2 ] ]
// }

Added in v2.0.0


Partition a stream using a function and process each stream individually. This returns a data structure that can be used to further filter down which groups shall be processed.

After calling apply on the GroupBy object, the remaining groups will be processed in parallel and the resulting streams merged in a nondeterministic fashion.

Up to buffer elements may be buffered in any group stream before the producer is backpressured. Take care to consume from all streams in order to prevent deadlocks.

For example, to collect the first 2 words for every starting letter from a stream of words:

import * as GroupBy from "./GroupBy"
import * as Stream from "./Stream"
import { pipe } from "./Function"

  Stream.fromIterable(["hello", "world", "hi", "holla"]),
  Stream.groupByKey((word) => word[0]),
  GroupBy.evaluate((key, stream) =>
      Stream.take(2), => [key, words] as const)


export declare const groupByKey: {
  <A, K>(
    f: (a: A) => K,
    options?: { readonly bufferSize?: number | undefined }
  ): <E, R>(self: Stream<A, E, R>) => GroupBy.GroupBy<K, A, E, R>
  <A, E, R, K>(
    self: Stream<A, E, R>,
    f: (a: A) => K,
    options?: { readonly bufferSize?: number | undefined }
  ): GroupBy.GroupBy<K, A, E, R>

Added in v2.0.0


Partitions the stream with specified chunkSize.


export declare const grouped: {
  (chunkSize: number): <A, E, R>(self: Stream<A, E, R>) => Stream<Chunk.Chunk<A>, E, R>
  <A, E, R>(self: Stream<A, E, R>, chunkSize: number): Stream<Chunk.Chunk<A>, E, R>


import { Effect, Stream } from "effect"

const stream = Stream.range(0, 8).pipe(Stream.grouped(3))

// Effect.runPromise(Stream.runCollect(stream)).then((chunks) => console.log("%o", chunks))
// {
//   _id: 'Chunk',
//   values: [
//     { _id: 'Chunk', values: [ 0, 1, 2, [length]: 3 ] },
//     { _id: 'Chunk', values: [ 3, 4, 5, [length]: 3 ] },
//     { _id: 'Chunk', values: [ 6, 7, 8, [length]: 3 ] },
//     [length]: 3
//   ]
// }

Added in v2.0.0


Partitions the stream with the specified chunkSize or until the specified duration has passed, whichever is satisfied first.


export declare const groupedWithin: {
    chunkSize: number,
    duration: Duration.DurationInput
  ): <A, E, R>(self: Stream<A, E, R>) => Stream<Chunk.Chunk<A>, E, R>
  <A, E, R>(self: Stream<A, E, R>, chunkSize: number, duration: Duration.DurationInput): Stream<Chunk.Chunk<A>, E, R>


import { Chunk, Effect, Schedule, Stream } from "effect"

const stream = Stream.range(0, 9).pipe(
  Stream.repeat(Schedule.spaced("1 second")),
  Stream.groupedWithin(18, "1.5 seconds"),

// Effect.runPromise(Stream.runCollect(stream)).then((chunks) => console.log(Chunk.toArray(chunks)))
// [
//   {
//     _id: 'Chunk',
//     values: [
//       0, 1, 2, 3, 4, 5, 6,
//       7, 8, 9, 0, 1, 2, 3,
//       4, 5, 6, 7
//     ]
//   },
//   {
//     _id: 'Chunk',
//     values: [
//       8, 9, 0, 1, 2,
//       3, 4, 5, 6, 7,
//       8, 9
//     ]
//   },
//   {
//     _id: 'Chunk',
//     values: [
//       0, 1, 2, 3, 4, 5, 6,
//       7, 8, 9, 0, 1, 2, 3,
//       4, 5, 6, 7
//     ]
//   }
// ]

Added in v2.0.0



Maps the success values of this stream to the specified constant value.


export declare const as: {
  <B>(value: B): <A, E, R>(self: Stream<A, E, R>) => Stream<B, E, R>
  <A, E, R, B>(self: Stream<A, E, R>, value: B): Stream<B, E, R>


import { Effect, Stream } from "effect"

const stream = Stream.range(1, 5).pipe(

// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ null, null, null, null, null ] }

Added in v2.0.0


Transforms the elements of this stream using the supplied function.


export declare const map: {
  <A, B>(f: (a: A) => B): <E, R>(self: Stream<A, E, R>) => Stream<B, E, R>
  <A, E, R, B>(self: Stream<A, E, R>, f: (a: A) => B): Stream<B, E, R>


import { Effect, Stream } from "effect"

const stream = Stream.make(1, 2, 3).pipe( => n + 1))

// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 2, 3, 4 ] }

Added in v2.0.0


Statefully maps over the elements of this stream to produce new elements.


export declare const mapAccum: {
  <S, A, A2>(s: S, f: (s: S, a: A) => readonly [S, A2]): <E, R>(self: Stream<A, E, R>) => Stream<A2, E, R>
  <A, E, R, S, A2>(self: Stream<A, E, R>, s: S, f: (s: S, a: A) => readonly [S, A2]): Stream<A2, E, R>


import { Effect, Stream } from "effect"

const runningTotal = (stream: Stream.Stream<number>): Stream.Stream<number> =>
  stream.pipe(Stream.mapAccum(0, (s, a) => [s + a, s + a]))

// input:  0, 1, 2, 3, 4, 5, 6
// Effect.runPromise(Stream.runCollect(runningTotal(Stream.range(0, 6)))).then(
//   console.log
// )
// { _id: "Chunk", values: [ 0, 1, 3, 6, 10, 15, 21 ] }

Added in v2.0.0


Statefully and effectfully maps over the elements of this stream to produce new elements.


export declare const mapAccumEffect: {
  <S, A, A2, E2, R2>(
    s: S,
    f: (s: S, a: A) => Effect.Effect<readonly [S, A2], E2, R2>
  ): <E, R>(self: Stream<A, E, R>) => Stream<A2, E2 | E, R2 | R>
  <A, E, R, S, A2, E2, R2>(
    self: Stream<A, E, R>,
    s: S,
    f: (s: S, a: A) => Effect.Effect<readonly [S, A2], E2, R2>
  ): Stream<A2, E | E2, R | R2>

Added in v2.0.0


Transforms the chunks emitted by this stream.


export declare const mapChunks: {
  <A, B>(f: (chunk: Chunk.Chunk<A>) => Chunk.Chunk<B>): <E, R>(self: Stream<A, E, R>) => Stream<B, E, R>
  <A, E, R, B>(self: Stream<A, E, R>, f: (chunk: Chunk.Chunk<A>) => Chunk.Chunk<B>): Stream<B, E, R>

Added in v2.0.0


Effectfully transforms the chunks emitted by this stream.


export declare const mapChunksEffect: {
  <A, B, E2, R2>(
    f: (chunk: Chunk.Chunk<A>) => Effect.Effect<Chunk.Chunk<B>, E2, R2>
  ): <E, R>(self: Stream<A, E, R>) => Stream<B, E2 | E, R2 | R>
  <A, E, R, B, E2, R2>(
    self: Stream<A, E, R>,
    f: (chunk: Chunk.Chunk<A>) => Effect.Effect<Chunk.Chunk<B>, E2, R2>
  ): Stream<B, E | E2, R | R2>

Added in v2.0.0


Maps each element to an iterable, and flattens the iterables into the output of this stream.


export declare const mapConcat: {
  <A, A2>(f: (a: A) => Iterable<A2>): <E, R>(self: Stream<A, E, R>) => Stream<A2, E, R>
  <A, E, R, A2>(self: Stream<A, E, R>, f: (a: A) => Iterable<A2>): Stream<A2, E, R>


import { Effect, Stream } from "effect"

const numbers = Stream.make("1-2-3", "4-5", "6").pipe(
  Stream.mapConcat((s) => s.split("-")), => parseInt(s))

// Effect.runPromise(Stream.runCollect(numbers)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 4, 5, 6 ] }

Added in v2.0.0


Maps each element to a chunk, and flattens the chunks into the output of this stream.


export declare const mapConcatChunk: {
  <A, A2>(f: (a: A) => Chunk.Chunk<A2>): <E, R>(self: Stream<A, E, R>) => Stream<A2, E, R>
  <A, E, R, A2>(self: Stream<A, E, R>, f: (a: A) => Chunk.Chunk<A2>): Stream<A2, E, R>

Added in v2.0.0


Effectfully maps each element to a chunk, and flattens the chunks into the output of this stream.


export declare const mapConcatChunkEffect: {
  <A, A2, E2, R2>(
    f: (a: A) => Effect.Effect<Chunk.Chunk<A2>, E2, R2>
  ): <E, R>(self: Stream<A, E, R>) => Stream<A2, E2 | E, R2 | R>
  <A, E, R, A2, E2, R2>(
    self: Stream<A, E, R>,
    f: (a: A) => Effect.Effect<Chunk.Chunk<A2>, E2, R2>
  ): Stream<A2, E | E2, R | R2>

Added in v2.0.0


Effectfully maps each element to an iterable, and flattens the iterables into the output of this stream.


export declare const mapConcatEffect: {
  <A, A2, E2, R2>(
    f: (a: A) => Effect.Effect<Iterable<A2>, E2, R2>
  ): <E, R>(self: Stream<A, E, R>) => Stream<A2, E2 | E, R2 | R>
  <A, E, R, A2, E2, R2>(
    self: Stream<A, E, R>,
    f: (a: A) => Effect.Effect<Iterable<A2>, E2, R2>
  ): Stream<A2, E | E2, R | R2>

Added in v2.0.0


Maps over elements of the stream with the specified effectful function.


export declare const mapEffect: {
  <A, A2, E2, R2>(
    f: (a: A) => Effect.Effect<A2, E2, R2>,
      | { readonly concurrency?: number | "unbounded" | undefined; readonly unordered?: boolean | undefined }
      | undefined
  ): <E, R>(self: Stream<A, E, R>) => Stream<A2, E2 | E, R2 | R>
  <A, A2, E2, R2, K>(
    f: (a: A) => Effect.Effect<A2, E2, R2>,
    options: { readonly key: (a: A) => K; readonly bufferSize?: number | undefined }
  ): <E, R>(self: Stream<A, E, R>) => Stream<A2, E2 | E, R2 | R>
  <A, E, R, A2, E2, R2>(
    self: Stream<A, E, R>,
    f: (a: A) => Effect.Effect<A2, E2, R2>,
      | { readonly concurrency?: number | "unbounded" | undefined; readonly unordered?: boolean | undefined }
      | undefined
  ): Stream<A2, E | E2, R | R2>
  <A, E, R, A2, E2, R2, K>(
    self: Stream<A, E, R>,
    f: (a: A) => Effect.Effect<A2, E2, R2>,
    options: { readonly key: (a: A) => K; readonly bufferSize?: number | undefined }
  ): Stream<A2, E | E2, R | R2>


import { Effect, Random, Stream } from "effect"

const stream = Stream.make(10, 20, 30).pipe(Stream.mapEffect((n) => Random.nextIntBetween(0, n)))

// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// Example Output: { _id: 'Chunk', values: [ 7, 19, 8 ] }

Added in v2.0.0


Transforms the errors emitted by this stream using f.


export declare const mapError: {
  <E, E2>(f: (error: E) => E2): <A, R>(self: Stream<A, E, R>) => Stream<A, E2, R>
  <A, E, R, E2>(self: Stream<A, E, R>, f: (error: E) => E2): Stream<A, E2, R>

Added in v2.0.0


Transforms the full causes of failures emitted by this stream.


export declare const mapErrorCause: {
  <E, E2>(f: (cause: Cause.Cause<E>) => Cause.Cause<E2>): <A, R>(self: Stream<A, E, R>) => Stream<A, E2, R>
  <A, E, R, E2>(self: Stream<A, E, R>, f: (cause: Cause.Cause<E>) => Cause.Cause<E2>): Stream<A, E2, R>

Added in v2.0.0


EventListener (interface)


export interface EventListener<A> {
    event: string,
    f: (event: A) => void,
      | {
          readonly capture?: boolean
          readonly passive?: boolean
          readonly once?: boolean
          readonly signal?: AbortSignal
      | boolean
  ): void
    event: string,
    f: (event: A) => void,
      | {
          readonly capture?: boolean
      | boolean
  ): void

Added in v3.4.0

Stream (interface)

A Stream<A, E, R> is a description of a program that, when evaluated, may emit zero or more values of type A, may fail with errors of type E, and uses an context of type R. One way to think of Stream is as a Effect program that could emit multiple values.

Stream is a purely functional pull based stream. Pull based streams offer inherent laziness and backpressure, relieving users of the need to manage buffers between operators. As an optimization, Stream does not emit single values, but rather an array of values. This allows the cost of effect evaluation to be amortized.

Stream forms a monad on its A type parameter, and has error management facilities for its E type parameter, modeled similarly to Effect (with some adjustments for the multiple-valued nature of Stream). These aspects allow for rich and expressive composition of streams.


export interface Stream<out A, out E = never, out R = never> extends Stream.Variance<A, E, R>, Pipeable {
  [Unify.typeSymbol]?: unknown
  [Unify.unifySymbol]?: StreamUnify<this>
  [Unify.ignoreSymbol]?: StreamUnifyIgnore

Added in v2.0.0

StreamUnify (interface)


export interface StreamUnify<A extends { [Unify.typeSymbol]?: any }> extends Effect.EffectUnify<A> {
  Stream?: () => A[Unify.typeSymbol] extends Stream<infer A0, infer E0, infer R0> | infer _ ? Stream<A0, E0, R0> : never

Added in v2.0.0

StreamUnifyIgnore (interface)


export interface StreamUnifyIgnore extends Effect.EffectUnifyIgnore {
  Effect?: true

Added in v2.0.0



Returns a stream that mirrors the first upstream to emit an item. As soon as one of the upstream emits a first value, the other is interrupted. The resulting stream will forward all items from the “winning” source stream. Any upstream failures will cause the returned stream to fail.


export declare const race: {
  <AR, ER, RR>(right: Stream<AR, ER, RR>): <AL, EL, RL>(left: Stream<AL, EL, RL>) => Stream<AL | AR, EL | ER, RL | RR>
  <AL, EL, RL, AR, ER, RR>(left: Stream<AL, EL, RL>, right: Stream<AR, ER, RR>): Stream<AL | AR, EL | ER, RL | RR>


import { Stream, Schedule, Console, Effect } from "effect"

const stream = Stream.fromSchedule(Schedule.spaced("2 millis")).pipe(
  Stream.race(Stream.fromSchedule(Schedule.spaced("1 millis"))),

// Output each millisecond from the first stream, the rest streams are interrupted
// 0
// 1
// 2
// 3
// 4
// 5

Added in v3.7.0


Returns a stream that mirrors the first upstream to emit an item. As soon as one of the upstream emits a first value, all the others are interrupted. The resulting stream will forward all items from the “winning” source stream. Any upstream failures will cause the returned stream to fail.


export declare const raceAll: <S extends ReadonlyArray<Stream<any, any, any>>>(
  ...streams: S
) => Stream<Stream.Success<S[number]>, Stream.Error<S[number]>, Stream.Context<S[number]>>


import { Stream, Schedule, Console, Effect } from "effect"

const stream = Stream.raceAll(
  Stream.fromSchedule(Schedule.spaced("1 millis")),
  Stream.fromSchedule(Schedule.spaced("2 millis")),
  Stream.fromSchedule(Schedule.spaced("4 millis"))
).pipe(Stream.take(6), Stream.tap(Console.log))

// Output each millisecond from the first stream, the rest streams are interrupted
// 0
// 1
// 2
// 3
// 4
// 5

Added in v3.5.0



Returns a Stream that first collects n elements from the input Stream, and then creates a new Stream using the specified function, and sends all the following elements through that.


export declare const branchAfter: {
  <A, A2, E2, R2>(
    n: number,
    f: (input: Chunk.Chunk<A>) => Stream<A2, E2, R2>
  ): <E, R>(self: Stream<A, E, R>) => Stream<A2, E2 | E, R2 | R>
  <A, E, R, A2, E2, R2>(
    self: Stream<A, E, R>,
    n: number,
    f: (input: Chunk.Chunk<A>) => Stream<A2, E2, R2>
  ): Stream<A2, E | E2, R | R2>

Added in v2.0.0


Returns a stream made of the concatenation in strict order of all the streams produced by passing each element of this stream to f0


export declare const flatMap: {
  <A, A2, E2, R2>(
    f: (a: A) => Stream<A2, E2, R2>,
      | {
          readonly concurrency?: number | "unbounded" | undefined
          readonly bufferSize?: number | undefined
          readonly switch?: boolean | undefined
      | undefined
  ): <E, R>(self: Stream<A, E, R>) => Stream<A2, E2 | E, R2 | R>
  <A, E, R, A2, E2, R2>(
    self: Stream<A, E, R>,
    f: (a: A) => Stream<A2, E2, R2>,
      | {
          readonly concurrency?: number | "unbounded" | undefined
          readonly bufferSize?: number | undefined
          readonly switch?: boolean | undefined
      | undefined
  ): Stream<A2, E | E2, R | R2>

Added in v2.0.0


Flattens this stream-of-streams into a stream made of the concatenation in strict order of all the streams.


export declare const flatten: {
      | { readonly concurrency?: number | "unbounded" | undefined; readonly bufferSize?: number | undefined }
      | undefined
  ): <A, E2, R2, E, R>(self: Stream<Stream<A, E2, R2>, E, R>) => Stream<A, E2 | E, R2 | R>
  <A, E2, R2, E, R>(
    self: Stream<Stream<A, E2, R2>, E, R>,
      | { readonly concurrency?: number | "unbounded" | undefined; readonly bufferSize?: number | undefined }
      | undefined
  ): Stream<A, E2 | E, R2 | R>

Added in v2.0.0


Submerges the chunks carried by this stream into the stream’s structure, while still preserving them.


export declare const flattenChunks: <A, E, R>(self: Stream<Chunk.Chunk<A>, E, R>) => Stream<A, E, R>

Added in v2.0.0


Flattens Effect values into the stream’s structure, preserving all information about the effect.


export declare const flattenEffect: {
      | { readonly concurrency?: number | "unbounded" | undefined; readonly unordered?: boolean | undefined }
      | undefined
  ): <A, E2, R2, E, R>(self: Stream<Effect.Effect<A, E2, R2>, E, R>) => Stream<A, E2 | E, R2 | R>
  <A, E2, R2, E, R>(
    self: Stream<Effect.Effect<A, E2, R2>, E, R>,
      | { readonly concurrency?: number | "unbounded" | undefined; readonly unordered?: boolean | undefined }
      | undefined
  ): Stream<A, E2 | E, R2 | R>

Added in v2.0.0


Unwraps Exit values that also signify end-of-stream by failing with None.

For Exit values that do not signal end-of-stream, prefer:



export declare const flattenExitOption: <A, E2, E, R>(
  self: Stream<Exit.Exit<A, Option.Option<E2>>, E, R>
) => Stream<A, E | E2, R>

Added in v2.0.0


Submerges the iterables carried by this stream into the stream’s structure, while still preserving them.


export declare const flattenIterables: <A, E, R>(self: Stream<Iterable<A>, E, R>) => Stream<A, E, R>

Added in v2.0.0


Unwraps Exit values and flatten chunks that also signify end-of-stream by failing with None.


export declare const flattenTake: <A, E2, E, R>(self: Stream<Take.Take<A, E2>, E, R>) => Stream<A, E | E2, R>

Added in v2.0.0


Adds an effect to be executed at the end of the stream.


export declare const onEnd: {
  <_, E2, R2>(effect: Effect.Effect<_, E2, R2>): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, R2 | R>
  <A, E, R, _, E2, R2>(self: Stream<A, E, R>, effect: Effect.Effect<_, E2, R2>): Stream<A, E | E2, R | R2>


import { Console, Effect, Stream } from "effect"

const stream = Stream.make(1, 2, 3).pipe( => n * 2),
  Stream.tap((n) => Console.log(`after mapping: ${n}`)),
  Stream.onEnd(Console.log("Stream ended"))

// after mapping: 2
// after mapping: 4
// after mapping: 6
// Stream ended
// { _id: 'Chunk', values: [ 2, 4, 6 ] }

Added in v3.6.0


Adds an effect to be executed at the start of the stream.


export declare const onStart: {
  <_, E2, R2>(effect: Effect.Effect<_, E2, R2>): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, R2 | R>
  <A, E, R, _, E2, R2>(self: Stream<A, E, R>, effect: Effect.Effect<_, E2, R2>): Stream<A, E | E2, R | R2>


import { Console, Effect, Stream } from "effect"

const stream = Stream.make(1, 2, 3).pipe(
  Stream.onStart(Console.log("Stream started")), => n * 2),
  Stream.tap((n) => Console.log(`after mapping: ${n}`))

// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// Stream started
// after mapping: 2
// after mapping: 4
// after mapping: 6
// { _id: 'Chunk', values: [ 2, 4, 6 ] }

Added in v3.6.0


Adds an effect to consumption of every element of the stream.


export declare const tap: {
  <A, X, E2, R2>(
    f: (a: NoInfer<A>) => Effect.Effect<X, E2, R2>
  ): <E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, R2 | R>
  <A, E, R, X, E2, R2>(self: Stream<A, E, R>, f: (a: NoInfer<A>) => Effect.Effect<X, E2, R2>): Stream<A, E | E2, R | R2>


import { Console, Effect, Stream } from "effect"

const stream = Stream.make(1, 2, 3).pipe(
  Stream.tap((n) => Console.log(`before mapping: ${n}`)), => n * 2),
  Stream.tap((n) => Console.log(`after mapping: ${n}`))

// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// before mapping: 1
// after mapping: 2
// before mapping: 2
// after mapping: 4
// before mapping: 3
// after mapping: 6
// { _id: 'Chunk', values: [ 2, 4, 6 ] }

Added in v2.0.0


Returns a stream that effectfully “peeks” at the failure or success of the stream.


export declare const tapBoth: {
  <E, X1, E2, R2, A, X2, E3, R3>(options: {
    readonly onFailure: (e: NoInfer<E>) => Effect.Effect<X1, E2, R2>
    readonly onSuccess: (a: NoInfer<A>) => Effect.Effect<X2, E3, R3>
  }): <R>(self: Stream<A, E, R>) => Stream<A, E | E2 | E3, R2 | R3 | R>
  <A, E, R, X1, E2, R2, X2, E3, R3>(
    self: Stream<A, E, R>,
    options: {
      readonly onFailure: (e: NoInfer<E>) => Effect.Effect<X1, E2, R2>
      readonly onSuccess: (a: NoInfer<A>) => Effect.Effect<X2, E3, R3>
  ): Stream<A, E | E2 | E3, R | R2 | R3>

Added in v2.0.0


Returns a stream that effectfully “peeks” at the failure of the stream.


export declare const tapError: {
  <E, X, E2, R2>(
    f: (error: NoInfer<E>) => Effect.Effect<X, E2, R2>
  ): <A, R>(self: Stream<A, E, R>) => Stream<A, E | E2, R2 | R>
  <A, E, R, X, E2, R2>(self: Stream<A, E, R>, f: (error: E) => Effect.Effect<X, E2, R2>): Stream<A, E | E2, R | R2>

Added in v2.0.0


Sends all elements emitted by this stream to the specified sink in addition to emitting them.


export declare const tapSink: {
  <A, E2, R2>(sink: Sink.Sink<unknown, A, unknown, E2, R2>): <E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, R2 | R>
  <A, E, R, E2, R2>(self: Stream<A, E, R>, sink: Sink.Sink<unknown, A, unknown, E2, R2>): Stream<A, E | E2, R | R2>

Added in v2.0.0




export declare const StreamTypeId: typeof StreamTypeId

Added in v2.0.0

StreamTypeId (type alias)


export type StreamTypeId = typeof StreamTypeId

Added in v2.0.0



Wraps the stream with a new span for tracing.


export declare const withSpan: {
    name: string,
    options?: Tracer.SpanOptions | undefined
  ): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, Exclude<R, Tracer.ParentSpan>>
  <A, E, R>(
    self: Stream<A, E, R>,
    name: string,
    options?: Tracer.SpanOptions | undefined
  ): Stream<A, E, Exclude<R, Tracer.ParentSpan>>

Added in v2.0.0

type lambdas

StreamTypeLambda (interface)


export interface StreamTypeLambda extends TypeLambda {
  readonly type: Stream<this["Target"], this["Out1"], this["Out2"]>

Added in v2.0.0


Stream (namespace)

Added in v2.0.0

Variance (interface)


export interface Variance<out A, out E, out R> {
  readonly [StreamTypeId]: VarianceStruct<A, E, R>

Added in v2.0.0

VarianceStruct (interface)


export interface VarianceStruct<out A, out E, out R> {
  readonly _A: Covariant<A>
  readonly _E: Covariant<E>
  readonly _R: Covariant<R>

Added in v3.4.0

Context (type alias)


export type Context<T extends Stream<any, any, any>> = [T] extends [Stream<infer _A, infer _E, infer _R>] ? _R : never

Added in v3.4.0

DynamicTuple (type alias)


export type DynamicTuple<T, N extends number> = N extends N
  ? number extends N
    ? Array<T>
    : DynamicTupleOf<T, N, []>
  : never

Added in v2.0.0

DynamicTupleOf (type alias)


export type DynamicTupleOf<T, N extends number, R extends Array<unknown>> = R["length"] extends N
  ? R
  : DynamicTupleOf<T, N, [T, ...R]>

Added in v2.0.0

Error (type alias)


export type Error<T extends Stream<any, any, any>> = [T] extends [Stream<infer _A, infer _E, infer _R>] ? _E : never

Added in v3.4.0

Success (type alias)


export type Success<T extends Stream<any, any, any>> = [T] extends [Stream<infer _A, infer _E, infer _R>] ? _A : never

Added in v3.4.0


Collects each underlying Chunk of the stream into a new chunk, and emits it on each pull.


export declare const accumulate: <A, E, R>(self: Stream<A, E, R>) => Stream<Chunk.Chunk<A>, E, R>

Added in v2.0.0


Re-chunks the elements of the stream by accumulating each underlying chunk.


export declare const accumulateChunks: <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, R>

Added in v2.0.0


Aggregates elements of this stream using the provided sink for as long as the downstream operators on the stream are busy.

This operator divides the stream into two asynchronous “islands”. Operators upstream of this operator run on one fiber, while downstream operators run on another. Whenever the downstream fiber is busy processing elements, the upstream fiber will feed elements into the sink until it signals completion.

Any sink can be used here, but see Sink.foldWeightedEffect and Sink.foldUntilEffect for sinks that cover the common usecases.


export declare const aggregate: {
  <B, A, A2, E2, R2>(sink: Sink.Sink<B, A | A2, A2, E2, R2>): <E, R>(self: Stream<A, E, R>) => Stream<B, E2 | E, R2 | R>
  <A, E, R, B, A2, E2, R2>(self: Stream<A, E, R>, sink: Sink.Sink<B, A | A2, A2, E2, R2>): Stream<B, E | E2, R | R2>

Added in v2.0.0


Like aggregateWithinEither, but only returns the Right results.


export declare const aggregateWithin: {
  <B, A, A2, E2, R2, C, R3>(
    sink: Sink.Sink<B, A | A2, A2, E2, R2>,
    schedule: Schedule.Schedule<C, Option.Option<B>, R3>
  ): <E, R>(self: Stream<A, E, R>) => Stream<B, E2 | E, R2 | R3 | R>
  <A, E, R, B, A2, E2, R2, C, R3>(
    self: Stream<A, E, R>,
    sink: Sink.Sink<B, A | A2, A2, E2, R2>,
    schedule: Schedule.Schedule<C, Option.Option<B>, R3>
  ): Stream<B, E | E2, R | R2 | R3>

Added in v2.0.0


Aggregates elements using the provided sink until it completes, or until the delay signalled by the schedule has passed.

This operator divides the stream into two asynchronous islands. Operators upstream of this operator run on one fiber, while downstream operators run on another. Elements will be aggregated by the sink until the downstream fiber pulls the aggregated value, or until the schedule’s delay has passed.

Aggregated elements will be fed into the schedule to determine the delays between pulls.


export declare const aggregateWithinEither: {
  <B, A, A2, E2, R2, C, R3>(
    sink: Sink.Sink<B, A | A2, A2, E2, R2>,
    schedule: Schedule.Schedule<C, Option.Option<B>, R3>
  ): <E, R>(self: Stream<A, E, R>) => Stream<Either.Either<B, C>, E2 | E, R2 | R3 | R>
  <A, E, R, B, A2, E2, R2, C, R3>(
    self: Stream<A, E, R>,
    sink: Sink.Sink<B, A | A2, A2, E2, R2>,
    schedule: Schedule.Schedule<C, Option.Option<B>, R3>
  ): Stream<Either.Either<B, C>, E | E2, R | R2 | R3>

Added in v2.0.0


Fan out the stream, producing a list of streams that have the same elements as this stream. The driver stream will only ever advance the maximumLag chunks before the slowest downstream stream.


export declare const broadcast: {
  <N extends number>(
    n: N,
      | number
      | { readonly capacity: "unbounded"; readonly replay?: number | undefined }
      | {
          readonly capacity: number
          readonly strategy?: "sliding" | "dropping" | "suspend" | undefined
          readonly replay?: number | undefined
  ): <A, E, R>(self: Stream<A, E, R>) => Effect.Effect<TupleOf<N, Stream<A, E>>, never, Scope.Scope | R>
  <A, E, R, N extends number>(
    self: Stream<A, E, R>,
    n: N,
      | number
      | { readonly capacity: "unbounded"; readonly replay?: number | undefined }
      | {
          readonly capacity: number
          readonly strategy?: "sliding" | "dropping" | "suspend" | undefined
          readonly replay?: number | undefined
  ): Effect.Effect<TupleOf<N, Stream<A, E>>, never, Scope.Scope | R>


import { Console, Effect, Fiber, Schedule, Stream } from "effect"

const numbers = Effect.scoped(
  Stream.range(1, 20).pipe(
    Stream.tap((n) => Console.log(`Emit ${n} element before broadcasting`)),
    Stream.broadcast(2, 5),
    Stream.flatMap(([first, second]) =>
      Effect.gen(function* () {
        const fiber1 = yield* Stream.runFold(first, 0, (acc, e) => Math.max(acc, e)).pipe(
          Effect.andThen((max) => Console.log(`Maximum: ${max}`)),
        const fiber2 = yield* second.pipe(
          Stream.schedule(Schedule.spaced("1 second")),
          Stream.runForEach((n) => Console.log(`Logging to the Console: ${n}`)),
        yield* Fiber.join(fiber1).pipe(, { concurrent: true }))

// Effect.runPromise(numbers).then(console.log)
// Emit 1 element before broadcasting
// Emit 2 element before broadcasting
// Emit 3 element before broadcasting
// Emit 4 element before broadcasting
// Emit 5 element before broadcasting
// Emit 6 element before broadcasting
// Emit 7 element before broadcasting
// Emit 8 element before broadcasting
// Emit 9 element before broadcasting
// Emit 10 element before broadcasting
// Emit 11 element before broadcasting
// Logging to the Console: 1
// Logging to the Console: 2
// Logging to the Console: 3
// Logging to the Console: 4
// Logging to the Console: 5
// Emit 12 element before broadcasting
// Emit 13 element before broadcasting
// Emit 14 element before broadcasting
// Emit 15 element before broadcasting
// Emit 16 element before broadcasting
// Logging to the Console: 6
// Logging to the Console: 7
// Logging to the Console: 8
// Logging to the Console: 9
// Logging to the Console: 10
// Emit 17 element before broadcasting
// Emit 18 element before broadcasting
// Emit 19 element before broadcasting
// Emit 20 element before broadcasting
// Logging to the Console: 11
// Logging to the Console: 12
// Logging to the Console: 13
// Logging to the Console: 14
// Logging to the Console: 15
// Maximum: 20
// Logging to the Console: 16
// Logging to the Console: 17
// Logging to the Console: 18
// Logging to the Console: 19
// Logging to the Console: 20
// { _id: 'Chunk', values: [ undefined ] }

Added in v2.0.0


Fan out the stream, producing a dynamic number of streams that have the same elements as this stream. The driver stream will only ever advance the maximumLag chunks before the slowest downstream stream.


export declare const broadcastDynamic: {
      | number
      | { readonly capacity: "unbounded"; readonly replay?: number | undefined }
      | {
          readonly capacity: number
          readonly strategy?: "sliding" | "dropping" | "suspend" | undefined
          readonly replay?: number | undefined
  ): <A, E, R>(self: Stream<A, E, R>) => Effect.Effect<Stream<A, E>, never, Scope.Scope | R>
  <A, E, R>(
    self: Stream<A, E, R>,
      | number
      | { readonly capacity: "unbounded"; readonly replay?: number | undefined }
      | {
          readonly capacity: number
          readonly strategy?: "sliding" | "dropping" | "suspend" | undefined
          readonly replay?: number | undefined
  ): Effect.Effect<Stream<A, E>, never, Scope.Scope | R>

Added in v2.0.0


Converts the stream to a scoped list of queues. Every value will be replicated to every queue with the slowest queue being allowed to buffer maximumLag chunks before the driver is back pressured.

Queues can unsubscribe from upstream by shutting down.


export declare const broadcastedQueues: {
  <N extends number>(
    n: N,
      | number
      | { readonly capacity: "unbounded"; readonly replay?: number | undefined }
      | {
          readonly capacity: number
          readonly strategy?: "sliding" | "dropping" | "suspend" | undefined
          readonly replay?: number | undefined
  ): <A, E, R>(
    self: Stream<A, E, R>
  ) => Effect.Effect<TupleOf<N, Queue.Dequeue<Take.Take<A, E>>>, never, Scope.Scope | R>
  <A, E, R, N extends number>(
    self: Stream<A, E, R>,
    n: N,
      | number
      | { readonly capacity: "unbounded"; readonly replay?: number | undefined }
      | {
          readonly capacity: number
          readonly strategy?: "sliding" | "dropping" | "suspend" | undefined
          readonly replay?: number | undefined
  ): Effect.Effect<TupleOf<N, Queue.Dequeue<Take.Take<A, E>>>, never, Scope.Scope | R>

Added in v2.0.0


Converts the stream to a scoped dynamic amount of queues. Every chunk will be replicated to every queue with the slowest queue being allowed to buffer maximumLag chunks before the driver is back pressured.

Queues can unsubscribe from upstream by shutting down.


export declare const broadcastedQueuesDynamic: {
      | number
      | { readonly capacity: "unbounded"; readonly replay?: number | undefined }
      | {
          readonly capacity: number
          readonly strategy?: "sliding" | "dropping" | "suspend" | undefined
          readonly replay?: number | undefined
  ): <A, E, R>(
    self: Stream<A, E, R>
  ) => Effect.Effect<Effect.Effect<Queue.Dequeue<Take.Take<A, E>>, never, Scope.Scope>, never, Scope.Scope | R>
  <A, E, R>(
    self: Stream<A, E, R>,
      | number
      | { readonly capacity: "unbounded"; readonly replay?: number | undefined }
      | {
          readonly capacity: number
          readonly strategy?: "sliding" | "dropping" | "suspend" | undefined
          readonly replay?: number | undefined
  ): Effect.Effect<Effect.Effect<Queue.Dequeue<Take.Take<A, E>>, never, Scope.Scope>, never, Scope.Scope | R>

Added in v2.0.0


Allows a faster producer to progress independently of a slower consumer by buffering up to capacity elements in a queue.

Note: This combinator destroys the chunking structure. It’s recommended to use rechunk afterwards. Additionally, prefer capacities that are powers of 2 for better performance.


export declare const buffer: {
      | { readonly capacity: "unbounded" }
      | { readonly capacity: number; readonly strategy?: "dropping" | "sliding" | "suspend" | undefined }
  ): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, R>
  <A, E, R>(
    self: Stream<A, E, R>,
      | { readonly capacity: "unbounded" }
      | { readonly capacity: number; readonly strategy?: "dropping" | "sliding" | "suspend" | undefined }
  ): Stream<A, E, R>


import { Console, Effect, Schedule, Stream } from "effect"

const stream = Stream.range(1, 10).pipe(
  Stream.tap((n) => Console.log(`before buffering: ${n}`)),
  Stream.buffer({ capacity: 4 }),
  Stream.tap((n) => Console.log(`after buffering: ${n}`)),
  Stream.schedule(Schedule.spaced("5 seconds"))

// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// before buffering: 1
// before buffering: 2
// before buffering: 3
// before buffering: 4
// before buffering: 5
// before buffering: 6
// after buffering: 1
// after buffering: 2
// before buffering: 7
// after buffering: 3
// before buffering: 8
// after buffering: 4
// before buffering: 9
// after buffering: 5
// before buffering: 10
// ...

Added in v2.0.0


Allows a faster producer to progress independently of a slower consumer by buffering up to capacity chunks in a queue.


export declare const bufferChunks: {
  (options: {
    readonly capacity: number
    readonly strategy?: "dropping" | "sliding" | "suspend" | undefined
  }): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, R>
  <A, E, R>(
    self: Stream<A, E, R>,
    options: { readonly capacity: number; readonly strategy?: "dropping" | "sliding" | "suspend" | undefined }
  ): Stream<A, E, R>

Added in v2.0.0


Returns a new stream that only emits elements that are not equal to the previous element emitted, using natural equality to determine whether two elements are equal.


export declare const changes: <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, R>


import { Effect, Stream } from "effect"

const stream = Stream.make(1, 1, 1, 2, 2, 3, 4).pipe(Stream.changes)

// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 4 ] }

Added in v2.0.0


Returns a new stream that only emits elements that are not equal to the previous element emitted, using the specified function to determine whether two elements are equal.


export declare const changesWith: {
  <A>(f: (x: A, y: A) => boolean): <E, R>(self: Stream<A, E, R>) => Stream<A, E, R>
  <A, E, R>(self: Stream<A, E, R>, f: (x: A, y: A) => boolean): Stream<A, E, R>

Added in v2.0.0


Returns a new stream that only emits elements that are not equal to the previous element emitted, using the specified effectual function to determine whether two elements are equal.


export declare const changesWithEffect: {
  <A, E2, R2>(
    f: (x: A, y: A) => Effect.Effect<boolean, E2, R2>
  ): <E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, R2 | R>
  <A, E, R, E2, R2>(self: Stream<A, E, R>, f: (x: A, y: A) => Effect.Effect<boolean, E2, R2>): Stream<A, E | E2, R | R2>

Added in v2.0.0


Exposes the underlying chunks of the stream as a stream of chunks of elements.


export declare const chunks: <A, E, R>(self: Stream<A, E, R>) => Stream<Chunk.Chunk<A>, E, R>

Added in v2.0.0


Performs the specified stream transformation with the chunk structure of the stream exposed.


export declare const chunksWith: {
  <A, E, R, A2, E2, R2>(
    f: (stream: Stream<Chunk.Chunk<A>, E, R>) => Stream<Chunk.Chunk<A2>, E2, R2>
  ): (self: Stream<A, E, R>) => Stream<A2, E | E2, R | R2>
  <A, E, R, A2, E2, R2>(
    self: Stream<A, E, R>,
    f: (stream: Stream<Chunk.Chunk<A>, E, R>) => Stream<Chunk.Chunk<A2>, E2, R2>
  ): Stream<A2, E | E2, R | R2>

Added in v2.0.0


Combines the elements from this stream and the specified stream by repeatedly applying the function f to extract an element using both sides and conceptually “offer” it to the destination stream. f can maintain some internal state to control the combining process, with the initial state being specified by s.

Where possible, prefer Stream.combineChunks for a more efficient implementation.


export declare const combine: {
  <A2, E2, R2, S, R3, E, A, R4, R5, A3>(
    that: Stream<A2, E2, R2>,
    s: S,
    f: (
      s: S,
      pullLeft: Effect.Effect<A, Option.Option<E>, R3>,
      pullRight: Effect.Effect<A2, Option.Option<E2>, R4>
    ) => Effect.Effect<Exit.Exit<readonly [A3, S], Option.Option<E2 | E>>, never, R5>
  ): <R>(self: Stream<A, E, R>) => Stream<A3, E2 | E, R2 | R3 | R4 | R5 | R>
  <R, A2, E2, R2, S, R3, E, A, R4, R5, A3>(
    self: Stream<A, E, R>,
    that: Stream<A2, E2, R2>,
    s: S,
    f: (
      s: S,
      pullLeft: Effect.Effect<A, Option.Option<E>, R3>,
      pullRight: Effect.Effect<A2, Option.Option<E2>, R4>
    ) => Effect.Effect<Exit.Exit<readonly [A3, S], Option.Option<E2 | E>>, never, R5>
  ): Stream<A3, E2 | E, R | R2 | R3 | R4 | R5>

Added in v2.0.0


Combines the chunks from this stream and the specified stream by repeatedly applying the function f to extract a chunk using both sides and conceptually “offer” it to the destination stream. f can maintain some internal state to control the combining process, with the initial state being specified by s.


export declare const combineChunks: {
  <A2, E2, R2, S, R3, E, A, R4, R5, A3>(
    that: Stream<A2, E2, R2>,
    s: S,
    f: (
      s: S,
      pullLeft: Effect.Effect<Chunk.Chunk<A>, Option.Option<E>, R3>,
      pullRight: Effect.Effect<Chunk.Chunk<A2>, Option.Option<E2>, R4>
    ) => Effect.Effect<Exit.Exit<readonly [Chunk.Chunk<A3>, S], Option.Option<E2 | E>>, never, R5>
  ): <R>(self: Stream<A, E, R>) => Stream<A3, E2 | E, R2 | R3 | R4 | R5 | R>
  <R, A2, E2, R2, S, R3, E, A, R4, R5, A3>(
    self: Stream<A, E, R>,
    that: Stream<A2, E2, R2>,
    s: S,
    f: (
      s: S,
      pullLeft: Effect.Effect<Chunk.Chunk<A>, Option.Option<E>, R3>,
      pullRight: Effect.Effect<Chunk.Chunk<A2>, Option.Option<E2>, R4>
    ) => Effect.Effect<Exit.Exit<readonly [Chunk.Chunk<A3>, S], Option.Option<E2 | E>>, never, R5>
  ): Stream<A3, E2 | E, R | R2 | R3 | R4 | R5>

Added in v2.0.0


Concatenates the specified stream with this stream, resulting in a stream that emits the elements from this stream and then the elements from the specified stream.


export declare const concat: {
  <A2, E2, R2>(that: Stream<A2, E2, R2>): <A, E, R>(self: Stream<A, E, R>) => Stream<A2 | A, E2 | E, R2 | R>
  <A, E, R, A2, E2, R2>(self: Stream<A, E, R>, that: Stream<A2, E2, R2>): Stream<A | A2, E | E2, R | R2>


import { Effect, Stream } from "effect"

const s1 = Stream.make(1, 2, 3)
const s2 = Stream.make(4, 5)

const stream = Stream.concat(s1, s2)

// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 4, 5 ] }

Added in v2.0.0


Composes this stream with the specified stream to create a cartesian product of elements. The right stream would be run multiple times, for every element in the left stream.

See also for the more common point-wise variant.


export declare const cross: {
  <AR, ER, RR>(right: Stream<AR, ER, RR>): <AL, EL, RL>(left: Stream<AL, EL, RL>) => Stream<[AL, AR], EL | ER, RL | RR>
  <AL, ER, RR, AR, EL, RL>(left: Stream<AL, ER, RR>, right: Stream<AR, EL, RL>): Stream<[AL, AR], EL | ER, RL | RR>


import { Effect, Stream } from "effect"

const s1 = Stream.make(1, 2, 3)
const s2 = Stream.make("a", "b")

const product = Stream.cross(s1, s2)

// Effect.runPromise(Stream.runCollect(product)).then(console.log)
// {
//   _id: "Chunk",
//   values: [
//     [ 1, "a" ], [ 1, "b" ], [ 2, "a" ], [ 2, "b" ], [ 3, "a" ], [ 3, "b" ]
//   ]
// }

Added in v2.0.0


Composes this stream with the specified stream to create a cartesian product of elements, but keeps only elements from left stream. The right stream would be run multiple times, for every element in the left stream.

See also Stream.zipLeft for the more common point-wise variant.


export declare const crossLeft: {
  <AR, ER, RR>(right: Stream<AR, ER, RR>): <AL, EL, RL>(left: Stream<AL, EL, RL>) => Stream<AL, EL | ER, RL | RR>
  <AL, EL, RL, AR, ER, RR>(left: Stream<AL, EL, RL>, right: Stream<AR, ER, RR>): Stream<AL, EL | ER, RL | RR>

Added in v2.0.0


Composes this stream with the specified stream to create a cartesian product of elements, but keeps only elements from the right stream. The left stream would be run multiple times, for every element in the right stream.

See also Stream.zipRight for the more common point-wise variant.


export declare const crossRight: {
  <AR, ER, RR>(right: Stream<AR, ER, RR>): <AL, EL, RL>(left: Stream<AL, EL, RL>) => Stream<AR, EL | ER, RL | RR>
  <AL, EL, RL, AR, ER, RR>(left: Stream<AL, EL, RL>, right: Stream<AR, ER, RR>): Stream<AR, EL | ER, RL | RR>

Added in v2.0.0


Composes this stream with the specified stream to create a cartesian product of elements with a specified function. The right stream would be run multiple times, for every element in the left stream.

See also Stream.zipWith for the more common point-wise variant.


export declare const crossWith: {
  <AR, ER, RR, AL, A>(
    right: Stream<AR, ER, RR>,
    f: (left: AL, right: AR) => A
  ): <EL, RL>(left: Stream<AL, EL, RL>) => Stream<A, EL | ER, RL | RR>
  <AL, EL, RL, AR, ER, RR, A>(
    left: Stream<AL, EL, RL>,
    right: Stream<AR, ER, RR>,
    f: (left: AL, right: AR) => A
  ): Stream<A, EL | ER, RL | RR>

Added in v2.0.0


Delays the emission of values by holding new values for a set duration. If no new values arrive during that time the value is emitted, however if a new value is received during the holding period the previous value is discarded and the process is repeated with the new value.

This operator is useful if you have a stream of “bursty” events which eventually settle down and you only need the final event of the burst. For example, a search engine may only want to initiate a search after a user has paused typing so as to not prematurely recommend results.


export declare const debounce: {
  (duration: Duration.DurationInput): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, R>
  <A, E, R>(self: Stream<A, E, R>, duration: Duration.DurationInput): Stream<A, E, R>


import { Effect, Stream } from "effect"

let last =
const log = (message: string) =>
  Effect.sync(() => {
    const end =
    console.log(`${message} after ${end - last}ms`)
    last = end

const stream = Stream.make(1, 2, 3).pipe(
    Stream.fromEffect(Effect.sleep("200 millis").pipe( // Emit 4 after 200 ms
  Stream.concat(Stream.make(5, 6)), // Continue with more rapid values
    Stream.fromEffect(Effect.sleep("150 millis").pipe( // Emit 7 after 150 ms
  Stream.tap((n) => log(`Received ${n}`)),
  Stream.debounce("100 millis"), // Only emit values after a pause of at least 100 milliseconds,
  Stream.tap((n) => log(`> Emitted ${n}`))

// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// Received 1 after 5ms
// Received 2 after 2ms
// Received 3 after 0ms
// > Emitted 3 after 104ms
// Received 4 after 99ms
// Received 5 after 1ms
// Received 6 after 0ms
// > Emitted 6 after 101ms
// Received 7 after 50ms
// Received 8 after 1ms
// > Emitted 8 after 101ms
// { _id: 'Chunk', values: [ 3, 6, 8 ] }

Added in v2.0.0


More powerful version of Stream.broadcast. Allows to provide a function that determines what queues should receive which elements. The decide function will receive the indices of the queues in the resulting list.


export declare const distributedWith: {
  <N extends number, A>(options: {
    readonly size: N
    readonly maximumLag: number
    readonly decide: (a: A) => Effect.Effect<Predicate<number>>
  }): <E, R>(
    self: Stream<A, E, R>
  ) => Effect.Effect<TupleOf<N, Queue.Dequeue<Exit.Exit<A, Option.Option<E>>>>, never, Scope.Scope | R>
  <A, E, R, N extends number>(
    self: Stream<A, E, R>,
    options: {
      readonly size: N
      readonly maximumLag: number
      readonly decide: (a: A) => Effect.Effect<Predicate<number>>
  ): Effect.Effect<TupleOf<N, Queue.Dequeue<Exit.Exit<A, Option.Option<E>>>>, never, Scope.Scope | R>

Added in v2.0.0


More powerful version of Stream.distributedWith. This returns a function that will produce new queues and corresponding indices. You can also provide a function that will be executed after the final events are enqueued in all queues. Shutdown of the queues is handled by the driver. Downstream users can also shutdown queues manually. In this case the driver will continue but no longer backpressure on them.


export declare const distributedWithDynamic: {
  <A>(options: {
    readonly maximumLag: number
    readonly decide: (a: A) => Effect.Effect<Predicate<number>, never, never>
  }): <E, R>(
    self: Stream<A, E, R>
  ) => Effect.Effect<
    Effect.Effect<[number, Queue.Dequeue<Exit.Exit<A, Option.Option<E>>>], never, never>,
    Scope.Scope | R
  <A, E, R>(
    self: Stream<A, E, R>,
    options: { readonly maximumLag: number; readonly decide: (a: A) => Effect.Effect<Predicate<number>, never, never> }
  ): Effect.Effect<
    Effect.Effect<[number, Queue.Dequeue<Exit.Exit<A, Option.Option<E>>>], never, never>,
    Scope.Scope | R

Added in v2.0.0


Converts this stream to a stream that executes its effects but emits no elements. Useful for sequencing effects using streams:


export declare const drain: <A, E, R>(self: Stream<A, E, R>) => Stream<never, E, R>


import { Effect, Stream } from "effect"

// We create a stream and immediately drain it.
const stream = Stream.range(1, 6).pipe(Stream.drain)

// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [] }

Added in v2.0.0


Drains the provided stream in the background for as long as this stream is running. If this stream ends before other, other will be interrupted. If other fails, this stream will fail with that error.


export declare const drainFork: {
  <A2, E2, R2>(that: Stream<A2, E2, R2>): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, R2 | R>
  <A, E, R, A2, E2, R2>(self: Stream<A, E, R>, that: Stream<A2, E2, R2>): Stream<A, E | E2, R | R2>

Added in v2.0.0


Drops the specified number of elements from this stream.


export declare const drop: {
  (n: number): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, R>
  <A, E, R>(self: Stream<A, E, R>, n: number): Stream<A, E, R>

Added in v2.0.0


Drops the last specified number of elements from this stream.


export declare const dropRight: {
  (n: number): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, R>
  <A, E, R>(self: Stream<A, E, R>, n: number): Stream<A, E, R>

Added in v2.0.0


Drops all elements of the stream until the specified predicate evaluates to true.


export declare const dropUntil: {
  <A>(predicate: Predicate<NoInfer<A>>): <E, R>(self: Stream<A, E, R>) => Stream<A, E, R>
  <A, E, R>(self: Stream<A, E, R>, predicate: Predicate<A>): Stream<A, E, R>

Added in v2.0.0


Drops all elements of the stream until the specified effectful predicate evaluates to true.


export declare const dropUntilEffect: {
  <A, E2, R2>(
    predicate: (a: NoInfer<A>) => Effect.Effect<boolean, E2, R2>
  ): <E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, R2 | R>
  <A, E, R, E2, R2>(
    self: Stream<A, E, R>,
    predicate: (a: NoInfer<A>) => Effect.Effect<boolean, E2, R2>
  ): Stream<A, E | E2, R | R2>

Added in v2.0.0


Drops all elements of the stream for as long as the specified predicate evaluates to true.


export declare const dropWhile: {
  <A>(predicate: Predicate<NoInfer<A>>): <E, R>(self: Stream<A, E, R>) => Stream<A, E, R>
  <A, E, R>(self: Stream<A, E, R>, predicate: Predicate<A>): Stream<A, E, R>

Added in v2.0.0


Drops all elements of the stream for as long as the specified predicate produces an effect that evalutates to true


export declare const dropWhileEffect: {
  <A, E2, R2>(
    predicate: (a: NoInfer<A>) => Effect.Effect<boolean, E2, R2>
  ): <E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, R2 | R>
  <A, E, R, E2, R2>(
    self: Stream<A, E, R>,
    predicate: (a: A) => Effect.Effect<boolean, E2, R2>
  ): Stream<A, E | E2, R | R2>

Added in v2.0.0


Returns a stream whose failures and successes have been lifted into an Either. The resulting stream cannot fail, because the failures have been exposed as part of the Either success case.


export declare const either: <A, E, R>(self: Stream<A, E, R>) => Stream<Either.Either<A, E>, never, R>

Added in v2.0.0


Executes the provided finalizer after this stream’s finalizers run.


export declare const ensuring: {
  <X, R2>(finalizer: Effect.Effect<X, never, R2>): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, R2 | R>
  <A, E, R, X, R2>(self: Stream<A, E, R>, finalizer: Effect.Effect<X, never, R2>): Stream<A, E, R | R2>


import { Console, Effect, Stream } from "effect"

const program = Stream.fromEffect(Console.log("Application Logic.")).pipe(
  Stream.concat(Stream.finalizer(Console.log("Finalizing the stream"))),
  Stream.ensuring(Console.log("Doing some other works after stream's finalization"))

// Effect.runPromise(Stream.runCollect(program)).then(console.log)
// Application Logic.
// Finalizing the stream
// Doing some other works after stream's finalization
// { _id: 'Chunk', values: [ undefined, undefined ] }

Added in v2.0.0


Executes the provided finalizer after this stream’s finalizers run.


export declare const ensuringWith: {
  <E, R2>(
    finalizer: (exit: Exit.Exit<unknown, E>) => Effect.Effect<unknown, never, R2>
  ): <A, R>(self: Stream<A, E, R>) => Stream<A, E, R2 | R>
  <A, E, R, R2>(
    self: Stream<A, E, R>,
    finalizer: (exit: Exit.Exit<unknown, E>) => Effect.Effect<unknown, never, R2>
  ): Stream<A, E, R | R2>

Added in v2.0.0


Performs a filter and map in a single step.


export declare const filterMap: {
  <A, B>(pf: (a: A) => Option.Option<B>): <E, R>(self: Stream<A, E, R>) => Stream<B, E, R>
  <A, E, R, B>(self: Stream<A, E, R>, pf: (a: A) => Option.Option<B>): Stream<B, E, R>

Added in v2.0.0


Performs an effectful filter and map in a single step.


export declare const filterMapEffect: {
  <A, A2, E2, R2>(
    pf: (a: A) => Option.Option<Effect.Effect<A2, E2, R2>>
  ): <E, R>(self: Stream<A, E, R>) => Stream<A2, E2 | E, R2 | R>
  <A, E, R, A2, E2, R2>(
    self: Stream<A, E, R>,
    pf: (a: A) => Option.Option<Effect.Effect<A2, E2, R2>>
  ): Stream<A2, E | E2, R | R2>

Added in v2.0.0


Transforms all elements of the stream for as long as the specified partial function is defined.


export declare const filterMapWhile: {
  <A, A2>(pf: (a: A) => Option.Option<A2>): <E, R>(self: Stream<A, E, R>) => Stream<A2, E, R>
  <A, E, R, A2>(self: Stream<A, E, R>, pf: (a: A) => Option.Option<A2>): Stream<A2, E, R>

Added in v2.0.0


Effectfully transforms all elements of the stream for as long as the specified partial function is defined.


export declare const filterMapWhileEffect: {
  <A, A2, E2, R2>(
    pf: (a: A) => Option.Option<Effect.Effect<A2, E2, R2>>
  ): <E, R>(self: Stream<A, E, R>) => Stream<A2, E2 | E, R2 | R>
  <A, E, R, A2, E2, R2>(
    self: Stream<A, E, R>,
    pf: (a: A) => Option.Option<Effect.Effect<A2, E2, R2>>
  ): Stream<A2, E | E2, R | R2>

Added in v2.0.0


Repeats this stream forever.


export declare const forever: <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, R>

Added in v2.0.0


Creates a Stream using addEventListener.


export declare const fromEventListener: <A = unknown>(
  target: EventListener<A>,
  type: string,
    | boolean
    | {
        readonly capture?: boolean
        readonly passive?: boolean
        readonly once?: boolean
        readonly bufferSize?: number | "unbounded" | undefined
    | undefined
) => Stream<A>

Added in v3.1.0


Specialized version of haltWhen which halts the evaluation of this stream after the given duration.

An element in the process of being pulled will not be interrupted when the given duration completes. See interruptAfter for this behavior.


export declare const haltAfter: {
  (duration: Duration.DurationInput): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, R>
  <A, E, R>(self: Stream<A, E, R>, duration: Duration.DurationInput): Stream<A, E, R>

Added in v2.0.0


Halts the evaluation of this stream when the provided effect completes. The given effect will be forked as part of the returned stream, and its success will be discarded.

An element in the process of being pulled will not be interrupted when the effect completes. See interruptWhen for this behavior.

If the effect completes with a failure, the stream will emit that failure.


export declare const haltWhen: {
  <X, E2, R2>(effect: Effect.Effect<X, E2, R2>): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, R2 | R>
  <A, E, R, X, E2, R2>(self: Stream<A, E, R>, effect: Effect.Effect<X, E2, R2>): Stream<A, E | E2, R | R2>

Added in v2.0.0


Halts the evaluation of this stream when the provided promise resolves.

If the promise completes with a failure, the stream will emit that failure.


export declare const haltWhenDeferred: {
  <X, E2>(deferred: Deferred.Deferred<X, E2>): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, R>
  <A, E, R, X, E2>(self: Stream<A, E, R>, deferred: Deferred.Deferred<X, E2>): Stream<A, E | E2, R>

Added in v2.0.0


The identity pipeline, which does not modify streams in any way.


export declare const identity: <A, E = never, R = never>() => Stream<A, E, R>

Added in v2.0.0


Interleaves this stream and the specified stream deterministically by alternating pulling values from this stream and the specified stream. When one stream is exhausted all remaining values in the other stream will be pulled.


export declare const interleave: {
  <A2, E2, R2>(that: Stream<A2, E2, R2>): <A, E, R>(self: Stream<A, E, R>) => Stream<A2 | A, E2 | E, R2 | R>
  <A, E, R, A2, E2, R2>(self: Stream<A, E, R>, that: Stream<A2, E2, R2>): Stream<A | A2, E | E2, R | R2>


import { Effect, Stream } from "effect"

const s1 = Stream.make(1, 2, 3)
const s2 = Stream.make(4, 5, 6)

const stream = Stream.interleave(s1, s2)

// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 4, 2, 5, 3, 6 ] }

Added in v2.0.0


Combines this stream and the specified stream deterministically using the stream of boolean values pull to control which stream to pull from next. A value of true indicates to pull from this stream and a value of false indicates to pull from the specified stream. Only consumes as many elements as requested by the pull stream. If either this stream or the specified stream are exhausted further requests for values from that stream will be ignored.


export declare const interleaveWith: {
  <A2, E2, R2, E3, R3>(
    that: Stream<A2, E2, R2>,
    decider: Stream<boolean, E3, R3>
  ): <A, E, R>(self: Stream<A, E, R>) => Stream<A2 | A, E2 | E3 | E, R2 | R3 | R>
  <A, E, R, A2, E2, R2, E3, R3>(
    self: Stream<A, E, R>,
    that: Stream<A2, E2, R2>,
    decider: Stream<boolean, E3, R3>
  ): Stream<A | A2, E | E2 | E3, R | R2 | R3>


import { Effect, Stream } from "effect"

const s1 = Stream.make(1, 3, 5, 7, 9)
const s2 = Stream.make(2, 4, 6, 8, 10)

const booleanStream = Stream.make(true, false, false).pipe(Stream.forever)

const stream = Stream.interleaveWith(s1, s2, booleanStream)

// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// {
//   _id: 'Chunk',
//   values: [
//     1, 2,  4, 3, 6,
//     8, 5, 10, 7, 9
//   ]
// }

Added in v2.0.0


Specialized version of Stream.interruptWhen which interrupts the evaluation of this stream after the given Duration.


export declare const interruptAfter: {
  (duration: Duration.DurationInput): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, R>
  <A, E, R>(self: Stream<A, E, R>, duration: Duration.DurationInput): Stream<A, E, R>

Added in v2.0.0


Interrupts the evaluation of this stream when the provided effect completes. The given effect will be forked as part of this stream, and its success will be discarded. This combinator will also interrupt any in-progress element being pulled from upstream.

If the effect completes with a failure before the stream completes, the returned stream will emit that failure.


export declare const interruptWhen: {
  <X, E2, R2>(effect: Effect.Effect<X, E2, R2>): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, R2 | R>
  <A, E, R, X, E2, R2>(self: Stream<A, E, R>, effect: Effect.Effect<X, E2, R2>): Stream<A, E | E2, R | R2>

Added in v2.0.0


Interrupts the evaluation of this stream when the provided promise resolves. This combinator will also interrupt any in-progress element being pulled from upstream.

If the promise completes with a failure, the stream will emit that failure.


export declare const interruptWhenDeferred: {
  <X, E2>(deferred: Deferred.Deferred<X, E2>): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, R>
  <A, E, R, X, E2>(self: Stream<A, E, R>, deferred: Deferred.Deferred<X, E2>): Stream<A, E | E2, R>

Added in v2.0.0


Intersperse stream with provided element.


export declare const intersperse: {
  <A2>(element: A2): <A, E, R>(self: Stream<A, E, R>) => Stream<A2 | A, E, R>
  <A, E, R, A2>(self: Stream<A, E, R>, element: A2): Stream<A | A2, E, R>


import { Effect, Stream } from "effect"

const stream = Stream.make(1, 2, 3, 4, 5).pipe(Stream.intersperse(0))

// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// {
//   _id: 'Chunk',
//   values: [
//     1, 0, 2, 0, 3,
//     0, 4, 0, 5
//   ]
// }

Added in v2.0.0


Intersperse the specified element, also adding a prefix and a suffix.


export declare const intersperseAffixes: {
  <A2, A3, A4>(options: {
    readonly start: A2
    readonly middle: A3
    readonly end: A4
  }): <A, E, R>(self: Stream<A, E, R>) => Stream<A2 | A3 | A4 | A, E, R>
  <A, E, R, A2, A3, A4>(
    self: Stream<A, E, R>,
    options: { readonly start: A2; readonly middle: A3; readonly end: A4 }
  ): Stream<A | A2 | A3 | A4, E, R>


import { Effect, Stream } from "effect"

const stream = Stream.make(1, 2, 3, 4, 5).pipe(
    start: "[",
    middle: "-",
    end: "]"

// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// {
//   _id: 'Chunk',
//   values: [
//     '[', 1,   '-', 2,   '-',
//     3,   '-', 4,   '-', 5,
//     ']'
//   ]
// }

Added in v2.0.0


Returns a stream whose failure and success channels have been mapped by the specified onFailure and onSuccess functions.


export declare const mapBoth: {
  <E, E2, A, A2>(options: {
    readonly onFailure: (e: E) => E2
    readonly onSuccess: (a: A) => A2
  }): <R>(self: Stream<A, E, R>) => Stream<A2, E2, R>
  <A, E, R, E2, A2>(
    self: Stream<A, E, R>,
    options: { readonly onFailure: (e: E) => E2; readonly onSuccess: (a: A) => A2 }
  ): Stream<A2, E2, R>

Added in v2.0.0


Merges this stream and the specified stream together.

New produced stream will terminate when both specified stream terminate if no termination strategy is specified.


export declare const merge: {
  <A2, E2, R2>(
    that: Stream<A2, E2, R2>,
    options?: { readonly haltStrategy?: HaltStrategy.HaltStrategyInput | undefined } | undefined
  ): <A, E, R>(self: Stream<A, E, R>) => Stream<A2 | A, E2 | E, R2 | R>
  <A, E, R, A2, E2, R2>(
    self: Stream<A, E, R>,
    that: Stream<A2, E2, R2>,
    options?: { readonly haltStrategy?: HaltStrategy.HaltStrategyInput | undefined } | undefined
  ): Stream<A | A2, E | E2, R | R2>


import { Effect, Schedule, Stream } from "effect"

const s1 = Stream.make(1, 2, 3).pipe(Stream.schedule(Schedule.spaced("100 millis")))
const s2 = Stream.make(4, 5, 6).pipe(Stream.schedule(Schedule.spaced("200 millis")))

const stream = Stream.merge(s1, s2)

// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 4, 2, 3, 5, 6 ] }

Added in v2.0.0


Merges a variable list of streams in a non-deterministic fashion. Up to n streams may be consumed in parallel and up to outputBuffer chunks may be buffered by this operator.


export declare const mergeAll: {
  (options: {
    readonly concurrency: number | "unbounded"
    readonly bufferSize?: number | undefined
  }): <A, E, R>(streams: Iterable<Stream<A, E, R>>) => Stream<A, E, R>
  <A, E, R>(
    streams: Iterable<Stream<A, E, R>>,
    options: { readonly concurrency: number | "unbounded"; readonly bufferSize?: number | undefined }
  ): Stream<A, E, R>

Added in v2.0.0


Merges this stream and the specified stream together to produce a stream of eithers.


export declare const mergeEither: {
  <A2, E2, R2>(
    that: Stream<A2, E2, R2>
  ): <A, E, R>(self: Stream<A, E, R>) => Stream<Either.Either<A2, A>, E2 | E, R2 | R>
  <A, E, R, A2, E2, R2>(self: Stream<A, E, R>, that: Stream<A2, E2, R2>): Stream<Either.Either<A2, A>, E | E2, R | R2>

Added in v2.0.0


Merges this stream and the specified stream together, discarding the values from the right stream.


export declare const mergeLeft: {
  <AR, ER, RR>(right: Stream<AR, ER, RR>): <AL, EL, RL>(left: Stream<AL, EL, RL>) => Stream<AL, ER | EL, RR | RL>
  <AL, EL, RL, AR, ER, RR>(left: Stream<AL, EL, RL>, right: Stream<AR, ER, RR>): Stream<AL, EL | ER, RL | RR>

Added in v2.0.0


Merges this stream and the specified stream together, discarding the values from the left stream.


export declare const mergeRight: {
  <AR, ER, RR>(right: Stream<AR, ER, RR>): <AL, EL, RL>(left: Stream<AL, EL, RL>) => Stream<AR, ER | EL, RR | RL>
  <AL, EL, RL, AR, ER, RR>(left: Stream<AL, EL, RL>, right: Stream<AR, ER, RR>): Stream<AR, EL | ER, RL | RR>

Added in v2.0.0


Merges this stream and the specified stream together to a common element type with the specified mapping functions.

New produced stream will terminate when both specified stream terminate if no termination strategy is specified.


export declare const mergeWith: {
  <A2, E2, R2, A, A3, A4>(
    other: Stream<A2, E2, R2>,
    options: {
      readonly onSelf: (a: A) => A3
      readonly onOther: (a2: A2) => A4
      readonly haltStrategy?: HaltStrategy.HaltStrategyInput | undefined
  ): <E, R>(self: Stream<A, E, R>) => Stream<A3 | A4, E2 | E, R2 | R>
  <A, E, R, A2, E2, R2, A3, A4>(
    self: Stream<A, E, R>,
    other: Stream<A2, E2, R2>,
    options: {
      readonly onSelf: (a: A) => A3
      readonly onOther: (a2: A2) => A4
      readonly haltStrategy?: HaltStrategy.HaltStrategyInput | undefined
  ): Stream<A3 | A4, E | E2, R | R2>


import { Effect, Schedule, Stream } from "effect"

const s1 = Stream.make("1", "2", "3").pipe(Stream.schedule(Schedule.spaced("100 millis")))
const s2 = Stream.make(4.1, 5.3, 6.2).pipe(Stream.schedule(Schedule.spaced("200 millis")))

const stream = Stream.mergeWith(s1, s2, {
  onSelf: (s) => parseInt(s),
  onOther: (n) => Math.floor(n)

// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 4, 2, 3, 5, 6 ] }

Added in v2.0.0


Returns a combined string resulting from concatenating each of the values from the stream.


export declare const mkString: <E, R>(self: Stream<string, E, R>) => Effect.Effect<string, E, R>

Added in v2.0.0


Runs the specified effect if this stream ends.


export declare const onDone: {
  <X, R2>(cleanup: () => Effect.Effect<X, never, R2>): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, R2 | R>
  <A, E, R, X, R2>(self: Stream<A, E, R>, cleanup: () => Effect.Effect<X, never, R2>): Stream<A, E, R | R2>

Added in v2.0.0


Runs the specified effect if this stream fails, providing the error to the effect if it exists.

Note: Unlike Effect.onError there is no guarantee that the provided effect will not be interrupted.


export declare const onError: {
  <E, X, R2>(
    cleanup: (cause: Cause.Cause<E>) => Effect.Effect<X, never, R2>
  ): <A, R>(self: Stream<A, E, R>) => Stream<A, E, R2 | R>
  <A, E, R, X, R2>(
    self: Stream<A, E, R>,
    cleanup: (cause: Cause.Cause<E>) => Effect.Effect<X, never, R2>
  ): Stream<A, E, R | R2>

Added in v2.0.0


Partition a stream using a predicate. The first stream will contain all element evaluated to true and the second one will contain all element evaluated to false. The faster stream may advance by up to buffer elements further than the slower one.


export declare const partition: {
  <C extends A, B extends A, A = C>(
    refinement: Refinement<NoInfer<A>, B>,
    options?: { bufferSize?: number | undefined } | undefined
  ): <E, R>(
    self: Stream<C, E, R>
  ) => Effect.Effect<[excluded: Stream<Exclude<C, B>, E, never>, satisfying: Stream<B, E, never>], E, R | Scope.Scope>
    predicate: Predicate<A>,
    options?: { bufferSize?: number | undefined } | undefined
  ): <E, R>(
    self: Stream<A, E, R>
  ) => Effect.Effect<[excluded: Stream<A, E, never>, satisfying: Stream<A, E, never>], E, Scope.Scope | R>
  <C extends A, E, R, B extends A, A = C>(
    self: Stream<C, E, R>,
    refinement: Refinement<A, B>,
    options?: { bufferSize?: number | undefined } | undefined
  ): Effect.Effect<[excluded: Stream<Exclude<C, B>, E, never>, satisfying: Stream<B, E, never>], E, R | Scope.Scope>
  <A, E, R>(
    self: Stream<A, E, R>,
    predicate: Predicate<A>,
    options?: { bufferSize?: number | undefined } | undefined
  ): Effect.Effect<[excluded: Stream<A, E, never>, satisfying: Stream<A, E, never>], E, R | Scope.Scope>


import { Effect, Stream } from "effect"

const partition = Stream.range(1, 10).pipe(Stream.partition((n) => n % 2 === 0, { bufferSize: 5 }))

const program = Effect.scoped(
  Effect.gen(function* () {
    const [evens, odds] = yield* partition
    console.log(yield* Stream.runCollect(evens))
    console.log(yield* Stream.runCollect(odds))

// Effect.runPromise(program)
// { _id: 'Chunk', values: [ 2, 4, 6, 8, 10 ] }
// { _id: 'Chunk', values: [ 1, 3, 5, 7, 9 ] }

Added in v2.0.0


Split a stream by an effectful predicate. The faster stream may advance by up to buffer elements further than the slower one.


export declare const partitionEither: {
  <A, A3, A2, E2, R2>(
    predicate: (a: NoInfer<A>) => Effect.Effect<Either.Either<A3, A2>, E2, R2>,
    options?: { readonly bufferSize?: number | undefined } | undefined
  ): <E, R>(
    self: Stream<A, E, R>
  ) => Effect.Effect<[left: Stream<A2, E2 | E, never>, right: Stream<A3, E2 | E, never>], E2 | E, Scope.Scope | R2 | R>
  <A, E, R, A3, A2, E2, R2>(
    self: Stream<A, E, R>,
    predicate: (a: A) => Effect.Effect<Either.Either<A3, A2>, E2, R2>,
    options?: { readonly bufferSize?: number | undefined } | undefined
  ): Effect.Effect<[left: Stream<A2, E | E2, never>, right: Stream<A3, E | E2, never>], E | E2, Scope.Scope | R | R2>


import { Effect, Either, Stream } from "effect"

const partition = Stream.range(1, 9).pipe(
  Stream.partitionEither((n) => Effect.succeed(n % 2 === 0 ? Either.left(n) : Either.right(n)), { bufferSize: 5 })

const program = Effect.scoped(
  Effect.gen(function* () {
    const [evens, odds] = yield* partition
    console.log(yield* Stream.runCollect(evens))
    console.log(yield* Stream.runCollect(odds))

// Effect.runPromise(program)
// { _id: 'Chunk', values: [ 2, 4, 6, 8 ] }
// { _id: 'Chunk', values: [ 1, 3, 5, 7, 9 ] }

Added in v2.0.0


Peels off enough material from the stream to construct a Z using the provided Sink and then returns both the Z and the rest of the Stream in a scope. Like all scoped values, the provided stream is valid only within the scope.


export declare const peel: {
  <A2, A, E2, R2>(
    sink: Sink.Sink<A2, A, A, E2, R2>
  ): <E, R>(self: Stream<A, E, R>) => Effect.Effect<[A2, Stream<A, E, never>], E2 | E, Scope.Scope | R2 | R>
  <A, E, R, A2, E2, R2>(
    self: Stream<A, E, R>,
    sink: Sink.Sink<A2, A, A, E2, R2>
  ): Effect.Effect<[A2, Stream<A, E, never>], E | E2, Scope.Scope | R | R2>

Added in v2.0.0


Pipes all of the values from this stream through the provided sink.

See also Stream.transduce.


export declare const pipeThrough: {
  <A2, A, L, E2, R2>(sink: Sink.Sink<A2, A, L, E2, R2>): <E, R>(self: Stream<A, E, R>) => Stream<L, E2 | E, R2 | R>
  <A, E, R, A2, L, E2, R2>(self: Stream<A, E, R>, sink: Sink.Sink<A2, A, L, E2, R2>): Stream<L, E | E2, R | R2>

Added in v2.0.0


Pipes all the values from this stream through the provided channel.


export declare const pipeThroughChannel: {
  <R2, E, E2, A, A2>(
    channel: Channel.Channel<Chunk.Chunk<A2>, Chunk.Chunk<A>, E2, E, unknown, unknown, R2>
  ): <R>(self: Stream<A, E, R>) => Stream<A2, E2, R2 | R>
  <R, R2, E, E2, A, A2>(
    self: Stream<A, E, R>,
    channel: Channel.Channel<Chunk.Chunk<A2>, Chunk.Chunk<A>, E2, E, unknown, unknown, R2>
  ): Stream<A2, E2, R | R2>

Added in v2.0.0


Pipes all values from this stream through the provided channel, passing through any error emitted by this stream unchanged.


export declare const pipeThroughChannelOrFail: {
  <R2, E, E2, A, A2>(
    chan: Channel.Channel<Chunk.Chunk<A2>, Chunk.Chunk<A>, E2, E, unknown, unknown, R2>
  ): <R>(self: Stream<A, E, R>) => Stream<A2, E | E2, R2 | R>
  <R, R2, E, E2, A, A2>(
    self: Stream<A, E, R>,
    chan: Channel.Channel<Chunk.Chunk<A2>, Chunk.Chunk<A>, E2, E, unknown, unknown, R2>
  ): Stream<A2, E | E2, R | R2>

Added in v2.0.0


Emits the provided chunk before emitting any other value.


export declare const prepend: {
  <B>(values: Chunk.Chunk<B>): <A, E, R>(self: Stream<A, E, R>) => Stream<B | A, E, R>
  <A, E, R, B>(self: Stream<A, E, R>, values: Chunk.Chunk<B>): Stream<A | B, E, R>

Added in v2.0.0


Re-chunks the elements of the stream into chunks of n elements each. The last chunk might contain less than n elements.


export declare const rechunk: {
  (n: number): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, R>
  <A, E, R>(self: Stream<A, E, R>, n: number): Stream<A, E, R>

Added in v2.0.0


Repeats the entire stream using the specified schedule. The stream will execute normally, and then repeat again according to the provided schedule.


export declare const repeat: {
  <B, R2>(schedule: Schedule.Schedule<B, unknown, R2>): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, R2 | R>
  <A, E, R, B, R2>(self: Stream<A, E, R>, schedule: Schedule.Schedule<B, unknown, R2>): Stream<A, E, R | R2>


import { Effect, Schedule, Stream } from "effect"

const stream = Stream.repeat(Stream.succeed(1), Schedule.forever)

// Effect.runPromise(Stream.runCollect(stream.pipe(Stream.take(5)))).then(console.log)
// { _id: 'Chunk', values: [ 1, 1, 1, 1, 1 ] }

Added in v2.0.0


Repeats the entire stream using the specified schedule. The stream will execute normally, and then repeat again according to the provided schedule. The schedule output will be emitted at the end of each repetition.


export declare const repeatEither: {
  <B, R2>(
    schedule: Schedule.Schedule<B, unknown, R2>
  ): <A, E, R>(self: Stream<A, E, R>) => Stream<Either.Either<A, B>, E, R2 | R>
  <A, E, R, B, R2>(
    self: Stream<A, E, R>,
    schedule: Schedule.Schedule<B, unknown, R2>
  ): Stream<Either.Either<A, B>, E, R | R2>

Added in v2.0.0


Repeats each element of the stream using the provided schedule. Repetitions are done in addition to the first execution, which means using Schedule.recurs(1) actually results in the original effect, plus an additional recurrence, for a total of two repetitions of each value in the stream.


export declare const repeatElements: {
  <B, R2>(schedule: Schedule.Schedule<B, unknown, R2>): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, R2 | R>
  <A, E, R, B, R2>(self: Stream<A, E, R>, schedule: Schedule.Schedule<B, unknown, R2>): Stream<A, E, R | R2>

Added in v2.0.0


Repeats each element of the stream using the provided schedule. When the schedule is finished, then the output of the schedule will be emitted into the stream. Repetitions are done in addition to the first execution, which means using Schedule.recurs(1) actually results in the original effect, plus an additional recurrence, for a total of two repetitions of each value in the stream.

This function accepts two conversion functions, which allow the output of this stream and the output of the provided schedule to be unified into a single type. For example, Either or similar data type.


export declare const repeatElementsWith: {
  <B, R2, A, C>(
    schedule: Schedule.Schedule<B, unknown, R2>,
    options: { readonly onElement: (a: A) => C; readonly onSchedule: (b: B) => C }
  ): <E, R>(self: Stream<A, E, R>) => Stream<C, E, R2 | R>
  <A, E, R, B, R2, C>(
    self: Stream<A, E, R>,
    schedule: Schedule.Schedule<B, unknown, R2>,
    options: { readonly onElement: (a: A) => C; readonly onSchedule: (b: B) => C }
  ): Stream<C, E, R | R2>

Added in v2.0.0


Repeats the entire stream using the specified schedule. The stream will execute normally, and then repeat again according to the provided schedule. The schedule output will be emitted at the end of each repetition and can be unified with the stream elements using the provided functions.


export declare const repeatWith: {
  <B, R2, A, C>(
    schedule: Schedule.Schedule<B, unknown, R2>,
    options: { readonly onElement: (a: A) => C; readonly onSchedule: (b: B) => C }
  ): <E, R>(self: Stream<A, E, R>) => Stream<C, E, R2 | R>
  <A, E, R, B, R2, C>(
    self: Stream<A, E, R>,
    schedule: Schedule.Schedule<B, unknown, R2>,
    options: { readonly onElement: (a: A) => C; readonly onSchedule: (b: B) => C }
  ): Stream<C, E, R | R2>

Added in v2.0.0


When the stream fails, retry it according to the given schedule

This retries the entire stream, so will re-execute all of the stream’s acquire operations.

The schedule is reset as soon as the first element passes through the stream again.


export declare const retry: {
  <E0 extends E, R2, E, X>(
    schedule: Schedule.Schedule<X, E0, R2>
  ): <A, R>(self: Stream<A, E, R>) => Stream<A, E, R2 | R>
  <A, E, R, X, E0 extends E, R2>(self: Stream<A, E, R>, schedule: Schedule.Schedule<X, E0, R2>): Stream<A, E, R | R2>

Added in v2.0.0


Statefully maps over the elements of this stream to produce all intermediate results of type S given an initial S.


export declare const scan: {
  <S, A>(s: S, f: (s: S, a: A) => S): <E, R>(self: Stream<A, E, R>) => Stream<S, E, R>
  <A, E, R, S>(self: Stream<A, E, R>, s: S, f: (s: S, a: A) => S): Stream<S, E, R>


import { Effect, Stream } from "effect"

const stream = Stream.range(1, 6).pipe(Stream.scan(0, (a, b) => a + b))

// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 0,  1,  3, 6, 10, 15, 21 ] }

Added in v2.0.0


Statefully and effectfully maps over the elements of this stream to produce all intermediate results of type S given an initial S.


export declare const scanEffect: {
  <S, A, E2, R2>(
    s: S,
    f: (s: S, a: A) => Effect.Effect<S, E2, R2>
  ): <E, R>(self: Stream<A, E, R>) => Stream<S, E2 | E, R2 | R>
  <A, E, R, S, E2, R2>(
    self: Stream<A, E, R>,
    s: S,
    f: (s: S, a: A) => Effect.Effect<S, E2, R2>
  ): Stream<S, E | E2, R | R2>

Added in v2.0.0


Statefully maps over the elements of this stream to produce all intermediate results.

See also Stream.scan.


export declare const scanReduce: {
  <A2, A>(f: (a2: A2 | A, a: A) => A2): <E, R>(self: Stream<A, E, R>) => Stream<A2 | A, E, R>
  <A, E, R, A2>(self: Stream<A, E, R>, f: (a2: A | A2, a: A) => A2): Stream<A | A2, E, R>

Added in v2.0.0


Statefully and effectfully maps over the elements of this stream to produce all intermediate results.

See also Stream.scanEffect.


export declare const scanReduceEffect: {
  <A2, A, E2, R2>(
    f: (a2: A2 | A, a: A) => Effect.Effect<A2 | A, E2, R2>
  ): <E, R>(self: Stream<A, E, R>) => Stream<A2 | A, E2 | E, R2 | R>
  <A, E, R, A2, E2, R2>(
    self: Stream<A, E, R>,
    f: (a2: A | A2, a: A) => Effect.Effect<A | A2, E2, R2>
  ): Stream<A | A2, E | E2, R | R2>

Added in v2.0.0


Schedules the output of the stream using the provided schedule.


export declare const schedule: {
  <X, A0 extends A, R2, A>(
    schedule: Schedule.Schedule<X, A0, R2>
  ): <E, R>(self: Stream<A, E, R>) => Stream<A, E, R2 | R>
  <A, E, R, X, A0 extends A, R2>(self: Stream<A, E, R>, schedule: Schedule.Schedule<X, A0, R2>): Stream<A, E, R | R2>

Added in v2.0.0


Schedules the output of the stream using the provided schedule and emits its output at the end (if schedule is finite). Uses the provided function to align the stream and schedule outputs on the same type.


export declare const scheduleWith: {
  <B, A0 extends A, R2, A, C>(
    schedule: Schedule.Schedule<B, A0, R2>,
    options: { readonly onElement: (a: A) => C; readonly onSchedule: (b: B) => C }
  ): <E, R>(self: Stream<A, E, R>) => Stream<C, E, R2 | R>
  <A, E, R, B, A0 extends A, R2, C>(
    self: Stream<A, E, R>,
    schedule: Schedule.Schedule<B, A0, R2>,
    options: { readonly onElement: (a: A) => C; readonly onSchedule: (b: B) => C }
  ): Stream<C, E, R | R2>

Added in v2.0.0


Returns a new Stream that multicasts the original Stream, subscribing to it as soon as the first consumer subscribes. As long as there is at least one consumer, the upstream will continue running and emitting data. When all consumers have exited, the upstream will be finalized.


export declare const share: {
  <A, E>(
      | {
          readonly capacity: "unbounded"
          readonly replay?: number | undefined
          readonly idleTimeToLive?: Duration.DurationInput | undefined
      | {
          readonly capacity: number
          readonly strategy?: "sliding" | "dropping" | "suspend" | undefined
          readonly replay?: number | undefined
          readonly idleTimeToLive?: Duration.DurationInput | undefined
  ): <R>(self: Stream<A, E, R>) => Effect.Effect<Stream<A, E>, never, R | Scope.Scope>
  <A, E, R>(
    self: Stream<A, E, R>,
      | {
          readonly capacity: "unbounded"
          readonly replay?: number | undefined
          readonly idleTimeToLive?: Duration.DurationInput | undefined
      | {
          readonly capacity: number
          readonly strategy?: "sliding" | "dropping" | "suspend" | undefined
          readonly replay?: number | undefined
          readonly idleTimeToLive?: Duration.DurationInput | undefined
  ): Effect.Effect<Stream<A, E>, never, R | Scope.Scope>

Added in v3.8.0


Emits a sliding window of n elements.

import * as Stream from "./Stream"
import { pipe } from "./Function"

pipe(Stream.make(1, 2, 3, 4), Stream.sliding(2), Stream.runCollect)
// => Chunk(Chunk(1, 2), Chunk(2, 3), Chunk(3, 4))


export declare const sliding: {
  (chunkSize: number): <A, E, R>(self: Stream<A, E, R>) => Stream<Chunk.Chunk<A>, E, R>
  <A, E, R>(self: Stream<A, E, R>, chunkSize: number): Stream<Chunk.Chunk<A>, E, R>

Added in v2.0.0


Like sliding, but with a configurable stepSize parameter.


export declare const slidingSize: {
  (chunkSize: number, stepSize: number): <A, E, R>(self: Stream<A, E, R>) => Stream<Chunk.Chunk<A>, E, R>
  <A, E, R>(self: Stream<A, E, R>, chunkSize: number, stepSize: number): Stream<Chunk.Chunk<A>, E, R>

Added in v2.0.0


Converts an option on values into an option on errors.


export declare const some: <A, E, R>(self: Stream<Option.Option<A>, E, R>) => Stream<A, Option.Option<E>, R>

Added in v2.0.0


Extracts the optional value, or returns the given ‘default’.


export declare const someOrElse: {
  <A2>(fallback: LazyArg<A2>): <A, E, R>(self: Stream<Option.Option<A>, E, R>) => Stream<A2 | A, E, R>
  <A, E, R, A2>(self: Stream<Option.Option<A>, E, R>, fallback: LazyArg<A2>): Stream<A | A2, E, R>

Added in v2.0.0


Extracts the optional value, or fails with the given error ‘e’.


export declare const someOrFail: {
  <E2>(error: LazyArg<E2>): <A, E, R>(self: Stream<Option.Option<A>, E, R>) => Stream<A, E2 | E, R>
  <A, E, R, E2>(self: Stream<Option.Option<A>, E, R>, error: LazyArg<E2>): Stream<A, E | E2, R>

Added in v2.0.0


Splits elements based on a predicate.

import * as Stream from "./Stream"
import { pipe } from "./Function"

  Stream.range(1, 10),
  Stream.split((n) => n % 4 === 0),
// => Chunk(Chunk(1, 2, 3), Chunk(5, 6, 7), Chunk(9))


export declare const split: {
  <A>(predicate: Predicate<NoInfer<A>>): <E, R>(self: Stream<A, E, R>) => Stream<Chunk.Chunk<A>, E, R>
  <A, E, R>(self: Stream<A, E, R>, predicate: Predicate<A>): Stream<Chunk.Chunk<A>, E, R>

Added in v2.0.0


Splits elements on a delimiter and transforms the splits into desired output.


export declare const splitOnChunk: {
  <A>(delimiter: Chunk.Chunk<A>): <E, R>(self: Stream<A, E, R>) => Stream<Chunk.Chunk<A>, E, R>
  <A, E, R>(self: Stream<A, E, R>, delimiter: Chunk.Chunk<A>): Stream<Chunk.Chunk<A>, E, R>

Added in v2.0.0


Takes the specified number of elements from this stream.


export declare const take: {
  (n: number): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, R>
  <A, E, R>(self: Stream<A, E, R>, n: number): Stream<A, E, R>


import { Effect, Stream } from "effect"

const stream = Stream.take(
  Stream.iterate(0, (n) => n + 1),

// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 0, 1, 2, 3, 4 ] }

Added in v2.0.0


Takes the last specified number of elements from this stream.


export declare const takeRight: {
  (n: number): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, R>
  <A, E, R>(self: Stream<A, E, R>, n: number): Stream<A, E, R>


import { Effect, Stream } from "effect"

const stream = Stream.takeRight(Stream.make(1, 2, 3, 4, 5, 6), 3)

// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 4, 5, 6 ] }

Added in v2.0.0


Takes all elements of the stream until the specified predicate evaluates to true.


export declare const takeUntil: {
  <A>(predicate: Predicate<NoInfer<A>>): <E, R>(self: Stream<A, E, R>) => Stream<A, E, R>
  <A, E, R>(self: Stream<A, E, R>, predicate: Predicate<A>): Stream<A, E, R>


import { Effect, Stream } from "effect"

const stream = Stream.takeUntil(
  Stream.iterate(0, (n) => n + 1),
  (n) => n === 4

// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 0, 1, 2, 3, 4 ] }

Added in v2.0.0


Takes all elements of the stream until the specified effectual predicate evaluates to true.


export declare const takeUntilEffect: {
  <A, E2, R2>(
    predicate: (a: NoInfer<A>) => Effect.Effect<boolean, E2, R2>
  ): <E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, R2 | R>
  <A, E, R, E2, R2>(
    self: Stream<A, E, R>,
    predicate: (a: A) => Effect.Effect<boolean, E2, R2>
  ): Stream<A, E | E2, R | R2>

Added in v2.0.0


Takes all elements of the stream for as long as the specified predicate evaluates to true.


export declare const takeWhile: {
  <A, B extends A>(refinement: Refinement<NoInfer<A>, B>): <E, R>(self: Stream<A, E, R>) => Stream<B, E, R>
  <A>(predicate: Predicate<NoInfer<A>>): <E, R>(self: Stream<A, E, R>) => Stream<A, E, R>
  <A, E, R, B extends A>(self: Stream<A, E, R>, refinement: Refinement<A, B>): Stream<B, E, R>
  <A, E, R>(self: Stream<A, E, R>, predicate: Predicate<A>): Stream<A, E, R>


import { Effect, Stream } from "effect"

const stream = Stream.takeWhile(
  Stream.iterate(0, (n) => n + 1),
  (n) => n < 5

// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 0, 1, 2, 3, 4 ] }

Added in v2.0.0


Returns a stream that effectfully “peeks” at the cause of failure of the stream.


export declare const tapErrorCause: {
  <E, X, E2, R2>(
    f: (cause: Cause.Cause<NoInfer<E>>) => Effect.Effect<X, E2, R2>
  ): <A, R>(self: Stream<A, E, R>) => Stream<A, E | E2, R2 | R>
  <A, E, R, X, E2, R2>(
    self: Stream<A, E, R>,
    f: (cause: Cause.Cause<E>) => Effect.Effect<X, E2, R2>
  ): Stream<A, E | E2, R | R2>

Added in v2.0.0


Delays the chunks of this stream according to the given bandwidth parameters using the token bucket algorithm. Allows for burst in the processing of elements by allowing the token bucket to accumulate tokens up to a units + burst threshold. The weight of each chunk is determined by the cost function.

If using the “enforce” strategy, chunks that do not meet the bandwidth constraints are dropped. If using the “shape” strategy, chunks are delayed until they can be emitted without exceeding the bandwidth constraints.

Defaults to the “shape” strategy.


export declare const throttle: {
  <A>(options: {
    readonly cost: (chunk: Chunk.Chunk<A>) => number
    readonly units: number
    readonly duration: Duration.DurationInput
    readonly burst?: number | undefined
    readonly strategy?: "enforce" | "shape" | undefined
  }): <E, R>(self: Stream<A, E, R>) => Stream<A, E, R>
  <A, E, R>(
    self: Stream<A, E, R>,
    options: {
      readonly cost: (chunk: Chunk.Chunk<A>) => number
      readonly units: number
      readonly duration: Duration.DurationInput
      readonly burst?: number | undefined
      readonly strategy?: "enforce" | "shape" | undefined
  ): Stream<A, E, R>


import { Chunk, Effect, Schedule, Stream } from "effect"

let last =
const log = (message: string) =>
  Effect.sync(() => {
    const end =
    console.log(`${message} after ${end - last}ms`)
    last = end

const stream = Stream.fromSchedule(Schedule.spaced("50 millis")).pipe(
  Stream.tap((n) => log(`Received ${n}`)),
    cost: Chunk.size,
    duration: "100 millis",
    units: 1
  Stream.tap((n) => log(`> Emitted ${n}`))

// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// Received 0 after 56ms
// > Emitted 0 after 0ms
// Received 1 after 52ms
// > Emitted 1 after 48ms
// Received 2 after 52ms
// > Emitted 2 after 49ms
// Received 3 after 52ms
// > Emitted 3 after 48ms
// Received 4 after 52ms
// > Emitted 4 after 47ms
// Received 5 after 52ms
// > Emitted 5 after 49ms
// { _id: 'Chunk', values: [ 0, 1, 2, 3, 4, 5 ] }

Added in v2.0.0


Delays the chunks of this stream according to the given bandwidth parameters using the token bucket algorithm. Allows for burst in the processing of elements by allowing the token bucket to accumulate tokens up to a units + burst threshold. The weight of each chunk is determined by the effectful costFn function.

If using the “enforce” strategy, chunks that do not meet the bandwidth constraints are dropped. If using the “shape” strategy, chunks are delayed until they can be emitted without exceeding the bandwidth constraints.

Defaults to the “shape” strategy.


export declare const throttleEffect: {
  <A, E2, R2>(options: {
    readonly cost: (chunk: Chunk.Chunk<A>) => Effect.Effect<number, E2, R2>
    readonly units: number
    readonly duration: Duration.DurationInput
    readonly burst?: number | undefined
    readonly strategy?: "enforce" | "shape" | undefined
  }): <E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, R2 | R>
  <A, E, R, E2, R2>(
    self: Stream<A, E, R>,
    options: {
      readonly cost: (chunk: Chunk.Chunk<A>) => Effect.Effect<number, E2, R2>
      readonly units: number
      readonly duration: Duration.DurationInput
      readonly burst?: number | undefined
      readonly strategy?: "enforce" | "shape" | undefined
  ): Stream<A, E | E2, R | R2>

Added in v2.0.0


Ends the stream if it does not produce a value after the specified duration.


export declare const timeout: {
  (duration: Duration.DurationInput): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, R>
  <A, E, R>(self: Stream<A, E, R>, duration: Duration.DurationInput): Stream<A, E, R>

Added in v2.0.0


Fails the stream with given error if it does not produce a value after d duration.


export declare const timeoutFail: {
  <E2>(error: LazyArg<E2>, duration: Duration.DurationInput): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, R>
  <A, E, R, E2>(self: Stream<A, E, R>, error: LazyArg<E2>, duration: Duration.DurationInput): Stream<A, E | E2, R>

Added in v2.0.0


Fails the stream with given cause if it does not produce a value after d duration.


export declare const timeoutFailCause: {
    cause: LazyArg<Cause.Cause<E2>>,
    duration: Duration.DurationInput
  ): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, R>
  <A, E, R, E2>(
    self: Stream<A, E, R>,
    cause: LazyArg<Cause.Cause<E2>>,
    duration: Duration.DurationInput
  ): Stream<A, E | E2, R>

Added in v2.0.0


Switches the stream if it does not produce a value after the specified duration.


export declare const timeoutTo: {
  <A2, E2, R2>(
    duration: Duration.DurationInput,
    that: Stream<A2, E2, R2>
  ): <A, E, R>(self: Stream<A, E, R>) => Stream<A2 | A, E2 | E, R2 | R>
  <A, E, R, A2, E2, R2>(
    self: Stream<A, E, R>,
    duration: Duration.DurationInput,
    that: Stream<A2, E2, R2>
  ): Stream<A | A2, E | E2, R | R2>

Added in v2.0.0


Applies the transducer to the stream and emits its outputs.


export declare const transduce: {
  <A2, A, E2, R2>(sink: Sink.Sink<A2, A, A, E2, R2>): <E, R>(self: Stream<A, E, R>) => Stream<A2, E2 | E, R2 | R>
  <A, E, R, A2, E2, R2>(self: Stream<A, E, R>, sink: Sink.Sink<A2, A, A, E2, R2>): Stream<A2, E | E2, R | R2>

Added in v2.0.0


Returns the specified stream if the given condition is satisfied, otherwise returns an empty stream.


export declare const when: {
  (test: LazyArg<boolean>): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, R>
  <A, E, R>(self: Stream<A, E, R>, test: LazyArg<boolean>): Stream<A, E, R>

Added in v2.0.0


Returns the stream when the given partial function is defined for the given effectful value, otherwise returns an empty stream.


export declare const whenCaseEffect: {
  <A, A2, E2, R2>(
    pf: (a: A) => Option.Option<Stream<A2, E2, R2>>
  ): <E, R>(self: Effect.Effect<A, E, R>) => Stream<A2, E2 | E, R2 | R>
  <A, E, R, A2, E2, R2>(
    self: Effect.Effect<A, E, R>,
    pf: (a: A) => Option.Option<Stream<A2, E2, R2>>
  ): Stream<A2, E | E2, R | R2>

Added in v2.0.0


Returns the stream if the given effectful condition is satisfied, otherwise returns an empty stream.


export declare const whenEffect: {
  <E2, R2>(effect: Effect.Effect<boolean, E2, R2>): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, R2 | R>
  <A, E, R, E2, R2>(self: Stream<A, E, R>, effect: Effect.Effect<boolean, E2, R2>): Stream<A, E | E2, R | R2>

Added in v2.0.0



Zips this stream with another point-wise and emits tuples of elements from both streams.

The new stream will end when one of the sides ends.


export declare const zip: {
  <A2, E2, R2>(that: Stream<A2, E2, R2>): <A, E, R>(self: Stream<A, E, R>) => Stream<[A, A2], E2 | E, R2 | R>
  <A, E, R, A2, E2, R2>(self: Stream<A, E, R>, that: Stream<A2, E2, R2>): Stream<[A, A2], E | E2, R | R2>


import { Effect, Stream } from "effect"

// We create two streams and zip them together.
const stream =, 2, 3, 4, 5, 6), Stream.make("a", "b", "c"))

// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ [ 1, 'a' ], [ 2, 'b' ], [ 3, 'c' ] ] }

Added in v2.0.0


Zips this stream with another point-wise, creating a new stream of pairs of elements from both sides.

The defaults defaultLeft and defaultRight will be used if the streams have different lengths and one of the streams has ended before the other.


export declare const zipAll: {
  <A2, E2, R2, A>(options: {
    readonly other: Stream<A2, E2, R2>
    readonly defaultSelf: A
    readonly defaultOther: A2
  }): <E, R>(self: Stream<A, E, R>) => Stream<[A, A2], E2 | E, R2 | R>
  <A, E, R, A2, E2, R2>(
    self: Stream<A, E, R>,
    options: { readonly other: Stream<A2, E2, R2>; readonly defaultSelf: A; readonly defaultOther: A2 }
  ): Stream<[A, A2], E | E2, R | R2>


import { Effect, Stream } from "effect"

const stream = Stream.zipAll(Stream.make(1, 2, 3, 4, 5, 6), {
  other: Stream.make("a", "b", "c"),
  defaultSelf: 0,
  defaultOther: "x"

// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: "Chunk", values: [ [ 1, "a" ], [ 2, "b" ], [ 3, "c" ], [ 4, "x" ], [ 5, "x" ], [ 6, "x" ] ] }

Added in v2.0.0


Zips this stream with another point-wise, and keeps only elements from this stream.

The provided default value will be used if the other stream ends before this one.


export declare const zipAllLeft: {
  <A2, E2, R2, A>(that: Stream<A2, E2, R2>, defaultLeft: A): <E, R>(self: Stream<A, E, R>) => Stream<A, E2 | E, R2 | R>
  <A, E, R, A2, E2, R2>(self: Stream<A, E, R>, that: Stream<A2, E2, R2>, defaultLeft: A): Stream<A, E | E2, R | R2>

Added in v2.0.0


Zips this stream with another point-wise, and keeps only elements from the other stream.

The provided default value will be used if this stream ends before the other one.


export declare const zipAllRight: {
  <A2, E2, R2>(
    that: Stream<A2, E2, R2>,
    defaultRight: A2
  ): <A, E, R>(self: Stream<A, E, R>) => Stream<A2, E2 | E, R2 | R>
  <A, E, R, A2, E2, R2>(self: Stream<A, E, R>, that: Stream<A2, E2, R2>, defaultRight: A2): Stream<A2, E | E2, R | R2>

Added in v2.0.0


Zips this stream that is sorted by distinct keys and the specified stream that is sorted by distinct keys to produce a new stream that is sorted by distinct keys. Combines values associated with each key into a tuple, using the specified values defaultLeft and defaultRight to fill in missing values.

This allows zipping potentially unbounded streams of data by key in constant space but the caller is responsible for ensuring that the streams are sorted by distinct keys.


export declare const zipAllSortedByKey: {
  <A2, E2, R2, A, K>(options: {
    readonly other: Stream<readonly [K, A2], E2, R2>
    readonly defaultSelf: A
    readonly defaultOther: A2
    readonly order: Order.Order<K>
  }): <E, R>(self: Stream<readonly [K, A], E, R>) => Stream<[K, [A, A2]], E2 | E, R2 | R>
  <K, A, E, R, A2, E2, R2>(
    self: Stream<readonly [K, A], E, R>,
    options: {
      readonly other: Stream<readonly [K, A2], E2, R2>
      readonly defaultSelf: A
      readonly defaultOther: A2
      readonly order: Order.Order<K>
  ): Stream<[K, [A, A2]], E | E2, R | R2>

Added in v2.0.0


Zips this stream that is sorted by distinct keys and the specified stream that is sorted by distinct keys to produce a new stream that is sorted by distinct keys. Keeps only values from this stream, using the specified value default to fill in missing values.

This allows zipping potentially unbounded streams of data by key in constant space but the caller is responsible for ensuring that the streams are sorted by distinct keys.


export declare const zipAllSortedByKeyLeft: {
  <A2, E2, R2, A, K>(options: {
    readonly other: Stream<readonly [K, A2], E2, R2>
    readonly defaultSelf: A
    readonly order: Order.Order<K>
  }): <E, R>(self: Stream<readonly [K, A], E, R>) => Stream<[K, A], E2 | E, R2 | R>
  <K, A, E, R, A2, E2, R2>(
    self: Stream<readonly [K, A], E, R>,
    options: {
      readonly other: Stream<readonly [K, A2], E2, R2>
      readonly defaultSelf: A
      readonly order: Order.Order<K>
  ): Stream<[K, A], E | E2, R | R2>

Added in v2.0.0


Zips this stream that is sorted by distinct keys and the specified stream that is sorted by distinct keys to produce a new stream that is sorted by distinct keys. Keeps only values from that stream, using the specified value default to fill in missing values.

This allows zipping potentially unbounded streams of data by key in constant space but the caller is responsible for ensuring that the streams are sorted by distinct keys.


export declare const zipAllSortedByKeyRight: {
  <K, A2, E2, R2>(options: {
    readonly other: Stream<readonly [K, A2], E2, R2>
    readonly defaultOther: A2
    readonly order: Order.Order<K>
  }): <A, E, R>(self: Stream<readonly [K, A], E, R>) => Stream<[K, A2], E2 | E, R2 | R>
  <A, E, R, K, A2, E2, R2>(
    self: Stream<readonly [K, A], E, R>,
    options: {
      readonly other: Stream<readonly [K, A2], E2, R2>
      readonly defaultOther: A2
      readonly order: Order.Order<K>
  ): Stream<[K, A2], E | E2, R | R2>

Added in v2.0.0


Zips this stream that is sorted by distinct keys and the specified stream that is sorted by distinct keys to produce a new stream that is sorted by distinct keys. Uses the functions left, right, and both to handle the cases where a key and value exist in this stream, that stream, or both streams.

This allows zipping potentially unbounded streams of data by key in constant space but the caller is responsible for ensuring that the streams are sorted by distinct keys.


export declare const zipAllSortedByKeyWith: {
  <K, A2, E2, R2, A, A3>(options: {
    readonly other: Stream<readonly [K, A2], E2, R2>
    readonly onSelf: (a: A) => A3
    readonly onOther: (a2: A2) => A3
    readonly onBoth: (a: A, a2: A2) => A3
    readonly order: Order.Order<K>
  }): <E, R>(self: Stream<readonly [K, A], E, R>) => Stream<[K, A3], E2 | E, R2 | R>
  <K, A, E, R, A2, E2, R2, A3>(
    self: Stream<readonly [K, A], E, R>,
    options: {
      readonly other: Stream<readonly [K, A2], E2, R2>
      readonly onSelf: (a: A) => A3
      readonly onOther: (a2: A2) => A3
      readonly onBoth: (a: A, a2: A2) => A3
      readonly order: Order.Order<K>
  ): Stream<[K, A3], E | E2, R | R2>

Added in v2.0.0


Zips this stream with another point-wise. The provided functions will be used to create elements for the composed stream.

The functions left and right will be used if the streams have different lengths and one of the streams has ended before the other.


export declare const zipAllWith: {
  <A2, E2, R2, A, A3>(options: {
    readonly other: Stream<A2, E2, R2>
    readonly onSelf: (a: A) => A3
    readonly onOther: (a2: A2) => A3
    readonly onBoth: (a: A, a2: A2) => A3
  }): <E, R>(self: Stream<A, E, R>) => Stream<A3, E2 | E, R2 | R>
  <A, E, R, A2, E2, R2, A3>(
    self: Stream<A, E, R>,
    options: {
      readonly other: Stream<A2, E2, R2>
      readonly onSelf: (a: A) => A3
      readonly onOther: (a2: A2) => A3
      readonly onBoth: (a: A, a2: A2) => A3
  ): Stream<A3, E | E2, R | R2>


import { Effect, Stream } from "effect"

const stream = Stream.zipAllWith(Stream.make(1, 2, 3, 4, 5, 6), {
  other: Stream.make("a", "b", "c"),
  onSelf: (n) => [n, "x"],
  onOther: (s) => [0, s],
  onBoth: (n, s) => [n - s.length, s]

// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: "Chunk", values: [ [ 0, "a" ], [ 1, "b" ], [ 2, "c" ], [ 4, "x" ], [ 5, "x" ], [ 6, "x" ] ] }

Added in v2.0.0


Zips this stream with another point-wise and emits tuples of elements from both streams.

The new stream will end when one of the sides ends.


export declare const zipFlatten: {
  <A2, E2, R2>(
    that: Stream<A2, E2, R2>
  ): <A extends ReadonlyArray<any>, E, R>(self: Stream<A, E, R>) => Stream<[...A, A2], E2 | E, R2 | R>
  <A extends ReadonlyArray<any>, E, R, A2, E2, R2>(
    self: Stream<A, E, R>,
    that: Stream<A2, E2, R2>
  ): Stream<[...A, A2], E | E2, R | R2>

Added in v2.0.0


Zips the two streams so that when a value is emitted by either of the two streams, it is combined with the latest value from the other stream to produce a result.

Note: tracking the latest value is done on a per-chunk basis. That means that emitted elements that are not the last value in chunks will never be used for zipping.


export declare const zipLatest: {
  <AR, ER, RR>(right: Stream<AR, ER, RR>): <AL, EL, RL>(left: Stream<AL, EL, RL>) => Stream<[AL, AR], EL | ER, RL | RR>
  <AL, EL, RL, AR, ER, RR>(left: Stream<AL, EL, RL>, right: Stream<AR, ER, RR>): Stream<[AL, AR], EL | ER, RL | RR>


import { Effect, Schedule, Stream } from "effect"

const s1 = Stream.make(1, 2, 3).pipe(Stream.schedule(Schedule.spaced("1 second")))

const s2 = Stream.make("a", "b", "c", "d").pipe(Stream.schedule(Schedule.spaced("500 millis")))

const stream = Stream.zipLatest(s1, s2)

// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: "Chunk", values: [ [ 1, "a" ], [ 1, "b" ], [ 2, "b" ], [ 2, "c" ], [ 2, "d" ], [ 3, "d" ] ] }

Added in v2.0.0


Zips multiple streams so that when a value is emitted by any of the streams, it is combined with the latest values from the other streams to produce a result.

Note: tracking the latest value is done on a per-chunk basis. That means that emitted elements that are not the last value in chunks will never be used for zipping.


export declare const zipLatestAll: <T extends ReadonlyArray<Stream<any, any, any>>>(
  ...streams: T
) => Stream<
  [T[number]] extends [never]
    ? never
    : { [K in keyof T]: T[K] extends Stream<infer A, infer _E, infer _R> ? A : never },
  [T[number]] extends [never] ? never : T[number] extends Stream<infer _A, infer _E, infer _R> ? _E : never,
  [T[number]] extends [never] ? never : T[number] extends Stream<infer _A, infer _E, infer _R> ? _R : never


import { Stream, Schedule, Console, Effect } from "effect"

const stream = Stream.zipLatestAll(
  Stream.fromSchedule(Schedule.spaced("1 millis")),
  Stream.fromSchedule(Schedule.spaced("2 millis")),
  Stream.fromSchedule(Schedule.spaced("4 millis"))
).pipe(Stream.take(6), Stream.tap(Console.log))

// Effect.runPromise(Stream.runDrain(stream))
// Output:
// [ 0, 0, 0 ]
// [ 1, 0, 0 ]
// [ 1, 1, 0 ]
// [ 2, 1, 0 ]
// [ 3, 1, 0 ]
// [ 3, 1, 1 ]
// .....

Added in v3.3.0


Zips the two streams so that when a value is emitted by either of the two streams, it is combined with the latest value from the other stream to produce a result.

Note: tracking the latest value is done on a per-chunk basis. That means that emitted elements that are not the last value in chunks will never be used for zipping.


export declare const zipLatestWith: {
  <AR, ER, RR, AL, A>(
    right: Stream<AR, ER, RR>,
    f: (left: AL, right: AR) => A
  ): <EL, RL>(left: Stream<AL, EL, RL>) => Stream<A, EL | ER, RL | RR>
  <AL, EL, RL, AR, ER, RR, A>(
    left: Stream<AL, EL, RL>,
    right: Stream<AR, ER, RR>,
    f: (left: AL, right: AR) => A
  ): Stream<A, EL | ER, RL | RR>

Added in v2.0.0


Zips this stream with another point-wise, but keeps only the outputs of left stream.

The new stream will end when one of the sides ends.


export declare const zipLeft: {
  <AR, ER, RR>(right: Stream<AR, ER, RR>): <AL, EL, RL>(left: Stream<AL, EL, RL>) => Stream<AL, ER | EL, RR | RL>
  <AL, EL, RL, AR, ER, RR>(left: Stream<AL, EL, RL>, right: Stream<AR, ER, RR>): Stream<AL, EL | ER, RL | RR>

Added in v2.0.0


Zips this stream with another point-wise, but keeps only the outputs of the right stream.

The new stream will end when one of the sides ends.


export declare const zipRight: {
  <AR, ER, RR>(right: Stream<AR, ER, RR>): <AL, EL, RL>(left: Stream<AL, EL, RL>) => Stream<AR, ER | EL, RR | RL>
  <AL, EL, RL, AR, ER, RR>(left: Stream<AL, EL, RL>, right: Stream<AR, ER, RR>): Stream<AR, EL | ER, RL | RR>

Added in v2.0.0


Zips this stream with another point-wise and applies the function to the paired elements.

The new stream will end when one of the sides ends.


export declare const zipWith: {
  <AR, ER, RR, AL, A>(
    right: Stream<AR, ER, RR>,
    f: (left: AL, right: AR) => A
  ): <EL, RL>(left: Stream<AL, EL, RL>) => Stream<A, EL | ER, RL | RR>
  <AL, EL, RL, AR, ER, RR, A>(
    left: Stream<AL, EL, RL>,
    right: Stream<AR, ER, RR>,
    f: (left: AL, right: AR) => A
  ): Stream<A, EL | ER, RL | RR>


import { Effect, Stream } from "effect"

// We create two streams and zip them with custom logic.
const stream = Stream.zipWith(Stream.make(1, 2, 3, 4, 5, 6), Stream.make("a", "b", "c"), (n, s) => [n - s.length, s])

// Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ [ 0, 'a' ], [ 1, 'b' ], [ 2, 'c' ] ] }

Added in v2.0.0


Zips this stream with another point-wise and applies the function to the paired elements.

The new stream will end when one of the sides ends.


export declare const zipWithChunks: {
  <A2, E2, R2, A, A3>(
    that: Stream<A2, E2, R2>,
    f: (
      left: Chunk.Chunk<A>,
      right: Chunk.Chunk<A2>
    ) => readonly [Chunk.Chunk<A3>, Either.Either<Chunk.Chunk<A2>, Chunk.Chunk<A>>]
  ): <E, R>(self: Stream<A, E, R>) => Stream<A3, E2 | E, R2 | R>
  <A, E, R, A2, E2, R2, A3>(
    self: Stream<A, E, R>,
    that: Stream<A2, E2, R2>,
    f: (
      left: Chunk.Chunk<A>,
      right: Chunk.Chunk<A2>
    ) => readonly [Chunk.Chunk<A3>, Either.Either<Chunk.Chunk<A2>, Chunk.Chunk<A>>]
  ): Stream<A3, E | E2, R | R2>

Added in v2.0.0


Zips this stream together with the index of elements.


export declare const zipWithIndex: <A, E, R>(self: Stream<A, E, R>) => Stream<[A, number], E, R>


import { Effect, Stream } from "effect"

const stream = Stream.make("Mary", "James", "Robert", "Patricia")

const indexedStream = Stream.zipWithIndex(stream)

// Effect.runPromise(Stream.runCollect(indexedStream)).then(console.log)
// {
//   _id: 'Chunk',
//   values: [ [ 'Mary', 0 ], [ 'James', 1 ], [ 'Robert', 2 ], [ 'Patricia', 3 ] ]
// }

Added in v2.0.0


Zips each element with the next element if present.


export declare const zipWithNext: <A, E, R>(self: Stream<A, E, R>) => Stream<[A, Option.Option<A>], E, R>


import { Chunk, Effect, Stream } from "effect"

const stream = Stream.zipWithNext(Stream.make(1, 2, 3, 4))

// Effect.runPromise(Stream.runCollect(stream)).then((chunk) => console.log(Chunk.toArray(chunk)))
// [
//   [ 1, { _id: 'Option', _tag: 'Some', value: 2 } ],
//   [ 2, { _id: 'Option', _tag: 'Some', value: 3 } ],
//   [ 3, { _id: 'Option', _tag: 'Some', value: 4 } ],
//   [ 4, { _id: 'Option', _tag: 'None' } ]
// ]

Added in v2.0.0


Zips each element with the previous element. Initially accompanied by None.


export declare const zipWithPrevious: <A, E, R>(self: Stream<A, E, R>) => Stream<[Option.Option<A>, A], E, R>


import { Chunk, Effect, Stream } from "effect"

const stream = Stream.zipWithPrevious(Stream.make(1, 2, 3, 4))

// Effect.runPromise(Stream.runCollect(stream)).then((chunk) => console.log(Chunk.toArray(chunk)))
// [
//   [ { _id: 'Option', _tag: 'None' }, 1 ],
//   [ { _id: 'Option', _tag: 'Some', value: 1 }, 2 ],
//   [ { _id: 'Option', _tag: 'Some', value: 2 }, 3 ],
//   [ { _id: 'Option', _tag: 'Some', value: 3 }, 4 ]
// ]

Added in v2.0.0


Zips each element with both the previous and next element.


export declare const zipWithPreviousAndNext: <A, E, R>(
  self: Stream<A, E, R>
) => Stream<[Option.Option<A>, A, Option.Option<A>], E, R>


import { Chunk, Effect, Stream } from "effect"

const stream = Stream.zipWithPreviousAndNext(Stream.make(1, 2, 3, 4))

// Effect.runPromise(Stream.runCollect(stream)).then((chunk) => console.log(Chunk.toArray(chunk)))
// [
//   [
//     { _id: 'Option', _tag: 'None' },
//     1,
//     { _id: 'Option', _tag: 'Some', value: 2 }
//   ],
//   [
//     { _id: 'Option', _tag: 'Some', value: 1 },
//     2,
//     { _id: 'Option', _tag: 'Some', value: 3 }
//   ],
//   [
//     { _id: 'Option', _tag: 'Some', value: 2 },
//     3,
//     { _id: 'Option', _tag: 'Some', value: 4 }
//   ],
//   [
//     { _id: 'Option', _tag: 'Some', value: 3 },
//     4,
//     { _id: 'Option', _tag: 'None' }
//   ]
// ]

Added in v2.0.0