Create Transcriber service orchestrator
This commit is contained in:
454
services/transcriber/src/orchestrator.js
Normal file
454
services/transcriber/src/orchestrator.js
Normal file
@ -0,0 +1,454 @@
|
||||
import { Client, GatewayIntentBits, EmbedBuilder } from 'discord.js';
|
||||
import { createClient } from 'redis';
|
||||
import { Client as PgClient } from 'pg';
|
||||
import winston from 'winston';
|
||||
import moment from 'moment';
|
||||
import _ from 'lodash';
|
||||
|
||||
// Environment variables
|
||||
const DISCORD_TOKEN = process.env.DISCORD_TOKEN;
|
||||
const POSTGRES_URL = process.env.POSTGRES_URL;
|
||||
const REDIS_URL = process.env.REDIS_URL || 'redis://redis:6379';
|
||||
const LOG_LEVEL = process.env.LOG_LEVEL || 'info';
|
||||
|
||||
// Language configuration
|
||||
const LANGUAGE_INFO = {
|
||||
'en': { name: 'English', flag: '🇺🇸', country_flag: '🇬🇧' },
|
||||
'de': { name: 'German', flag: '🇩🇪' },
|
||||
'ko': { name: 'Korean', flag: '🇰🇷' },
|
||||
'es': { name: 'Spanish', flag: '🇪🇸' },
|
||||
'fr': { name: 'French', flag: '🇫🇷' },
|
||||
'it': { name: 'Italian', flag: '🇮🇹' },
|
||||
'pt': { name: 'Portuguese', flag: '🇵🇹' },
|
||||
'ru': { name: 'Russian', flag: '🇷🇺' },
|
||||
'ja': { name: 'Japanese', flag: '🇯🇵' },
|
||||
'zh': { name: 'Chinese', flag: '🇨🇳' },
|
||||
'ar': { name: 'Arabic', flag: '🇸🇦' }
|
||||
};
|
||||
|
||||
// Always show these languages in this order
|
||||
const PRIMARY_LANGUAGES = ['en', 'de', 'ko'];
|
||||
|
||||
// Logging configuration
|
||||
const logger = winston.createLogger({
|
||||
level: LOG_LEVEL,
|
||||
format: winston.format.combine(
|
||||
winston.format.timestamp(),
|
||||
winston.format.errors({ stack: true }),
|
||||
winston.format.json()
|
||||
),
|
||||
transports: [
|
||||
new winston.transports.Console(),
|
||||
new winston.transports.File({ filename: '/app/logs/transcriber-error.log', level: 'error' }),
|
||||
new winston.transports.File({ filename: '/app/logs/transcriber.log' })
|
||||
]
|
||||
});
|
||||
|
||||
class TranscriberService {
|
||||
constructor() {
|
||||
this.discord_client = null;
|
||||
this.redis_client = null;
|
||||
this.pg_client = null;
|
||||
this.active_channels = new Map(); // Track which channels to send messages to
|
||||
}
|
||||
|
||||
async initialize() {
|
||||
try {
|
||||
// Initialize Discord client
|
||||
this.discord_client = new Client({
|
||||
intents: [
|
||||
GatewayIntentBits.Guilds,
|
||||
GatewayIntentBits.GuildMessages,
|
||||
GatewayIntentBits.MessageContent
|
||||
]
|
||||
});
|
||||
|
||||
// Initialize database connection
|
||||
this.pg_client = new PgClient({ connectionString: POSTGRES_URL });
|
||||
await this.pg_client.connect();
|
||||
|
||||
// Initialize Redis connection
|
||||
this.redis_client = createClient({ url: REDIS_URL });
|
||||
await this.redis_client.connect();
|
||||
|
||||
// Login to Discord
|
||||
await this.discord_client.login(DISCORD_TOKEN);
|
||||
|
||||
logger.info('Transcriber service initialized successfully');
|
||||
} catch (error) {
|
||||
logger.error('Failed to initialize transcriber service:', error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async cleanup() {
|
||||
if (this.discord_client) {
|
||||
this.discord_client.destroy();
|
||||
}
|
||||
if (this.redis_client) {
|
||||
await this.redis_client.quit();
|
||||
}
|
||||
if (this.pg_client) {
|
||||
await this.pg_client.end();
|
||||
}
|
||||
}
|
||||
|
||||
async getRecordingDetails(recordingId) {
|
||||
try {
|
||||
const query = `
|
||||
SELECT
|
||||
r.id, r.speaker_nickname, r.speaker_username, r.duration_seconds,
|
||||
r.started_at, r.ended_at,
|
||||
c.guild_id, c.channel_id, c.channel_name
|
||||
FROM recordings r
|
||||
JOIN connections c ON r.connection_id = c.id
|
||||
WHERE r.id = $1
|
||||
`;
|
||||
|
||||
const result = await this.pg_client.query(query, [recordingId]);
|
||||
return result.rows[0] || null;
|
||||
} catch (error) {
|
||||
logger.error('Failed to get recording details:', error);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
async getTranscriptionDetails(transcriptionId) {
|
||||
try {
|
||||
const query = `
|
||||
SELECT
|
||||
t.id, t.detected_language, t.language_confidence,
|
||||
t.transcription_text, t.processing_time_ms,
|
||||
r.id as recording_id, r.speaker_nickname, r.speaker_username,
|
||||
r.duration_seconds, r.started_at,
|
||||
c.guild_id, c.channel_id, c.channel_name
|
||||
FROM transcriptions t
|
||||
JOIN recordings r ON t.recording_id = r.id
|
||||
JOIN connections c ON r.connection_id = c.id
|
||||
WHERE t.id = $1
|
||||
`;
|
||||
|
||||
const result = await this.pg_client.query(query, [transcriptionId]);
|
||||
return result.rows[0] || null;
|
||||
} catch (error) {
|
||||
logger.error('Failed to get transcription details:', error);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
formatDuration(durationSeconds) {
|
||||
if (!durationSeconds) return '0.0 seconds';
|
||||
|
||||
if (durationSeconds < 60) {
|
||||
return `${durationSeconds.toFixed(1)} seconds`;
|
||||
} else if (durationSeconds < 3600) {
|
||||
const minutes = Math.floor(durationSeconds / 60);
|
||||
const seconds = (durationSeconds % 60).toFixed(1);
|
||||
return `${minutes}m ${seconds}s`;
|
||||
} else {
|
||||
const hours = Math.floor(durationSeconds / 3600);
|
||||
const minutes = Math.floor((durationSeconds % 3600) / 60);
|
||||
const seconds = (durationSeconds % 60).toFixed(1);
|
||||
return `${hours}h ${minutes}m ${seconds}s`;
|
||||
}
|
||||
}
|
||||
|
||||
formatTimeAgo(timestamp) {
|
||||
return moment(timestamp).fromNow();
|
||||
}
|
||||
|
||||
getLanguageDisplay(languageCode) {
|
||||
const info = LANGUAGE_INFO[languageCode];
|
||||
if (!info) return `${languageCode.toUpperCase()} 🏳️`;
|
||||
|
||||
// Use country_flag if available (for English), otherwise use flag
|
||||
const flag = info.country_flag || info.flag;
|
||||
return `${info.name} ${flag}`;
|
||||
}
|
||||
|
||||
async createDiscordMessage(transcriptionData, translations = {}) {
|
||||
try {
|
||||
const {
|
||||
speaker_nickname,
|
||||
speaker_username,
|
||||
detected_language,
|
||||
language_confidence,
|
||||
transcription_text,
|
||||
duration_seconds,
|
||||
started_at
|
||||
} = transcriptionData;
|
||||
|
||||
// Create the main message content
|
||||
let messageContent = '';
|
||||
|
||||
// Header
|
||||
messageContent += `🎤 **New Transcription**\n`;
|
||||
messageContent += `**Speaker:** ${speaker_nickname || speaker_username}\n`;
|
||||
messageContent += `🌍 **Language Detected:** ${this.getLanguageDisplay(detected_language)}\n`;
|
||||
messageContent += `⏱️ **Duration:** ${this.formatDuration(duration_seconds)}\n`;
|
||||
messageContent += `🕐 **Time:** ${this.formatTimeAgo(started_at)}\n\n`;
|
||||
|
||||
// Transcript section
|
||||
messageContent += `📝 **Transcript**\n`;
|
||||
messageContent += `${transcription_text}\n\n`;
|
||||
|
||||
// Translation sections
|
||||
// Always show primary languages in order, excluding the detected language
|
||||
const languagesToShow = PRIMARY_LANGUAGES.filter(lang => lang !== detected_language);
|
||||
|
||||
for (const langCode of languagesToShow) {
|
||||
const translation = translations[langCode];
|
||||
if (translation) {
|
||||
messageContent += `${this.getLanguageDisplay(langCode).split(' ')[1]} **${this.getLanguageDisplay(langCode).split(' ')[0]}**\n`;
|
||||
messageContent += `${translation}\n\n`;
|
||||
}
|
||||
}
|
||||
|
||||
return messageContent.trim();
|
||||
} catch (error) {
|
||||
logger.error('Error creating Discord message:', error);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
async sendToDiscord(guildId, channelId, messageContent) {
|
||||
try {
|
||||
// Get the guild
|
||||
const guild = await this.discord_client.guilds.fetch(guildId);
|
||||
if (!guild) {
|
||||
logger.error('Guild not found:', guildId);
|
||||
return false;
|
||||
}
|
||||
|
||||
// Get the channel
|
||||
const channel = await guild.channels.fetch(channelId);
|
||||
if (!channel) {
|
||||
logger.error('Channel not found:', channelId);
|
||||
return false;
|
||||
}
|
||||
|
||||
// Check if it's a text channel
|
||||
if (!channel.isTextBased()) {
|
||||
logger.error('Channel is not text-based:', channelId);
|
||||
return false;
|
||||
}
|
||||
|
||||
// Send the message
|
||||
await channel.send(messageContent);
|
||||
|
||||
logger.info('Message sent to Discord', {
|
||||
guildId,
|
||||
channelId,
|
||||
channelName: channel.name
|
||||
});
|
||||
|
||||
return true;
|
||||
} catch (error) {
|
||||
logger.error('Failed to send Discord message:', error);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
async handleTranslationsCompleted(eventData) {
|
||||
try {
|
||||
const {
|
||||
transcriptionId,
|
||||
sourceLanguage,
|
||||
sourceText,
|
||||
translations,
|
||||
processingTimeMs
|
||||
} = eventData;
|
||||
|
||||
logger.info('Processing translations completed event', {
|
||||
transcriptionId,
|
||||
sourceLanguage,
|
||||
translationCount: Object.keys(translations).length
|
||||
});
|
||||
|
||||
// Get transcription details including channel info
|
||||
const transcriptionDetails = await this.getTranscriptionDetails(transcriptionId);
|
||||
if (!transcriptionDetails) {
|
||||
logger.error('Transcription not found:', transcriptionId);
|
||||
return;
|
||||
}
|
||||
|
||||
// Create Discord message
|
||||
const messageContent = await this.createDiscordMessage(transcriptionDetails, translations);
|
||||
if (!messageContent) {
|
||||
logger.error('Failed to create Discord message');
|
||||
return;
|
||||
}
|
||||
|
||||
// Send to Discord
|
||||
const success = await this.sendToDiscord(
|
||||
transcriptionDetails.guild_id,
|
||||
transcriptionDetails.channel_id,
|
||||
messageContent
|
||||
);
|
||||
|
||||
if (success) {
|
||||
logger.info('Successfully sent transcription to Discord', {
|
||||
transcriptionId,
|
||||
speaker: transcriptionDetails.speaker_nickname,
|
||||
language: sourceLanguage,
|
||||
channel: transcriptionDetails.channel_name
|
||||
});
|
||||
} else {
|
||||
logger.error('Failed to send transcription to Discord', { transcriptionId });
|
||||
}
|
||||
|
||||
} catch (error) {
|
||||
logger.error('Error handling translations completed event:', error);
|
||||
}
|
||||
}
|
||||
|
||||
async handleConnectionStarted(eventData) {
|
||||
try {
|
||||
const { connectionId, channelId, channelName, guildId } = eventData;
|
||||
|
||||
// Track this channel as active for transcriptions
|
||||
this.active_channels.set(connectionId, {
|
||||
guildId,
|
||||
channelId,
|
||||
channelName,
|
||||
startedAt: new Date()
|
||||
});
|
||||
|
||||
logger.info('Voice connection started', {
|
||||
connectionId,
|
||||
channelName,
|
||||
guildId
|
||||
});
|
||||
|
||||
} catch (error) {
|
||||
logger.error('Error handling connection started event:', error);
|
||||
}
|
||||
}
|
||||
|
||||
async handleConnectionEnded(eventData) {
|
||||
try {
|
||||
const { connectionId } = eventData;
|
||||
|
||||
// Remove from active channels
|
||||
this.active_channels.delete(connectionId);
|
||||
|
||||
logger.info('Voice connection ended', { connectionId });
|
||||
|
||||
} catch (error) {
|
||||
logger.error('Error handling connection ended event:', error);
|
||||
}
|
||||
}
|
||||
|
||||
async listenForEvents() {
|
||||
logger.info('Starting to listen for translation events');
|
||||
|
||||
// Subscribe to Redis pub/sub
|
||||
const subscriber = this.redis_client.duplicate();
|
||||
await subscriber.connect();
|
||||
await subscriber.subscribe('voice-translator-events', (message) => {
|
||||
this.processRedisMessage(message);
|
||||
});
|
||||
|
||||
logger.info('Subscribed to voice-translator-events channel');
|
||||
}
|
||||
|
||||
async processRedisMessage(message) {
|
||||
try {
|
||||
const data = JSON.parse(message);
|
||||
const { event, data: eventData } = data;
|
||||
|
||||
switch (event) {
|
||||
case 'translations_completed':
|
||||
await this.handleTranslationsCompleted(eventData);
|
||||
break;
|
||||
|
||||
case 'connection_started':
|
||||
await this.handleConnectionStarted(eventData);
|
||||
break;
|
||||
|
||||
case 'connection_ended':
|
||||
await this.handleConnectionEnded(eventData);
|
||||
break;
|
||||
|
||||
default:
|
||||
logger.debug('Unhandled event type:', event);
|
||||
}
|
||||
|
||||
} catch (error) {
|
||||
if (error instanceof SyntaxError) {
|
||||
logger.error('Invalid JSON in Redis message:', message);
|
||||
} else {
|
||||
logger.error('Error processing Redis message:', error);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Discord client event handlers
|
||||
async function setupDiscordHandlers(service) {
|
||||
service.discord_client.once('ready', () => {
|
||||
logger.info(`Discord client logged in as ${service.discord_client.user.tag}`);
|
||||
});
|
||||
|
||||
service.discord_client.on('error', (error) => {
|
||||
logger.error('Discord client error:', error);
|
||||
});
|
||||
|
||||
// Handle reconnection
|
||||
service.discord_client.on('reconnecting', () => {
|
||||
logger.info('Discord client reconnecting...');
|
||||
});
|
||||
|
||||
service.discord_client.on('resume', () => {
|
||||
logger.info('Discord client resumed connection');
|
||||
});
|
||||
}
|
||||
|
||||
// Main application
|
||||
async function main() {
|
||||
const service = new TranscriberService();
|
||||
|
||||
try {
|
||||
await service.initialize();
|
||||
await setupDiscordHandlers(service);
|
||||
|
||||
// Start listening for events
|
||||
await service.listenForEvents();
|
||||
|
||||
logger.info('Transcriber service started successfully');
|
||||
|
||||
// Graceful shutdown handling
|
||||
process.on('SIGINT', async () => {
|
||||
logger.info('Received SIGINT, shutting down gracefully...');
|
||||
await service.cleanup();
|
||||
process.exit(0);
|
||||
});
|
||||
|
||||
process.on('SIGTERM', async () => {
|
||||
logger.info('Received SIGTERM, shutting down gracefully...');
|
||||
await service.cleanup();
|
||||
process.exit(0);
|
||||
});
|
||||
|
||||
} catch (error) {
|
||||
logger.error('Failed to start transcriber service:', error);
|
||||
await service.cleanup();
|
||||
process.exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
// Handle unhandled errors
|
||||
process.on('unhandledRejection', (reason, promise) => {
|
||||
logger.error('Unhandled Rejection at:', promise, 'reason:', reason);
|
||||
});
|
||||
|
||||
process.on('uncaughtException', (error) => {
|
||||
logger.error('Uncaught Exception:', error);
|
||||
process.exit(1);
|
||||
});
|
||||
|
||||
// Start the application
|
||||
main().catch((error) => {
|
||||
logger.error('Fatal error in main:', error);
|
||||
process.exit(1);
|
||||
});
|
Reference in New Issue
Block a user