Skip to main content Link Search Menu Expand Document (external link)

DurableQueue.ts overview

Since v1.0.0


Exports Grouped by Category


Constructors

make

A DurableQueue wraps a PersistedQueue, providing a way to wait for items to finish processing using a DurableDeferred.

import { DurableQueue, Workflow } from "@effect/workflow"
import { Effect, Schema } from "effect"

// Define a DurableQueue that can be used to derive workers and offer items for
// processing.
const ApiQueue = DurableQueue.make({
  name: "ApiQueue",
  payload: {
    id: Schema.String
  },
  success: Schema.Void,
  error: Schema.Never,
  idempotencyKey(payload) {
    return payload.id
  }
})

const MyWorkflow = Workflow.make({
  name: "MyWorkflow",
  payload: {
    id: Schema.String
  },
  idempotencyKey: ({ id }) => id
})

const MyWorkflowLayer = MyWorkflow.toLayer(
  Effect.fn(function* () {
    // Add an item to the DurableQueue defined above.
    //
    // When the worker has finished processing the item, the workflow will
    // resume.
    //
    yield* DurableQueue.process(ApiQueue, { id: "api-call-1" })

    yield* Effect.log("Workflow succeeded!")
  })
)

// Define a worker layer that can process items from the DurableQueue.
const ApiWorker = DurableQueue.worker(
  ApiQueue,
  Effect.fn(function* ({ id }) {
    yield* Effect.log(`Worker processing API call with id: ${id}`)
  }),
  { concurrency: 5 } // Process up to 5 items concurrently
)

Signature

declare const make: <
  Payload extends Schema.Schema.Any | Schema.Struct.Fields,
  Success extends Schema.Schema.Any = typeof Schema.Void,
  Error extends Schema.Schema.All = typeof Schema.Never
>(options: {
  readonly name: string
  readonly payload: Payload
  readonly idempotencyKey: (
    payload: Payload extends Schema.Struct.Fields ? Schema.Struct<Payload>["Type"] : Payload["Type"]
  ) => string
  readonly success?: Success | undefined
  readonly error?: Error | undefined
}) => DurableQueue<Payload extends Schema.Struct.Fields ? Schema.Struct<Payload> : Payload, Success, Error>

Source

Since v1.0.0

Models

DurableQueue (interface)

Signature

export interface DurableQueue<
  Payload extends Schema.Schema.Any,
  Success extends Schema.Schema.Any = typeof Schema.Void,
  Error extends Schema.Schema.All = typeof Schema.Never
> {
  readonly [TypeId]: TypeId
  readonly name: string
  readonly payloadSchema: Payload
  readonly idempotencyKey: (payload: Payload["Type"]) => string
  readonly deferred: DurableDeferred.DurableDeferred<Success, Error>
}

Source

Since v1.0.0

Processing

process

Signature

declare const process: <
  Payload extends Schema.Schema.Any,
  Success extends Schema.Schema.Any,
  Error extends Schema.Schema.All
>(
  self: DurableQueue<Payload, Success, Error>,
  payload: Payload["Type"],
  options?: { readonly retrySchedule?: Schedule.Schedule<any, PersistedQueue.PersistedQueueError> | undefined }
) => Effect.Effect<
  Success["Type"],
  Error["Type"],
  | WorkflowEngine.WorkflowEngine
  | WorkflowEngine.WorkflowInstance
  | PersistedQueue.PersistedQueueFactory
  | Success["Context"]
  | Error["Context"]
  | Payload["Context"]
>

Source

Since v1.0.0

Type IDs

TypeId

Signature

declare const TypeId: "~@effect/workflow/DurableQueue"

Source

Since v1.0.0

TypeId (type alias)

Signature

type TypeId = "~@effect/workflow/DurableQueue"

Source

Since v1.0.0

Worker

makeWorker

Signature

declare const makeWorker: <
  Payload extends Schema.Schema.Any,
  Success extends Schema.Schema.Any,
  Error extends Schema.Schema.All,
  R
>(
  self: DurableQueue<Payload, Success, Error>,
  f: (payload: Payload["Type"]) => Effect.Effect<Success["Type"], Error["Type"], R>,
  options?: { readonly concurrency?: number | undefined } | undefined
) => Effect.Effect<
  never,
  never,
  | WorkflowEngine.WorkflowEngine
  | PersistedQueue.PersistedQueueFactory
  | R
  | Payload["Context"]
  | Success["Context"]
  | Error["Context"]
>

Source

Since v1.0.0

worker

Signature

declare const worker: <
  Payload extends Schema.Schema.Any,
  Success extends Schema.Schema.Any,
  Error extends Schema.Schema.All,
  R
>(
  self: DurableQueue<Payload, Success, Error>,
  f: (payload: Payload["Type"]) => Effect.Effect<Success["Type"], Error["Type"], R>,
  options?: { readonly concurrency?: number | undefined } | undefined
) => Layer.Layer<
  never,
  never,
  | WorkflowEngine.WorkflowEngine
  | PersistedQueue.PersistedQueueFactory
  | R
  | Payload["Context"]
  | Success["Context"]
  | Error["Context"]
>

Source

Since v1.0.0