diff --git a/src/chatbot.ts b/src/chatbot.ts index 1b5cb38..c6b9859 100644 --- a/src/chatbot.ts +++ b/src/chatbot.ts @@ -73,6 +73,15 @@ export async function handleChatbotMessage( try { await sendTyping(); } catch { /* ignore */ } }, 4000); + // Send "working on it" notification if Claude takes longer than 15 seconds + let longTaskNotified = false; + const longTaskTimer = setTimeout(async () => { + longTaskNotified = true; + try { + await safeSendReply(sendReply, '⏳ Задача принята, работаю... Ответ может занять несколько минут.'); + } catch { /* ignore */ } + }, 15000); + try { await sendTyping(); @@ -81,6 +90,7 @@ export async function handleChatbotMessage( const resumeId = session && !isExpired ? session.sessionId : undefined; const response = await executeWithRetry(chatId, message, resumeId); + clearTimeout(longTaskTimer); chatbotSessions.set(chatId, { sessionId: response.session_id, @@ -102,8 +112,11 @@ export async function handleChatbotMessage( await safeSendReply(sendReply, chunk); } } catch (error: any) { + clearTimeout(longTaskTimer); console.error('[Chatbot] Error:', error.message); - await safeSendReply(sendReply, `❌ Ошибка: ${error.message}`); + const isTimeout = error.message?.includes('exit code 143') || error.message?.includes('Timeout'); + const hint = isTimeout ? '\n\n💡 Используйте /task для долгих задач без таймаута.' : ''; + await safeSendReply(sendReply, `❌ Ошибка: ${error.message}${hint}`); } finally { clearInterval(typingInterval); activeLocks.delete(chatId); @@ -237,7 +250,7 @@ async function executeWithRetry( } } -function parseClaudeResponse(stdout: string): ClaudeJsonResponse { +export function parseClaudeResponse(stdout: string): ClaudeJsonResponse { const trimmed = stdout.trim(); // Try the whole output as a single JSON @@ -267,7 +280,7 @@ function parseClaudeResponse(stdout: string): ClaudeJsonResponse { throw new Error(`No valid JSON result in output (${trimmed.substring(0, 150)})`); } -function splitMessage(text: string, maxLength: number = 4000): string[] { +export function splitMessage(text: string, maxLength: number = 4000): string[] { if (text.length <= maxLength) return [text]; const chunks: string[] = []; diff --git a/src/index.ts b/src/index.ts index 89f60c5..3a3d727 100644 --- a/src/index.ts +++ b/src/index.ts @@ -28,8 +28,19 @@ import { initChatbot, isChatbotEnabled, resetChatbotSession, - getChatbotStatus + getChatbotStatus, + splitMessage } from './chatbot.js'; +import { + initTaskManager, + createTask, + getTask, + listTasks, + cancelTask, + stopTaskManager, + formatTaskShort, + getTaskStatusEmoji +} from './task-manager.js'; dotenv.config(); @@ -90,6 +101,22 @@ setInterval(() => { // Initialize chatbot module initChatbot(); +// Initialize task manager with Telegram notification callback +initTaskManager(async (targetChatId: string, text: string) => { + const chunks = splitMessage(text); + for (const chunk of chunks) { + try { + await bot.telegram.sendMessage(targetChatId, chunk, { parse_mode: 'Markdown' }); + } catch { + try { + await bot.telegram.sendMessage(targetChatId, chunk); + } catch (err) { + console.error('[TaskManager] Failed to send notification:', err); + } + } + } +}); + app.use(express.json()); // Save chat ID to .env file @@ -138,9 +165,14 @@ bot.command('help', async (ctx) => { await ctx.reply( '*Claude Telegram Bridge - Commands*\n\n' + '*Chatbot:*\n' + - '• Send any message — Claude ответит как чат-бот\n' + + '• @mention или reply — Claude ответит как чат-бот\n' + '`/chatbot` - Статус чат-бота\n' + '`/chatreset` - Сброс диалога\n\n' + + '*Background Tasks:*\n' + + '`/task ` - Создать фоновую задачу (без таймаута)\n' + + '`/task list` - Список задач\n' + + '`/task N` - Статус задачи #N\n' + + '`/task cancel N` - Отменить задачу\n\n' + '*Session Management:*\n' + '`/sessions` - List active Claude sessions\n' + '`/queue` - View queued messages\n\n' + @@ -193,6 +225,72 @@ bot.command('chatreset', async (ctx) => { } }); +bot.command('task', async (ctx) => { + const text = ctx.message.text.replace(/^\/task\s*/, '').trim(); + const userChatId = ctx.chat.id.toString(); + + // /task list + if (text === 'list' || text === '') { + const tasks = await listTasks(); + if (tasks.length === 0) { + await ctx.reply('📭 Нет задач.\n\nСоздать: `/task ваше сообщение`', { parse_mode: 'Markdown' }); + return; + } + const lines = tasks.map(t => formatTaskShort(t)); + await ctx.reply(`*Задачи* (последние ${tasks.length})\n\n${lines.join('\n')}`, { parse_mode: 'Markdown' }); + return; + } + + // /task cancel N + const cancelMatch = text.match(/^cancel\s+(\d+)$/); + if (cancelMatch) { + const id = parseInt(cancelMatch[1]); + const success = await cancelTask(id); + if (success) { + await ctx.reply(`🚫 Задача #${id} отменена.`); + } else { + await ctx.reply(`❌ Не удалось отменить задачу #${id}. Возможно, она уже завершена.`); + } + return; + } + + // /task N — show task details + const idMatch = text.match(/^(\d+)$/); + if (idMatch) { + const id = parseInt(idMatch[1]); + const task = await getTask(id); + if (!task) { + await ctx.reply(`❌ Задача #${id} не найдена.`); + return; + } + const emoji = getTaskStatusEmoji(task.status); + let detail = `${emoji} *Задача #${task.id}*\n\n`; + detail += `*Статус:* ${task.status}\n`; + detail += `*Создана:* ${new Date(task.createdAt).toLocaleString('ru')}\n`; + if (task.startedAt) detail += `*Начата:* ${new Date(task.startedAt).toLocaleString('ru')}\n`; + if (task.completedAt) detail += `*Завершена:* ${new Date(task.completedAt).toLocaleString('ru')}\n`; + if (task.durationMs) detail += `*Длительность:* ${Math.round(task.durationMs / 1000)} сек\n`; + if (task.numTurns) detail += `*Ходы:* ${task.numTurns}\n`; + detail += `\n*Запрос:*\n${task.message.substring(0, 200)}${task.message.length > 200 ? '...' : ''}\n`; + if (task.result) { + const preview = task.result.substring(0, 500); + detail += `\n*Результат:*\n${preview}${task.result.length > 500 ? '...' : ''}`; + } + if (task.error) detail += `\n*Ошибка:* ${task.error}`; + + try { + await ctx.reply(detail, { parse_mode: 'Markdown' }); + } catch { + await ctx.reply(detail); + } + return; + } + + // /task — create new task + const task = await createTask(userChatId, text); + await ctx.reply(`📋 *Задача #${task.id} создана*\n\nВыполняется в фоне. Уведомлю по завершении.`, { parse_mode: 'Markdown' }); +}); + bot.command('sessions', async (ctx) => { const sessions = Array.from(activeSessions.values()); @@ -775,6 +873,55 @@ app.get('/queue/summary', async (req, res) => { } }); +// Task manager endpoints +app.post('/tasks', async (req, res) => { + const { chatId: taskChatId, message } = req.body; + if (!message) { + return res.status(400).json({ error: 'message is required' }); + } + try { + const task = await createTask(taskChatId || chatId || 'http', message); + res.json({ success: true, task }); + } catch (error: any) { + res.status(500).json({ error: error.message }); + } +}); + +app.get('/tasks', async (req, res) => { + try { + const tasks = await listTasks(); + res.json({ tasks, count: tasks.length }); + } catch (error: any) { + res.status(500).json({ error: error.message }); + } +}); + +app.get('/tasks/:id', async (req, res) => { + try { + const task = await getTask(parseInt(req.params.id)); + if (task) { + res.json({ task }); + } else { + res.status(404).json({ error: 'Task not found' }); + } + } catch (error: any) { + res.status(500).json({ error: error.message }); + } +}); + +app.post('/tasks/:id/cancel', async (req, res) => { + try { + const success = await cancelTask(parseInt(req.params.id)); + if (success) { + res.json({ success: true }); + } else { + res.status(400).json({ error: 'Cannot cancel task (already completed or not found)' }); + } + } catch (error: any) { + res.status(500).json({ error: error.message }); + } +}); + // HTTP endpoint for sending notifications app.post('/notify', async (req, res) => { if (!ENABLED) { @@ -1107,11 +1254,13 @@ app.listen(PORT, HOST, () => { // Graceful shutdown process.once('SIGINT', () => { console.log('\n👋 Shutting down...'); + stopTaskManager(); bot.stop('SIGINT'); process.exit(0); }); process.once('SIGTERM', () => { + stopTaskManager(); bot.stop('SIGTERM'); process.exit(0); }); diff --git a/src/task-manager.ts b/src/task-manager.ts new file mode 100644 index 0000000..3c09470 --- /dev/null +++ b/src/task-manager.ts @@ -0,0 +1,347 @@ +import { spawn, ChildProcess } from 'child_process'; +import fs from 'fs/promises'; +import path from 'path'; +import { existsSync } from 'fs'; +import { parseClaudeResponse, splitMessage } from './chatbot.js'; + +// --- Interfaces --- + +export interface Task { + id: number; + chatId: string; + message: string; + status: 'pending' | 'running' | 'completed' | 'failed' | 'cancelled'; + createdAt: string; + startedAt?: string; + completedAt?: string; + result?: string; + error?: string; + sessionId?: string; + numTurns?: number; + costUsd?: number; + durationMs?: number; +} + +interface TaskStore { + nextId: number; + tasks: Task[]; +} + +// --- Configuration --- + +const TASK_MAX_TURNS = () => process.env.TASK_MAX_TURNS || '50'; +const TASK_CWD = () => process.env.TASK_CWD || process.env.CHATBOT_CWD || '/home/fitcrm'; +const TASKS_FILE = path.join(process.env.HOME || '~', '.innervoice', 'tasks.json'); +const MAX_STORED_TASKS = 50; + +// --- State --- + +let workerTimer: NodeJS.Timeout | null = null; +let currentProcess: ChildProcess | null = null; +let sendNotification: ((chatId: string, text: string) => Promise) | null = null; + +// --- Persistence --- + +async function ensureDir(): Promise { + const dir = path.dirname(TASKS_FILE); + if (!existsSync(dir)) { + await fs.mkdir(dir, { recursive: true }); + } +} + +async function loadStore(): Promise { + await ensureDir(); + if (!existsSync(TASKS_FILE)) { + return { nextId: 1, tasks: [] }; + } + try { + const content = await fs.readFile(TASKS_FILE, 'utf-8'); + return JSON.parse(content); + } catch { + return { nextId: 1, tasks: [] }; + } +} + +async function saveStore(store: TaskStore): Promise { + await ensureDir(); + // Keep only last MAX_STORED_TASKS + if (store.tasks.length > MAX_STORED_TASKS) { + const active = store.tasks.filter(t => t.status === 'pending' || t.status === 'running'); + const finished = store.tasks + .filter(t => t.status !== 'pending' && t.status !== 'running') + .slice(-MAX_STORED_TASKS); + store.tasks = [...active, ...finished].slice(-MAX_STORED_TASKS); + } + await fs.writeFile(TASKS_FILE, JSON.stringify(store, null, 2)); +} + +// --- Public API --- + +export function initTaskManager( + notifyFn: (chatId: string, text: string) => Promise +): void { + sendNotification = notifyFn; + + // Recovery: mark stuck 'running' tasks as failed + loadStore().then(async (store) => { + let changed = false; + for (const task of store.tasks) { + if (task.status === 'running') { + task.status = 'failed'; + task.error = 'Process restarted'; + task.completedAt = new Date().toISOString(); + changed = true; + console.log(`[TaskManager] Recovered stuck task #${task.id}`); + } + } + if (changed) await saveStore(store); + }); + + // Start worker loop + if (workerTimer) clearInterval(workerTimer); + workerTimer = setInterval(workerLoop, 5000); + + console.log(`[TaskManager] Initialized (maxTurns: ${TASK_MAX_TURNS()}, cwd: ${TASK_CWD()})`); +} + +export async function createTask(chatId: string, message: string): Promise { + const store = await loadStore(); + const task: Task = { + id: store.nextId++, + chatId, + message, + status: 'pending', + createdAt: new Date().toISOString(), + }; + store.tasks.push(task); + await saveStore(store); + console.log(`[TaskManager] Created task #${task.id}: "${message.substring(0, 50)}..."`); + return task; +} + +export async function getTask(id: number): Promise { + const store = await loadStore(); + return store.tasks.find(t => t.id === id); +} + +export async function listTasks(): Promise { + const store = await loadStore(); + return store.tasks.slice(-10).reverse(); +} + +export async function cancelTask(id: number): Promise { + const store = await loadStore(); + const task = store.tasks.find(t => t.id === id); + if (!task) return false; + + if (task.status === 'pending') { + task.status = 'cancelled'; + task.completedAt = new Date().toISOString(); + await saveStore(store); + return true; + } + + if (task.status === 'running' && currentProcess) { + currentProcess.kill('SIGTERM'); + setTimeout(() => { currentProcess?.kill('SIGKILL'); }, 5000); + task.status = 'cancelled'; + task.completedAt = new Date().toISOString(); + await saveStore(store); + return true; + } + + return false; +} + +export function stopTaskManager(): void { + if (workerTimer) { + clearInterval(workerTimer); + workerTimer = null; + } + if (currentProcess) { + currentProcess.kill('SIGTERM'); + currentProcess = null; + } + console.log('[TaskManager] Stopped'); +} + +// --- Worker --- + +let workerBusy = false; + +async function workerLoop(): Promise { + if (workerBusy) return; + + const store = await loadStore(); + const pending = store.tasks.find(t => t.status === 'pending'); + if (!pending) return; + + workerBusy = true; + console.log(`[TaskManager] Starting task #${pending.id}: "${pending.message.substring(0, 50)}..."`); + + // Mark as running + pending.status = 'running'; + pending.startedAt = new Date().toISOString(); + await saveStore(store); + + try { + const response = await executeTaskCli(pending.message); + + // Update task + const freshStore = await loadStore(); + const task = freshStore.tasks.find(t => t.id === pending.id); + if (task) { + task.status = 'completed'; + task.completedAt = new Date().toISOString(); + task.sessionId = response.session_id; + task.numTurns = response.num_turns; + task.costUsd = response.cost_usd; + task.durationMs = response.duration_ms; + + let resultText = response.result?.trim(); + if (!resultText) { + if (response.subtype === 'error_max_turns') { + resultText = `Выполнил ${response.num_turns} действий, но не уложился в лимит ходов.`; + } else { + resultText = 'Задача выполнена (без текстового ответа).'; + } + } + task.result = resultText; + await saveStore(freshStore); + + // Notify + const duration = formatDuration(task.durationMs || 0); + const header = `*Задача #${task.id} завершена* (${duration})`; + const body = resultText.length > 3500 ? resultText.substring(0, 3500) + '...' : resultText; + await notify(task.chatId, `${header}\n\n${body}`); + } + } catch (error: any) { + const freshStore = await loadStore(); + const task = freshStore.tasks.find(t => t.id === pending.id); + if (task && task.status === 'running') { + task.status = 'failed'; + task.completedAt = new Date().toISOString(); + task.error = error.message; + await saveStore(freshStore); + + await notify(task.chatId, `*Задача #${task.id} провалилась*\n\n${error.message}`); + } + } finally { + workerBusy = false; + currentProcess = null; + } +} + +// --- CLI Execution --- + +function executeTaskCli(message: string): Promise { + return new Promise((resolve, reject) => { + const args: string[] = [ + '-p', message, + '--output-format', 'json', + '--max-turns', TASK_MAX_TURNS(), + ]; + + const env = { ...process.env }; + delete env.CLAUDECODE; + delete env.CLAUDE_CODE_ENTRYPOINT; + delete env.CLAUDE_SPAWNED; + delete env.INNERVOICE_SPAWNED; + + console.log(`[TaskManager] Spawning: claude ${args.map((a, i) => i === 1 ? `"${a.substring(0, 40)}..."` : a).join(' ')}`); + + const child = spawn('claude', args, { + cwd: TASK_CWD(), + env, + stdio: ['ignore', 'pipe', 'pipe'], + }); + + currentProcess = child; + let stdout = ''; + let stderr = ''; + + child.stdout.on('data', (data: Buffer) => { + stdout += data.toString(); + }); + + child.stderr.on('data', (data: Buffer) => { + stderr += data.toString(); + }); + + // No timeout — tasks run until completion + + child.on('error', (error) => { + console.error(`[TaskManager] Spawn error: ${error.message}`); + reject(new Error(`Failed to start claude: ${error.message}`)); + }); + + child.on('close', (code) => { + currentProcess = null; + console.log(`[TaskManager] Process exited with code ${code}`); + + if (stdout.trim()) { + try { + const parsed = parseClaudeResponse(stdout); + if (parsed.is_error) { + reject(new Error(parsed.result || 'Claude returned an error')); + } else { + resolve(parsed); + } + return; + } catch (parseErr: any) { + console.error(`[TaskManager] Parse error: ${parseErr.message}`); + } + } + + if (code !== 0) { + const errDetail = stderr.trim() || stdout.trim() || `exit code ${code}`; + reject(new Error(`Claude CLI failed: ${errDetail.substring(0, 200)}`)); + } else { + reject(new Error('Empty response from Claude CLI')); + } + }); + }); +} + +// --- Helpers --- + +function formatDuration(ms: number): string { + if (ms < 1000) return `${ms}ms`; + const sec = Math.floor(ms / 1000); + if (sec < 60) return `${sec} сек`; + const min = Math.floor(sec / 60); + const remainSec = sec % 60; + return `${min} мин ${remainSec} сек`; +} + +async function notify(chatId: string, text: string): Promise { + if (!sendNotification) return; + try { + await sendNotification(chatId, text); + } catch (error) { + console.error('[TaskManager] Notification failed:', error); + } +} + +export function getTaskStatusEmoji(status: Task['status']): string { + switch (status) { + case 'pending': return '⏳'; + case 'running': return '🔄'; + case 'completed': return '✅'; + case 'failed': return '❌'; + case 'cancelled': return '🚫'; + } +} + +export function formatTaskShort(task: Task): string { + const emoji = getTaskStatusEmoji(task.status); + const msg = task.message.length > 40 ? task.message.substring(0, 40) + '...' : task.message; + let line = `${emoji} #${task.id} ${msg}`; + if (task.status === 'completed' && task.durationMs) { + line += ` (${formatDuration(task.durationMs)})`; + } + if (task.status === 'failed' && task.error) { + line += ` — ${task.error.substring(0, 50)}`; + } + return line; +}