| import os |
| import uuid |
| import requests |
| import threading |
| import time |
| from flask import Flask, request, jsonify, render_template |
| from werkzeug.utils import secure_filename |
| from itertools import cycle |
| from dotenv import load_dotenv |
|
|
| load_dotenv() |
|
|
| workers_env = os.getenv("WORKER_NODES", "") |
| if not workers_env: |
| raise ValueError("WORKER_NODES environment variable is not set") |
|
|
| WORKER_URLS = [url.strip() for url in workers_env.split(',') if url.strip()] |
| worker_pool = cycle(WORKER_URLS) |
|
|
| UPLOADER_API_URL = "https://hamed744-uploadfile.hf.space/upload" |
| HF_TOKEN = os.getenv("HF_TOKEN") |
|
|
| app = Flask(__name__, template_folder='templates') |
|
|
| jobs = {} |
| lock = threading.Lock() |
|
|
| def get_next_worker_url(): |
| return next(worker_pool) |
|
|
| def get_permanent_link(job_id, temp_render_url, worker_url): |
| try: |
| with lock: jobs[job_id]["status"] = "در حال دائمیسازی لینک..." |
| if not HF_TOKEN: raise Exception("HF_TOKEN not found") |
| |
| video_full_url = f"{worker_url}{temp_render_url}" |
| |
| payload = {'url': video_full_url} |
| headers = {'Authorization': f'Bearer {HF_TOKEN}'} |
| response = requests.post(UPLOADER_API_URL, json=payload, headers=headers, timeout=600) |
| response.raise_for_status() |
| data = response.json() |
| |
| final_url = data.get("hf_url") or data.get("url") |
| if not final_url: raise Exception("Invalid response from uploader service") |
| |
| with lock: |
| jobs[job_id]["status"] = "completed" |
| jobs[job_id]["result"] = final_url |
| |
| except Exception as e: |
| print(f"Error making link permanent for {job_id}: {e}") |
| with lock: |
| jobs[job_id]["status"] = "error" |
| jobs[job_id]["result"] = f"Error making link permanent. Temp link: {temp_render_url}" |
|
|
| def poll_worker_service(job_id, render_job_id, worker_url): |
| POLLING_INTERVAL = 15 |
| MAX_POLLING_ERRORS = 20 |
| error_count = 0 |
|
|
| while True: |
| try: |
| response = requests.post(f"{worker_url}/check_job_status", json={"job_id": render_job_id}, timeout=45) |
| response.raise_for_status() |
| error_count = 0 |
|
|
| data = response.json() |
| with lock: |
| current_job = jobs.get(job_id) |
| if current_job and current_job.get("status") != data.get("status"): |
| current_job["status"] = data.get("status") |
| |
| if data.get("status") == "completed": |
| get_permanent_link(job_id, data.get("result"), worker_url) |
| return |
| elif data.get("status") == "error": |
| with lock: |
| current_job = jobs.get(job_id) |
| if current_job: |
| current_job["result"] = data.get("result", "Unknown worker error") |
| return |
| except requests.exceptions.RequestException as e: |
| error_count += 1 |
| print(f"Connection error polling worker {worker_url} ({error_count}/{MAX_POLLING_ERRORS}): {e}") |
| if error_count >= MAX_POLLING_ERRORS: |
| with lock: |
| current_job = jobs.get(job_id) |
| if current_job: |
| current_job["status"] = "error" |
| current_job["result"] = f"Connection to worker ({worker_url}) lost." |
| return |
| |
| time.sleep(POLLING_INTERVAL) |
|
|
| @app.route('/') |
| def index(): |
| return render_template('index.html') |
|
|
| @app.route('/api/submit_job', methods=['POST']) |
| def submit_job(): |
| if 'image_file' not in request.files or 'video_file' not in request.files: |
| return jsonify({"error": "Image and video files are required"}), 400 |
|
|
| image_file = request.files['image_file'] |
| video_file = request.files['video_file'] |
| motion = request.form.get('motion') |
| style = request.form.get('style') |
|
|
| image_bytes = image_file.read() |
| video_bytes = video_file.read() |
| |
| MAX_RETRIES = 3 |
| for attempt in range(MAX_RETRIES): |
| selected_worker_url = get_next_worker_url() |
| print(f"Attempt {attempt + 1}/{MAX_RETRIES}: Sending to worker: {selected_worker_url}") |
| try: |
| response = requests.post( |
| f"{selected_worker_url}/submit_new_job", |
| files={'image_file': (secure_filename(image_file.filename), image_bytes, image_file.mimetype), |
| 'video_file': (secure_filename(video_file.filename), video_bytes, video_file.mimetype)}, |
| data={'motion': motion, 'style': style}, |
| timeout=600 |
| ) |
| response.raise_for_status() |
| render_data = response.json() |
| render_job_id = render_data.get("job_id") |
| if not render_job_id: raise Exception("Invalid response from worker") |
| |
| internal_job_id = str(uuid.uuid4()) |
| with lock: jobs[internal_job_id] = {"status": "Sending to worker...", "result": None} |
| |
| thread = threading.Thread(target=poll_worker_service, args=(internal_job_id, render_job_id, selected_worker_url)) |
| thread.start() |
| return jsonify({"job_id": internal_job_id}) |
| except Exception as e: |
| print(f"Error on attempt {attempt + 1} for worker {selected_worker_url}: {e}") |
| time.sleep(2) |
|
|
| final_error_message = ("<strong>Servers are busy!</strong><br>" |
| "Please try again in a few minutes.") |
| return jsonify({"error": final_error_message}), 500 |
|
|
| @app.route('/api/check_status', methods=['POST']) |
| def check_status(): |
| data = request.get_json() |
| job_id = data.get('job_id') |
| if not job_id: return jsonify({"error": "Job ID is required"}), 400 |
| with lock: job = jobs.get(job_id, {"status": "not_found", "result": None}) |
| return jsonify({"status": job["status"], "result": job["result"]}) |
|
|
| if __name__ == '__main__': |
| app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 7860))) |