AI agents fail more often than you might expect. OpenAI’s API returns errors at a low but non-trivial rate that fluctuates with load, and Claude’s API shows similar behavior. On its own, that doesn’t sound alarming. But once a system is making hundreds or thousands of calls, those small percentages turn into a steady stream of failures. A task queue turns those inevitable failures from silent data loss into recoverable work.
The real issue isn’t retry logic by itself. AI agents tend to fan out work: a single user request can trigger multiple LLM calls, database writes, and external API requests. Without orchestration, this quickly leads to race conditions, duplicate processing, and very little visibility into what actually broke. A queue gives you ordering, observability, and the ability to resume or replay operations from any point in the chain.
In this article, we’ll look at why AI-driven workloads need a task queue, how queues help preserve context and prevent duplication, and how to build a simple, practical queue for an AI agent.
The Replay is a weekly newsletter for dev and engineering leaders.
Delivered once a week, it's your curated guide to the most important conversations around frontend dev, emerging AI tools, and the state of modern software.
AI operations consume tokens at variable rates and unpredictable costs. A single prompt might use 500 tokens or 50,000, depending on context size. Standard worker pools assume roughly uniform task duration, but LLM calls range from 200ms to 30+ seconds. This variance makes normal parallel processing a bad idea.
Rate limits make the problem worse. Most LLM APIs enforce both requests-per-minute and tokens-per-minute limits. Hit either one and you’ll start seeing 429 errors that ripple through the rest of the system. A task queue gives you a place to apply adaptive throttling so both limits are respected at the same time, instead of reacting to failures after the fact.
Context loss is the most expensive failure mode. When an AI agent drops its conversation history mid-operation, rebuilding that context costs both tokens and time. Worse, the reconstructed context may not be identical, which can lead to inconsistent decisions. By carrying full context with each queued operation, retries stay deterministic and predictable.
The queue becomes your source of truth. Each task stores the complete context needed to execute independently: the conversation history, the user’s original request, intermediate results from prior operations, and metadata about what’s already been attempted. When a failure occurs, the retry pulls this context directly from the queue rather than trying to reconstruct it.
Consider an agent that needs to analyze a document, extract key points, and draft a summary. Without a queue, a failure during summary generation means restarting from scratch, re-analyzing the document, and burning tokens on duplicate work. With a queue, you store the extracted key points in the task payload. The retry skips straight to summary generation using the cached analysis.
Deduplication happens at the queue level. Before enqueuing a task, you check whether an identical operation is already in flight. For AI workloads, “identical” usually means the same conversation context and the same requested action. This avoids the common failure mode where a user repeatedly hits “retry” and unknowingly triggers five identical LLM calls, each burning tokens and money for the same result.
We’ll build a queue that supports the patterns AI agents actually need: priority levels, adaptive rate limiting, dead letter handling, and context preservation. The implementation uses in-memory storage to keep things simple, but the same structure maps cleanly to Redis or PostgreSQL when you’re ready to persist state and run multiple workers.
Create a file called task-queue.js in your project root:
class TaskQueue {
constructor(options = {}) {
this.tasks = new Map();
this.processing = new Set();
this.deadLetter = new Map();
this.maxRetries = options.maxRetries || 3;
this.rateLimitPerMinute = options.rateLimitPerMinute || 60;
this.tokenLimitPerMinute = options.tokenLimitPerMinute || 90000;
this.recentRequests = [];
this.recentTokens = [];
this.priorities = { high: [], normal: [], low: [] };
}
async add(task) {
const taskId = task.id || `task_${Date.now()}_${Math.random()}`;
const taskData = {
id: taskId,
priority: task.priority || 'normal',
context: task.context,
operation: task.operation,
payload: task.payload,
retries: 0,
createdAt: Date.now(),
status: 'pending'
};
const contextHash = this._hashContext(task.context);
const duplicate = this._findDuplicate(contextHash, task.operation);
if (duplicate) {
return { taskId: duplicate.id, isDuplicate: true };
}
this.tasks.set(taskId, taskData);
this.priorities[taskData.priority].push(taskId);
return { taskId, isDuplicate: false };
}
async process(handler) {
while (true) {
await this._waitForRateLimit();
const taskId = this._getNextTask();
if (!taskId) {
await new Promise(resolve => setTimeout(resolve, 100));
continue;
}
const task = this.tasks.get(taskId);
if (!task || this.processing.has(taskId)) continue;
this.processing.add(taskId);
task.status = 'processing';
try {
const result = await handler(task);
this._recordUsage(result.tokensUsed || 1000);
this.tasks.delete(taskId);
this.processing.delete(taskId);
} catch (error) {
this._handleFailure(task, error);
}
}
}
_getNextTask() {
for (const priority of ['high', 'normal', 'low']) {
const queue = this.priorities[priority];
while (queue.length > 0) {
const taskId = queue.shift();
const task = this.tasks.get(taskId);
if (task && task.status === 'pending') {
return taskId;
}
}
}
return null;
}
async _waitForRateLimit() {
const now = Date.now();
const oneMinuteAgo = now - 60000;
this.recentRequests = this.recentRequests.filter(t => t > oneMinuteAgo);
this.recentTokens = this.recentTokens.filter(t => t.timestamp > oneMinuteAgo);
const currentRequests = this.recentRequests.length;
const currentTokens = this.recentTokens.reduce((sum, t) => sum + t.count, 0);
if (currentRequests >= this.rateLimitPerMinute ||
currentTokens >= this.tokenLimitPerMinute) {
const waitTime = Math.max(
this.recentRequests[0] + 60000 - now,
this.recentTokens[0]?.timestamp + 60000 - now,
0
);
await new Promise(resolve => setTimeout(resolve, waitTime + 100));
}
this.recentRequests.push(now);
}
_recordUsage(tokenCount) {
this.recentTokens.push({ timestamp: Date.now(), count: tokenCount });
}
_handleFailure(task, error) {
this.processing.delete(task.id);
task.retries++;
if (task.retries >= this.maxRetries) {
task.status = 'failed';
this.deadLetter.set(task.id, { ...task, error: error.message });
this.tasks.delete(task.id);
} else {
task.status = 'pending';
const delay = Math.min(1000 * Math.pow(2, task.retries), 30000);
setTimeout(() => {
this.priorities[task.priority].push(task.id);
}, delay);
}
}
_hashContext(context) {
return JSON.stringify(context).split('').reduce(
(hash, char) => ((hash << 5) - hash) + char.charCodeAt(0), 0
);
}
_findDuplicate(contextHash, operation) {
for (const [id, task] of this.tasks) {
const taskHash = this._hashContext(task.context);
if (taskHash === contextHash && task.operation === operation) {
return task;
}
}
return null;
}
getDeadLetterTasks() {
return Array.from(this.deadLetter.values());
}
async retryDeadLetter(taskId) {
const task = this.deadLetter.get(taskId);
if (!task) return false;
task.retries = 0;
task.status = 'pending';
this.tasks.set(taskId, task);
this.priorities[task.priority].push(taskId);
this.deadLetter.delete(taskId);
return true;
}
}
module.exports = TaskQueue;
The queue maintains three priority levels because AI agents mix user-facing work with background processing. Requests that directly impact the user run at high priority, while analysis tasks use normal priority, and optional enhancements run at low priority. Processing always drains high-priority tasks first, so background work never degrades the user experience.
Rate limiting tracks both request count and token usage. The _waitForRateLimit method determines when the next task can safely run without exceeding either limit. This avoids the cascade of 429 errors you get from naive parallel execution.
Context hashing enables deduplication without expensive string comparisons. The hash function is intentionally simple, but effective at detecting identical conversation states. In a production system, you’d likely replace this with a more robust approach, such as content-addressed storage or embedding-based similarity.
Create a file called ai-agent.js that demonstrates practical usage:
const Anthropic = require('@anthropic-ai/sdk');
const TaskQueue = require('./task-queue');
class AIAgent {
constructor(apiKey) {
this.client = new Anthropic({ apiKey });
this.queue = new TaskQueue({
maxRetries: 3,
rateLimitPerMinute: 50,
tokenLimitPerMinute: 80000
});
this.queue.process(this.executeTask.bind(this));
}
async executeTask(task) {
const { context, operation, payload } = task;
const messages = context.conversationHistory || [];
messages.push({ role: 'user', content: payload.prompt });
const response = await this.client.messages.create({
model: 'claude-sonnet-4-20250514',
max_tokens: 1024,
messages: messages
});
const tokensUsed = response.usage.input_tokens + response.usage.output_tokens;
if (payload.onComplete) {
payload.onComplete({
content: response.content[0].text,
tokensUsed,
conversationHistory: [...messages, {
role: 'assistant',
content: response.content[0].text
}]
});
}
return { tokensUsed };
}
async analyzeDocument(documentText, userId) {
const context = {
userId,
conversationHistory: [],
documentId: this._hashString(documentText)
};
const analysisTask = await this.queue.add({
priority: 'high',
context,
operation: 'analyze',
payload: {
prompt: `Analyze this document and extract the 5 most important points:\n\n${documentText}`,
onComplete: (result) => {
this._generateSummary(result, context, userId);
}
}
});
return analysisTask.taskId;
}
async _generateSummary(analysisResult, context, userId) {
const summaryContext = {
...context,
conversationHistory: analysisResult.conversationHistory
};
await this.queue.add({
priority: 'normal',
context: summaryContext,
operation: 'summarize',
payload: {
prompt: 'Now write a concise 2-paragraph summary based on those key points.',
onComplete: (result) => {
console.log(`Summary completed for user ${userId}:`, result.content);
}
}
});
}
_hashString(str) {
return str.split('').reduce(
(hash, char) => ((hash << 5) - hash) + char.charCodeAt(0), 0
);
}
getFailedTasks() {
return this.queue.getDeadLetterTasks();
}
async retryFailed(taskId) {
return this.queue.retryDeadLetter(taskId);
}
}
module.exports = AIAgent;
The agent chains operations by storing completion callbacks in the task payload. When the analysis completes, the callback automatically queues the summary task with the accumulated conversation history. This pattern prevents context loss because each task carries its complete dependency chain.
Priority assignment happens based on user-facing impact. Document analysis requested by a waiting user gets high priority. The follow-up summary, which occurs in the background, is given normal priority. This allows you to handle a new user request immediately rather than waiting for background summarization to complete.
Here’s a complete example showing the agent handling concurrent requests in example.js:
const AIAgent = require('./ai-agent');
async function main() {
const agent = new AIAgent(process.env.ANTHROPIC_API_KEY);
const doc1 = "Quantum computing uses qubits that can exist in superposition...";
const doc2 = "Machine learning models require vast amounts of training data...";
const doc3 = "Blockchain technology provides distributed consensus mechanisms...";
const task1 = await agent.analyzeDocument(doc1, 'user_1');
const task2 = await agent.analyzeDocument(doc2, 'user_2');
const task3 = await agent.analyzeDocument(doc3, 'user_1');
console.log('Queued tasks:', task1, task2, task3);
setTimeout(async () => {
const failed = agent.getFailedTasks();
console.log(`Failed tasks: ${failed.length}`);
for (const task of failed) {
console.log(`Retrying task ${task.id}`);
await agent.retryFailed(task.id);
}
}, 60000);
}
main().catch(console.error);
The example queues three document analyses from two different users. The queue ensures that even if Claude’s API returns errors for some requests, each operation retries automatically with its full context intact. After 60 seconds, the code checks for any tasks that have exhausted their retries and explicitly requeues them for another attempt.
Tasks land in the dead letter queue after exhausting all retry attempts. These represent operations that failed for reasons beyond transient API errors: malformed prompts, context size exceeding limits, content policy violations, or systemic issues like invalid API keys.
The dead letter queue becomes your debugging surface. Each failed task preserves the exact context and payload that caused it to fail, making it easy to inspect and spot patterns. If ten tasks fail with “context too large” errors, that’s a clear signal the agent needs better context pruning.
Manual retries from the dead letter queue make sense once the root cause is fixed. If a bug was generating malformed prompts, you can drain the dead letter queue and reprocess those tasks. If you switch API providers, you can retry everything that failed due to authentication errors. The queue keeps enough information to make these retries safe and predictable.
The calculus depends on three factors: failure rate, operation cost, and impact of data loss. At 1% failure rate, processing 1000 operations means 10 failures. If each operation costs $0.20 in API fees, that’s $2 in wasted spend per thousand operations. More importantly, those 10 failures might represent lost customer data or incomplete transactions.
The cost calculation changes for multi-step operations. An agent that spawns five LLM calls per user request has a 5% compound failure rate at 1% per-operation failures. Processing 1000 user requests means 50 incomplete operations. A queue reduces this to near zero because failed steps retry without reprocessing successful ones.
Implementation overhead matters less than you’d expect. The queue code above represents roughly 200 lines. Compare that to the debugging time spent tracking down “why did the agent stop responding” issues in production.
A queue-based architecture turns AI agents from fragile scripts into resilient systems. Operations that would otherwise fail silently can retry with their full context intact. Rate limits that once caused cascading errors become managed constraints instead of surprise outages.
The dead letter queue surfaces systemic issues before they affect large numbers of users. For most production AI agents, this kind of infrastructure is something you want from day one, not something you bolt on after the first major incident.

Discover what’s new in The Replay, LogRocket’s newsletter for dev and engineering leaders, in the January 21st issue.

Jemima Abu, a senior product engineer and award-winning developer educator, shows how she replaced 150+ lines of JavaScript with just a few new CSS features.

AI writes code fast. Reviewing it is slower. This article explains why AI changes code review and where the real bottleneck appears.

When security policies block cloud AI tools entirely, OpenCode with local models offers a compliant alternative.
Would you be interested in joining LogRocket's developer community?
Join LogRocket’s Content Advisory Board. You’ll help inform the type of content we create and get access to exclusive meetups, social accreditation, and swag.
Sign up now