From 1ad31614d6eec7dba38e0da478e414a84b967235 Mon Sep 17 00:00:00 2001 From: MAHaines Date: Mon, 14 Jul 2025 10:34:52 -0500 Subject: [PATCH] Create Transcriber service orchestrator --- services/transcriber/src/orchestrator.js | 454 +++++++++++++++++++++++ 1 file changed, 454 insertions(+) create mode 100644 services/transcriber/src/orchestrator.js diff --git a/services/transcriber/src/orchestrator.js b/services/transcriber/src/orchestrator.js new file mode 100644 index 0000000..54218db --- /dev/null +++ b/services/transcriber/src/orchestrator.js @@ -0,0 +1,454 @@ +import { Client, GatewayIntentBits, EmbedBuilder } from 'discord.js'; +import { createClient } from 'redis'; +import { Client as PgClient } from 'pg'; +import winston from 'winston'; +import moment from 'moment'; +import _ from 'lodash'; + +// Environment variables +const DISCORD_TOKEN = process.env.DISCORD_TOKEN; +const POSTGRES_URL = process.env.POSTGRES_URL; +const REDIS_URL = process.env.REDIS_URL || 'redis://redis:6379'; +const LOG_LEVEL = process.env.LOG_LEVEL || 'info'; + +// Language configuration +const LANGUAGE_INFO = { + 'en': { name: 'English', flag: '🇺🇸', country_flag: '🇬🇧' }, + 'de': { name: 'German', flag: '🇩🇪' }, + 'ko': { name: 'Korean', flag: '🇰🇷' }, + 'es': { name: 'Spanish', flag: '🇪🇸' }, + 'fr': { name: 'French', flag: '🇫🇷' }, + 'it': { name: 'Italian', flag: '🇮🇹' }, + 'pt': { name: 'Portuguese', flag: '🇵🇹' }, + 'ru': { name: 'Russian', flag: '🇷🇺' }, + 'ja': { name: 'Japanese', flag: '🇯🇵' }, + 'zh': { name: 'Chinese', flag: '🇨🇳' }, + 'ar': { name: 'Arabic', flag: '🇸🇦' } +}; + +// Always show these languages in this order +const PRIMARY_LANGUAGES = ['en', 'de', 'ko']; + +// Logging configuration +const logger = winston.createLogger({ + level: LOG_LEVEL, + format: winston.format.combine( + winston.format.timestamp(), + winston.format.errors({ stack: true }), + winston.format.json() + ), + transports: [ + new winston.transports.Console(), + new winston.transports.File({ filename: '/app/logs/transcriber-error.log', level: 'error' }), + new winston.transports.File({ filename: '/app/logs/transcriber.log' }) + ] +}); + +class TranscriberService { + constructor() { + this.discord_client = null; + this.redis_client = null; + this.pg_client = null; + this.active_channels = new Map(); // Track which channels to send messages to + } + + async initialize() { + try { + // Initialize Discord client + this.discord_client = new Client({ + intents: [ + GatewayIntentBits.Guilds, + GatewayIntentBits.GuildMessages, + GatewayIntentBits.MessageContent + ] + }); + + // Initialize database connection + this.pg_client = new PgClient({ connectionString: POSTGRES_URL }); + await this.pg_client.connect(); + + // Initialize Redis connection + this.redis_client = createClient({ url: REDIS_URL }); + await this.redis_client.connect(); + + // Login to Discord + await this.discord_client.login(DISCORD_TOKEN); + + logger.info('Transcriber service initialized successfully'); + } catch (error) { + logger.error('Failed to initialize transcriber service:', error); + throw error; + } + } + + async cleanup() { + if (this.discord_client) { + this.discord_client.destroy(); + } + if (this.redis_client) { + await this.redis_client.quit(); + } + if (this.pg_client) { + await this.pg_client.end(); + } + } + + async getRecordingDetails(recordingId) { + try { + const query = ` + SELECT + r.id, r.speaker_nickname, r.speaker_username, r.duration_seconds, + r.started_at, r.ended_at, + c.guild_id, c.channel_id, c.channel_name + FROM recordings r + JOIN connections c ON r.connection_id = c.id + WHERE r.id = $1 + `; + + const result = await this.pg_client.query(query, [recordingId]); + return result.rows[0] || null; + } catch (error) { + logger.error('Failed to get recording details:', error); + return null; + } + } + + async getTranscriptionDetails(transcriptionId) { + try { + const query = ` + SELECT + t.id, t.detected_language, t.language_confidence, + t.transcription_text, t.processing_time_ms, + r.id as recording_id, r.speaker_nickname, r.speaker_username, + r.duration_seconds, r.started_at, + c.guild_id, c.channel_id, c.channel_name + FROM transcriptions t + JOIN recordings r ON t.recording_id = r.id + JOIN connections c ON r.connection_id = c.id + WHERE t.id = $1 + `; + + const result = await this.pg_client.query(query, [transcriptionId]); + return result.rows[0] || null; + } catch (error) { + logger.error('Failed to get transcription details:', error); + return null; + } + } + + formatDuration(durationSeconds) { + if (!durationSeconds) return '0.0 seconds'; + + if (durationSeconds < 60) { + return `${durationSeconds.toFixed(1)} seconds`; + } else if (durationSeconds < 3600) { + const minutes = Math.floor(durationSeconds / 60); + const seconds = (durationSeconds % 60).toFixed(1); + return `${minutes}m ${seconds}s`; + } else { + const hours = Math.floor(durationSeconds / 3600); + const minutes = Math.floor((durationSeconds % 3600) / 60); + const seconds = (durationSeconds % 60).toFixed(1); + return `${hours}h ${minutes}m ${seconds}s`; + } + } + + formatTimeAgo(timestamp) { + return moment(timestamp).fromNow(); + } + + getLanguageDisplay(languageCode) { + const info = LANGUAGE_INFO[languageCode]; + if (!info) return `${languageCode.toUpperCase()} 🏳️`; + + // Use country_flag if available (for English), otherwise use flag + const flag = info.country_flag || info.flag; + return `${info.name} ${flag}`; + } + + async createDiscordMessage(transcriptionData, translations = {}) { + try { + const { + speaker_nickname, + speaker_username, + detected_language, + language_confidence, + transcription_text, + duration_seconds, + started_at + } = transcriptionData; + + // Create the main message content + let messageContent = ''; + + // Header + messageContent += `🎤 **New Transcription**\n`; + messageContent += `**Speaker:** ${speaker_nickname || speaker_username}\n`; + messageContent += `🌍 **Language Detected:** ${this.getLanguageDisplay(detected_language)}\n`; + messageContent += `⏱️ **Duration:** ${this.formatDuration(duration_seconds)}\n`; + messageContent += `🕐 **Time:** ${this.formatTimeAgo(started_at)}\n\n`; + + // Transcript section + messageContent += `📝 **Transcript**\n`; + messageContent += `${transcription_text}\n\n`; + + // Translation sections + // Always show primary languages in order, excluding the detected language + const languagesToShow = PRIMARY_LANGUAGES.filter(lang => lang !== detected_language); + + for (const langCode of languagesToShow) { + const translation = translations[langCode]; + if (translation) { + messageContent += `${this.getLanguageDisplay(langCode).split(' ')[1]} **${this.getLanguageDisplay(langCode).split(' ')[0]}**\n`; + messageContent += `${translation}\n\n`; + } + } + + return messageContent.trim(); + } catch (error) { + logger.error('Error creating Discord message:', error); + return null; + } + } + + async sendToDiscord(guildId, channelId, messageContent) { + try { + // Get the guild + const guild = await this.discord_client.guilds.fetch(guildId); + if (!guild) { + logger.error('Guild not found:', guildId); + return false; + } + + // Get the channel + const channel = await guild.channels.fetch(channelId); + if (!channel) { + logger.error('Channel not found:', channelId); + return false; + } + + // Check if it's a text channel + if (!channel.isTextBased()) { + logger.error('Channel is not text-based:', channelId); + return false; + } + + // Send the message + await channel.send(messageContent); + + logger.info('Message sent to Discord', { + guildId, + channelId, + channelName: channel.name + }); + + return true; + } catch (error) { + logger.error('Failed to send Discord message:', error); + return false; + } + } + + async handleTranslationsCompleted(eventData) { + try { + const { + transcriptionId, + sourceLanguage, + sourceText, + translations, + processingTimeMs + } = eventData; + + logger.info('Processing translations completed event', { + transcriptionId, + sourceLanguage, + translationCount: Object.keys(translations).length + }); + + // Get transcription details including channel info + const transcriptionDetails = await this.getTranscriptionDetails(transcriptionId); + if (!transcriptionDetails) { + logger.error('Transcription not found:', transcriptionId); + return; + } + + // Create Discord message + const messageContent = await this.createDiscordMessage(transcriptionDetails, translations); + if (!messageContent) { + logger.error('Failed to create Discord message'); + return; + } + + // Send to Discord + const success = await this.sendToDiscord( + transcriptionDetails.guild_id, + transcriptionDetails.channel_id, + messageContent + ); + + if (success) { + logger.info('Successfully sent transcription to Discord', { + transcriptionId, + speaker: transcriptionDetails.speaker_nickname, + language: sourceLanguage, + channel: transcriptionDetails.channel_name + }); + } else { + logger.error('Failed to send transcription to Discord', { transcriptionId }); + } + + } catch (error) { + logger.error('Error handling translations completed event:', error); + } + } + + async handleConnectionStarted(eventData) { + try { + const { connectionId, channelId, channelName, guildId } = eventData; + + // Track this channel as active for transcriptions + this.active_channels.set(connectionId, { + guildId, + channelId, + channelName, + startedAt: new Date() + }); + + logger.info('Voice connection started', { + connectionId, + channelName, + guildId + }); + + } catch (error) { + logger.error('Error handling connection started event:', error); + } + } + + async handleConnectionEnded(eventData) { + try { + const { connectionId } = eventData; + + // Remove from active channels + this.active_channels.delete(connectionId); + + logger.info('Voice connection ended', { connectionId }); + + } catch (error) { + logger.error('Error handling connection ended event:', error); + } + } + + async listenForEvents() { + logger.info('Starting to listen for translation events'); + + // Subscribe to Redis pub/sub + const subscriber = this.redis_client.duplicate(); + await subscriber.connect(); + await subscriber.subscribe('voice-translator-events', (message) => { + this.processRedisMessage(message); + }); + + logger.info('Subscribed to voice-translator-events channel'); + } + + async processRedisMessage(message) { + try { + const data = JSON.parse(message); + const { event, data: eventData } = data; + + switch (event) { + case 'translations_completed': + await this.handleTranslationsCompleted(eventData); + break; + + case 'connection_started': + await this.handleConnectionStarted(eventData); + break; + + case 'connection_ended': + await this.handleConnectionEnded(eventData); + break; + + default: + logger.debug('Unhandled event type:', event); + } + + } catch (error) { + if (error instanceof SyntaxError) { + logger.error('Invalid JSON in Redis message:', message); + } else { + logger.error('Error processing Redis message:', error); + } + } + } +} + +// Discord client event handlers +async function setupDiscordHandlers(service) { + service.discord_client.once('ready', () => { + logger.info(`Discord client logged in as ${service.discord_client.user.tag}`); + }); + + service.discord_client.on('error', (error) => { + logger.error('Discord client error:', error); + }); + + // Handle reconnection + service.discord_client.on('reconnecting', () => { + logger.info('Discord client reconnecting...'); + }); + + service.discord_client.on('resume', () => { + logger.info('Discord client resumed connection'); + }); +} + +// Main application +async function main() { + const service = new TranscriberService(); + + try { + await service.initialize(); + await setupDiscordHandlers(service); + + // Start listening for events + await service.listenForEvents(); + + logger.info('Transcriber service started successfully'); + + // Graceful shutdown handling + process.on('SIGINT', async () => { + logger.info('Received SIGINT, shutting down gracefully...'); + await service.cleanup(); + process.exit(0); + }); + + process.on('SIGTERM', async () => { + logger.info('Received SIGTERM, shutting down gracefully...'); + await service.cleanup(); + process.exit(0); + }); + + } catch (error) { + logger.error('Failed to start transcriber service:', error); + await service.cleanup(); + process.exit(1); + } +} + +// Handle unhandled errors +process.on('unhandledRejection', (reason, promise) => { + logger.error('Unhandled Rejection at:', promise, 'reason:', reason); +}); + +process.on('uncaughtException', (error) => { + logger.error('Uncaught Exception:', error); + process.exit(1); +}); + +// Start the application +main().catch((error) => { + logger.error('Fatal error in main:', error); + process.exit(1); +});