DurableQueue.ts overview
Since v1.0.0
Exports Grouped by Category
Constructors
make
A DurableQueue wraps a PersistedQueue, providing a way to wait for items to finish processing using a DurableDeferred.
import { DurableQueue, Workflow } from "@effect/workflow"
import { Effect, Schema } from "effect"
// Define a DurableQueue that can be used to derive workers and offer items for
// processing.
const ApiQueue = DurableQueue.make({
name: "ApiQueue",
payload: {
id: Schema.String
},
success: Schema.Void,
error: Schema.Never,
idempotencyKey(payload) {
return payload.id
}
})
const MyWorkflow = Workflow.make({
name: "MyWorkflow",
payload: {
id: Schema.String
},
idempotencyKey: ({ id }) => id
})
const MyWorkflowLayer = MyWorkflow.toLayer(
Effect.fn(function* () {
// Add an item to the DurableQueue defined above.
//
// When the worker has finished processing the item, the workflow will
// resume.
//
yield* DurableQueue.process(ApiQueue, { id: "api-call-1" })
yield* Effect.log("Workflow succeeded!")
})
)
// Define a worker layer that can process items from the DurableQueue.
const ApiWorker = DurableQueue.worker(
ApiQueue,
Effect.fn(function* ({ id }) {
yield* Effect.log(`Worker processing API call with id: ${id}`)
}),
{ concurrency: 5 } // Process up to 5 items concurrently
)
Signature
declare const make: <
Payload extends Schema.Schema.Any | Schema.Struct.Fields,
Success extends Schema.Schema.Any = typeof Schema.Void,
Error extends Schema.Schema.All = typeof Schema.Never
>(options: {
readonly name: string
readonly payload: Payload
readonly idempotencyKey: (
payload: Payload extends Schema.Struct.Fields ? Schema.Struct<Payload>["Type"] : Payload["Type"]
) => string
readonly success?: Success | undefined
readonly error?: Error | undefined
}) => DurableQueue<Payload extends Schema.Struct.Fields ? Schema.Struct<Payload> : Payload, Success, Error>
Since v1.0.0
Models
DurableQueue (interface)
Signature
export interface DurableQueue<
Payload extends Schema.Schema.Any,
Success extends Schema.Schema.Any = typeof Schema.Void,
Error extends Schema.Schema.All = typeof Schema.Never
> {
readonly [TypeId]: TypeId
readonly name: string
readonly payloadSchema: Payload
readonly idempotencyKey: (payload: Payload["Type"]) => string
readonly deferred: DurableDeferred.DurableDeferred<Success, Error>
}
Since v1.0.0
Processing
process
Signature
declare const process: <
Payload extends Schema.Schema.Any,
Success extends Schema.Schema.Any,
Error extends Schema.Schema.All
>(
self: DurableQueue<Payload, Success, Error>,
payload: Payload["Type"],
options?: { readonly retrySchedule?: Schedule.Schedule<any, PersistedQueue.PersistedQueueError> | undefined }
) => Effect.Effect<
Success["Type"],
Error["Type"],
| WorkflowEngine.WorkflowEngine
| WorkflowEngine.WorkflowInstance
| PersistedQueue.PersistedQueueFactory
| Success["Context"]
| Error["Context"]
| Payload["Context"]
>
Since v1.0.0
Type IDs
TypeId
Signature
declare const TypeId: "~@effect/workflow/DurableQueue"
Since v1.0.0
TypeId (type alias)
Signature
type TypeId = "~@effect/workflow/DurableQueue"
Since v1.0.0
Worker
makeWorker
Signature
declare const makeWorker: <
Payload extends Schema.Schema.Any,
Success extends Schema.Schema.Any,
Error extends Schema.Schema.All,
R
>(
self: DurableQueue<Payload, Success, Error>,
f: (payload: Payload["Type"]) => Effect.Effect<Success["Type"], Error["Type"], R>,
options?: { readonly concurrency?: number | undefined } | undefined
) => Effect.Effect<
never,
never,
| WorkflowEngine.WorkflowEngine
| PersistedQueue.PersistedQueueFactory
| R
| Payload["Context"]
| Success["Context"]
| Error["Context"]
>
Since v1.0.0
worker
Signature
declare const worker: <
Payload extends Schema.Schema.Any,
Success extends Schema.Schema.Any,
Error extends Schema.Schema.All,
R
>(
self: DurableQueue<Payload, Success, Error>,
f: (payload: Payload["Type"]) => Effect.Effect<Success["Type"], Error["Type"], R>,
options?: { readonly concurrency?: number | undefined } | undefined
) => Layer.Layer<
never,
never,
| WorkflowEngine.WorkflowEngine
| PersistedQueue.PersistedQueueFactory
| R
| Payload["Context"]
| Success["Context"]
| Error["Context"]
>
Since v1.0.0