feat(phase2): add message queue system for offline projects

Phase 2 Complete: Message queuing for offline/inactive projects

Queue Storage:
- File-based queue in ~/.innervoice/queues/
- Separate JSON file per project
- Persistent storage survives restarts
- Auto-cleanup of old delivered tasks (7 days)

Queue Manager (queue-manager.ts):
- enqueueTask() - Add task to project queue
- getPendingTasks() - Get undelivered tasks
- markTaskDelivered() - Mark task as complete
- getQueueSummary() - Get overview of all queues
- cleanupOldTasks() - Remove old delivered tasks

API Endpoints:
- POST /queue/add - Queue message for project
- GET /queue/:projectName - Get pending tasks
- POST /queue/:projectName/mark-delivered - Mark delivered
- GET /queue/summary - Get all project summaries

Telegram Bot Features:
- /queue command - Show all queued messages
- Project-targeted messages: "ProjectName: message"
- Auto-detect if project is online or offline
- Queue for offline, deliver immediately if online

MCP Tool:
- telegram_check_queue - Check for queued messages on startup
- Shows pending messages with timestamps
- Perfect for checking what happened while offline

Usage Scenarios:

1. Send to offline project:
   You: "ESO-MCP: Continue with roadmap"
   Bot: "📥 Message queued for ESO-MCP (offline)"

2. Open Claude in ESO-MCP:
   Claude auto-checks queue on startup
   Shows: "📬 You have 1 queued message: Continue with roadmap"

3. Check queue status:
   You: "/queue"
   Bot: Shows all projects with pending messages

This solves the "no one listening" problem - messages are stored
and delivered when Claude opens in that project.

Next: Phase 3 (remote Claude spawner)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
RichardDillman
2025-11-23 17:27:24 -05:00
parent 82f46c4569
commit fd750d9b50
4 changed files with 381 additions and 2 deletions

View File

