diff --git a/services/recorder/src/bot.js b/services/recorder/src/bot.js new file mode 100644 index 0000000..bbba7fe --- /dev/null +++ b/services/recorder/src/bot.js @@ -0,0 +1,473 @@ +import { Client, GatewayIntentBits, Partials, REST, Routes } from 'discord.js'; +import { joinVoiceChannel, getVoiceConnection, EndBehaviorType } from '@discordjs/voice'; +import { createClient } from 'redis'; +import { Client as PgClient } from 'pg'; +import prism from 'prism-media'; +import fs from 'fs'; +import path from 'path'; +import { EventEmitter } from 'events'; +import { v4 as uuidv4 } from 'uuid'; +import winston from 'winston'; + +// Increase max listeners for audio streams +EventEmitter.defaultMaxListeners = 50; + +// Environment variables +const TOKEN = process.env.DISCORD_TOKEN; +const CLIENT_ID = process.env.CLIENT_ID; +const GUILD_ID = process.env.GUILD_ID; +const POSTGRES_URL = process.env.POSTGRES_URL; +const REDIS_URL = process.env.REDIS_URL || 'redis://redis:6379'; + +// Logging configuration +const logger = winston.createLogger({ + level: process.env.LOG_LEVEL || 'info', + format: winston.format.combine( + winston.format.timestamp(), + winston.format.errors({ stack: true }), + winston.format.json() + ), + transports: [ + new winston.transports.Console(), + new winston.transports.File({ filename: '/app/logs/recorder-error.log', level: 'error' }), + new winston.transports.File({ filename: '/app/logs/recorder.log' }) + ] +}); + +// Database client +const pgClient = new PgClient({ connectionString: POSTGRES_URL }); + +// Redis client +const redisClient = createClient({ url: REDIS_URL }); + +// Discord client +const client = new Client({ + intents: [ + GatewayIntentBits.Guilds, + GatewayIntentBits.GuildVoiceStates, + GatewayIntentBits.GuildMembers + ], + partials: [Partials.Channel], +}); + +// State management +const recordingSessions = new Map(); +let currentConnectionId = null; + +// Initialize connections +async function initialize() { + try { + await pgClient.connect(); + await redisClient.connect(); + logger.info('Connected to PostgreSQL and Redis'); + } catch (error) { + logger.error('Failed to initialize connections:', error); + process.exit(1); + } +} + +// Register slash commands +async function registerCommands() { + const rest = new REST({ version: '10' }).setToken(TOKEN); + + try { + const commands = [ + { + name: 'join', + description: 'Join your voice channel and start recording' + }, + { + name: 'leave', + description: 'Leave the voice channel and stop recording' + }, + { + name: 'status', + description: 'Show current recording status' + } + ]; + + await rest.put( + Routes.applicationGuildCommands(CLIENT_ID, GUILD_ID), + { body: commands } + ); + + logger.info('Successfully registered slash commands'); + } catch (error) { + logger.error('Failed to register slash commands:', error); + } +} + +// Create database connection record +async function createConnection(channel, inviter) { + try { + const query = ` + INSERT INTO connections (guild_id, guild_name, channel_id, channel_name, inviter_id, inviter_name) + VALUES ($1, $2, $3, $4, $5, $6) + RETURNING id, uuid + `; + + const values = [ + channel.guild.id, + channel.guild.name, + channel.id, + channel.name, + inviter.id, + inviter.username + ]; + + const result = await pgClient.query(query, values); + const connection = result.rows[0]; + + logger.info(`Created connection record:`, { + connectionId: connection.id, + channelName: channel.name, + inviterName: inviter.username + }); + + return connection; + } catch (error) { + logger.error('Failed to create connection record:', error); + throw error; + } +} + +// Create recording record +async function createRecording(userId, nickname, username, connectionId) { + try { + const recordingUuid = uuidv4(); + const fileName = `${recordingUuid}.pcm`; + const filePath = `/app/audio/raw/${fileName}`; + + const query = ` + INSERT INTO recordings (uuid, connection_id, speaker_id, speaker_nickname, speaker_username, file_name, file_path, status) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + RETURNING id, uuid + `; + + const values = [ + recordingUuid, + connectionId, + userId, + nickname, + username, + fileName, + filePath, + 'recording' + ]; + + const result = await pgClient.query(query, values); + const recording = result.rows[0]; + + logger.info(`Created recording record:`, { + recordingId: recording.id, + speakerNickname: nickname, + fileName + }); + + return { ...recording, fileName, filePath }; + } catch (error) { + logger.error('Failed to create recording record:', error); + throw error; + } +} + +// Update recording when finished +async function finishRecording(recordingId, endTime, duration) { + try { + const query = ` + UPDATE recordings + SET ended_at = $1, duration_seconds = $2, status = $3, updated_at = CURRENT_TIMESTAMP + WHERE id = $4 + `; + + await pgClient.query(query, [endTime, duration, 'queued', recordingId]); + + logger.info(`Finished recording:`, { recordingId, duration }); + } catch (error) { + logger.error('Failed to update recording record:', error); + } +} + +// Publish message to Redis +async function publishMessage(event, data) { + try { + const message = JSON.stringify({ + event, + data, + timestamp: new Date().toISOString(), + service: 'recorder' + }); + + await redisClient.publish('voice-translator-events', message); + logger.debug(`Published event: ${event}`, data); + } catch (error) { + logger.error('Failed to publish message:', error); + } +} + +// Start recording for a user +async function startUserRecording(userId, member, channel) { + try { + if (recordingSessions.has(userId)) { + logger.warn(`User ${userId} is already being recorded`); + return; + } + + const nickname = member.nickname || member.user.username; + const username = member.user.username; + + // Create recording record + const recording = await createRecording(userId, nickname, username, currentConnectionId); + + // Set up audio streams + const connection = getVoiceConnection(channel.guild.id); + const audioStream = connection.receiver.subscribe(userId, { + end: { behavior: EndBehaviorType.AfterSilence, duration: 3000 } + }); + + // Create output file stream + const outputStream = fs.createWriteStream(recording.filePath); + const pcmStream = new prism.opus.Decoder({ channels: 2, rate: 48000, frameSize: 960 }); + + // Pipe audio data + audioStream.pipe(pcmStream).pipe(outputStream); + + // Track recording session + const startTime = new Date(); + recordingSessions.set(userId, { + recordingId: recording.id, + nickname, + audioStream, + outputStream, + startTime, + filePath: recording.filePath + }); + + logger.info(`Started recording for ${nickname} (${userId})`); + + // Handle recording end + audioStream.once('end', async () => { + try { + const session = recordingSessions.get(userId); + if (!session) return; + + const endTime = new Date(); + const duration = (endTime - session.startTime) / 1000; // seconds + + // Close streams + outputStream.end(); + + // Update database + await finishRecording(session.recordingId, endTime, duration); + + // Publish to message queue + await publishMessage('recording_completed', { + recordingId: session.recordingId, + userId, + nickname: session.nickname, + filePath: session.filePath, + duration, + startTime: session.startTime, + endTime + }); + + // Clean up + recordingSessions.delete(userId); + + logger.info(`Recording completed for ${session.nickname}: ${duration}s`); + } catch (error) { + logger.error(`Error finishing recording for user ${userId}:`, error); + } + }); + + } catch (error) { + logger.error(`Failed to start recording for user ${userId}:`, error); + } +} + +// Start recording session +async function startRecording(channel, inviter) { + try { + // Join voice channel + const connection = joinVoiceChannel({ + channelId: channel.id, + guildId: channel.guild.id, + adapterCreator: channel.guild.voiceAdapterCreator, + selfDeaf: false, + selfMute: true, + }); + + // Create connection record + const connectionRecord = await createConnection(channel, inviter); + currentConnectionId = connectionRecord.id; + + // Publish connection started event + await publishMessage('connection_started', { + connectionId: connectionRecord.id, + channelId: channel.id, + channelName: channel.name, + guildId: channel.guild.id, + guildName: channel.guild.name + }); + + // Set up speaking detection + connection.receiver.speaking.on('start', async (userId) => { + try { + const member = channel.guild.members.cache.get(userId); + if (!member || member.user.bot) return; // Skip bots + + await startUserRecording(userId, member, channel); + } catch (error) { + logger.error(`Error handling speaking start for user ${userId}:`, error); + } + }); + + logger.info(`Started recording session in ${channel.name} (${channel.guild.name})`); + + } catch (error) { + logger.error('Failed to start recording session:', error); + throw error; + } +} + +// Stop all recordings +async function stopAllRecordings() { + try { + for (const [userId, session] of recordingSessions.entries()) { + try { + // Force end the streams + if (session.audioStream && !session.audioStream.destroyed) { + session.audioStream.destroy(); + } + if (session.outputStream && !session.outputStream.destroyed) { + session.outputStream.end(); + } + + // Update database record + const endTime = new Date(); + const duration = (endTime - session.startTime) / 1000; + await finishRecording(session.recordingId, endTime, duration); + + logger.info(`Force-stopped recording for ${session.nickname}`); + } catch (error) { + logger.error(`Error stopping recording for user ${userId}:`, error); + } + } + + recordingSessions.clear(); + + // Update connection record + if (currentConnectionId) { + await pgClient.query( + 'UPDATE connections SET ended_at = CURRENT_TIMESTAMP WHERE id = $1', + [currentConnectionId] + ); + + await publishMessage('connection_ended', { + connectionId: currentConnectionId + }); + + currentConnectionId = null; + } + + } catch (error) { + logger.error('Error stopping recordings:', error); + } +} + +// Discord event handlers +client.once('ready', async () => { + logger.info(`Bot logged in as ${client.user.tag}`); + await registerCommands(); +}); + +client.on('interactionCreate', async (interaction) => { + if (!interaction.isCommand()) return; + + const { commandName } = interaction; + + try { + if (commandName === 'join') { + const member = interaction.guild.members.cache.get(interaction.user.id); + if (!member || !member.voice.channel) { + await interaction.reply('❗ You need to join a voice channel first.'); + return; + } + + // Check if already connected + const existingConnection = getVoiceConnection(interaction.guild.id); + if (existingConnection) { + await interaction.reply('🔴 Bot is already recording in a voice channel. Use `/leave` first.'); + return; + } + + await startRecording(member.voice.channel, interaction.user); + await interaction.reply(`🟢 Joined **${member.voice.channel.name}** and started recording.`); + + } else if (commandName === 'leave') { + const connection = getVoiceConnection(interaction.guild.id); + if (!connection) { + await interaction.reply('❗ Bot is not currently in a voice channel.'); + return; + } + + await stopAllRecordings(); + connection.destroy(); + + await interaction.reply('🔻 Left the voice channel and stopped all recordings.'); + + } else if (commandName === 'status') { + const connection = getVoiceConnection(interaction.guild.id); + const activeRecordings = recordingSessions.size; + + if (!connection) { + await interaction.reply('📊 **Status**: Not connected to any voice channel'); + } else { + await interaction.reply( + `📊 **Status**: Recording in voice channel\n` + + `🎤 **Active recordings**: ${activeRecordings} users\n` + + `⏱️ **Session started**: ` + ); + } + } + + } catch (error) { + logger.error(`Error handling command ${commandName}:`, error); + await interaction.reply('❌ An error occurred while processing your command.'); + } +}); + +// Error handling +client.on('error', (error) => { + logger.error('Discord client error:', error); +}); + +process.on('unhandledRejection', (reason, promise) => { + logger.error('Unhandled Rejection at:', promise, 'reason:', reason); +}); + +process.on('uncaughtException', (error) => { + logger.error('Uncaught Exception:', error); + process.exit(1); +}); + +// Graceful shutdown +process.on('SIGINT', async () => { + logger.info('Received SIGINT, shutting down gracefully...'); + await stopAllRecordings(); + await pgClient.end(); + await redisClient.quit(); + client.destroy(); + process.exit(0); +}); + +// Start the application +async function start() { + await initialize(); + await client.login(TOKEN); +} + +start().catch((error) => { + logger.error('Failed to start application:', error); + process.exit(1); +});