Create Audio Processor service implementation
This commit is contained in:
352
services/audio-processor/src/processor.py
Normal file
352
services/audio-processor/src/processor.py
Normal file
@ -0,0 +1,352 @@
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from typing import Dict, Any
|
||||
|
||||
import redis.asyncio as redis
|
||||
import psycopg2
|
||||
from psycopg2.extras import DictCursor
|
||||
import soundfile as sf
|
||||
import numpy as np
|
||||
from scipy import signal
|
||||
import structlog
|
||||
|
||||
# Configure structured logging
|
||||
structlog.configure(
|
||||
processors=[
|
||||
structlog.stdlib.filter_by_level,
|
||||
structlog.stdlib.add_log_level,
|
||||
structlog.stdlib.add_logger_name,
|
||||
structlog.stdlib.PositionalArgumentsFormatter(),
|
||||
structlog.processors.TimeStamper(fmt="iso"),
|
||||
structlog.processors.StackInfoRenderer(),
|
||||
structlog.processors.format_exc_info,
|
||||
structlog.processors.JSONRenderer()
|
||||
],
|
||||
context_class=dict,
|
||||
logger_factory=structlog.stdlib.LoggerFactory(),
|
||||
wrapper_class=structlog.stdlib.BoundLogger,
|
||||
cache_logger_on_first_use=True,
|
||||
)
|
||||
|
||||
logger = structlog.get_logger()
|
||||
|
||||
# Configuration
|
||||
POSTGRES_URL = os.getenv('POSTGRES_URL')
|
||||
REDIS_URL = os.getenv('REDIS_URL', 'redis://redis:6379')
|
||||
RAW_AUDIO_PATH = '/app/audio/raw'
|
||||
PROCESSED_AUDIO_PATH = '/app/audio/processed'
|
||||
LOG_LEVEL = os.getenv('LOG_LEVEL', 'INFO')
|
||||
|
||||
# Set up logging level
|
||||
logging.basicConfig(level=getattr(logging, LOG_LEVEL))
|
||||
|
||||
class AudioProcessor:
|
||||
def __init__(self):
|
||||
self.redis_client = None
|
||||
self.pg_connection = None
|
||||
|
||||
async def initialize(self):
|
||||
"""Initialize database and Redis connections"""
|
||||
try:
|
||||
# Connect to Redis
|
||||
self.redis_client = redis.from_url(REDIS_URL)
|
||||
await self.redis_client.ping()
|
||||
|
||||
# Connect to PostgreSQL
|
||||
self.pg_connection = psycopg2.connect(POSTGRES_URL)
|
||||
|
||||
logger.info("Audio processor initialized successfully")
|
||||
except Exception as e:
|
||||
logger.error("Failed to initialize audio processor", error=str(e))
|
||||
raise
|
||||
|
||||
async def cleanup(self):
|
||||
"""Clean up connections"""
|
||||
if self.redis_client:
|
||||
await self.redis_client.close()
|
||||
if self.pg_connection:
|
||||
self.pg_connection.close()
|
||||
|
||||
def convert_pcm_to_wav_ffmpeg(self, input_path: str, output_path: str) -> bool:
|
||||
"""
|
||||
Convert PCM to optimized WAV using FFmpeg
|
||||
Discord PCM: 48kHz, 16-bit, stereo -> 16kHz, 16-bit, mono
|
||||
"""
|
||||
try:
|
||||
cmd = [
|
||||
'ffmpeg',
|
||||
'-f', 's16le', # Input format: signed 16-bit little endian
|
||||
'-ar', '48000', # Input sample rate: 48kHz
|
||||
'-ac', '2', # Input channels: stereo
|
||||
'-i', input_path, # Input file
|
||||
'-ar', '16000', # Output sample rate: 16kHz (optimal for Whisper)
|
||||
'-ac', '1', # Output channels: mono
|
||||
'-f', 'wav', # Output format: WAV
|
||||
'-y', # Overwrite output file
|
||||
output_path # Output file
|
||||
]
|
||||
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=30 # 30 second timeout
|
||||
)
|
||||
|
||||
if result.returncode == 0:
|
||||
logger.info("Successfully converted PCM to WAV",
|
||||
input_path=input_path, output_path=output_path)
|
||||
return True
|
||||
else:
|
||||
logger.error("FFmpeg conversion failed",
|
||||
error=result.stderr, input_path=input_path)
|
||||
return False
|
||||
|
||||
except subprocess.TimeoutExpired:
|
||||
logger.error("FFmpeg conversion timed out", input_path=input_path)
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error("Error during PCM to WAV conversion",
|
||||
error=str(e), input_path=input_path)
|
||||
return False
|
||||
|
||||
def convert_pcm_to_wav_python(self, input_path: str, output_path: str) -> bool:
|
||||
"""
|
||||
Alternative Python-based conversion using scipy/soundfile
|
||||
Fallback if FFmpeg fails
|
||||
"""
|
||||
try:
|
||||
# Read raw PCM data
|
||||
with open(input_path, 'rb') as f:
|
||||
pcm_data = f.read()
|
||||
|
||||
# Convert bytes to numpy array (16-bit signed integers)
|
||||
audio = np.frombuffer(pcm_data, dtype=np.int16)
|
||||
|
||||
# Discord sends stereo (2 channels), reshape accordingly
|
||||
if len(audio) % 2 == 0:
|
||||
audio = audio.reshape(-1, 2)
|
||||
# Convert to mono by averaging channels
|
||||
audio_mono = np.mean(audio, axis=1, dtype=np.int16)
|
||||
else:
|
||||
# If odd number of samples, treat as mono
|
||||
audio_mono = audio
|
||||
|
||||
# Downsample from 48kHz to 16kHz
|
||||
original_rate = 48000
|
||||
target_rate = 16000
|
||||
downsample_factor = original_rate // target_rate
|
||||
|
||||
# Use scipy's decimate for anti-aliasing downsampling
|
||||
audio_16k = signal.decimate(audio_mono, downsample_factor, ftype='fir')
|
||||
|
||||
# Ensure we're within int16 range
|
||||
audio_16k = np.clip(audio_16k, -32768, 32767).astype(np.int16)
|
||||
|
||||
# Save as WAV file
|
||||
sf.write(output_path, audio_16k, target_rate, subtype='PCM_16')
|
||||
|
||||
logger.info("Successfully converted PCM to WAV (Python)",
|
||||
input_path=input_path, output_path=output_path)
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Error during Python PCM to WAV conversion",
|
||||
error=str(e), input_path=input_path)
|
||||
return False
|
||||
|
||||
def get_audio_info(self, file_path: str) -> Dict[str, Any]:
|
||||
"""Get audio file information"""
|
||||
try:
|
||||
# Get file size
|
||||
file_size = os.path.getsize(file_path)
|
||||
|
||||
# For WAV files, get additional info
|
||||
if file_path.endswith('.wav'):
|
||||
info = sf.info(file_path)
|
||||
return {
|
||||
'file_size_bytes': file_size,
|
||||
'duration_seconds': info.duration,
|
||||
'sample_rate': info.samplerate,
|
||||
'channels': info.channels,
|
||||
'format': info.format
|
||||
}
|
||||
else:
|
||||
return {
|
||||
'file_size_bytes': file_size
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error("Error getting audio info", error=str(e), file_path=file_path)
|
||||
return {'file_size_bytes': 0}
|
||||
|
||||
async def update_recording_status(self, recording_id: int, status: str,
|
||||
processed_path: str = None, error_message: str = None):
|
||||
"""Update recording status in database"""
|
||||
try:
|
||||
with self.pg_connection.cursor() as cursor:
|
||||
if processed_path:
|
||||
# Get audio info for the processed file
|
||||
audio_info = self.get_audio_info(processed_path)
|
||||
|
||||
cursor.execute("""
|
||||
UPDATE recordings
|
||||
SET status = %s, updated_at = CURRENT_TIMESTAMP
|
||||
WHERE id = %s
|
||||
""", (status, recording_id))
|
||||
else:
|
||||
cursor.execute("""
|
||||
UPDATE recordings
|
||||
SET status = %s, updated_at = CURRENT_TIMESTAMP
|
||||
WHERE id = %s
|
||||
""", (status, recording_id))
|
||||
|
||||
self.pg_connection.commit()
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Failed to update recording status",
|
||||
error=str(e), recording_id=recording_id)
|
||||
self.pg_connection.rollback()
|
||||
|
||||
async def publish_event(self, event: str, data: Dict[str, Any]):
|
||||
"""Publish event to Redis"""
|
||||
try:
|
||||
message = {
|
||||
'event': event,
|
||||
'data': data,
|
||||
'timestamp': asyncio.get_event_loop().time(),
|
||||
'service': 'audio-processor'
|
||||
}
|
||||
|
||||
await self.redis_client.publish('voice-translator-events', json.dumps(message))
|
||||
logger.debug("Published event", event=event, data=data)
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Failed to publish event", error=str(e), event=event)
|
||||
|
||||
async def process_recording(self, message_data: Dict[str, Any]):
|
||||
"""Process a single recording"""
|
||||
recording_id = message_data.get('recordingId')
|
||||
file_path = message_data.get('filePath')
|
||||
|
||||
if not recording_id or not file_path:
|
||||
logger.error("Invalid message data", data=message_data)
|
||||
return
|
||||
|
||||
logger.info("Processing recording", recording_id=recording_id, file_path=file_path)
|
||||
|
||||
try:
|
||||
# Update status to processing
|
||||
await self.update_recording_status(recording_id, 'processing')
|
||||
|
||||
# Generate output path
|
||||
input_file = Path(file_path)
|
||||
output_file = Path(PROCESSED_AUDIO_PATH) / f"{input_file.stem}.wav"
|
||||
|
||||
# Ensure output directory exists
|
||||
output_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Try FFmpeg first, fallback to Python implementation
|
||||
success = self.convert_pcm_to_wav_ffmpeg(str(input_file), str(output_file))
|
||||
|
||||
if not success:
|
||||
logger.warning("FFmpeg conversion failed, trying Python method",
|
||||
recording_id=recording_id)
|
||||
success = self.convert_pcm_to_wav_python(str(input_file), str(output_file))
|
||||
|
||||
if success:
|
||||
# Update database with processed file info
|
||||
await self.update_recording_status(recording_id, 'processed', str(output_file))
|
||||
|
||||
# Get audio info for the processed file
|
||||
audio_info = self.get_audio_info(str(output_file))
|
||||
|
||||
# Publish processing completed event
|
||||
await self.publish_event('audio_processed', {
|
||||
'recordingId': recording_id,
|
||||
'originalPath': file_path,
|
||||
'processedPath': str(output_file),
|
||||
'audioInfo': audio_info
|
||||
})
|
||||
|
||||
logger.info("Successfully processed recording",
|
||||
recording_id=recording_id,
|
||||
output_path=str(output_file))
|
||||
|
||||
# Clean up original PCM file to save space
|
||||
try:
|
||||
os.remove(input_file)
|
||||
logger.debug("Cleaned up original PCM file", path=str(input_file))
|
||||
except Exception as e:
|
||||
logger.warning("Failed to clean up PCM file",
|
||||
error=str(e), path=str(input_file))
|
||||
|
||||
else:
|
||||
await self.update_recording_status(recording_id, 'failed',
|
||||
error_message="Audio conversion failed")
|
||||
logger.error("Failed to process recording", recording_id=recording_id)
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Error processing recording",
|
||||
error=str(e), recording_id=recording_id)
|
||||
await self.update_recording_status(recording_id, 'failed',
|
||||
error_message=str(e))
|
||||
|
||||
async def listen_for_recordings(self):
|
||||
"""Listen for new recordings to process"""
|
||||
logger.info("Starting to listen for recording events")
|
||||
|
||||
# Subscribe to Redis pub/sub
|
||||
pubsub = self.redis_client.pubsub()
|
||||
await pubsub.subscribe('voice-translator-events')
|
||||
|
||||
try:
|
||||
async for message in pubsub.listen():
|
||||
if message['type'] == 'message':
|
||||
try:
|
||||
data = json.loads(message['data'])
|
||||
|
||||
# Process recording_completed events
|
||||
if data.get('event') == 'recording_completed':
|
||||
await self.process_recording(data['data'])
|
||||
|
||||
except json.JSONDecodeError:
|
||||
logger.error("Invalid JSON in message", data=message['data'])
|
||||
except Exception as e:
|
||||
logger.error("Error processing message", error=str(e))
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Error in message listener", error=str(e))
|
||||
finally:
|
||||
await pubsub.unsubscribe('voice-translator-events')
|
||||
|
||||
async def main():
|
||||
"""Main application entry point"""
|
||||
processor = AudioProcessor()
|
||||
|
||||
try:
|
||||
await processor.initialize()
|
||||
|
||||
# Create directories if they don't exist
|
||||
os.makedirs(RAW_AUDIO_PATH, exist_ok=True)
|
||||
os.makedirs(PROCESSED_AUDIO_PATH, exist_ok=True)
|
||||
|
||||
logger.info("Audio processor started successfully")
|
||||
|
||||
# Start listening for events
|
||||
await processor.listen_for_recordings()
|
||||
|
||||
except KeyboardInterrupt:
|
||||
logger.info("Received interrupt signal, shutting down...")
|
||||
except Exception as e:
|
||||
logger.error("Fatal error in audio processor", error=str(e))
|
||||
raise
|
||||
finally:
|
||||
await processor.cleanup()
|
||||
logger.info("Audio processor shutdown complete")
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
Reference in New Issue
Block a user