Files
encoder/app.py
T
2025-07-08 19:25:16 +00:00

485 lines
17 KiB
Python

from flask import Flask, render_template, request, jsonify, redirect, url_for
import os
import json
import threading
import time
import subprocess
import mysql.connector
from datetime import datetime
from pathlib import Path
import mimetypes
import logging
from werkzeug.utils import secure_filename
from dotenv import load_dotenv
load_dotenv()
app = Flask(__name__)
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', 'your-secret-key-here')
DB_CONFIG = {
'host': os.environ.get('DB_HOST', 'localhost'),
'user': os.environ.get('DB_USER', 'root'),
'password': os.environ.get('DB_PASSWORD', ''),
'database': os.environ.get('DB_NAME', 'encoder'),
'autocommit': True
}
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
current_jobs = {'video': None, 'audio': None}
job_queue = {'video': [], 'audio': []}
job_lock = threading.Lock()
class DatabaseManager:
def __init__(self):
self.config = DB_CONFIG
self.init_database()
def get_connection(self):
return mysql.connector.connect(**self.config)
def init_database(self):
"""Initialize database tables"""
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS encoder_config (
id INT AUTO_INCREMENT PRIMARY KEY,
encoder_type ENUM('video', 'audio') NOT NULL,
watch_folder VARCHAR(500) NOT NULL,
enabled BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS job_reports (
id INT AUTO_INCREMENT PRIMARY KEY,
encoder_type ENUM('video', 'audio') NOT NULL,
file_path VARCHAR(500) NOT NULL,
original_size BIGINT NOT NULL,
encoded_size BIGINT NOT NULL,
size_saved BIGINT NOT NULL,
original_format VARCHAR(50) NOT NULL,
encoded_format VARCHAR(50) NOT NULL,
status ENUM('success', 'failed', 'processing') NOT NULL,
error_message TEXT,
processing_time DECIMAL(10,2),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()
cursor.close()
conn.close()
def get_config(self, encoder_type):
"""Get configuration for an encoder type"""
conn = self.get_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM encoder_config WHERE encoder_type = %s AND enabled = TRUE", (encoder_type,))
result = cursor.fetchall()
cursor.close()
conn.close()
return result
def add_config(self, encoder_type, watch_folder):
"""Add a new watch folder configuration"""
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute(
"INSERT INTO encoder_config (encoder_type, watch_folder) VALUES (%s, %s)",
(encoder_type, watch_folder)
)
conn.commit()
cursor.close()
conn.close()
def add_job_report(self, encoder_type, file_path, original_size, encoded_size,
original_format, encoded_format, status, error_message=None, processing_time=None):
"""Add a job report to the database"""
size_saved = original_size - encoded_size
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO job_reports
(encoder_type, file_path, original_size, encoded_size, size_saved,
original_format, encoded_format, status, error_message, processing_time)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (encoder_type, file_path, original_size, encoded_size, size_saved,
original_format, encoded_format, status, error_message, processing_time))
conn.commit()
cursor.close()
conn.close()
def get_job_reports(self, encoder_type, page=1, per_page=20):
"""Get job reports with pagination"""
offset = (page - 1) * per_page
conn = self.get_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT * FROM job_reports
WHERE encoder_type = %s
ORDER BY created_at DESC
LIMIT %s OFFSET %s
""", (encoder_type, per_page, offset))
reports = cursor.fetchall()
cursor.execute("SELECT COUNT(*) as total FROM job_reports WHERE encoder_type = %s", (encoder_type,))
total = cursor.fetchone()['total']
cursor.close()
conn.close()
return reports, total
def get_total_stats(self, encoder_type):
"""Get total statistics for an encoder type"""
conn = self.get_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT
COUNT(*) as total_jobs,
SUM(size_saved) as total_saved,
SUM(original_size) as total_original_size,
SUM(encoded_size) as total_encoded_size
FROM job_reports
WHERE encoder_type = %s AND status = 'success'
""", (encoder_type,))
result = cursor.fetchone()
cursor.close()
conn.close()
return result
db_manager = DatabaseManager()
class VideoEncoder:
def __init__(self):
self.supported_formats = ['.mp4', '.avi', '.mkv', '.mov', '.wmv', '.flv', '.webm']
def is_h264(self, file_path):
"""Check if video file is H264 encoded"""
try:
result = subprocess.run([
'ffprobe', '-v', 'quiet', '-select_streams', 'v:0',
'-show_entries', 'stream=codec_name', '-of', 'csv=p=0',
file_path
], capture_output=True, text=True)
return result.stdout.strip() == 'h264'
except Exception as e:
logger.error(f"Error checking codec for {file_path}: {e}")
return False
def encode_to_h265(self, input_path, output_path):
"""Encode video to H265 MKV format"""
try:
cmd = [
'ffmpeg', '-i', input_path, '-c:v', 'libx265', '-c:a', 'copy',
'-preset', 'medium', '-crf', '28', '-f', 'matroska', output_path, '-y'
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
return True, None
else:
return False, result.stderr
except Exception as e:
return False, str(e)
class AudioEncoder:
def __init__(self):
self.supported_formats = ['.flac', '.wav', '.m4a', '.ogg']
def is_flac(self, file_path):
"""Check if audio file is FLAC format"""
return file_path.lower().endswith('.flac')
def encode_to_mp3(self, input_path, output_path):
"""Encode audio to MP3 320kbps"""
try:
cmd = [
'ffmpeg', '-i', input_path, '-c:a', 'libmp3lame',
'-b:a', '320k', output_path, '-y'
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
return True, None
else:
return False, result.stderr
except Exception as e:
return False, str(e)
def process_file(encoder_type, file_path, encoder):
"""Process a single file"""
global current_jobs
start_time = time.time()
original_size = os.path.getsize(file_path)
try:
file_name = os.path.basename(file_path)
file_dir = os.path.dirname(file_path)
if encoder_type == 'video':
if not encoder.is_h264(file_path):
return
temp_output = os.path.join(file_dir, f"temp_{file_name}")
temp_output = os.path.splitext(temp_output)[0] + '.mkv'
success, error = encoder.encode_to_h265(file_path, temp_output)
if success:
encoded_size = os.path.getsize(temp_output)
os.remove(file_path)
final_output = os.path.splitext(file_path)[0] + '.mkv'
os.rename(temp_output, final_output)
processing_time = time.time() - start_time
db_manager.add_job_report(
encoder_type, file_path, original_size, encoded_size,
os.path.splitext(file_path)[1], '.mkv', 'success',
processing_time=processing_time
)
logger.info(f"Successfully encoded {file_path} to H265")
else:
if os.path.exists(temp_output):
os.remove(temp_output)
db_manager.add_job_report(
encoder_type, file_path, original_size, 0,
os.path.splitext(file_path)[1], '.mkv', 'failed',
error_message=error
)
logger.error(f"Failed to encode {file_path}: {error}")
elif encoder_type == 'audio':
if not encoder.is_flac(file_path):
return
temp_output = os.path.join(file_dir, f"temp_{file_name}")
temp_output = os.path.splitext(temp_output)[0] + '.mp3'
success, error = encoder.encode_to_mp3(file_path, temp_output)
if success:
encoded_size = os.path.getsize(temp_output)
os.remove(file_path)
final_output = os.path.splitext(file_path)[0] + '.mp3'
os.rename(temp_output, final_output)
processing_time = time.time() - start_time
db_manager.add_job_report(
encoder_type, file_path, original_size, encoded_size,
'.flac', '.mp3', 'success',
processing_time=processing_time
)
logger.info(f"Successfully encoded {file_path} to MP3")
else:
if os.path.exists(temp_output):
os.remove(temp_output)
db_manager.add_job_report(
encoder_type, file_path, original_size, 0,
'.flac', '.mp3', 'failed',
error_message=error
)
logger.error(f"Failed to encode {file_path}: {error}")
except Exception as e:
logger.error(f"Error processing {file_path}: {e}")
db_manager.add_job_report(
encoder_type, file_path, original_size, 0,
os.path.splitext(file_path)[1], '', 'failed',
error_message=str(e)
)
finally:
with job_lock:
current_jobs[encoder_type] = None
def worker_thread(encoder_type):
"""Worker thread for processing jobs"""
encoder = VideoEncoder() if encoder_type == 'video' else AudioEncoder()
while True:
with job_lock:
if job_queue[encoder_type] and not current_jobs[encoder_type]:
file_path = job_queue[encoder_type].pop(0)
current_jobs[encoder_type] = {
'file_path': file_path,
'start_time': time.time()
}
if current_jobs[encoder_type]:
process_file(encoder_type, current_jobs[encoder_type]['file_path'], encoder)
time.sleep(1)
def scan_folders():
"""Scan watch folders for new files"""
video_encoder = VideoEncoder()
audio_encoder = AudioEncoder()
while True:
try:
video_configs = db_manager.get_config('video')
for config in video_configs:
folder = config['watch_folder']
if os.path.exists(folder):
for root, dirs, files in os.walk(folder):
for file in files:
if any(file.lower().endswith(ext) for ext in video_encoder.supported_formats):
file_path = os.path.join(root, file)
if video_encoder.is_h264(file_path):
with job_lock:
if file_path not in job_queue['video']:
job_queue['video'].append(file_path)
audio_configs = db_manager.get_config('audio')
for config in audio_configs:
folder = config['watch_folder']
if os.path.exists(folder):
for root, dirs, files in os.walk(folder):
for file in files:
if any(file.lower().endswith(ext) for ext in audio_encoder.supported_formats):
file_path = os.path.join(root, file)
if audio_encoder.is_flac(file_path):
with job_lock:
if file_path not in job_queue['audio']:
job_queue['audio'].append(file_path)
except Exception as e:
logger.error(f"Error scanning folders: {e}")
time.sleep(60)
video_worker = threading.Thread(target=worker_thread, args=('video',), daemon=True)
audio_worker = threading.Thread(target=worker_thread, args=('audio',), daemon=True)
folder_scanner = threading.Thread(target=scan_folders, daemon=True)
video_worker.start()
audio_worker.start()
folder_scanner.start()
@app.route('/')
def index():
"""Main dashboard"""
with job_lock:
current_video_job = current_jobs['video']
current_audio_job = current_jobs['audio']
video_queue = job_queue['video'][:10]
audio_queue = job_queue['audio'][:10]
video_stats = db_manager.get_total_stats('video')
audio_stats = db_manager.get_total_stats('audio')
return render_template('index.html',
current_video_job=current_video_job,
current_audio_job=current_audio_job,
video_queue=video_queue,
audio_queue=audio_queue,
video_stats=video_stats,
audio_stats=audio_stats)
@app.route('/reports/<encoder_type>')
def reports(encoder_type):
"""Show job reports for encoder type"""
if encoder_type not in ['video', 'audio']:
return redirect(url_for('index'))
page = request.args.get('page', 1, type=int)
reports, total = db_manager.get_job_reports(encoder_type, page, 20)
total_pages = (total + 19) // 20
return render_template('reports.html',
encoder_type=encoder_type,
reports=reports,
page=page,
total_pages=total_pages,
total=total)
@app.route('/config')
def config():
"""Configuration page"""
video_configs = db_manager.get_config('video')
audio_configs = db_manager.get_config('audio')
return render_template('config.html',
video_configs=video_configs,
audio_configs=audio_configs)
@app.route('/add_config', methods=['POST'])
def add_config():
"""Add new watch folder configuration"""
encoder_type = request.form.get('encoder_type')
watch_folder = request.form.get('watch_folder')
if encoder_type in ['video', 'audio'] and watch_folder:
db_manager.add_config(encoder_type, watch_folder)
return redirect(url_for('config'))
@app.route('/api/status')
def api_status():
"""API endpoint for current status"""
with job_lock:
return jsonify({
'current_jobs': current_jobs,
'queue_lengths': {
'video': len(job_queue['video']),
'audio': len(job_queue['audio'])
}
})
def format_bytes(bytes_val):
"""Format bytes to human readable format"""
if bytes_val is None:
return "0 B"
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if bytes_val < 1024.0:
return f"{bytes_val:.1f} {unit}"
bytes_val /= 1024.0
return f"{bytes_val:.1f} PB"
def format_duration(seconds):
"""Format seconds to human readable duration"""
if seconds is None:
return "N/A"
if seconds < 60:
return f"{seconds:.1f}s"
elif seconds < 3600:
minutes = seconds // 60
seconds = seconds % 60
return f"{int(minutes)}m {seconds:.0f}s"
else:
hours = seconds // 3600
minutes = (seconds % 3600) // 60
return f"{int(hours)}h {int(minutes)}m"
def path_exists(path):
"""Check if a path exists"""
return os.path.exists(path)
app.jinja_env.filters['format_bytes'] = format_bytes
app.jinja_env.filters['format_duration'] = format_duration
app.jinja_env.filters['path_exists'] = path_exists
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)