feat: task manager — фоновые задачи без таймаута через /task

Новый модуль task-manager.ts: очередь задач с файловым хранением,
worker loop каждые 5 сек, Claude CLI с --max-turns 50 без таймаута,
Telegram-уведомления по завершении. Команды /task, /task list,
/task N, /task cancel N. HTTP API /tasks.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
root
2026-02-19 17:03:32 +00:00
parent 18fcc06626
commit b3fdd383ac
3 changed files with 514 additions and 5 deletions

View File

@@ -73,6 +73,15 @@ export async function handleChatbotMessage(
try { await sendTyping(); } catch { /* ignore */ } try { await sendTyping(); } catch { /* ignore */ }
}, 4000); }, 4000);
// Send "working on it" notification if Claude takes longer than 15 seconds
let longTaskNotified = false;
const longTaskTimer = setTimeout(async () => {
longTaskNotified = true;
try {
await safeSendReply(sendReply, '⏳ Задача принята, работаю... Ответ может занять несколько минут.');
} catch { /* ignore */ }
}, 15000);
try { try {
await sendTyping(); await sendTyping();
@@ -81,6 +90,7 @@ export async function handleChatbotMessage(
const resumeId = session && !isExpired ? session.sessionId : undefined; const resumeId = session && !isExpired ? session.sessionId : undefined;
const response = await executeWithRetry(chatId, message, resumeId); const response = await executeWithRetry(chatId, message, resumeId);
clearTimeout(longTaskTimer);
chatbotSessions.set(chatId, { chatbotSessions.set(chatId, {
sessionId: response.session_id, sessionId: response.session_id,
@@ -102,8 +112,11 @@ export async function handleChatbotMessage(
await safeSendReply(sendReply, chunk); await safeSendReply(sendReply, chunk);
} }
} catch (error: any) { } catch (error: any) {
clearTimeout(longTaskTimer);
console.error('[Chatbot] Error:', error.message); console.error('[Chatbot] Error:', error.message);
await safeSendReply(sendReply, `❌ Ошибка: ${error.message}`); const isTimeout = error.message?.includes('exit code 143') || error.message?.includes('Timeout');
const hint = isTimeout ? '\n\n💡 Используйте /task для долгих задач без таймаута.' : '';
await safeSendReply(sendReply, `❌ Ошибка: ${error.message}${hint}`);
} finally { } finally {
clearInterval(typingInterval); clearInterval(typingInterval);
activeLocks.delete(chatId); activeLocks.delete(chatId);
@@ -237,7 +250,7 @@ async function executeWithRetry(
} }
} }
function parseClaudeResponse(stdout: string): ClaudeJsonResponse { export function parseClaudeResponse(stdout: string): ClaudeJsonResponse {
const trimmed = stdout.trim(); const trimmed = stdout.trim();
// Try the whole output as a single JSON // Try the whole output as a single JSON
@@ -267,7 +280,7 @@ function parseClaudeResponse(stdout: string): ClaudeJsonResponse {
throw new Error(`No valid JSON result in output (${trimmed.substring(0, 150)})`); throw new Error(`No valid JSON result in output (${trimmed.substring(0, 150)})`);
} }
function splitMessage(text: string, maxLength: number = 4000): string[] { export function splitMessage(text: string, maxLength: number = 4000): string[] {
if (text.length <= maxLength) return [text]; if (text.length <= maxLength) return [text];
const chunks: string[] = []; const chunks: string[] = [];

View File

@@ -28,8 +28,19 @@ import {
initChatbot, initChatbot,
isChatbotEnabled, isChatbotEnabled,
resetChatbotSession, resetChatbotSession,
getChatbotStatus getChatbotStatus,
splitMessage
} from './chatbot.js'; } from './chatbot.js';
import {
initTaskManager,
createTask,
getTask,
listTasks,
cancelTask,
stopTaskManager,
formatTaskShort,
getTaskStatusEmoji
} from './task-manager.js';
dotenv.config(); dotenv.config();
@@ -90,6 +101,22 @@ setInterval(() => {
// Initialize chatbot module // Initialize chatbot module
initChatbot(); initChatbot();
// Initialize task manager with Telegram notification callback
initTaskManager(async (targetChatId: string, text: string) => {
const chunks = splitMessage(text);
for (const chunk of chunks) {
try {
await bot.telegram.sendMessage(targetChatId, chunk, { parse_mode: 'Markdown' });
} catch {
try {
await bot.telegram.sendMessage(targetChatId, chunk);
} catch (err) {
console.error('[TaskManager] Failed to send notification:', err);
}
}
}
});
app.use(express.json()); app.use(express.json());
// Save chat ID to .env file // Save chat ID to .env file
@@ -138,9 +165,14 @@ bot.command('help', async (ctx) => {
await ctx.reply( await ctx.reply(
'*Claude Telegram Bridge - Commands*\n\n' + '*Claude Telegram Bridge - Commands*\n\n' +
'*Chatbot:*\n' + '*Chatbot:*\n' +
'• Send any message — Claude ответит как чат-бот\n' + '• @mention или reply — Claude ответит как чат-бот\n' +
'`/chatbot` - Статус чат-бота\n' + '`/chatbot` - Статус чат-бота\n' +
'`/chatreset` - Сброс диалога\n\n' + '`/chatreset` - Сброс диалога\n\n' +
'*Background Tasks:*\n' +
'`/task <message>` - Создать фоновую задачу (без таймаута)\n' +
'`/task list` - Список задач\n' +
'`/task N` - Статус задачи #N\n' +
'`/task cancel N` - Отменить задачу\n\n' +
'*Session Management:*\n' + '*Session Management:*\n' +
'`/sessions` - List active Claude sessions\n' + '`/sessions` - List active Claude sessions\n' +
'`/queue` - View queued messages\n\n' + '`/queue` - View queued messages\n\n' +
@@ -193,6 +225,72 @@ bot.command('chatreset', async (ctx) => {
} }
}); });
bot.command('task', async (ctx) => {
const text = ctx.message.text.replace(/^\/task\s*/, '').trim();
const userChatId = ctx.chat.id.toString();
// /task list
if (text === 'list' || text === '') {
const tasks = await listTasks();
if (tasks.length === 0) {
await ctx.reply('📭 Нет задач.\n\nСоздать: `/task ваше сообщение`', { parse_mode: 'Markdown' });
return;
}
const lines = tasks.map(t => formatTaskShort(t));
await ctx.reply(`*Задачи* (последние ${tasks.length})\n\n${lines.join('\n')}`, { parse_mode: 'Markdown' });
return;
}
// /task cancel N
const cancelMatch = text.match(/^cancel\s+(\d+)$/);
if (cancelMatch) {
const id = parseInt(cancelMatch[1]);
const success = await cancelTask(id);
if (success) {
await ctx.reply(`🚫 Задача #${id} отменена.`);
} else {
await ctx.reply(`Не удалось отменить задачу #${id}. Возможно, она уже завершена.`);
}
return;
}
// /task N — show task details
const idMatch = text.match(/^(\d+)$/);
if (idMatch) {
const id = parseInt(idMatch[1]);
const task = await getTask(id);
if (!task) {
await ctx.reply(`❌ Задача #${id} не найдена.`);
return;
}
const emoji = getTaskStatusEmoji(task.status);
let detail = `${emoji} *Задача #${task.id}*\n\n`;
detail += `*Статус:* ${task.status}\n`;
detail += `*Создана:* ${new Date(task.createdAt).toLocaleString('ru')}\n`;
if (task.startedAt) detail += `*Начата:* ${new Date(task.startedAt).toLocaleString('ru')}\n`;
if (task.completedAt) detail += `*Завершена:* ${new Date(task.completedAt).toLocaleString('ru')}\n`;
if (task.durationMs) detail += `*Длительность:* ${Math.round(task.durationMs / 1000)} сек\n`;
if (task.numTurns) detail += `*Ходы:* ${task.numTurns}\n`;
detail += `\n*Запрос:*\n${task.message.substring(0, 200)}${task.message.length > 200 ? '...' : ''}\n`;
if (task.result) {
const preview = task.result.substring(0, 500);
detail += `\n*Результат:*\n${preview}${task.result.length > 500 ? '...' : ''}`;
}
if (task.error) detail += `\n*Ошибка:* ${task.error}`;
try {
await ctx.reply(detail, { parse_mode: 'Markdown' });
} catch {
await ctx.reply(detail);
}
return;
}
// /task <message> — create new task
const task = await createTask(userChatId, text);
await ctx.reply(`📋 *Задача #${task.id} создана*\n\nВыполняется в фоне. Уведомлю по завершении.`, { parse_mode: 'Markdown' });
});
bot.command('sessions', async (ctx) => { bot.command('sessions', async (ctx) => {
const sessions = Array.from(activeSessions.values()); const sessions = Array.from(activeSessions.values());
@@ -775,6 +873,55 @@ app.get('/queue/summary', async (req, res) => {
} }
}); });
// Task manager endpoints
app.post('/tasks', async (req, res) => {
const { chatId: taskChatId, message } = req.body;
if (!message) {
return res.status(400).json({ error: 'message is required' });
}
try {
const task = await createTask(taskChatId || chatId || 'http', message);
res.json({ success: true, task });
} catch (error: any) {
res.status(500).json({ error: error.message });
}
});
app.get('/tasks', async (req, res) => {
try {
const tasks = await listTasks();
res.json({ tasks, count: tasks.length });
} catch (error: any) {
res.status(500).json({ error: error.message });
}
});
app.get('/tasks/:id', async (req, res) => {
try {
const task = await getTask(parseInt(req.params.id));
if (task) {
res.json({ task });
} else {
res.status(404).json({ error: 'Task not found' });
}
} catch (error: any) {
res.status(500).json({ error: error.message });
}
});
app.post('/tasks/:id/cancel', async (req, res) => {
try {
const success = await cancelTask(parseInt(req.params.id));
if (success) {
res.json({ success: true });
} else {
res.status(400).json({ error: 'Cannot cancel task (already completed or not found)' });
}
} catch (error: any) {
res.status(500).json({ error: error.message });
}
});
// HTTP endpoint for sending notifications // HTTP endpoint for sending notifications
app.post('/notify', async (req, res) => { app.post('/notify', async (req, res) => {
if (!ENABLED) { if (!ENABLED) {
@@ -1107,11 +1254,13 @@ app.listen(PORT, HOST, () => {
// Graceful shutdown // Graceful shutdown
process.once('SIGINT', () => { process.once('SIGINT', () => {
console.log('\n👋 Shutting down...'); console.log('\n👋 Shutting down...');
stopTaskManager();
bot.stop('SIGINT'); bot.stop('SIGINT');
process.exit(0); process.exit(0);
}); });
process.once('SIGTERM', () => { process.once('SIGTERM', () => {
stopTaskManager();
bot.stop('SIGTERM'); bot.stop('SIGTERM');
process.exit(0); process.exit(0);
}); });

347
src/task-manager.ts Normal file
View File

@@ -0,0 +1,347 @@
import { spawn, ChildProcess } from 'child_process';
import fs from 'fs/promises';
import path from 'path';
import { existsSync } from 'fs';
import { parseClaudeResponse, splitMessage } from './chatbot.js';
// --- Interfaces ---
export interface Task {
id: number;
chatId: string;
message: string;
status: 'pending' | 'running' | 'completed' | 'failed' | 'cancelled';
createdAt: string;
startedAt?: string;
completedAt?: string;
result?: string;
error?: string;
sessionId?: string;
numTurns?: number;
costUsd?: number;
durationMs?: number;
}
interface TaskStore {
nextId: number;
tasks: Task[];
}
// --- Configuration ---
const TASK_MAX_TURNS = () => process.env.TASK_MAX_TURNS || '50';
const TASK_CWD = () => process.env.TASK_CWD || process.env.CHATBOT_CWD || '/home/fitcrm';
const TASKS_FILE = path.join(process.env.HOME || '~', '.innervoice', 'tasks.json');
const MAX_STORED_TASKS = 50;
// --- State ---
let workerTimer: NodeJS.Timeout | null = null;
let currentProcess: ChildProcess | null = null;
let sendNotification: ((chatId: string, text: string) => Promise<void>) | null = null;
// --- Persistence ---
async function ensureDir(): Promise<void> {
const dir = path.dirname(TASKS_FILE);
if (!existsSync(dir)) {
await fs.mkdir(dir, { recursive: true });
}
}
async function loadStore(): Promise<TaskStore> {
await ensureDir();
if (!existsSync(TASKS_FILE)) {
return { nextId: 1, tasks: [] };
}
try {
const content = await fs.readFile(TASKS_FILE, 'utf-8');
return JSON.parse(content);
} catch {
return { nextId: 1, tasks: [] };
}
}
async function saveStore(store: TaskStore): Promise<void> {
await ensureDir();
// Keep only last MAX_STORED_TASKS
if (store.tasks.length > MAX_STORED_TASKS) {
const active = store.tasks.filter(t => t.status === 'pending' || t.status === 'running');
const finished = store.tasks
.filter(t => t.status !== 'pending' && t.status !== 'running')
.slice(-MAX_STORED_TASKS);
store.tasks = [...active, ...finished].slice(-MAX_STORED_TASKS);
}
await fs.writeFile(TASKS_FILE, JSON.stringify(store, null, 2));
}
// --- Public API ---
export function initTaskManager(
notifyFn: (chatId: string, text: string) => Promise<void>
): void {
sendNotification = notifyFn;
// Recovery: mark stuck 'running' tasks as failed
loadStore().then(async (store) => {
let changed = false;
for (const task of store.tasks) {
if (task.status === 'running') {
task.status = 'failed';
task.error = 'Process restarted';
task.completedAt = new Date().toISOString();
changed = true;
console.log(`[TaskManager] Recovered stuck task #${task.id}`);
}
}
if (changed) await saveStore(store);
});
// Start worker loop
if (workerTimer) clearInterval(workerTimer);
workerTimer = setInterval(workerLoop, 5000);
console.log(`[TaskManager] Initialized (maxTurns: ${TASK_MAX_TURNS()}, cwd: ${TASK_CWD()})`);
}
export async function createTask(chatId: string, message: string): Promise<Task> {
const store = await loadStore();
const task: Task = {
id: store.nextId++,
chatId,
message,
status: 'pending',
createdAt: new Date().toISOString(),
};
store.tasks.push(task);
await saveStore(store);
console.log(`[TaskManager] Created task #${task.id}: "${message.substring(0, 50)}..."`);
return task;
}
export async function getTask(id: number): Promise<Task | undefined> {
const store = await loadStore();
return store.tasks.find(t => t.id === id);
}
export async function listTasks(): Promise<Task[]> {
const store = await loadStore();
return store.tasks.slice(-10).reverse();
}
export async function cancelTask(id: number): Promise<boolean> {
const store = await loadStore();
const task = store.tasks.find(t => t.id === id);
if (!task) return false;
if (task.status === 'pending') {
task.status = 'cancelled';
task.completedAt = new Date().toISOString();
await saveStore(store);
return true;
}
if (task.status === 'running' && currentProcess) {
currentProcess.kill('SIGTERM');
setTimeout(() => { currentProcess?.kill('SIGKILL'); }, 5000);
task.status = 'cancelled';
task.completedAt = new Date().toISOString();
await saveStore(store);
return true;
}
return false;
}
export function stopTaskManager(): void {
if (workerTimer) {
clearInterval(workerTimer);
workerTimer = null;
}
if (currentProcess) {
currentProcess.kill('SIGTERM');
currentProcess = null;
}
console.log('[TaskManager] Stopped');
}
// --- Worker ---
let workerBusy = false;
async function workerLoop(): Promise<void> {
if (workerBusy) return;
const store = await loadStore();
const pending = store.tasks.find(t => t.status === 'pending');
if (!pending) return;
workerBusy = true;
console.log(`[TaskManager] Starting task #${pending.id}: "${pending.message.substring(0, 50)}..."`);
// Mark as running
pending.status = 'running';
pending.startedAt = new Date().toISOString();
await saveStore(store);
try {
const response = await executeTaskCli(pending.message);
// Update task
const freshStore = await loadStore();
const task = freshStore.tasks.find(t => t.id === pending.id);
if (task) {
task.status = 'completed';
task.completedAt = new Date().toISOString();
task.sessionId = response.session_id;
task.numTurns = response.num_turns;
task.costUsd = response.cost_usd;
task.durationMs = response.duration_ms;
let resultText = response.result?.trim();
if (!resultText) {
if (response.subtype === 'error_max_turns') {
resultText = `Выполнил ${response.num_turns} действий, но не уложился в лимит ходов.`;
} else {
resultText = 'Задача выполнена (без текстового ответа).';
}
}
task.result = resultText;
await saveStore(freshStore);
// Notify
const duration = formatDuration(task.durationMs || 0);
const header = `*Задача #${task.id} завершена* (${duration})`;
const body = resultText.length > 3500 ? resultText.substring(0, 3500) + '...' : resultText;
await notify(task.chatId, `${header}\n\n${body}`);
}
} catch (error: any) {
const freshStore = await loadStore();
const task = freshStore.tasks.find(t => t.id === pending.id);
if (task && task.status === 'running') {
task.status = 'failed';
task.completedAt = new Date().toISOString();
task.error = error.message;
await saveStore(freshStore);
await notify(task.chatId, `*Задача #${task.id} провалилась*\n\n${error.message}`);
}
} finally {
workerBusy = false;
currentProcess = null;
}
}
// --- CLI Execution ---
function executeTaskCli(message: string): Promise<any> {
return new Promise((resolve, reject) => {
const args: string[] = [
'-p', message,
'--output-format', 'json',
'--max-turns', TASK_MAX_TURNS(),
];
const env = { ...process.env };
delete env.CLAUDECODE;
delete env.CLAUDE_CODE_ENTRYPOINT;
delete env.CLAUDE_SPAWNED;
delete env.INNERVOICE_SPAWNED;
console.log(`[TaskManager] Spawning: claude ${args.map((a, i) => i === 1 ? `"${a.substring(0, 40)}..."` : a).join(' ')}`);
const child = spawn('claude', args, {
cwd: TASK_CWD(),
env,
stdio: ['ignore', 'pipe', 'pipe'],
});
currentProcess = child;
let stdout = '';
let stderr = '';
child.stdout.on('data', (data: Buffer) => {
stdout += data.toString();
});
child.stderr.on('data', (data: Buffer) => {
stderr += data.toString();
});
// No timeout — tasks run until completion
child.on('error', (error) => {
console.error(`[TaskManager] Spawn error: ${error.message}`);
reject(new Error(`Failed to start claude: ${error.message}`));
});
child.on('close', (code) => {
currentProcess = null;
console.log(`[TaskManager] Process exited with code ${code}`);
if (stdout.trim()) {
try {
const parsed = parseClaudeResponse(stdout);
if (parsed.is_error) {
reject(new Error(parsed.result || 'Claude returned an error'));
} else {
resolve(parsed);
}
return;
} catch (parseErr: any) {
console.error(`[TaskManager] Parse error: ${parseErr.message}`);
}
}
if (code !== 0) {
const errDetail = stderr.trim() || stdout.trim() || `exit code ${code}`;
reject(new Error(`Claude CLI failed: ${errDetail.substring(0, 200)}`));
} else {
reject(new Error('Empty response from Claude CLI'));
}
});
});
}
// --- Helpers ---
function formatDuration(ms: number): string {
if (ms < 1000) return `${ms}ms`;
const sec = Math.floor(ms / 1000);
if (sec < 60) return `${sec} сек`;
const min = Math.floor(sec / 60);
const remainSec = sec % 60;
return `${min} мин ${remainSec} сек`;
}
async function notify(chatId: string, text: string): Promise<void> {
if (!sendNotification) return;
try {
await sendNotification(chatId, text);
} catch (error) {
console.error('[TaskManager] Notification failed:', error);
}
}
export function getTaskStatusEmoji(status: Task['status']): string {
switch (status) {
case 'pending': return '⏳';
case 'running': return '🔄';
case 'completed': return '✅';
case 'failed': return '❌';
case 'cancelled': return '🚫';
}
}
export function formatTaskShort(task: Task): string {
const emoji = getTaskStatusEmoji(task.status);
const msg = task.message.length > 40 ? task.message.substring(0, 40) + '...' : task.message;
let line = `${emoji} #${task.id} ${msg}`;
if (task.status === 'completed' && task.durationMs) {
line += ` (${formatDuration(task.durationMs)})`;
}
if (task.status === 'failed' && task.error) {
line += `${task.error.substring(0, 50)}`;
}
return line;
}