From 4e1307079be9dcc95e4641a3fe6f3b6540d50e9c Mon Sep 17 00:00:00 2001 From: MAHaines Date: Mon, 14 Jul 2025 00:26:34 -0500 Subject: [PATCH] Create Audio Processor service implementation --- services/audio-processor/src/processor.py | 352 ++++++++++++++++++++++ 1 file changed, 352 insertions(+) create mode 100644 services/audio-processor/src/processor.py diff --git a/services/audio-processor/src/processor.py b/services/audio-processor/src/processor.py new file mode 100644 index 0000000..6f1626c --- /dev/null +++ b/services/audio-processor/src/processor.py @@ -0,0 +1,352 @@ +import asyncio +import json +import logging +import os +import subprocess +from pathlib import Path +from typing import Dict, Any + +import redis.asyncio as redis +import psycopg2 +from psycopg2.extras import DictCursor +import soundfile as sf +import numpy as np +from scipy import signal +import structlog + +# Configure structured logging +structlog.configure( + processors=[ + structlog.stdlib.filter_by_level, + structlog.stdlib.add_log_level, + structlog.stdlib.add_logger_name, + structlog.stdlib.PositionalArgumentsFormatter(), + structlog.processors.TimeStamper(fmt="iso"), + structlog.processors.StackInfoRenderer(), + structlog.processors.format_exc_info, + structlog.processors.JSONRenderer() + ], + context_class=dict, + logger_factory=structlog.stdlib.LoggerFactory(), + wrapper_class=structlog.stdlib.BoundLogger, + cache_logger_on_first_use=True, +) + +logger = structlog.get_logger() + +# Configuration +POSTGRES_URL = os.getenv('POSTGRES_URL') +REDIS_URL = os.getenv('REDIS_URL', 'redis://redis:6379') +RAW_AUDIO_PATH = '/app/audio/raw' +PROCESSED_AUDIO_PATH = '/app/audio/processed' +LOG_LEVEL = os.getenv('LOG_LEVEL', 'INFO') + +# Set up logging level +logging.basicConfig(level=getattr(logging, LOG_LEVEL)) + +class AudioProcessor: + def __init__(self): + self.redis_client = None + self.pg_connection = None + + async def initialize(self): + """Initialize database and Redis connections""" + try: + # Connect to Redis + self.redis_client = redis.from_url(REDIS_URL) + await self.redis_client.ping() + + # Connect to PostgreSQL + self.pg_connection = psycopg2.connect(POSTGRES_URL) + + logger.info("Audio processor initialized successfully") + except Exception as e: + logger.error("Failed to initialize audio processor", error=str(e)) + raise + + async def cleanup(self): + """Clean up connections""" + if self.redis_client: + await self.redis_client.close() + if self.pg_connection: + self.pg_connection.close() + + def convert_pcm_to_wav_ffmpeg(self, input_path: str, output_path: str) -> bool: + """ + Convert PCM to optimized WAV using FFmpeg + Discord PCM: 48kHz, 16-bit, stereo -> 16kHz, 16-bit, mono + """ + try: + cmd = [ + 'ffmpeg', + '-f', 's16le', # Input format: signed 16-bit little endian + '-ar', '48000', # Input sample rate: 48kHz + '-ac', '2', # Input channels: stereo + '-i', input_path, # Input file + '-ar', '16000', # Output sample rate: 16kHz (optimal for Whisper) + '-ac', '1', # Output channels: mono + '-f', 'wav', # Output format: WAV + '-y', # Overwrite output file + output_path # Output file + ] + + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=30 # 30 second timeout + ) + + if result.returncode == 0: + logger.info("Successfully converted PCM to WAV", + input_path=input_path, output_path=output_path) + return True + else: + logger.error("FFmpeg conversion failed", + error=result.stderr, input_path=input_path) + return False + + except subprocess.TimeoutExpired: + logger.error("FFmpeg conversion timed out", input_path=input_path) + return False + except Exception as e: + logger.error("Error during PCM to WAV conversion", + error=str(e), input_path=input_path) + return False + + def convert_pcm_to_wav_python(self, input_path: str, output_path: str) -> bool: + """ + Alternative Python-based conversion using scipy/soundfile + Fallback if FFmpeg fails + """ + try: + # Read raw PCM data + with open(input_path, 'rb') as f: + pcm_data = f.read() + + # Convert bytes to numpy array (16-bit signed integers) + audio = np.frombuffer(pcm_data, dtype=np.int16) + + # Discord sends stereo (2 channels), reshape accordingly + if len(audio) % 2 == 0: + audio = audio.reshape(-1, 2) + # Convert to mono by averaging channels + audio_mono = np.mean(audio, axis=1, dtype=np.int16) + else: + # If odd number of samples, treat as mono + audio_mono = audio + + # Downsample from 48kHz to 16kHz + original_rate = 48000 + target_rate = 16000 + downsample_factor = original_rate // target_rate + + # Use scipy's decimate for anti-aliasing downsampling + audio_16k = signal.decimate(audio_mono, downsample_factor, ftype='fir') + + # Ensure we're within int16 range + audio_16k = np.clip(audio_16k, -32768, 32767).astype(np.int16) + + # Save as WAV file + sf.write(output_path, audio_16k, target_rate, subtype='PCM_16') + + logger.info("Successfully converted PCM to WAV (Python)", + input_path=input_path, output_path=output_path) + return True + + except Exception as e: + logger.error("Error during Python PCM to WAV conversion", + error=str(e), input_path=input_path) + return False + + def get_audio_info(self, file_path: str) -> Dict[str, Any]: + """Get audio file information""" + try: + # Get file size + file_size = os.path.getsize(file_path) + + # For WAV files, get additional info + if file_path.endswith('.wav'): + info = sf.info(file_path) + return { + 'file_size_bytes': file_size, + 'duration_seconds': info.duration, + 'sample_rate': info.samplerate, + 'channels': info.channels, + 'format': info.format + } + else: + return { + 'file_size_bytes': file_size + } + except Exception as e: + logger.error("Error getting audio info", error=str(e), file_path=file_path) + return {'file_size_bytes': 0} + + async def update_recording_status(self, recording_id: int, status: str, + processed_path: str = None, error_message: str = None): + """Update recording status in database""" + try: + with self.pg_connection.cursor() as cursor: + if processed_path: + # Get audio info for the processed file + audio_info = self.get_audio_info(processed_path) + + cursor.execute(""" + UPDATE recordings + SET status = %s, updated_at = CURRENT_TIMESTAMP + WHERE id = %s + """, (status, recording_id)) + else: + cursor.execute(""" + UPDATE recordings + SET status = %s, updated_at = CURRENT_TIMESTAMP + WHERE id = %s + """, (status, recording_id)) + + self.pg_connection.commit() + + except Exception as e: + logger.error("Failed to update recording status", + error=str(e), recording_id=recording_id) + self.pg_connection.rollback() + + async def publish_event(self, event: str, data: Dict[str, Any]): + """Publish event to Redis""" + try: + message = { + 'event': event, + 'data': data, + 'timestamp': asyncio.get_event_loop().time(), + 'service': 'audio-processor' + } + + await self.redis_client.publish('voice-translator-events', json.dumps(message)) + logger.debug("Published event", event=event, data=data) + + except Exception as e: + logger.error("Failed to publish event", error=str(e), event=event) + + async def process_recording(self, message_data: Dict[str, Any]): + """Process a single recording""" + recording_id = message_data.get('recordingId') + file_path = message_data.get('filePath') + + if not recording_id or not file_path: + logger.error("Invalid message data", data=message_data) + return + + logger.info("Processing recording", recording_id=recording_id, file_path=file_path) + + try: + # Update status to processing + await self.update_recording_status(recording_id, 'processing') + + # Generate output path + input_file = Path(file_path) + output_file = Path(PROCESSED_AUDIO_PATH) / f"{input_file.stem}.wav" + + # Ensure output directory exists + output_file.parent.mkdir(parents=True, exist_ok=True) + + # Try FFmpeg first, fallback to Python implementation + success = self.convert_pcm_to_wav_ffmpeg(str(input_file), str(output_file)) + + if not success: + logger.warning("FFmpeg conversion failed, trying Python method", + recording_id=recording_id) + success = self.convert_pcm_to_wav_python(str(input_file), str(output_file)) + + if success: + # Update database with processed file info + await self.update_recording_status(recording_id, 'processed', str(output_file)) + + # Get audio info for the processed file + audio_info = self.get_audio_info(str(output_file)) + + # Publish processing completed event + await self.publish_event('audio_processed', { + 'recordingId': recording_id, + 'originalPath': file_path, + 'processedPath': str(output_file), + 'audioInfo': audio_info + }) + + logger.info("Successfully processed recording", + recording_id=recording_id, + output_path=str(output_file)) + + # Clean up original PCM file to save space + try: + os.remove(input_file) + logger.debug("Cleaned up original PCM file", path=str(input_file)) + except Exception as e: + logger.warning("Failed to clean up PCM file", + error=str(e), path=str(input_file)) + + else: + await self.update_recording_status(recording_id, 'failed', + error_message="Audio conversion failed") + logger.error("Failed to process recording", recording_id=recording_id) + + except Exception as e: + logger.error("Error processing recording", + error=str(e), recording_id=recording_id) + await self.update_recording_status(recording_id, 'failed', + error_message=str(e)) + + async def listen_for_recordings(self): + """Listen for new recordings to process""" + logger.info("Starting to listen for recording events") + + # Subscribe to Redis pub/sub + pubsub = self.redis_client.pubsub() + await pubsub.subscribe('voice-translator-events') + + try: + async for message in pubsub.listen(): + if message['type'] == 'message': + try: + data = json.loads(message['data']) + + # Process recording_completed events + if data.get('event') == 'recording_completed': + await self.process_recording(data['data']) + + except json.JSONDecodeError: + logger.error("Invalid JSON in message", data=message['data']) + except Exception as e: + logger.error("Error processing message", error=str(e)) + + except Exception as e: + logger.error("Error in message listener", error=str(e)) + finally: + await pubsub.unsubscribe('voice-translator-events') + +async def main(): + """Main application entry point""" + processor = AudioProcessor() + + try: + await processor.initialize() + + # Create directories if they don't exist + os.makedirs(RAW_AUDIO_PATH, exist_ok=True) + os.makedirs(PROCESSED_AUDIO_PATH, exist_ok=True) + + logger.info("Audio processor started successfully") + + # Start listening for events + await processor.listen_for_recordings() + + except KeyboardInterrupt: + logger.info("Received interrupt signal, shutting down...") + except Exception as e: + logger.error("Fatal error in audio processor", error=str(e)) + raise + finally: + await processor.cleanup() + logger.info("Audio processor shutdown complete") + +if __name__ == "__main__": + asyncio.run(main())