Cloudflare Queues
Status: Production Ready ✅ Last Updated: 2025-10-21 Dependencies: cloudflare-worker-base (for Worker setup) Latest Versions: wrangler@4.43.0, @cloudflare/workers-types@4.20251014.0
Quick Start (10 Minutes)
1. Create a Queue
# Create a new queue
npx wrangler queues create my-queue
# Output:
# ✅ Successfully created queue my-queue
# List all queues
npx wrangler queues list
# Get queue info
npx wrangler queues info my-queue
2. Set Up Producer (Send Messages)
wrangler.jsonc:
{
"name": "my-producer",
"main": "src/index.ts",
"compatibility_date": "2025-10-11",
"queues": {
"producers": [
{
"binding": "MY_QUEUE", // Available as env.MY_QUEUE
"queue": "my-queue" // Queue name from step 1
}
]
}
}
src/index.ts (Producer):
import { Hono } from 'hono';
type Bindings = {
MY_QUEUE: Queue;
};
const app = new Hono<{ Bindings: Bindings }>();
// Send single message
app.post('/send', async (c) => {
const body = await c.req.json();
await c.env.MY_QUEUE.send({
userId: body.userId,
action: 'process-order',
timestamp: Date.now(),
});
return c.json({ status: 'queued' });
});
// Send batch of messages
app.post('/send-batch', async (c) => {
const items = await c.req.json();
await c.env.MY_QUEUE.sendBatch(
items.map((item) => ({
body: { userId: item.userId, action: item.action },
}))
);
return c.json({ status: 'queued', count: items.length });
});
export default app;
3. Set Up Consumer (Process Messages)
Create consumer Worker:
# In a new directory
npm create cloudflare@latest my-consumer -- --type hello-world --ts
cd my-consumer
wrangler.jsonc:
{
"name": "my-consumer",
"main": "src/index.ts",
"compatibility_date": "2025-10-11",
"queues": {
"consumers": [
{
"queue": "my-queue", // Queue to consume from
"max_batch_size": 10, // Process up to 10 messages at once
"max_batch_timeout": 5 // Or wait max 5 seconds
}
]
}
}
src/index.ts (Consumer):
export default {
async queue(
batch: MessageBatch,
env: Env,
ctx: ExecutionContext
): Promise<void> {
console.log(`Processing batch of ${batch.messages.length} messages`);
for (const message of batch.messages) {
console.log('Message:', message.id, message.body, `Attempt: ${message.attempts}`);
// Your processing logic here
await processMessage(message.body);
}
// Implicit acknowledgement: if this function returns without error,
// all messages are automatically acknowledged
},
};
async function processMessage(body: any) {
// Process the message
console.log('Processing:', body);
}
4. Deploy and Test
# Deploy producer
cd my-producer
npm run deploy
# Deploy consumer
cd my-consumer
npm run deploy
# Test by sending a message
curl -X POST https://my-producer.<your-subdomain>.workers.dev/send \
-H "Content-Type: application/json" \
-d '{"userId": "123", "action": "welcome-email"}'
# Watch consumer logs
npx wrangler tail my-consumer
Complete Producer API
send() - Send Single Message
interface QueueSendOptions {
delaySeconds?: number; // Delay delivery (0-43200 seconds / 12 hours)
}
await env.MY_QUEUE.send(body: any, options?: QueueSendOptions);
Examples:
// Simple send
await env.MY_QUEUE.send({ userId: '123', action: 'send-email' });
// Send with delay (10 minutes)
await env.MY_QUEUE.send(
{ userId: '123', action: 'reminder' },
{ delaySeconds: 600 }
);
// Send structured data
await env.MY_QUEUE.send({
type: 'order-confirmation',
orderId: 'ORD-123',
email: 'user@example.com',
items: [{ sku: 'ITEM-1', quantity: 2 }],
total: 49.99,
timestamp: Date.now(),
});
CRITICAL:
- Message body must be JSON serializable (structured clone algorithm)
- Maximum message size: 128 KB (including ~100 bytes internal metadata)
- Messages >128 KB will fail - split them or store in R2 and send reference
sendBatch() - Send Multiple Messages
interface MessageSendRequest<Body = any> {
body: Body;
delaySeconds?: number;
}
interface QueueSendBatchOptions {
delaySeconds?: number; // Default delay for all messages
}
await env.MY_QUEUE.sendBatch(
messages: Iterable<MessageSendRequest>,
options?: QueueSendBatchOptions
);
Examples:
// Send batch of messages
await env.MY_QUEUE.sendBatch([
{ body: { userId: '1', action: 'email' } },
{ body: { userId: '2', action: 'email' } },
{ body: { userId: '3', action: 'email' } },
]);
// Send batch with individual delays
await env.MY_QUEUE.sendBatch([
{ body: { task: 'task1' }, delaySeconds: 60 }, // 1 min
{ body: { task: 'task2' }, delaySeconds: 300 }, // 5 min
{ body: { task: 'task3' }, delaySeconds: 600 }, // 10 min
]);
// Send batch with default delay
await env.MY_QUEUE.sendBatch(
[
{ body: { task: 'task1' } },
{ body: { task: 'task2' } },
],
{ delaySeconds: 3600 } // All delayed by 1 hour
);
// Dynamic batch from array
const tasks = await getTasks();
await env.MY_QUEUE.sendBatch(
tasks.map((task) => ({
body: {
taskId: task.id,
userId: task.userId,
priority: task.priority,
},
}))
);
Limits:
- Maximum 100 messages per batch
- Maximum 256 KB total batch size
- Each message still limited to 128 KB individually
Complete Consumer API
Queue Handler Function
export default {
async queue(
batch: MessageBatch,
env: Env,
ctx: ExecutionContext
): Promise<void> {
// Process messages
},
};
Parameters:
batch- MessageBatch object containing messagesenv- Environment bindings (KV, D1, R2, etc.)ctx- Execution context forwaitUntil(),passThroughOnException()
MessageBatch Interface
interface MessageBatch<Body = unknown> {
readonly queue: string; // Queue name
readonly messages: Message<Body>[]; // Array of messages
ackAll(): void; // Acknowledge all messages
retryAll(options?: QueueRetryOptions): void; // Retry all messages
}
Properties:
-
queue- Name of the queue this batch came from- Useful when one consumer handles multiple queues
-
messages- Array of Message objects- Ordering is best effort, not guaranteed
- Process order should not be assumed
Methods:
-
ackAll()- Mark all messages as successfully delivered- Even if handler throws error, these messages won't retry
- Use when you've safely processed all messages
-
retryAll(options?)- Mark all messages for retry- Messages re-queued immediately (or after delay)
- Counts towards max_retries limit
Message Interface
interface Message<Body = unknown> {
readonly id: string; // Unique message ID
readonly timestamp: Date; // When message was sent
readonly body: Body; // Message content
readonly attempts: number; // Retry count (starts at 1)
ack(): void; // Acknowledge this message
retry(options?: QueueRetryOptions): void; // Retry this message
}
Properties:
id- System-generated unique ID (UUID)timestamp- Date object when message was sent to queuebody- Your message content (any JSON serializable type)attempts- Number of times consumer has processed this message- Starts at 1 on first delivery
- Increments on each retry
- Use for exponential backoff:
delaySeconds: 60 * message.attempts
Methods:
ack()- Mark message as successfully delivered- Message won't be retried even if handler fails later
- Critical for non-idempotent operations (DB writes, API cal