@@ -3,6 +3,13 @@ import express from 'express';
import dotenv from 'dotenv';
import fs from 'fs/promises';
import path from 'path';
import {
enqueueTask,
getPendingTasks,
markTaskDelivered,
getQueueSummary,
cleanupOldTasks
} from './queue-manager.js';
dotenv.config();
@@ -144,11 +151,33 @@ bot.command('sessions', async (ctx) => {
}).join('\n\n');
await ctx.reply(
`*Active Claude Sessions* (${sessions.length})\n\n${sessionList}\n\n_Reply with #sessionId to send a message to a specific session_`,
`*Active Claude Sessions* (${sessions.length})\n\n${sessionList}\n\n_To send message to specific project: ProjectName: your message_`,
{ parse_mode: 'Markdown' }
);
});
bot.command('queue', async (ctx) => {
try {
const summary = await getQueueSummary();
if (summary.length === 0) {
await ctx.reply('📭 No queued messages');
return;
}
const queueList = summary.map((s, i) => {
return `${i + 1}. *${s.projectName}*\n 📥 ${s.pending} pending (${s.total} total)`;
}).join('\n\n');
await ctx.reply(
`*Queued Messages* (${summary.length} projects)\n\n${queueList}`,
{ parse_mode: 'Markdown' }
);
} catch (error: any) {
await ctx.reply(`❌ Error: ${error.message}`);
}
});
// Listen for any text messages from user
bot.on('text', async (ctx) => {
const message = ctx.message.text;
@@ -167,7 +196,45 @@ bot.on('text', async (ctx) => {
return;
}
// Add to message queue for processing
// Check if message is targeted to a specific project: "ProjectName: message"
const projectMatch = message.match(/^([a-zA-Z0-9-_]+):\s*(.+)/);
if (projectMatch) {
const [, targetProject, actualMessage] = projectMatch;
// Check if project has an active session
const activeSession = Array.from(activeSessions.values())
.find(s => s.projectName.toLowerCase() === targetProject.toLowerCase());
if (activeSession) {
// Add to message queue with session ID
messageQueue.push({
from,
message: actualMessage,
timestamp: new Date(),
read: false,
sessionId: activeSession.id
});
await ctx.reply(`💬 Message sent to active session: *${activeSession.projectName}*`, { parse_mode: 'Markdown' });
} else {
// Queue for when project becomes active
try {
await enqueueTask({
projectName: targetProject,
projectPath: '/unknown',
message: actualMessage,
from,
priority: 'normal',
timestamp: new Date()
});
await ctx.reply(`📥 Message queued for *${targetProject}* (offline)\n\nIt will be delivered when Claude starts in that project.`, { parse_mode: 'Markdown' });
} catch (error: any) {
await ctx.reply(`❌ Failed to queue message: ${error.message}`);
}
}
return;
}
// No project specified - add to general message queue
messageQueue.push({
from,
message,
@@ -245,6 +312,66 @@ app.get('/sessions', (req, res) => {
res.json({ sessions, count: sessions.length });
});
// Queue management endpoints
app.post('/queue/add', async (req, res) => {
const { projectName, projectPath, message, from, priority = 'normal' } = req.body;
if (!projectName || !message || !from) {
return res.status(400).json({ error: 'projectName, message, and from are required' });
}
try {
const task = await enqueueTask({
projectName,
projectPath: projectPath || '/unknown',
message,
from,
priority,
timestamp: new Date()
});
res.json({ success: true, taskId: task.id, task });
} catch (error: any) {
res.status(500).json({ error: error.message });
}
});
app.get('/queue/:projectName', async (req, res) => {
const { projectName } = req.params;
try {
const tasks = await getPendingTasks(projectName);
res.json({ projectName, tasks, count: tasks.length });
} catch (error: any) {
res.status(500).json({ error: error.message });
}
});
app.post('/queue/:projectName/mark-delivered', async (req, res) => {
const { projectName } = req.params;
const { taskId } = req.body;
if (!taskId) {
return res.status(400).json({ error: 'taskId is required' });
}
try {
await markTaskDelivered(projectName, taskId);
res.json({ success: true });
} catch (error: any) {
res.status(500).json({ error: error.message });
}
});
app.get('/queue/summary', async (req, res) => {
try {
const summary = await getQueueSummary();
res.json({ summary, totalProjects: summary.length });
} catch (error: any) {
res.status(500).json({ error: error.message });
}
});
// HTTP endpoint for sending notifications
app.post('/notify', async (req, res) => {
if (!ENABLED) {

View File

@@ -218,6 +218,14 @@ const TOOLS: Tool[] = [
properties: {},
},
},
{
name: 'telegram_check_queue',
description: 'Check if there are any queued messages for this project. Use this on startup to see if the user sent any messages while Claude was offline.',
inputSchema: {
type: 'object',
properties: {},
},
},
];
// Create the MCP server
@@ -400,6 +408,44 @@ server.setRequestHandler(CallToolRequestSchema, async (request) => {
};
}
case 'telegram_check_queue': {
const { name: projectName } = getProjectInfo();
const response = await fetch(`${BRIDGE_URL}/queue/${encodeURIComponent(projectName)}`);
if (!response.ok) {
throw new Error('Failed to check queue');
}
const result: any = await response.json();
const tasks = result.tasks || [];
if (tasks.length === 0) {
return {
content: [
{
type: 'text',
text: '📭 No queued messages for this project',
},
],
};
}
const taskList = tasks.map((t: any, i: number) => {
const timestamp = new Date(t.timestamp).toLocaleString();
return `${i + 1}. From *${t.from}* (${timestamp})\n ${t.message}`;
}).join('\n\n');
return {
content: [
{
type: 'text',
text: `📬 You have ${tasks.length} queued message(s):\n\n${taskList}\n\n_These messages were sent while you were offline._`,
},
],
};
}
default:
throw new Error(`Unknown tool: ${name}`);
}

145
src/queue-manager.ts Normal file
View File

@@ -0,0 +1,145 @@
// Message Queue Manager for InnerVoice
// Stores messages for offline/inactive projects
import fs from 'fs/promises';
import path from 'path';
import { existsSync } from 'fs';
const QUEUE_DIR = path.join(process.env.HOME || '~', '.innervoice', 'queues');
export interface QueuedTask {
id: string;
projectName: string;
projectPath: string;
message: string;
from: string;
timestamp: Date;
priority: 'low' | 'normal' | 'high';
status: 'pending' | 'delivered' | 'expired';
}
// Ensure queue directory exists
async function ensureQueueDir(): Promise<void> {
if (!existsSync(QUEUE_DIR)) {
await fs.mkdir(QUEUE_DIR, { recursive: true });
}
}
// Get queue file path for a project
function getQueuePath(projectName: string): string {
const safeName = projectName.replace(/[^a-z0-9-_]/gi, '_').toLowerCase();
return path.join(QUEUE_DIR, `${safeName}.json`);
}
// Load queue for a project
export async function loadQueue(projectName: string): Promise<QueuedTask[]> {
await ensureQueueDir();
const queuePath = getQueuePath(projectName);
if (!existsSync(queuePath)) {
return [];
}
try {
const content = await fs.readFile(queuePath, 'utf-8');
const tasks = JSON.parse(content);
// Convert timestamp strings back to Date objects
return tasks.map((t: any) => ({
...t,
timestamp: new Date(t.timestamp)
}));
} catch (error) {
console.error(`Error loading queue for ${projectName}:`, error);
return [];
}
}
// Save queue for a project
export async function saveQueue(projectName: string, tasks: QueuedTask[]): Promise<void> {
await ensureQueueDir();
const queuePath = getQueuePath(projectName);
await fs.writeFile(queuePath, JSON.stringify(tasks, null, 2));
}
// Add a task to the queue
export async function enqueueTask(task: Omit<QueuedTask, 'id' | 'status'>): Promise<QueuedTask> {
const queue = await loadQueue(task.projectName);
const newTask: QueuedTask = {
...task,
id: `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`,
status: 'pending'
};
queue.push(newTask);
await saveQueue(task.projectName, queue);
console.log(`📥 Queued task for ${task.projectName}: ${task.message.substring(0, 50)}...`);
return newTask;
}
// Get pending tasks for a project
export async function getPendingTasks(projectName: string): Promise<QueuedTask[]> {
const queue = await loadQueue(projectName);
return queue.filter(t => t.status === 'pending');
}
// Mark a task as delivered
export async function markTaskDelivered(projectName: string, taskId: string): Promise<void> {
const queue = await loadQueue(projectName);
const task = queue.find(t => t.id === taskId);
if (task) {
task.status = 'delivered';
await saveQueue(projectName, queue);
console.log(`✅ Task delivered: ${taskId}`);
}
}
// Clear delivered tasks older than N days
export async function cleanupOldTasks(projectName: string, daysOld: number = 7): Promise<number> {
const queue = await loadQueue(projectName);
const cutoff = new Date();
cutoff.setDate(cutoff.getDate() - daysOld);
const filtered = queue.filter(t => {
if (t.status === 'delivered' && t.timestamp < cutoff) {
return false; // Remove old delivered tasks
}
return true;
});
const removed = queue.length - filtered.length;
if (removed > 0) {
await saveQueue(projectName, filtered);
console.log(`🧹 Cleaned up ${removed} old tasks for ${projectName}`);
}
return removed;
}
// List all projects with queued tasks
export async function listProjectsWithQueues(): Promise<string[]> {
await ensureQueueDir();
const files = await fs.readdir(QUEUE_DIR);
return files
.filter(f => f.endsWith('.json'))
.map(f => f.replace('.json', '').replace(/_/g, '-'));
}
// Get queue summary for all projects
export async function getQueueSummary(): Promise<{ projectName: string; pending: number; total: number }[]> {
const projects = await listProjectsWithQueues();
const summaries = await Promise.all(
projects.map(async (projectName) => {
const queue = await loadQueue(projectName);
return {
projectName,
pending: queue.filter(t => t.status === 'pending').length,
total: queue.length
};
})
);
return summaries.filter(s => s.total > 0);
}