Fire Queue
A powerful, scalable queue system built on Google Firestore that leverages native time-based indexing for optimal performance.
🚀 Key Features
- Time-Based Processing: Consumers resume from their last processed timestamp
- Updatable Messages: Unlike traditional queues, messages can be updated while in queue
- TTL Support: Automatic cleanup of expired queue items
- Multi-Consumer: Multiple consumers can process the same queue independently
- Configuration-Driven: Simple setup with minimal configuration
- Firestore Native: Leverages Firestore's indexing, real-time updates, and ACID transactions
- Batch Processing: Efficient batch operations for high throughput
- Built-in Monitoring: Queue metrics and consumer health tracking
🏗️ Architecture
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Producers │───▶│ Queue Collection│◀───│ Consumers │
│ │ │ │ │ │
│ • Add messages │ │ • Time-indexed │ │ • Track progress│
│ • Update msgs │ │ • TTL cleanup │ │ • Resume from │
│ • Set priority │ │ • Versioning │ │ timestamp │
└─────────────────┘ └──────────────────┘ └─────────────────┘
│
▼
┌──────────────────┐
│ Consumer Tracking│
│ │
│ • Last processed │
│ • Health status │
│ • Error tracking │
└──────────────────┘
📦 Installation
npm install fire-queue
🔧 Super Simple Setup
Only 3 things needed: projectId
, connection config, and topic
name!
import { createReadyQueue } from 'fire-queue';
// ✨ Method 1: Minimal setup with service account
const { enqueue, consume, shutdown } = await createReadyQueue({
projectId: 'your-project-id',
serviceAccountPath: './firebase-service-account.json',
topic: 'my_queue', // This becomes your queue name
// dbId: 'custom-db', // Optional - defaults to "firequeue" (auto-created)
});
// ✨ Method 2: Using pre-configured Firestore instance
import * as admin from 'firebase-admin';
const firestoreInstance = new admin.firestore.Firestore({
projectId: 'your-project-id',
databaseId: 'firequeue',
});
const { enqueue, consume, shutdown } = await createReadyQueue({
projectId: 'your-project-id',
firestoreInstance, // Reuse existing connection
topic: 'my_queue',
});
// 📝 Add messages (super simple!)
await enqueue('user_signup', {
userId: 'user123',
email: 'john@example.com'
});
await enqueue('send_email', {
to: 'john@example.com',
subject: 'Welcome!'
}, {
priority: 1, // High priority
tags: ['email', 'welcome']
});
// 👤 Process messages
await consume('my-worker', async (messages) => {
for (const message of messages) {
console.log(`Processing: ${message.type}`);
await processMessage(message.payload);
await message.ack(); // Mark as completed
}
});
// 🧹 Cleanup when done
await shutdown();
Using FireQueueManager for Reusable Configurations
import { setupFireQueue } from 'fire-queue';
// ✨ Configure once, use everywhere
const manager = await setupFireQueue({
projectId: 'your-project-id',
serviceAccountPath: './firebase-service-account.json',
dbId: 'firequeue',
});
// Create multiple queues with shared configuration
const emailQueue = await manager.createQueue('email_notifications');
const analyticsQueue = await manager.createQueue('analytics_events');
const backupQueue = await manager.createQueue('daily_backups');
// Reuse the same Firestore connection for other operations
const sharedFirestore = manager.getFirestoreInstance();
// Shutdown all queues at once
await manager.shutdown();
Even Simpler with Auto-Consumer
import { createQueueWithConsumer } from 'fire-queue';
// Creates queue + consumer in one call!
const { enqueue, shutdown } = await createQueueWithConsumer(
{
projectId: 'your-project-id',
serviceAccountPath: './firebase-service-account.json',
topic: 'simple_queue'
},
'auto-worker',
async (messages) => {
// Process messages - ack/nack handled automatically!
for (const message of messages) {
console.log(`Auto-processing: ${message.type}`);
// Throws error = auto-nack, success = auto-ack
}
}
);
await enqueue('task', { data: 'hello world' });
// ... messages processed automatically
await shutdown();
📝 Multiple Ways to Write Data
Fire Queue supports various data entry methods:
1. Direct Firestore Writing
import { FirestoreWriter } from 'fire-queue';
const writer = new FirestoreWriter({
projectId: 'your-project',
queueName: 'my_queue',
serviceAccountPath: './firebase-service-account.json',
enableBatching: true,
batchSize: 50
});
await writer.write({
type: 'data_processing',
payload: { userId: '123', action: 'process' }
});
2. HTTP API Writing
import { HTTPWriter, HTTPQueueServer } from 'fire-queue';
// Start HTTP server
const server = new HTTPQueueServer(3000);
server.registerQueue('my_queue', queue);
await server.start();
// Write via HTTP
const httpWriter = new HTTPWriter({
endpoint: 'http://localhost:3000',
queueName: 'my_queue',
projectId: 'your-project'
});
await httpWriter.write({
type: 'api_request',
payload: { endpoint: '/users', data: { name: 'John' } }
});
3. Bulk/Stream Writing
import { BulkWriter } from 'fire-queue';
const bulkWriter = new BulkWriter({
projectId: 'your-project',
queueName: 'bulk_queue',
batchSize: 100,
flushIntervalMs: 5000
});
// Add messages (auto-batched)
for (let i = 0; i < 1000; i++) {
await bulkWriter.add({
type: 'analytics',
payload: { event: 'page_view', userId: `user${i}` }
});
}
// Import from CSV
await bulkWriter.addFromCSV(csvData, {
'userId': 'payload.userId',
'event': 'payload.event'
});
4. Cloud Function Generation
import { CloudFunctionWriter } from 'fire-queue';
const cfWriter = new CloudFunctionWriter({
projectId: 'your-project',
queueName: 'cf_queue',
functionName: 'autoQueueWriter',
triggerType: 'firestore' // or 'http', 'schedule'
});
// Generate Cloud Function code
const functionCode = cfWriter.generateCloudFunction();
console.log(functionCode); // Ready-to-deploy function!
📋 Project Status
This project is under active development. See TODO.md for planned features and progress.
🤝 Contributing
We welcome contributions! Please see CONTRIBUTING.md for guidelines.
📄 License
MIT License - see LICENSE for details.