Create Dashboard service main application

This commit is contained in:
2025-07-14 10:48:30 -05:00
parent b7c23b168d
commit bea580be83

View File

@ -0,0 +1,672 @@
import express from 'express';
import { Server as SocketIOServer } from 'socket.io';
import { createServer } from 'http';
import { createClient } from 'redis';
import { Client as PgClient } from 'pg';
import winston from 'winston';
import moment from 'moment';
import _ from 'lodash';
import path from 'path';
import { fileURLToPath } from 'url';
import cors from 'cors';
import helmet from 'helmet';
import compression from 'compression';
import rateLimit from 'express-rate-limit';
// ES modules compatibility
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
// Environment variables
const PORT = process.env.PORT || 3000;
const POSTGRES_URL = process.env.POSTGRES_URL;
const REDIS_URL = process.env.REDIS_URL || 'redis://redis:6379';
const LOG_LEVEL = process.env.LOG_LEVEL || 'info';
const NODE_ENV = process.env.NODE_ENV || 'development';
// Language configuration
const LANGUAGE_INFO = {
'en': { name: 'English', flag: '🇺🇸', color: '#3498db' },
'de': { name: 'German', flag: '🇩🇪', color: '#e74c3c' },
'ko': { name: 'Korean', flag: '🇰🇷', color: '#f39c12' },
'es': { name: 'Spanish', flag: '🇪🇸', color: '#9b59b6' },
'fr': { name: 'French', flag: '🇫🇷', color: '#2ecc71' },
'it': { name: 'Italian', flag: '🇮🇹', color: '#e67e22' },
'pt': { name: 'Portuguese', flag: '🇵🇹', color: '#1abc9c' },
'ru': { name: 'Russian', flag: '🇷🇺', color: '#34495e' },
'ja': { name: 'Japanese', flag: '🇯🇵', color: '#e91e63' },
'zh': { name: 'Chinese', flag: '🇨🇳', color: '#ff5722' }
};
// Logging configuration
const logger = winston.createLogger({
level: LOG_LEVEL,
format: winston.format.combine(
winston.format.timestamp(),
winston.format.errors({ stack: true }),
winston.format.json()
),
transports: [
new winston.transports.Console(),
new winston.transports.File({ filename: '/app/logs/dashboard-error.log', level: 'error' }),
new winston.transports.File({ filename: '/app/logs/dashboard.log' })
]
});
class DashboardService {
constructor() {
this.app = express();
this.server = createServer(this.app);
this.io = new SocketIOServer(this.server, {
cors: {
origin: "*",
methods: ["GET", "POST"]
}
});
this.redis_client = null;
this.pg_client = null;
this.activeConnections = new Map();
this.realtimeStats = {
totalTranscriptions: 0,
totalSpeakingTime: 0,
activeUsers: 0,
languageBreakdown: {},
recentActivity: []
};
}
async initialize() {
try {
// Initialize database connection
this.pg_client = new PgClient({ connectionString: POSTGRES_URL });
await this.pg_client.connect();
// Initialize Redis connection
this.redis_client = createClient({ url: REDIS_URL });
await this.redis_client.connect();
// Set up Express middleware
this.setupMiddleware();
// Set up routes
this.setupRoutes();
// Set up Socket.IO
this.setupSocketIO();
// Start listening for Redis events
this.listenForEvents();
logger.info('Dashboard service initialized successfully');
} catch (error) {
logger.error('Failed to initialize dashboard service:', error);
throw error;
}
}
setupMiddleware() {
// Security and performance middleware
this.app.use(helmet({
contentSecurityPolicy: {
directives: {
defaultSrc: ["'self'"],
styleSrc: ["'self'", "'unsafe-inline'", "https://cdnjs.cloudflare.com"],
scriptSrc: ["'self'", "'unsafe-inline'", "https://cdnjs.cloudflare.com"],
imgSrc: ["'self'", "data:", "https:"],
},
},
}));
this.app.use(compression());
this.app.use(cors());
// Rate limiting
const limiter = rateLimit({
windowMs: 15 * 60 * 1000, // 15 minutes
max: 100 // limit each IP to 100 requests per windowMs
});
this.app.use(limiter);
// Static files and view engine
this.app.use(express.static(path.join(__dirname, '../public')));
this.app.set('view engine', 'ejs');
this.app.set('views', path.join(__dirname, 'views'));
// Body parsing
this.app.use(express.json({ limit: '10mb' }));
this.app.use(express.urlencoded({ extended: true, limit: '10mb' }));
// Logging middleware
this.app.use((req, res, next) => {
logger.info(`${req.method} ${req.url}`, {
ip: req.ip,
userAgent: req.get('User-Agent')
});
next();
});
}
setupRoutes() {
// Main dashboard page
this.app.get('/', async (req, res) => {
try {
const dashboardData = await this.getDashboardData();
res.render('dashboard', {
data: dashboardData,
moment,
languages: LANGUAGE_INFO
});
} catch (error) {
logger.error('Error rendering dashboard:', error);
res.status(500).render('error', { error: 'Failed to load dashboard' });
}
});
// API endpoints
this.app.get('/api/stats', async (req, res) => {
try {
const stats = await this.getSystemStats();
res.json(stats);
} catch (error) {
logger.error('Error getting stats:', error);
res.status(500).json({ error: 'Failed to get stats' });
}
});
this.app.get('/api/recent-activity', async (req, res) => {
try {
const limit = parseInt(req.query.limit) || 20;
const activity = await this.getRecentActivity(limit);
res.json(activity);
} catch (error) {
logger.error('Error getting recent activity:', error);
res.status(500).json({ error: 'Failed to get recent activity' });
}
});
this.app.get('/api/language-stats', async (req, res) => {
try {
const days = parseInt(req.query.days) || 7;
const stats = await this.getLanguageStats(days);
res.json(stats);
} catch (error) {
logger.error('Error getting language stats:', error);
res.status(500).json({ error: 'Failed to get language stats' });
}
});
this.app.get('/api/performance-metrics', async (req, res) => {
try {
const hours = parseInt(req.query.hours) || 24;
const metrics = await this.getPerformanceMetrics(hours);
res.json(metrics);
} catch (error) {
logger.error('Error getting performance metrics:', error);
res.status(500).json({ error: 'Failed to get performance metrics' });
}
});
this.app.get('/api/user-activity', async (req, res) => {
try {
const days = parseInt(req.query.days) || 7;
const activity = await this.getUserActivity(days);
res.json(activity);
} catch (error) {
logger.error('Error getting user activity:', error);
res.status(500).json({ error: 'Failed to get user activity' });
}
});
// Health check
this.app.get('/health', (req, res) => {
res.json({
status: 'healthy',
timestamp: new Date().toISOString(),
uptime: process.uptime(),
version: '2.0.0'
});
});
// 404 handler
this.app.use((req, res) => {
res.status(404).render('error', { error: 'Page not found' });
});
// Error handler
this.app.use((error, req, res, next) => {
logger.error('Express error:', error);
res.status(500).render('error', { error: 'Internal server error' });
});
}
setupSocketIO() {
this.io.on('connection', (socket) => {
logger.info('Client connected to dashboard', { socketId: socket.id });
// Send initial data
socket.emit('stats-update', this.realtimeStats);
// Handle disconnection
socket.on('disconnect', () => {
logger.info('Client disconnected from dashboard', { socketId: socket.id });
});
// Handle client requests for specific data
socket.on('request-stats', async () => {
try {
const stats = await this.getSystemStats();
socket.emit('stats-update', stats);
} catch (error) {
logger.error('Error sending stats to client:', error);
}
});
});
}
async getDashboardData() {
try {
const [
systemStats,
recentActivity,
languageStats,
performanceMetrics,
userActivity
] = await Promise.all([
this.getSystemStats(),
this.getRecentActivity(10),
this.getLanguageStats(7),
this.getPerformanceMetrics(24),
this.getUserActivity(7)
]);
return {
systemStats,
recentActivity,
languageStats,
performanceMetrics,
userActivity,
activeConnections: Array.from(this.activeConnections.values())
};
} catch (error) {
logger.error('Error getting dashboard data:', error);
throw error;
}
}
async getSystemStats() {
try {
const queries = [
// Total transcriptions
`SELECT COUNT(*) as total_transcriptions FROM transcriptions WHERE status = 'completed'`,
// Total speaking time
`SELECT COALESCE(SUM(duration_seconds), 0) as total_speaking_time FROM recordings WHERE status IN ('completed', 'processed')`,
// Active connections count
`SELECT COUNT(*) as active_connections FROM connections WHERE ended_at IS NULL`,
// Today's activity
`SELECT COUNT(*) as today_transcriptions FROM transcriptions
WHERE DATE(created_at) = CURRENT_DATE AND status = 'completed'`,
// Language breakdown (last 7 days)
`SELECT detected_language, COUNT(*) as count
FROM transcriptions
WHERE created_at >= NOW() - INTERVAL '7 days' AND status = 'completed'
GROUP BY detected_language
ORDER BY count DESC`,
// Average processing times
`SELECT
AVG(processing_time_ms) as avg_transcription_time,
AVG(t.processing_time_ms) as avg_translation_time
FROM transcriptions tr
LEFT JOIN translations t ON tr.id = t.transcription_id
WHERE tr.created_at >= NOW() - INTERVAL '24 hours'`
];
const results = await Promise.all(
queries.map(query => this.pg_client.query(query))
);
const languageBreakdown = {};
if (results[4].rows) {
results[4].rows.forEach(row => {
const langInfo = LANGUAGE_INFO[row.detected_language] || { name: row.detected_language, flag: '🏳️' };
languageBreakdown[row.detected_language] = {
count: parseInt(row.count),
name: langInfo.name,
flag: langInfo.flag,
color: langInfo.color || '#95a5a6'
};
});
}
return {
totalTranscriptions: parseInt(results[0].rows[0].total_transcriptions),
totalSpeakingTime: parseFloat(results[1].rows[0].total_speaking_time || 0),
activeConnections: parseInt(results[2].rows[0].active_connections),
todayTranscriptions: parseInt(results[3].rows[0].today_transcriptions),
languageBreakdown,
avgTranscriptionTime: parseFloat(results[5].rows[0].avg_transcription_time || 0),
avgTranslationTime: parseFloat(results[5].rows[0].avg_translation_time || 0),
lastUpdated: new Date().toISOString()
};
} catch (error) {
logger.error('Error getting system stats:', error);
throw error;
}
}
async getRecentActivity(limit = 20) {
try {
const query = `
SELECT
t.id as transcription_id,
t.detected_language,
t.transcription_text,
t.created_at,
t.processing_time_ms,
r.speaker_nickname,
r.speaker_username,
r.duration_seconds,
c.channel_name,
c.guild_id
FROM transcriptions t
JOIN recordings r ON t.recording_id = r.id
JOIN connections c ON r.connection_id = c.id
WHERE t.status = 'completed'
ORDER BY t.created_at DESC
LIMIT $1
`;
const result = await this.pg_client.query(query, [limit]);
return result.rows.map(row => ({
...row,
languageInfo: LANGUAGE_INFO[row.detected_language] || { name: row.detected_language, flag: '🏳️' },
timeAgo: moment(row.created_at).fromNow(),
transcriptPreview: row.transcription_text.length > 100
? row.transcription_text.substring(0, 100) + '...'
: row.transcription_text
}));
} catch (error) {
logger.error('Error getting recent activity:', error);
throw error;
}
}
async getLanguageStats(days = 7) {
try {
const query = `
SELECT
detected_language,
COUNT(*) as transcription_count,
AVG(processing_time_ms) as avg_processing_time,
SUM(r.duration_seconds) as total_speaking_time
FROM transcriptions t
JOIN recordings r ON t.recording_id = r.id
WHERE t.created_at >= NOW() - INTERVAL '${days} days'
AND t.status = 'completed'
GROUP BY detected_language
ORDER BY transcription_count DESC
`;
const result = await this.pg_client.query(query);
return result.rows.map(row => ({
...row,
languageInfo: LANGUAGE_INFO[row.detected_language] || { name: row.detected_language, flag: '🏳️' },
transcription_count: parseInt(row.transcription_count),
avg_processing_time: parseFloat(row.avg_processing_time || 0),
total_speaking_time: parseFloat(row.total_speaking_time || 0)
}));
} catch (error) {
logger.error('Error getting language stats:', error);
throw error;
}
}
async getPerformanceMetrics(hours = 24) {
try {
const query = `
SELECT
service_name,
operation,
AVG(duration_ms) as avg_duration,
COUNT(*) as operation_count,
COUNT(CASE WHEN success = false THEN 1 END) as error_count
FROM processing_metrics
WHERE created_at >= NOW() - INTERVAL '${hours} hours'
GROUP BY service_name, operation
ORDER BY service_name, operation
`;
const result = await this.pg_client.query(query);
return result.rows.map(row => ({
...row,
avg_duration: parseFloat(row.avg_duration || 0),
operation_count: parseInt(row.operation_count),
error_count: parseInt(row.error_count),
success_rate: row.operation_count > 0
? ((row.operation_count - row.error_count) / row.operation_count * 100).toFixed(2)
: 0
}));
} catch (error) {
logger.error('Error getting performance metrics:', error);
throw error;
}
}
async getUserActivity(days = 7) {
try {
const query = `
SELECT
speaker_id,
speaker_nickname,
speaker_username,
COUNT(*) as transcription_count,
SUM(duration_seconds) as total_speaking_time,
MAX(started_at) as last_activity
FROM recordings r
JOIN transcriptions t ON r.id = t.recording_id
WHERE r.started_at >= NOW() - INTERVAL '${days} days'
AND t.status = 'completed'
GROUP BY speaker_id, speaker_nickname, speaker_username
ORDER BY transcription_count DESC
LIMIT 20
`;
const result = await this.pg_client.query(query);
return result.rows.map(row => ({
...row,
transcription_count: parseInt(row.transcription_count),
total_speaking_time: parseFloat(row.total_speaking_time || 0),
last_activity_ago: moment(row.last_activity).fromNow()
}));
} catch (error) {
logger.error('Error getting user activity:', error);
throw error;
}
}
async listenForEvents() {
logger.info('Starting to listen for dashboard events');
const subscriber = this.redis_client.duplicate();
await subscriber.connect();
await subscriber.subscribe('voice-translator-events', (message) => {
this.processRedisMessage(message);
});
logger.info('Subscribed to voice-translator-events channel for dashboard');
}
async processRedisMessage(message) {
try {
const data = JSON.parse(message);
const { event, data: eventData } = data;
switch (event) {
case 'connection_started':
await this.handleConnectionStarted(eventData);
break;
case 'connection_ended':
await this.handleConnectionEnded(eventData);
break;
case 'transcription_completed':
await this.handleTranscriptionCompleted(eventData);
break;
case 'translations_completed':
await this.handleTranslationsCompleted(eventData);
break;
default:
logger.debug('Unhandled event type for dashboard:', event);
}
} catch (error) {
if (error instanceof SyntaxError) {
logger.error('Invalid JSON in Redis message:', message);
} else {
logger.error('Error processing Redis message in dashboard:', error);
}
}
}
async handleConnectionStarted(eventData) {
const { connectionId, channelName, guildId } = eventData;
this.activeConnections.set(connectionId, {
...eventData,
startedAt: new Date()
});
// Broadcast to all connected clients
this.io.emit('connection-started', {
connectionId,
channelName,
guildId,
timestamp: new Date().toISOString()
});
logger.info('Dashboard: Connection started', { connectionId, channelName });
}
async handleConnectionEnded(eventData) {
const { connectionId } = eventData;
this.activeConnections.delete(connectionId);
// Broadcast to all connected clients
this.io.emit('connection-ended', {
connectionId,
timestamp: new Date().toISOString()
});
logger.info('Dashboard: Connection ended', { connectionId });
}
async handleTranscriptionCompleted(eventData) {
const { transcriptionId, recordingId, text, language } = eventData;
// Update realtime stats
this.realtimeStats.totalTranscriptions++;
this.realtimeStats.languageBreakdown[language] =
(this.realtimeStats.languageBreakdown[language] || 0) + 1;
// Broadcast to all connected clients
this.io.emit('transcription-completed', {
transcriptionId,
recordingId,
text: text.length > 100 ? text.substring(0, 100) + '...' : text,
language,
languageInfo: LANGUAGE_INFO[language] || { name: language, flag: '🏳️' },
timestamp: new Date().toISOString()
});
logger.info('Dashboard: Transcription completed', { transcriptionId, language });
}
async handleTranslationsCompleted(eventData) {
const { transcriptionId, translations } = eventData;
// Broadcast to all connected clients
this.io.emit('translations-completed', {
transcriptionId,
translationCount: Object.keys(translations).length,
timestamp: new Date().toISOString()
});
logger.info('Dashboard: Translations completed', { transcriptionId });
}
async cleanup() {
if (this.redis_client) {
await this.redis_client.quit();
}
if (this.pg_client) {
await this.pg_client.end();
}
if (this.server) {
this.server.close();
}
}
start() {
this.server.listen(PORT, () => {
logger.info(`Dashboard server running on port ${PORT}`);
});
}
}
// Main application
async function main() {
const dashboard = new DashboardService();
try {
await dashboard.initialize();
dashboard.start();
logger.info('Dashboard service started successfully');
// Graceful shutdown handling
process.on('SIGINT', async () => {
logger.info('Received SIGINT, shutting down dashboard gracefully...');
await dashboard.cleanup();
process.exit(0);
});
process.on('SIGTERM', async () => {
logger.info('Received SIGTERM, shutting down dashboard gracefully...');
await dashboard.cleanup();
process.exit(0);
});
} catch (error) {
logger.error('Failed to start dashboard service:', error);
await dashboard.cleanup();
process.exit(1);
}
}
// Handle unhandled errors
process.on('unhandledRejection', (reason, promise) => {
logger.error('Unhandled Rejection at:', promise, 'reason:', reason);
});
process.on('uncaughtException', (error) => {
logger.error('Uncaught Exception:', error);
process.exit(1);
});
// Start the application
main().catch((error) => {
logger.error('Fatal error in dashboard main:', error);
process.exit(1);
});