Batch processing
When ingestion volume is high—such as recording analytical clickstream data, processing application log streams, or importing massive product catalog updates—sending messages one-by-one creates unnecessary CPU and network overhead.
To handle high-throughput workloads efficiently, Layeron Queue provides two key optimizations:
- Bulk Ingestion (
sendBatch): Publish up to 100 messages into the queue in a single API call. - High-Performance Consumer Tuning: Configure the consumer’s
batchSizeandconcurrencyparameters to lease and process multiple messages concurrently.
Code Implementation
Section titled “Code Implementation”This example demonstrates receiving a batch of client-side tracking events in a single HTTP request, pushing them immediately to the queue using .sendBatch(), and tuning the consumer to process events concurrently in batches of 50.
import { backend } from "@layeron/core"import { queue } from "@layeron/modules"
const app = backend()
// 1. Declare the queue with high-throughput consumer tuningconst telemetryQueue = queue({ name: "telemetry-events", // Tune consumer batching and concurrency consumer: { batchSize: 50, // Fetch up to 50 messages in a single lease batch concurrency: 10, // Run up to 10 batch-handling worker instances concurrently visibilityTimeoutSeconds: 60, // Lock messages for 60 seconds during processing }, retry: { maxAttempts: 3, backoff: "fixed", initialDelaySeconds: 5, },})
app.use(telemetryQueue)
// 2. High-Throughput Bulk Ingestion Routeapp.post("/api/telemetry", async (request) => { const body = await request.json() as { events: Array<{ type: string; url: string; userId: string }> }
if (!body.events || !Array.isArray(body.events)) { return new Response("Invalid events list", { status: 400 }) }
// Map events to the QueueSendInput format. // We can attach unique idempotency keys or custom headers per message if needed. const messagesToSend = body.events.map((event) => ({ payload: { type: event.type, url: event.url, userId: event.userId, ip: request.headers.get("cf-connecting-ip") || "unknown", timestamp: new Date().toISOString(), }, // Optional: deduplicate identical clicks sent in the same window idempotencyKey: `${event.userId}_${event.type}_${Date.now()}`, }))
// Send up to 100 messages in a single highly-efficient batch call const result = await telemetryQueue.sendBatch(messagesToSend)
return Response.json({ status: "accepted", messagesEnqueued: result.messages.length, }, { status: 202 })})
// 3. Batch Consumer Handler// Because batchSize is 50, Layeron can deliver up to 50 messages per batch.// The consumer handler runs concurrently up to 10 times.telemetryQueue.consume(async (message) => { const event = message.payload
// Process the individual message (e.g. record metrics, write to Layeron Database, push to Storage) await saveTelemetryEvent(event)})
async function saveTelemetryEvent(event: any) { // Save to persistent storage...}How It Works
Section titled “How It Works”- Reduced Network Roundtrips: By calling
telemetryQueue.sendBatch(...), your app writes many items with one queue call. This dramatically reduces latency compared to invokingsend()100 times in a loop. - Batch Delivery: The consumer is configured with
batchSize: 50, so Layeron can deliver up to 50 messages to the consumer at a time. This reduces per-message overhead in high-volume workloads. - Concurrency Control: With
concurrency: 10, Layeron allows up to 10 consumer executions to run simultaneously. This helps drain the queue quickly during peak traffic times. - Visibility Timeout Safety: Since the consumer processes up to 50 messages per batch, the
visibilityTimeoutSecondsis set to60seconds. This ensures that even if database operations are slightly slow under load, the messages will not unlock and get re-leased by another worker before the consumer finishes.