Create complete Recorder service bot.js implementation
This commit is contained in:
473
services/recorder/src/bot.js
Normal file
473
services/recorder/src/bot.js
Normal file
@ -0,0 +1,473 @@
|
||||
import { Client, GatewayIntentBits, Partials, REST, Routes } from 'discord.js';
|
||||
import { joinVoiceChannel, getVoiceConnection, EndBehaviorType } from '@discordjs/voice';
|
||||
import { createClient } from 'redis';
|
||||
import { Client as PgClient } from 'pg';
|
||||
import prism from 'prism-media';
|
||||
import fs from 'fs';
|
||||
import path from 'path';
|
||||
import { EventEmitter } from 'events';
|
||||
import { v4 as uuidv4 } from 'uuid';
|
||||
import winston from 'winston';
|
||||
|
||||
// Increase max listeners for audio streams
|
||||
EventEmitter.defaultMaxListeners = 50;
|
||||
|
||||
// Environment variables
|
||||
const TOKEN = process.env.DISCORD_TOKEN;
|
||||
const CLIENT_ID = process.env.CLIENT_ID;
|
||||
const GUILD_ID = process.env.GUILD_ID;
|
||||
const POSTGRES_URL = process.env.POSTGRES_URL;
|
||||
const REDIS_URL = process.env.REDIS_URL || 'redis://redis:6379';
|
||||
|
||||
// Logging configuration
|
||||
const logger = winston.createLogger({
|
||||
level: process.env.LOG_LEVEL || 'info',
|
||||
format: winston.format.combine(
|
||||
winston.format.timestamp(),
|
||||
winston.format.errors({ stack: true }),
|
||||
winston.format.json()
|
||||
),
|
||||
transports: [
|
||||
new winston.transports.Console(),
|
||||
new winston.transports.File({ filename: '/app/logs/recorder-error.log', level: 'error' }),
|
||||
new winston.transports.File({ filename: '/app/logs/recorder.log' })
|
||||
]
|
||||
});
|
||||
|
||||
// Database client
|
||||
const pgClient = new PgClient({ connectionString: POSTGRES_URL });
|
||||
|
||||
// Redis client
|
||||
const redisClient = createClient({ url: REDIS_URL });
|
||||
|
||||
// Discord client
|
||||
const client = new Client({
|
||||
intents: [
|
||||
GatewayIntentBits.Guilds,
|
||||
GatewayIntentBits.GuildVoiceStates,
|
||||
GatewayIntentBits.GuildMembers
|
||||
],
|
||||
partials: [Partials.Channel],
|
||||
});
|
||||
|
||||
// State management
|
||||
const recordingSessions = new Map();
|
||||
let currentConnectionId = null;
|
||||
|
||||
// Initialize connections
|
||||
async function initialize() {
|
||||
try {
|
||||
await pgClient.connect();
|
||||
await redisClient.connect();
|
||||
logger.info('Connected to PostgreSQL and Redis');
|
||||
} catch (error) {
|
||||
logger.error('Failed to initialize connections:', error);
|
||||
process.exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
// Register slash commands
|
||||
async function registerCommands() {
|
||||
const rest = new REST({ version: '10' }).setToken(TOKEN);
|
||||
|
||||
try {
|
||||
const commands = [
|
||||
{
|
||||
name: 'join',
|
||||
description: 'Join your voice channel and start recording'
|
||||
},
|
||||
{
|
||||
name: 'leave',
|
||||
description: 'Leave the voice channel and stop recording'
|
||||
},
|
||||
{
|
||||
name: 'status',
|
||||
description: 'Show current recording status'
|
||||
}
|
||||
];
|
||||
|
||||
await rest.put(
|
||||
Routes.applicationGuildCommands(CLIENT_ID, GUILD_ID),
|
||||
{ body: commands }
|
||||
);
|
||||
|
||||
logger.info('Successfully registered slash commands');
|
||||
} catch (error) {
|
||||
logger.error('Failed to register slash commands:', error);
|
||||
}
|
||||
}
|
||||
|
||||
// Create database connection record
|
||||
async function createConnection(channel, inviter) {
|
||||
try {
|
||||
const query = `
|
||||
INSERT INTO connections (guild_id, guild_name, channel_id, channel_name, inviter_id, inviter_name)
|
||||
VALUES ($1, $2, $3, $4, $5, $6)
|
||||
RETURNING id, uuid
|
||||
`;
|
||||
|
||||
const values = [
|
||||
channel.guild.id,
|
||||
channel.guild.name,
|
||||
channel.id,
|
||||
channel.name,
|
||||
inviter.id,
|
||||
inviter.username
|
||||
];
|
||||
|
||||
const result = await pgClient.query(query, values);
|
||||
const connection = result.rows[0];
|
||||
|
||||
logger.info(`Created connection record:`, {
|
||||
connectionId: connection.id,
|
||||
channelName: channel.name,
|
||||
inviterName: inviter.username
|
||||
});
|
||||
|
||||
return connection;
|
||||
} catch (error) {
|
||||
logger.error('Failed to create connection record:', error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
// Create recording record
|
||||
async function createRecording(userId, nickname, username, connectionId) {
|
||||
try {
|
||||
const recordingUuid = uuidv4();
|
||||
const fileName = `${recordingUuid}.pcm`;
|
||||
const filePath = `/app/audio/raw/${fileName}`;
|
||||
|
||||
const query = `
|
||||
INSERT INTO recordings (uuid, connection_id, speaker_id, speaker_nickname, speaker_username, file_name, file_path, status)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
||||
RETURNING id, uuid
|
||||
`;
|
||||
|
||||
const values = [
|
||||
recordingUuid,
|
||||
connectionId,
|
||||
userId,
|
||||
nickname,
|
||||
username,
|
||||
fileName,
|
||||
filePath,
|
||||
'recording'
|
||||
];
|
||||
|
||||
const result = await pgClient.query(query, values);
|
||||
const recording = result.rows[0];
|
||||
|
||||
logger.info(`Created recording record:`, {
|
||||
recordingId: recording.id,
|
||||
speakerNickname: nickname,
|
||||
fileName
|
||||
});
|
||||
|
||||
return { ...recording, fileName, filePath };
|
||||
} catch (error) {
|
||||
logger.error('Failed to create recording record:', error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
// Update recording when finished
|
||||
async function finishRecording(recordingId, endTime, duration) {
|
||||
try {
|
||||
const query = `
|
||||
UPDATE recordings
|
||||
SET ended_at = $1, duration_seconds = $2, status = $3, updated_at = CURRENT_TIMESTAMP
|
||||
WHERE id = $4
|
||||
`;
|
||||
|
||||
await pgClient.query(query, [endTime, duration, 'queued', recordingId]);
|
||||
|
||||
logger.info(`Finished recording:`, { recordingId, duration });
|
||||
} catch (error) {
|
||||
logger.error('Failed to update recording record:', error);
|
||||
}
|
||||
}
|
||||
|
||||
// Publish message to Redis
|
||||
async function publishMessage(event, data) {
|
||||
try {
|
||||
const message = JSON.stringify({
|
||||
event,
|
||||
data,
|
||||
timestamp: new Date().toISOString(),
|
||||
service: 'recorder'
|
||||
});
|
||||
|
||||
await redisClient.publish('voice-translator-events', message);
|
||||
logger.debug(`Published event: ${event}`, data);
|
||||
} catch (error) {
|
||||
logger.error('Failed to publish message:', error);
|
||||
}
|
||||
}
|
||||
|
||||
// Start recording for a user
|
||||
async function startUserRecording(userId, member, channel) {
|
||||
try {
|
||||
if (recordingSessions.has(userId)) {
|
||||
logger.warn(`User ${userId} is already being recorded`);
|
||||
return;
|
||||
}
|
||||
|
||||
const nickname = member.nickname || member.user.username;
|
||||
const username = member.user.username;
|
||||
|
||||
// Create recording record
|
||||
const recording = await createRecording(userId, nickname, username, currentConnectionId);
|
||||
|
||||
// Set up audio streams
|
||||
const connection = getVoiceConnection(channel.guild.id);
|
||||
const audioStream = connection.receiver.subscribe(userId, {
|
||||
end: { behavior: EndBehaviorType.AfterSilence, duration: 3000 }
|
||||
});
|
||||
|
||||
// Create output file stream
|
||||
const outputStream = fs.createWriteStream(recording.filePath);
|
||||
const pcmStream = new prism.opus.Decoder({ channels: 2, rate: 48000, frameSize: 960 });
|
||||
|
||||
// Pipe audio data
|
||||
audioStream.pipe(pcmStream).pipe(outputStream);
|
||||
|
||||
// Track recording session
|
||||
const startTime = new Date();
|
||||
recordingSessions.set(userId, {
|
||||
recordingId: recording.id,
|
||||
nickname,
|
||||
audioStream,
|
||||
outputStream,
|
||||
startTime,
|
||||
filePath: recording.filePath
|
||||
});
|
||||
|
||||
logger.info(`Started recording for ${nickname} (${userId})`);
|
||||
|
||||
// Handle recording end
|
||||
audioStream.once('end', async () => {
|
||||
try {
|
||||
const session = recordingSessions.get(userId);
|
||||
if (!session) return;
|
||||
|
||||
const endTime = new Date();
|
||||
const duration = (endTime - session.startTime) / 1000; // seconds
|
||||
|
||||
// Close streams
|
||||
outputStream.end();
|
||||
|
||||
// Update database
|
||||
await finishRecording(session.recordingId, endTime, duration);
|
||||
|
||||
// Publish to message queue
|
||||
await publishMessage('recording_completed', {
|
||||
recordingId: session.recordingId,
|
||||
userId,
|
||||
nickname: session.nickname,
|
||||
filePath: session.filePath,
|
||||
duration,
|
||||
startTime: session.startTime,
|
||||
endTime
|
||||
});
|
||||
|
||||
// Clean up
|
||||
recordingSessions.delete(userId);
|
||||
|
||||
logger.info(`Recording completed for ${session.nickname}: ${duration}s`);
|
||||
} catch (error) {
|
||||
logger.error(`Error finishing recording for user ${userId}:`, error);
|
||||
}
|
||||
});
|
||||
|
||||
} catch (error) {
|
||||
logger.error(`Failed to start recording for user ${userId}:`, error);
|
||||
}
|
||||
}
|
||||
|
||||
// Start recording session
|
||||
async function startRecording(channel, inviter) {
|
||||
try {
|
||||
// Join voice channel
|
||||
const connection = joinVoiceChannel({
|
||||
channelId: channel.id,
|
||||
guildId: channel.guild.id,
|
||||
adapterCreator: channel.guild.voiceAdapterCreator,
|
||||
selfDeaf: false,
|
||||
selfMute: true,
|
||||
});
|
||||
|
||||
// Create connection record
|
||||
const connectionRecord = await createConnection(channel, inviter);
|
||||
currentConnectionId = connectionRecord.id;
|
||||
|
||||
// Publish connection started event
|
||||
await publishMessage('connection_started', {
|
||||
connectionId: connectionRecord.id,
|
||||
channelId: channel.id,
|
||||
channelName: channel.name,
|
||||
guildId: channel.guild.id,
|
||||
guildName: channel.guild.name
|
||||
});
|
||||
|
||||
// Set up speaking detection
|
||||
connection.receiver.speaking.on('start', async (userId) => {
|
||||
try {
|
||||
const member = channel.guild.members.cache.get(userId);
|
||||
if (!member || member.user.bot) return; // Skip bots
|
||||
|
||||
await startUserRecording(userId, member, channel);
|
||||
} catch (error) {
|
||||
logger.error(`Error handling speaking start for user ${userId}:`, error);
|
||||
}
|
||||
});
|
||||
|
||||
logger.info(`Started recording session in ${channel.name} (${channel.guild.name})`);
|
||||
|
||||
} catch (error) {
|
||||
logger.error('Failed to start recording session:', error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
// Stop all recordings
|
||||
async function stopAllRecordings() {
|
||||
try {
|
||||
for (const [userId, session] of recordingSessions.entries()) {
|
||||
try {
|
||||
// Force end the streams
|
||||
if (session.audioStream && !session.audioStream.destroyed) {
|
||||
session.audioStream.destroy();
|
||||
}
|
||||
if (session.outputStream && !session.outputStream.destroyed) {
|
||||
session.outputStream.end();
|
||||
}
|
||||
|
||||
// Update database record
|
||||
const endTime = new Date();
|
||||
const duration = (endTime - session.startTime) / 1000;
|
||||
await finishRecording(session.recordingId, endTime, duration);
|
||||
|
||||
logger.info(`Force-stopped recording for ${session.nickname}`);
|
||||
} catch (error) {
|
||||
logger.error(`Error stopping recording for user ${userId}:`, error);
|
||||
}
|
||||
}
|
||||
|
||||
recordingSessions.clear();
|
||||
|
||||
// Update connection record
|
||||
if (currentConnectionId) {
|
||||
await pgClient.query(
|
||||
'UPDATE connections SET ended_at = CURRENT_TIMESTAMP WHERE id = $1',
|
||||
[currentConnectionId]
|
||||
);
|
||||
|
||||
await publishMessage('connection_ended', {
|
||||
connectionId: currentConnectionId
|
||||
});
|
||||
|
||||
currentConnectionId = null;
|
||||
}
|
||||
|
||||
} catch (error) {
|
||||
logger.error('Error stopping recordings:', error);
|
||||
}
|
||||
}
|
||||
|
||||
// Discord event handlers
|
||||
client.once('ready', async () => {
|
||||
logger.info(`Bot logged in as ${client.user.tag}`);
|
||||
await registerCommands();
|
||||
});
|
||||
|
||||
client.on('interactionCreate', async (interaction) => {
|
||||
if (!interaction.isCommand()) return;
|
||||
|
||||
const { commandName } = interaction;
|
||||
|
||||
try {
|
||||
if (commandName === 'join') {
|
||||
const member = interaction.guild.members.cache.get(interaction.user.id);
|
||||
if (!member || !member.voice.channel) {
|
||||
await interaction.reply('❗ You need to join a voice channel first.');
|
||||
return;
|
||||
}
|
||||
|
||||
// Check if already connected
|
||||
const existingConnection = getVoiceConnection(interaction.guild.id);
|
||||
if (existingConnection) {
|
||||
await interaction.reply('🔴 Bot is already recording in a voice channel. Use `/leave` first.');
|
||||
return;
|
||||
}
|
||||
|
||||
await startRecording(member.voice.channel, interaction.user);
|
||||
await interaction.reply(`🟢 Joined **${member.voice.channel.name}** and started recording.`);
|
||||
|
||||
} else if (commandName === 'leave') {
|
||||
const connection = getVoiceConnection(interaction.guild.id);
|
||||
if (!connection) {
|
||||
await interaction.reply('❗ Bot is not currently in a voice channel.');
|
||||
return;
|
||||
}
|
||||
|
||||
await stopAllRecordings();
|
||||
connection.destroy();
|
||||
|
||||
await interaction.reply('🔻 Left the voice channel and stopped all recordings.');
|
||||
|
||||
} else if (commandName === 'status') {
|
||||
const connection = getVoiceConnection(interaction.guild.id);
|
||||
const activeRecordings = recordingSessions.size;
|
||||
|
||||
if (!connection) {
|
||||
await interaction.reply('📊 **Status**: Not connected to any voice channel');
|
||||
} else {
|
||||
await interaction.reply(
|
||||
`📊 **Status**: Recording in voice channel\n` +
|
||||
`🎤 **Active recordings**: ${activeRecordings} users\n` +
|
||||
`⏱️ **Session started**: <t:${Math.floor(Date.now() / 1000)}:R>`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
} catch (error) {
|
||||
logger.error(`Error handling command ${commandName}:`, error);
|
||||
await interaction.reply('❌ An error occurred while processing your command.');
|
||||
}
|
||||
});
|
||||
|
||||
// Error handling
|
||||
client.on('error', (error) => {
|
||||
logger.error('Discord client error:', error);
|
||||
});
|
||||
|
||||
process.on('unhandledRejection', (reason, promise) => {
|
||||
logger.error('Unhandled Rejection at:', promise, 'reason:', reason);
|
||||
});
|
||||
|
||||
process.on('uncaughtException', (error) => {
|
||||
logger.error('Uncaught Exception:', error);
|
||||
process.exit(1);
|
||||
});
|
||||
|
||||
// Graceful shutdown
|
||||
process.on('SIGINT', async () => {
|
||||
logger.info('Received SIGINT, shutting down gracefully...');
|
||||
await stopAllRecordings();
|
||||
await pgClient.end();
|
||||
await redisClient.quit();
|
||||
client.destroy();
|
||||
process.exit(0);
|
||||
});
|
||||
|
||||
// Start the application
|
||||
async function start() {
|
||||
await initialize();
|
||||
await client.login(TOKEN);
|
||||
}
|
||||
|
||||
start().catch((error) => {
|
||||
logger.error('Failed to start application:', error);
|
||||
process.exit(1);
|
||||
});
|
Reference in New Issue
Block a user