diff --git a/src/chatbot.ts b/src/chatbot.ts new file mode 100644 index 0000000..2b3fb9d --- /dev/null +++ b/src/chatbot.ts @@ -0,0 +1,306 @@ +import { spawn } from 'child_process'; + +// --- Interfaces --- + +interface ChatbotSession { + sessionId: string; + lastActivity: number; + messageCount: number; +} + +interface ClaudeJsonResponse { + type: string; + subtype: string; + cost_usd: number; + is_error: boolean; + duration_ms: number; + num_turns: number; + result: string; + session_id: string; +} + +// --- Configuration --- + +const CHATBOT_ENABLED = () => process.env.CHATBOT_ENABLED !== 'false'; +const CHATBOT_CWD = () => process.env.CHATBOT_CWD || '/home/fitcrm'; +const CHATBOT_MAX_TURNS = () => process.env.CHATBOT_MAX_TURNS || '3'; +const CHATBOT_TIMEOUT = () => parseInt(process.env.CHATBOT_TIMEOUT || '120000'); +const CHATBOT_SESSION_TIMEOUT = () => parseInt(process.env.CHATBOT_SESSION_TIMEOUT || '1800000'); + +// --- State --- + +const chatbotSessions = new Map(); +const activeLocks = new Set(); +let cleanupTimer: NodeJS.Timeout | null = null; + +// --- Public API --- + +export function isChatbotEnabled(): boolean { + return CHATBOT_ENABLED(); +} + +export function initChatbot(): void { + if (cleanupTimer) clearInterval(cleanupTimer); + + cleanupTimer = setInterval(() => { + const now = Date.now(); + const timeout = CHATBOT_SESSION_TIMEOUT(); + for (const [chatId, session] of chatbotSessions.entries()) { + if (now - session.lastActivity > timeout) { + chatbotSessions.delete(chatId); + console.log(`[Chatbot] Expired session for chat ${chatId}`); + } + } + }, 5 * 60 * 1000); + + console.log(`[Chatbot] Initialized (enabled: ${CHATBOT_ENABLED()}, cwd: ${CHATBOT_CWD()}, maxTurns: ${CHATBOT_MAX_TURNS()})`); +} + +export async function handleChatbotMessage( + chatId: string, + message: string, + sendTyping: () => Promise, + sendReply: (text: string, parseMode?: string) => Promise +): Promise { + if (activeLocks.has(chatId)) { + await sendReply('⏳ Подождите, обрабатываю предыдущее сообщение...'); + return; + } + + activeLocks.add(chatId); + + const typingInterval = setInterval(async () => { + try { await sendTyping(); } catch { /* ignore */ } + }, 4000); + + try { + await sendTyping(); + + const session = chatbotSessions.get(chatId); + const isExpired = session && (Date.now() - session.lastActivity > CHATBOT_SESSION_TIMEOUT()); + const resumeId = session && !isExpired ? session.sessionId : undefined; + + const response = await executeWithRetry(chatId, message, resumeId); + + chatbotSessions.set(chatId, { + sessionId: response.session_id, + lastActivity: Date.now(), + messageCount: (session && !isExpired ? session.messageCount : 0) + 1, + }); + + const resultText = response.result || '(пустой ответ)'; + const chunks = splitMessage(resultText); + + for (const chunk of chunks) { + await safeSendReply(sendReply, chunk); + } + } catch (error: any) { + console.error('[Chatbot] Error:', error.message); + await safeSendReply(sendReply, `❌ Ошибка: ${error.message}`); + } finally { + clearInterval(typingInterval); + activeLocks.delete(chatId); + } +} + +export function resetChatbotSession(chatId: string): boolean { + return chatbotSessions.delete(chatId); +} + +export function getChatbotStatus(chatId: string): { + enabled: boolean; + hasSession: boolean; + messageCount: number; + sessionAge: number | null; +} { + const session = chatbotSessions.get(chatId); + return { + enabled: CHATBOT_ENABLED(), + hasSession: !!session, + messageCount: session?.messageCount || 0, + sessionAge: session ? Math.floor((Date.now() - session.lastActivity) / 60000) : null, + }; +} + +// --- Internal --- + +function executeClaudeCli(message: string, resumeSessionId?: string): Promise { + return new Promise((resolve, reject) => { + const args: string[] = [ + '-p', message, + '--output-format', 'json', + '--max-turns', CHATBOT_MAX_TURNS(), + ]; + + if (resumeSessionId) { + args.push('--resume', resumeSessionId); + } + + // Clean env: remove all Claude Code session vars + const env = { ...process.env }; + delete env.CLAUDECODE; + delete env.CLAUDE_CODE_ENTRYPOINT; + delete env.CLAUDE_SPAWNED; + delete env.INNERVOICE_SPAWNED; + + console.log(`[Chatbot] Spawning: claude ${args.map((a, i) => i === 1 ? `"${a.substring(0, 40)}..."` : a).join(' ')}`); + + const child = spawn('claude', args, { + cwd: CHATBOT_CWD(), + env, + stdio: ['ignore', 'pipe', 'pipe'], + }); + + let stdout = ''; + let stderr = ''; + + child.stdout.on('data', (data: Buffer) => { + stdout += data.toString(); + }); + + child.stderr.on('data', (data: Buffer) => { + stderr += data.toString(); + }); + + // Timeout handler + const timer = setTimeout(() => { + console.error(`[Chatbot] Timeout after ${CHATBOT_TIMEOUT()}ms, killing process`); + child.kill('SIGTERM'); + setTimeout(() => child.kill('SIGKILL'), 5000); + }, CHATBOT_TIMEOUT()); + + child.on('error', (error) => { + clearTimeout(timer); + console.error(`[Chatbot] Spawn error: ${error.message}`); + reject(new Error(`Failed to start claude: ${error.message}`)); + }); + + child.on('close', (code) => { + clearTimeout(timer); + + console.log(`[Chatbot] Process exited with code ${code}`); + if (stderr.trim()) { + console.error(`[Chatbot] stderr: ${stderr.substring(0, 500)}`); + } + if (stdout.trim()) { + console.log(`[Chatbot] stdout (first 300): ${stdout.substring(0, 300)}`); + } else { + console.log(`[Chatbot] stdout: (empty)`); + } + + // Try to parse response regardless of exit code + if (stdout.trim()) { + try { + const parsed = parseClaudeResponse(stdout); + if (parsed.is_error) { + reject(new Error(parsed.result || 'Claude returned an error')); + } else { + resolve(parsed); + } + return; + } catch (parseErr: any) { + console.error(`[Chatbot] Parse error: ${parseErr.message}`); + } + } + + if (code !== 0) { + const errDetail = stderr.trim() || stdout.trim() || `exit code ${code}`; + reject(new Error(`Claude CLI failed: ${errDetail.substring(0, 200)}`)); + } else { + reject(new Error('Empty response from Claude CLI')); + } + }); + }); +} + +async function executeWithRetry( + chatId: string, + message: string, + resumeId?: string +): Promise { + try { + return await executeClaudeCli(message, resumeId); + } catch (error) { + if (resumeId) { + console.log(`[Chatbot] Resume failed for ${chatId}, retrying fresh`); + chatbotSessions.delete(chatId); + return await executeClaudeCli(message, undefined); + } + throw error; + } +} + +function parseClaudeResponse(stdout: string): ClaudeJsonResponse { + const trimmed = stdout.trim(); + + // Try the whole output as a single JSON + try { + return JSON.parse(trimmed); + } catch { /* continue */ } + + // Try each line (may have streaming JSON lines) + const lines = trimmed.split('\n'); + for (let i = lines.length - 1; i >= 0; i--) { + const line = lines[i].trim(); + if (!line) continue; + try { + const parsed = JSON.parse(line); + if (parsed.type === 'result') return parsed; + } catch { continue; } + } + + // Last resort: find result JSON anywhere in output + const match = trimmed.match(/\{[^]*?"type"\s*:\s*"result"[^]*?\}/); + if (match) { + try { + return JSON.parse(match[0]); + } catch { /* continue */ } + } + + throw new Error(`No valid JSON result in output (${trimmed.substring(0, 150)})`); +} + +function splitMessage(text: string, maxLength: number = 4000): string[] { + if (text.length <= maxLength) return [text]; + + const chunks: string[] = []; + let remaining = text; + + while (remaining.length > 0) { + if (remaining.length <= maxLength) { + chunks.push(remaining); + break; + } + + let splitIdx = remaining.lastIndexOf('\n\n', maxLength); + if (splitIdx === -1 || splitIdx < maxLength * 0.3) { + splitIdx = remaining.lastIndexOf('\n', maxLength); + } + if (splitIdx === -1 || splitIdx < maxLength * 0.3) { + splitIdx = remaining.lastIndexOf(' ', maxLength); + } + if (splitIdx === -1 || splitIdx < maxLength * 0.3) { + splitIdx = maxLength; + } + + chunks.push(remaining.substring(0, splitIdx)); + remaining = remaining.substring(splitIdx).trimStart(); + } + + return chunks.slice(0, 5); +} + +async function safeSendReply( + sendReply: (text: string, parseMode?: string) => Promise, + text: string +): Promise { + try { + await sendReply(text, 'Markdown'); + } catch { + try { + await sendReply(text); + } catch (innerError) { + console.error('[Chatbot] Failed to send reply:', innerError); + } + } +} diff --git a/src/index.ts b/src/index.ts index db4d965..6be0d46 100644 --- a/src/index.ts +++ b/src/index.ts @@ -23,10 +23,19 @@ import { loadProjects, validateProjectPath } from './project-registry.js'; +import { + handleChatbotMessage, + initChatbot, + isChatbotEnabled, + resetChatbotSession, + getChatbotStatus +} from './chatbot.js'; dotenv.config(); -const bot = new Telegraf(process.env.TELEGRAM_BOT_TOKEN!); +const bot = new Telegraf(process.env.TELEGRAM_BOT_TOKEN!, { + handlerTimeout: 300_000, // 5 min for chatbot responses +}); const app = express(); const PORT = parseInt(process.env.PORT || '3456'); const HOST = process.env.HOST || 'localhost'; @@ -78,6 +87,9 @@ setInterval(() => { } }, 5 * 60 * 1000); // Check every 5 minutes +// Initialize chatbot module +initChatbot(); + app.use(express.json()); // Save chat ID to .env file @@ -125,6 +137,10 @@ bot.command('status', async (ctx) => { bot.command('help', async (ctx) => { await ctx.reply( '*Claude Telegram Bridge - Commands*\n\n' + + '*Chatbot:*\n' + + '• Send any message — Claude ответит как чат-бот\n' + + '`/chatbot` - Статус чат-бота\n' + + '`/chatreset` - Сброс диалога\n\n' + '*Session Management:*\n' + '`/sessions` - List active Claude sessions\n' + '`/queue` - View queued messages\n\n' + @@ -139,12 +155,11 @@ bot.command('help', async (ctx) => { '`/status` - Check bridge status\n' + '`/test` - Send test notification\n\n' + '*How it works:*\n' + - '• Send any message - forwards to active Claude\n' + + '• Send any message — chatbot responds via Claude CLI\n' + '• Target specific project: `ProjectName: message`\n' + '• Messages show context: 📁 ProjectName [#abc1234]\n' + '• Register projects for remote spawning\n' + - '• Messages queue when projects are offline\n\n' + - 'More info: See README in bridge folder', + '• Messages queue when projects are offline', { parse_mode: 'Markdown' } ); }); @@ -153,6 +168,31 @@ bot.command('test', async (ctx) => { await ctx.reply('✅ Test notification received! Bridge is working.'); }); +bot.command('chatbot', async (ctx) => { + const status = getChatbotStatus(ctx.chat.id.toString()); + const enabledText = status.enabled ? '✅ Включен' : '⛔ Выключен'; + const sessionText = status.hasSession + ? `🗣 Активная сессия: ${status.messageCount} сообщений, последнее ${status.sessionAge} мин назад` + : '📭 Нет активной сессии'; + + await ctx.reply( + `*Chatbot Status*\n\n` + + `${enabledText}\n` + + `${sessionText}\n\n` + + `Сброс диалога: /chatreset`, + { parse_mode: 'Markdown' } + ); +}); + +bot.command('chatreset', async (ctx) => { + const deleted = resetChatbotSession(ctx.chat.id.toString()); + if (deleted) { + await ctx.reply('🔄 Диалог сброшен. Следующее сообщение начнёт новую сессию.'); + } else { + await ctx.reply('📭 Нет активной сессии для сброса.'); + } +}); + bot.command('sessions', async (ctx) => { const sessions = Array.from(activeSessions.values()); @@ -533,24 +573,52 @@ bot.on('text', async (ctx) => { // No project specified - check if we should auto-route to last session if (lastMessageSession && activeSessions.has(lastMessageSession)) { const session = activeSessions.get(lastMessageSession)!; - messageQueue.push({ - from, + const claudeRunning = isClaudeRunning(session.projectName); + + if (claudeRunning) { + messageQueue.push({ + from, + message, + timestamp: new Date(), + read: false, + sessionId: lastMessageSession + }); + await ctx.reply(`💬 Auto-routed to: 📁 *${session.projectName}* [#${lastMessageSession.substring(0, 7)}]`, { parse_mode: 'Markdown' }); + console.log(`📥 Auto-routed to ${session.projectName}`); + return; + } + + // Claude not actually running — clean up stale session + console.log(`[CLEANUP] Removing stale lastMessageSession for ${session.projectName}`); + activeSessions.delete(lastMessageSession); + lastMessageSession = null; + } + + // No active project session — use chatbot if enabled + if (isChatbotEnabled()) { + const userChatId = ctx.chat.id.toString(); + console.log(`🤖 Chatbot handling message from ${from}: "${message.substring(0, 50)}..."`); + + // Fire-and-forget: don't block Telegraf's handler + handleChatbotMessage( + userChatId, message, - timestamp: new Date(), - read: false, - sessionId: lastMessageSession - }); - await ctx.reply(`💬 Auto-routed to: 📁 *${session.projectName}* [#${lastMessageSession.substring(0, 7)}]`, { parse_mode: 'Markdown' }); - console.log(`📥 Auto-routed to ${session.projectName}`); + async () => { + await ctx.sendChatAction('typing'); + }, + async (text: string, parseMode?: string) => { + await ctx.reply(text, parseMode ? { parse_mode: parseMode as any } : {}); + } + ).catch(err => console.error('[Chatbot] Async error:', err)); } else { - // No recent session - add to general message queue + // Fallback: queue the message (original behavior) messageQueue.push({ from, message, timestamp: new Date(), read: false }); - await ctx.reply('💬 Message received - responding...'); + await ctx.reply('💬 Message received - queued for processing.'); console.log('📥 Queued for Claude to process'); } });