diff --git a/services/dashboard/src/app.js b/services/dashboard/src/app.js new file mode 100644 index 0000000..50ac865 --- /dev/null +++ b/services/dashboard/src/app.js @@ -0,0 +1,672 @@ +import express from 'express'; +import { Server as SocketIOServer } from 'socket.io'; +import { createServer } from 'http'; +import { createClient } from 'redis'; +import { Client as PgClient } from 'pg'; +import winston from 'winston'; +import moment from 'moment'; +import _ from 'lodash'; +import path from 'path'; +import { fileURLToPath } from 'url'; +import cors from 'cors'; +import helmet from 'helmet'; +import compression from 'compression'; +import rateLimit from 'express-rate-limit'; + +// ES modules compatibility +const __filename = fileURLToPath(import.meta.url); +const __dirname = path.dirname(__filename); + +// Environment variables +const PORT = process.env.PORT || 3000; +const POSTGRES_URL = process.env.POSTGRES_URL; +const REDIS_URL = process.env.REDIS_URL || 'redis://redis:6379'; +const LOG_LEVEL = process.env.LOG_LEVEL || 'info'; +const NODE_ENV = process.env.NODE_ENV || 'development'; + +// Language configuration +const LANGUAGE_INFO = { + 'en': { name: 'English', flag: '🇺🇸', color: '#3498db' }, + 'de': { name: 'German', flag: '🇩🇪', color: '#e74c3c' }, + 'ko': { name: 'Korean', flag: '🇰🇷', color: '#f39c12' }, + 'es': { name: 'Spanish', flag: '🇪🇸', color: '#9b59b6' }, + 'fr': { name: 'French', flag: '🇫🇷', color: '#2ecc71' }, + 'it': { name: 'Italian', flag: '🇮🇹', color: '#e67e22' }, + 'pt': { name: 'Portuguese', flag: '🇵🇹', color: '#1abc9c' }, + 'ru': { name: 'Russian', flag: '🇷🇺', color: '#34495e' }, + 'ja': { name: 'Japanese', flag: '🇯🇵', color: '#e91e63' }, + 'zh': { name: 'Chinese', flag: '🇨🇳', color: '#ff5722' } +}; + +// Logging configuration +const logger = winston.createLogger({ + level: LOG_LEVEL, + format: winston.format.combine( + winston.format.timestamp(), + winston.format.errors({ stack: true }), + winston.format.json() + ), + transports: [ + new winston.transports.Console(), + new winston.transports.File({ filename: '/app/logs/dashboard-error.log', level: 'error' }), + new winston.transports.File({ filename: '/app/logs/dashboard.log' }) + ] +}); + +class DashboardService { + constructor() { + this.app = express(); + this.server = createServer(this.app); + this.io = new SocketIOServer(this.server, { + cors: { + origin: "*", + methods: ["GET", "POST"] + } + }); + this.redis_client = null; + this.pg_client = null; + this.activeConnections = new Map(); + this.realtimeStats = { + totalTranscriptions: 0, + totalSpeakingTime: 0, + activeUsers: 0, + languageBreakdown: {}, + recentActivity: [] + }; + } + + async initialize() { + try { + // Initialize database connection + this.pg_client = new PgClient({ connectionString: POSTGRES_URL }); + await this.pg_client.connect(); + + // Initialize Redis connection + this.redis_client = createClient({ url: REDIS_URL }); + await this.redis_client.connect(); + + // Set up Express middleware + this.setupMiddleware(); + + // Set up routes + this.setupRoutes(); + + // Set up Socket.IO + this.setupSocketIO(); + + // Start listening for Redis events + this.listenForEvents(); + + logger.info('Dashboard service initialized successfully'); + } catch (error) { + logger.error('Failed to initialize dashboard service:', error); + throw error; + } + } + + setupMiddleware() { + // Security and performance middleware + this.app.use(helmet({ + contentSecurityPolicy: { + directives: { + defaultSrc: ["'self'"], + styleSrc: ["'self'", "'unsafe-inline'", "https://cdnjs.cloudflare.com"], + scriptSrc: ["'self'", "'unsafe-inline'", "https://cdnjs.cloudflare.com"], + imgSrc: ["'self'", "data:", "https:"], + }, + }, + })); + + this.app.use(compression()); + this.app.use(cors()); + + // Rate limiting + const limiter = rateLimit({ + windowMs: 15 * 60 * 1000, // 15 minutes + max: 100 // limit each IP to 100 requests per windowMs + }); + this.app.use(limiter); + + // Static files and view engine + this.app.use(express.static(path.join(__dirname, '../public'))); + this.app.set('view engine', 'ejs'); + this.app.set('views', path.join(__dirname, 'views')); + + // Body parsing + this.app.use(express.json({ limit: '10mb' })); + this.app.use(express.urlencoded({ extended: true, limit: '10mb' })); + + // Logging middleware + this.app.use((req, res, next) => { + logger.info(`${req.method} ${req.url}`, { + ip: req.ip, + userAgent: req.get('User-Agent') + }); + next(); + }); + } + + setupRoutes() { + // Main dashboard page + this.app.get('/', async (req, res) => { + try { + const dashboardData = await this.getDashboardData(); + res.render('dashboard', { + data: dashboardData, + moment, + languages: LANGUAGE_INFO + }); + } catch (error) { + logger.error('Error rendering dashboard:', error); + res.status(500).render('error', { error: 'Failed to load dashboard' }); + } + }); + + // API endpoints + this.app.get('/api/stats', async (req, res) => { + try { + const stats = await this.getSystemStats(); + res.json(stats); + } catch (error) { + logger.error('Error getting stats:', error); + res.status(500).json({ error: 'Failed to get stats' }); + } + }); + + this.app.get('/api/recent-activity', async (req, res) => { + try { + const limit = parseInt(req.query.limit) || 20; + const activity = await this.getRecentActivity(limit); + res.json(activity); + } catch (error) { + logger.error('Error getting recent activity:', error); + res.status(500).json({ error: 'Failed to get recent activity' }); + } + }); + + this.app.get('/api/language-stats', async (req, res) => { + try { + const days = parseInt(req.query.days) || 7; + const stats = await this.getLanguageStats(days); + res.json(stats); + } catch (error) { + logger.error('Error getting language stats:', error); + res.status(500).json({ error: 'Failed to get language stats' }); + } + }); + + this.app.get('/api/performance-metrics', async (req, res) => { + try { + const hours = parseInt(req.query.hours) || 24; + const metrics = await this.getPerformanceMetrics(hours); + res.json(metrics); + } catch (error) { + logger.error('Error getting performance metrics:', error); + res.status(500).json({ error: 'Failed to get performance metrics' }); + } + }); + + this.app.get('/api/user-activity', async (req, res) => { + try { + const days = parseInt(req.query.days) || 7; + const activity = await this.getUserActivity(days); + res.json(activity); + } catch (error) { + logger.error('Error getting user activity:', error); + res.status(500).json({ error: 'Failed to get user activity' }); + } + }); + + // Health check + this.app.get('/health', (req, res) => { + res.json({ + status: 'healthy', + timestamp: new Date().toISOString(), + uptime: process.uptime(), + version: '2.0.0' + }); + }); + + // 404 handler + this.app.use((req, res) => { + res.status(404).render('error', { error: 'Page not found' }); + }); + + // Error handler + this.app.use((error, req, res, next) => { + logger.error('Express error:', error); + res.status(500).render('error', { error: 'Internal server error' }); + }); + } + + setupSocketIO() { + this.io.on('connection', (socket) => { + logger.info('Client connected to dashboard', { socketId: socket.id }); + + // Send initial data + socket.emit('stats-update', this.realtimeStats); + + // Handle disconnection + socket.on('disconnect', () => { + logger.info('Client disconnected from dashboard', { socketId: socket.id }); + }); + + // Handle client requests for specific data + socket.on('request-stats', async () => { + try { + const stats = await this.getSystemStats(); + socket.emit('stats-update', stats); + } catch (error) { + logger.error('Error sending stats to client:', error); + } + }); + }); + } + + async getDashboardData() { + try { + const [ + systemStats, + recentActivity, + languageStats, + performanceMetrics, + userActivity + ] = await Promise.all([ + this.getSystemStats(), + this.getRecentActivity(10), + this.getLanguageStats(7), + this.getPerformanceMetrics(24), + this.getUserActivity(7) + ]); + + return { + systemStats, + recentActivity, + languageStats, + performanceMetrics, + userActivity, + activeConnections: Array.from(this.activeConnections.values()) + }; + } catch (error) { + logger.error('Error getting dashboard data:', error); + throw error; + } + } + + async getSystemStats() { + try { + const queries = [ + // Total transcriptions + `SELECT COUNT(*) as total_transcriptions FROM transcriptions WHERE status = 'completed'`, + + // Total speaking time + `SELECT COALESCE(SUM(duration_seconds), 0) as total_speaking_time FROM recordings WHERE status IN ('completed', 'processed')`, + + // Active connections count + `SELECT COUNT(*) as active_connections FROM connections WHERE ended_at IS NULL`, + + // Today's activity + `SELECT COUNT(*) as today_transcriptions FROM transcriptions + WHERE DATE(created_at) = CURRENT_DATE AND status = 'completed'`, + + // Language breakdown (last 7 days) + `SELECT detected_language, COUNT(*) as count + FROM transcriptions + WHERE created_at >= NOW() - INTERVAL '7 days' AND status = 'completed' + GROUP BY detected_language + ORDER BY count DESC`, + + // Average processing times + `SELECT + AVG(processing_time_ms) as avg_transcription_time, + AVG(t.processing_time_ms) as avg_translation_time + FROM transcriptions tr + LEFT JOIN translations t ON tr.id = t.transcription_id + WHERE tr.created_at >= NOW() - INTERVAL '24 hours'` + ]; + + const results = await Promise.all( + queries.map(query => this.pg_client.query(query)) + ); + + const languageBreakdown = {}; + if (results[4].rows) { + results[4].rows.forEach(row => { + const langInfo = LANGUAGE_INFO[row.detected_language] || { name: row.detected_language, flag: '🏳️' }; + languageBreakdown[row.detected_language] = { + count: parseInt(row.count), + name: langInfo.name, + flag: langInfo.flag, + color: langInfo.color || '#95a5a6' + }; + }); + } + + return { + totalTranscriptions: parseInt(results[0].rows[0].total_transcriptions), + totalSpeakingTime: parseFloat(results[1].rows[0].total_speaking_time || 0), + activeConnections: parseInt(results[2].rows[0].active_connections), + todayTranscriptions: parseInt(results[3].rows[0].today_transcriptions), + languageBreakdown, + avgTranscriptionTime: parseFloat(results[5].rows[0].avg_transcription_time || 0), + avgTranslationTime: parseFloat(results[5].rows[0].avg_translation_time || 0), + lastUpdated: new Date().toISOString() + }; + } catch (error) { + logger.error('Error getting system stats:', error); + throw error; + } + } + + async getRecentActivity(limit = 20) { + try { + const query = ` + SELECT + t.id as transcription_id, + t.detected_language, + t.transcription_text, + t.created_at, + t.processing_time_ms, + r.speaker_nickname, + r.speaker_username, + r.duration_seconds, + c.channel_name, + c.guild_id + FROM transcriptions t + JOIN recordings r ON t.recording_id = r.id + JOIN connections c ON r.connection_id = c.id + WHERE t.status = 'completed' + ORDER BY t.created_at DESC + LIMIT $1 + `; + + const result = await this.pg_client.query(query, [limit]); + + return result.rows.map(row => ({ + ...row, + languageInfo: LANGUAGE_INFO[row.detected_language] || { name: row.detected_language, flag: '🏳️' }, + timeAgo: moment(row.created_at).fromNow(), + transcriptPreview: row.transcription_text.length > 100 + ? row.transcription_text.substring(0, 100) + '...' + : row.transcription_text + })); + } catch (error) { + logger.error('Error getting recent activity:', error); + throw error; + } + } + + async getLanguageStats(days = 7) { + try { + const query = ` + SELECT + detected_language, + COUNT(*) as transcription_count, + AVG(processing_time_ms) as avg_processing_time, + SUM(r.duration_seconds) as total_speaking_time + FROM transcriptions t + JOIN recordings r ON t.recording_id = r.id + WHERE t.created_at >= NOW() - INTERVAL '${days} days' + AND t.status = 'completed' + GROUP BY detected_language + ORDER BY transcription_count DESC + `; + + const result = await this.pg_client.query(query); + + return result.rows.map(row => ({ + ...row, + languageInfo: LANGUAGE_INFO[row.detected_language] || { name: row.detected_language, flag: '🏳️' }, + transcription_count: parseInt(row.transcription_count), + avg_processing_time: parseFloat(row.avg_processing_time || 0), + total_speaking_time: parseFloat(row.total_speaking_time || 0) + })); + } catch (error) { + logger.error('Error getting language stats:', error); + throw error; + } + } + + async getPerformanceMetrics(hours = 24) { + try { + const query = ` + SELECT + service_name, + operation, + AVG(duration_ms) as avg_duration, + COUNT(*) as operation_count, + COUNT(CASE WHEN success = false THEN 1 END) as error_count + FROM processing_metrics + WHERE created_at >= NOW() - INTERVAL '${hours} hours' + GROUP BY service_name, operation + ORDER BY service_name, operation + `; + + const result = await this.pg_client.query(query); + + return result.rows.map(row => ({ + ...row, + avg_duration: parseFloat(row.avg_duration || 0), + operation_count: parseInt(row.operation_count), + error_count: parseInt(row.error_count), + success_rate: row.operation_count > 0 + ? ((row.operation_count - row.error_count) / row.operation_count * 100).toFixed(2) + : 0 + })); + } catch (error) { + logger.error('Error getting performance metrics:', error); + throw error; + } + } + + async getUserActivity(days = 7) { + try { + const query = ` + SELECT + speaker_id, + speaker_nickname, + speaker_username, + COUNT(*) as transcription_count, + SUM(duration_seconds) as total_speaking_time, + MAX(started_at) as last_activity + FROM recordings r + JOIN transcriptions t ON r.id = t.recording_id + WHERE r.started_at >= NOW() - INTERVAL '${days} days' + AND t.status = 'completed' + GROUP BY speaker_id, speaker_nickname, speaker_username + ORDER BY transcription_count DESC + LIMIT 20 + `; + + const result = await this.pg_client.query(query); + + return result.rows.map(row => ({ + ...row, + transcription_count: parseInt(row.transcription_count), + total_speaking_time: parseFloat(row.total_speaking_time || 0), + last_activity_ago: moment(row.last_activity).fromNow() + })); + } catch (error) { + logger.error('Error getting user activity:', error); + throw error; + } + } + + async listenForEvents() { + logger.info('Starting to listen for dashboard events'); + + const subscriber = this.redis_client.duplicate(); + await subscriber.connect(); + await subscriber.subscribe('voice-translator-events', (message) => { + this.processRedisMessage(message); + }); + + logger.info('Subscribed to voice-translator-events channel for dashboard'); + } + + async processRedisMessage(message) { + try { + const data = JSON.parse(message); + const { event, data: eventData } = data; + + switch (event) { + case 'connection_started': + await this.handleConnectionStarted(eventData); + break; + + case 'connection_ended': + await this.handleConnectionEnded(eventData); + break; + + case 'transcription_completed': + await this.handleTranscriptionCompleted(eventData); + break; + + case 'translations_completed': + await this.handleTranslationsCompleted(eventData); + break; + + default: + logger.debug('Unhandled event type for dashboard:', event); + } + + } catch (error) { + if (error instanceof SyntaxError) { + logger.error('Invalid JSON in Redis message:', message); + } else { + logger.error('Error processing Redis message in dashboard:', error); + } + } + } + + async handleConnectionStarted(eventData) { + const { connectionId, channelName, guildId } = eventData; + + this.activeConnections.set(connectionId, { + ...eventData, + startedAt: new Date() + }); + + // Broadcast to all connected clients + this.io.emit('connection-started', { + connectionId, + channelName, + guildId, + timestamp: new Date().toISOString() + }); + + logger.info('Dashboard: Connection started', { connectionId, channelName }); + } + + async handleConnectionEnded(eventData) { + const { connectionId } = eventData; + + this.activeConnections.delete(connectionId); + + // Broadcast to all connected clients + this.io.emit('connection-ended', { + connectionId, + timestamp: new Date().toISOString() + }); + + logger.info('Dashboard: Connection ended', { connectionId }); + } + + async handleTranscriptionCompleted(eventData) { + const { transcriptionId, recordingId, text, language } = eventData; + + // Update realtime stats + this.realtimeStats.totalTranscriptions++; + this.realtimeStats.languageBreakdown[language] = + (this.realtimeStats.languageBreakdown[language] || 0) + 1; + + // Broadcast to all connected clients + this.io.emit('transcription-completed', { + transcriptionId, + recordingId, + text: text.length > 100 ? text.substring(0, 100) + '...' : text, + language, + languageInfo: LANGUAGE_INFO[language] || { name: language, flag: '🏳️' }, + timestamp: new Date().toISOString() + }); + + logger.info('Dashboard: Transcription completed', { transcriptionId, language }); + } + + async handleTranslationsCompleted(eventData) { + const { transcriptionId, translations } = eventData; + + // Broadcast to all connected clients + this.io.emit('translations-completed', { + transcriptionId, + translationCount: Object.keys(translations).length, + timestamp: new Date().toISOString() + }); + + logger.info('Dashboard: Translations completed', { transcriptionId }); + } + + async cleanup() { + if (this.redis_client) { + await this.redis_client.quit(); + } + if (this.pg_client) { + await this.pg_client.end(); + } + if (this.server) { + this.server.close(); + } + } + + start() { + this.server.listen(PORT, () => { + logger.info(`Dashboard server running on port ${PORT}`); + }); + } +} + +// Main application +async function main() { + const dashboard = new DashboardService(); + + try { + await dashboard.initialize(); + dashboard.start(); + + logger.info('Dashboard service started successfully'); + + // Graceful shutdown handling + process.on('SIGINT', async () => { + logger.info('Received SIGINT, shutting down dashboard gracefully...'); + await dashboard.cleanup(); + process.exit(0); + }); + + process.on('SIGTERM', async () => { + logger.info('Received SIGTERM, shutting down dashboard gracefully...'); + await dashboard.cleanup(); + process.exit(0); + }); + + } catch (error) { + logger.error('Failed to start dashboard service:', error); + await dashboard.cleanup(); + process.exit(1); + } +} + +// Handle unhandled errors +process.on('unhandledRejection', (reason, promise) => { + logger.error('Unhandled Rejection at:', promise, 'reason:', reason); +}); + +process.on('uncaughtException', (error) => { + logger.error('Uncaught Exception:', error); + process.exit(1); +}); + +// Start the application +main().catch((error) => { + logger.error('Fatal error in dashboard main:', error); + process.exit(1); +});