mgbam commited on
Commit
c7eb758
·
verified ·
1 Parent(s): 0c8eb2b

Update media_processing.py

Browse files
Files changed (1) hide show
  1. media_processing.py +310 -1109
media_processing.py CHANGED
@@ -1,1167 +1,368 @@
 
 
 
 
1
  import os
 
2
  import base64
3
- import cv2
4
- import numpy as np
5
- from PIL import Image
6
- import pytesseract
7
  import requests
8
- from urllib.parse import urlparse, urljoin
9
- from bs4 import BeautifulSoup
10
- import html2text
11
- import json
12
- import time
13
- import webbrowser
14
- import urllib.parse
15
- import copy
16
- import html
17
  import tempfile
18
- import uuid
19
- import datetime
20
- import threading
21
- import atexit
22
- from huggingface_hub import HfApi
23
  import gradio as gr
24
- import subprocess
25
- import re
26
-
27
- # ---------------------------------------------------------------------------
28
- # Video temp-file management (per-session tracking and cleanup)
29
- # ---------------------------------------------------------------------------
30
- VIDEO_TEMP_DIR = os.path.join(tempfile.gettempdir(), "anycoder_videos")
31
- VIDEO_FILE_TTL_SECONDS = 6 * 60 * 60 # 6 hours
32
- _SESSION_VIDEO_FILES: Dict[str, List[str]] = {}
33
- _VIDEO_FILES_LOCK = threading.Lock()
34
-
35
- def _ensure_video_dir_exists() -> None:
36
- try:
37
- os.makedirs(VIDEO_TEMP_DIR, exist_ok=True)
38
- except Exception:
39
- pass
40
-
41
- def _register_video_for_session(session_id: Optional[str], file_path: str) -> None:
42
- if not session_id or not file_path:
43
- return
44
- with _VIDEO_FILES_LOCK:
45
- if session_id not in _SESSION_VIDEO_FILES:
46
- _SESSION_VIDEO_FILES[session_id] = []
47
- _SESSION_VIDEO_FILES[session_id].append(file_path)
48
 
49
- def cleanup_session_videos(session_id: Optional[str]) -> None:
50
- if not session_id:
51
- return
52
- with _VIDEO_FILES_LOCK:
53
- file_list = _SESSION_VIDEO_FILES.pop(session_id, [])
54
- for path in file_list:
55
- try:
56
- if path and os.path.exists(path):
57
- os.unlink(path)
58
- except Exception:
59
- # Best-effort cleanup
60
- pass
61
-
62
- def reap_old_videos(ttl_seconds: int = VIDEO_FILE_TTL_SECONDS) -> None:
63
- """Delete old video files in the temp directory based on modification time."""
64
- try:
65
- _ensure_video_dir_exists()
66
- now_ts = time.time()
67
- for name in os.listdir(VIDEO_TEMP_DIR):
68
- path = os.path.join(VIDEO_TEMP_DIR, name)
69
- try:
70
- if not os.path.isfile(path):
71
- continue
72
- mtime = os.path.getmtime(path)
73
- if now_ts - mtime > ttl_seconds:
74
- os.unlink(path)
75
- except Exception:
76
- pass
77
- except Exception:
78
- # Temp dir might not exist or be accessible; ignore
79
- pass
80
-
81
- # ---------------------------------------------------------------------------
82
- # Audio temp-file management (per-session tracking and cleanup)
83
- # ---------------------------------------------------------------------------
84
- AUDIO_TEMP_DIR = os.path.join(tempfile.gettempdir(), "anycoder_audio")
85
- AUDIO_FILE_TTL_SECONDS = 6 * 60 * 60 # 6 hours
86
- _SESSION_AUDIO_FILES: Dict[str, List[str]] = {}
87
- _AUDIO_FILES_LOCK = threading.Lock()
88
-
89
- def _ensure_audio_dir_exists() -> None:
90
- try:
91
- os.makedirs(AUDIO_TEMP_DIR, exist_ok=True)
92
- except Exception:
93
- pass
94
-
95
- def _register_audio_for_session(session_id: Optional[str], file_path: str) -> None:
96
- if not session_id or not file_path:
97
- return
98
- with _AUDIO_FILES_LOCK:
99
- if session_id not in _SESSION_AUDIO_FILES:
100
- _SESSION_AUDIO_FILES[session_id] = []
101
- _SESSION_AUDIO_FILES[session_id].append(file_path)
102
-
103
- def cleanup_session_audio(session_id: Optional[str]) -> None:
104
- if not session_id:
105
- return
106
- with _AUDIO_FILES_LOCK:
107
- file_list = _SESSION_AUDIO_FILES.pop(session_id, [])
108
- for path in file_list:
109
- try:
110
- if path and os.path.exists(path):
111
- os.unlink(path)
112
- except Exception:
113
- pass
114
 
115
- def reap_old_audio(ttl_seconds: int = AUDIO_FILE_TTL_SECONDS) -> None:
116
- try:
117
- _ensure_audio_dir_exists()
118
- now_ts = time.time()
119
- for name in os.listdir(AUDIO_TEMP_DIR):
120
- path = os.path.join(AUDIO_TEMP_DIR, name)
121
- try:
122
- if not os.path.isfile(path):
123
- continue
124
- mtime = os.path.getmtime(path)
125
- if now_ts - mtime > ttl_seconds:
126
- os.unlink(path)
127
- except Exception:
128
- pass
129
- except Exception:
130
- pass
131
-
132
- # ---------------------------------------------------------------------------
133
- # General temp media file management (per-session tracking and cleanup)
134
- # ---------------------------------------------------------------------------
135
- MEDIA_TEMP_DIR = os.path.join(tempfile.gettempdir(), "anycoder_media")
136
- MEDIA_FILE_TTL_SECONDS = 6 * 60 * 60 # 6 hours
137
- _SESSION_MEDIA_FILES: Dict[str, List[str]] = {}
138
- _MEDIA_FILES_LOCK = threading.Lock()
139
-
140
- # Global dictionary to store temporary media files for the session
141
- temp_media_files = {}
142
-
143
- def _ensure_media_dir_exists() -> None:
144
- """Ensure the media temp directory exists."""
145
- try:
146
- os.makedirs(MEDIA_TEMP_DIR, exist_ok=True)
147
- except Exception:
148
- pass
149
-
150
- def track_session_media_file(session_id: Optional[str], file_path: str) -> None:
151
- """Track a media file for session-based cleanup."""
152
- if not session_id or not file_path:
153
- return
154
- with _MEDIA_FILES_LOCK:
155
- if session_id not in _SESSION_MEDIA_FILES:
156
- _SESSION_MEDIA_FILES[session_id] = []
157
- _SESSION_MEDIA_FILES[session_id].append(file_path)
158
-
159
- def cleanup_session_media(session_id: Optional[str]) -> None:
160
- """Clean up media files for a specific session."""
161
- if not session_id:
162
- return
163
- with _MEDIA_FILES_LOCK:
164
- files_to_clean = _SESSION_MEDIA_FILES.pop(session_id, [])
165
 
166
- for path in files_to_clean:
 
 
167
  try:
168
- if path and os.path.exists(path):
169
- os.unlink(path)
170
- except Exception:
171
- # Best-effort cleanup
172
- pass
173
-
174
- def reap_old_media(ttl_seconds: int = MEDIA_FILE_TTL_SECONDS) -> None:
175
- """Delete old media files in the temp directory based on modification time."""
176
- try:
177
- _ensure_media_dir_exists()
178
- now_ts = time.time()
179
- for name in os.listdir(MEDIA_TEMP_DIR):
180
- path = os.path.join(MEDIA_TEMP_DIR, name)
181
- if os.path.isfile(path):
182
- try:
183
- mtime = os.path.getmtime(path)
184
- if (now_ts - mtime) > ttl_seconds:
185
- os.unlink(path)
186
- except Exception:
187
- pass
188
- except Exception:
189
- # Temp dir might not exist or be accessible; ignore
190
- pass
191
-
192
- def cleanup_all_temp_media_on_startup() -> None:
193
- """Clean up all temporary media files on app startup."""
194
- try:
195
- # Clean up temp_media_files registry
196
- temp_media_files.clear()
197
-
198
- # Clean up actual files from disk (assume all are orphaned on startup)
199
- _ensure_media_dir_exists()
200
- for name in os.listdir(MEDIA_TEMP_DIR):
201
- path = os.path.join(MEDIA_TEMP_DIR, name)
202
- if os.path.isfile(path):
203
- try:
204
- os.unlink(path)
205
- except Exception:
206
- pass
207
-
208
- # Clear session tracking
209
- with _MEDIA_FILES_LOCK:
210
- _SESSION_MEDIA_FILES.clear()
211
 
212
- print("[StartupCleanup] Cleaned up orphaned temporary media files")
213
- except Exception as e:
214
- print(f"[StartupCleanup] Error during media cleanup: {str(e)}")
215
-
216
- def cleanup_all_temp_media_on_shutdown() -> None:
217
- """Clean up all temporary media files on app shutdown."""
218
- try:
219
- print("[ShutdownCleanup] Cleaning up temporary media files...")
220
-
221
- # Clean up temp_media_files registry and remove files
222
- for file_id, file_info in temp_media_files.items():
223
- try:
224
- if os.path.exists(file_info['path']):
225
- os.unlink(file_info['path'])
226
- except Exception:
227
- pass
228
- temp_media_files.clear()
229
-
230
- # Clean up all session files
231
- with _MEDIA_FILES_LOCK:
232
- for session_id, file_paths in _SESSION_MEDIA_FILES.items():
233
- for path in file_paths:
234
- try:
235
- if path and os.path.exists(path):
236
- os.unlink(path)
237
- except Exception:
238
- pass
239
- _SESSION_MEDIA_FILES.clear()
240
-
241
- print("[ShutdownCleanup] Temporary media cleanup completed")
242
- except Exception as e:
243
- print(f"[ShutdownCleanup] Error during cleanup: {str(e)}")
244
-
245
- # Register shutdown cleanup handler
246
- atexit.register(cleanup_all_temp_media_on_shutdown)
247
-
248
- def create_temp_media_url(media_bytes: bytes, filename: str, media_type: str = "image", session_id: Optional[str] = None) -> str:
249
- """Create a temporary file and return a local URL for preview."""
250
- try:
251
- # Create unique filename with timestamp and UUID
252
- timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
253
- unique_id = str(uuid.uuid4())[:8]
254
- base_name, ext = os.path.splitext(filename)
255
- unique_filename = f"{media_type}_{timestamp}_{unique_id}_{base_name}{ext}"
256
-
257
- # Create temporary file in the dedicated directory
258
- _ensure_media_dir_exists()
259
- temp_path = os.path.join(MEDIA_TEMP_DIR, unique_filename)
260
-
261
- # Write media bytes to temporary file
262
- with open(temp_path, 'wb') as f:
263
- f.write(media_bytes)
264
-
265
- # Track file for session-based cleanup
266
- if session_id:
267
- track_session_media_file(session_id, temp_path)
268
-
269
- # Store the file info for later upload
270
- file_id = f"{media_type}_{unique_id}"
271
- temp_media_files[file_id] = {
272
- 'path': temp_path,
273
- 'filename': filename,
274
- 'media_type': media_type,
275
- 'media_bytes': media_bytes
276
- }
277
-
278
- # Return file:// URL for preview
279
- file_url = f"file://{temp_path}"
280
- print(f"[TempMedia] Created temporary {media_type} file: {file_url}")
281
- return file_url
282
-
283
- except Exception as e:
284
- print(f"[TempMedia] Failed to create temporary file: {str(e)}")
285
- return f"Error creating temporary {media_type} file: {str(e)}"
286
-
287
- def upload_media_to_hf(media_bytes: bytes, filename: str, media_type: str = "image", token: gr.OAuthToken | None = None, use_temp: bool = True) -> str:
288
- """Upload media file to user's Hugging Face account or create temporary file."""
289
- try:
290
- # If use_temp is True, create temporary file for preview
291
- if use_temp:
292
- return create_temp_media_url(media_bytes, filename, media_type)
293
-
294
- # Otherwise, upload to Hugging Face for permanent URL
295
- # Try to get token from OAuth first, then fall back to environment variable
296
- hf_token = None
297
- if token and token.token:
298
- hf_token = token.token
299
- else:
300
- hf_token = os.getenv('HF_TOKEN')
301
-
302
- if not hf_token:
303
- return "Error: Please log in with your Hugging Face account to upload media, or set HF_TOKEN environment variable."
304
-
305
- # Initialize HF API
306
- api = HfApi(token=hf_token)
307
-
308
- # Get current user info to determine username
309
  try:
310
- user_info = api.whoami()
311
- username = user_info.get('name', 'unknown-user')
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
312
  except Exception as e:
313
- print(f"[HFUpload] Could not get user info: {e}")
314
- username = 'anycoder-user'
315
-
316
- # Create repository name for media storage
317
- repo_name = f"{username}/anycoder-media"
318
-
319
- # Try to create the repository if it doesn't exist
320
  try:
321
- api.create_repo(
322
- repo_id=repo_name,
323
- repo_type="dataset",
324
- private=False,
325
- exist_ok=True
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
326
  )
327
- print(f"[HFUpload] Repository {repo_name} ready")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
328
  except Exception as e:
329
- print(f"[HFUpload] Repository creation/access issue: {e}")
330
- # Continue anyway, repo might already exist
331
-
332
- # Create unique filename with timestamp and UUID
333
- timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
334
- unique_id = str(uuid.uuid4())[:8]
335
- base_name, ext = os.path.splitext(filename)
336
- unique_filename = f"{media_type}/{timestamp}_{unique_id}_{base_name}{ext}"
337
-
338
- # Create temporary file for upload
339
- with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as temp_file:
340
- temp_file.write(media_bytes)
341
- temp_path = temp_file.name
342
-
343
  try:
344
- # Upload file to HF repository
345
- api.upload_file(
346
- path_or_fileobj=temp_path,
347
- path_in_repo=unique_filename,
348
- repo_id=repo_name,
349
- repo_type="dataset",
350
- commit_message=f"Upload {media_type} generated by AnyCoder"
 
 
 
 
 
 
 
 
 
 
351
  )
352
 
353
- # Generate permanent URL
354
- permanent_url = f"https://huggingface.co/datasets/{repo_name}/resolve/main/{unique_filename}"
355
- print(f"[HFUpload] Successfully uploaded {media_type} to {permanent_url}")
356
- return permanent_url
357
 
358
- finally:
359
- # Clean up temporary file
360
- try:
361
- os.unlink(temp_path)
362
- except Exception:
363
- pass
364
-
365
- except Exception as e:
366
- print(f"[HFUpload] Upload failed: {str(e)}")
367
- return f"Error uploading {media_type} to Hugging Face: {str(e)}"
368
-
369
- def upload_temp_files_to_hf_and_replace_urls(html_content: str, token: gr.OAuthToken | None = None) -> str:
370
- """Upload all temporary media files to HF and replace their URLs in HTML content."""
371
- try:
372
- if not temp_media_files:
373
- print("[DeployUpload] No temporary media files to upload")
374
- return html_content
375
-
376
- print(f"[DeployUpload] Uploading {len(temp_media_files)} temporary media files to HF")
377
- updated_content = html_content
378
-
379
- for file_id, file_info in temp_media_files.items():
380
- try:
381
- # Upload to HF with permanent URL
382
- permanent_url = upload_media_to_hf(
383
- file_info['media_bytes'],
384
- file_info['filename'],
385
- file_info['media_type'],
386
- token,
387
- use_temp=False # Force permanent upload
388
- )
389
-
390
- if not permanent_url.startswith("Error"):
391
- # Replace the temporary file URL with permanent URL
392
- temp_url = f"file://{file_info['path']}"
393
- updated_content = updated_content.replace(temp_url, permanent_url)
394
- print(f"[DeployUpload] Replaced {temp_url} with {permanent_url}")
395
- else:
396
- print(f"[DeployUpload] Failed to upload {file_id}: {permanent_url}")
 
 
 
 
 
 
 
397
 
398
- except Exception as e:
399
- print(f"[DeployUpload] Error uploading {file_id}: {str(e)}")
400
- continue
401
-
402
- # Clean up temporary files after upload
403
- cleanup_temp_media_files()
404
-
405
- return updated_content
406
-
407
- except Exception as e:
408
- print(f"[DeployUpload] Failed to upload temporary files: {str(e)}")
409
- return html_content
410
-
411
- def cleanup_temp_media_files():
412
- """Clean up temporary media files from disk and memory."""
413
- try:
414
- for file_id, file_info in temp_media_files.items():
415
  try:
416
- if os.path.exists(file_info['path']):
417
- os.remove(file_info['path'])
418
- print(f"[TempCleanup] Removed {file_info['path']}")
419
  except Exception as e:
420
- print(f"[TempCleanup] Failed to remove {file_info['path']}: {str(e)}")
421
-
422
- # Clear the global dictionary
423
- temp_media_files.clear()
424
- print("[TempCleanup] Cleared temporary media files registry")
425
-
426
- except Exception as e:
427
- print(f"[TempCleanup] Error during cleanup: {str(e)}")
428
-
429
- def generate_image_with_qwen(prompt: str, image_index: int = 0, token: gr.OAuthToken | None = None) -> str:
430
- """Generate image using Qwen image model via Hugging Face InferenceClient and upload to HF for permanent URL"""
431
- try:
432
- # Check if HF_TOKEN is available
433
- if not os.getenv('HF_TOKEN'):
434
- return "Error: HF_TOKEN environment variable is not set. Please set it to your Hugging Face API token."
435
-
436
- # Create InferenceClient for Qwen image generation
437
- client = InferenceClient(
438
- provider="auto",
439
- api_key=os.getenv('HF_TOKEN'),
440
- bill_to="huggingface",
441
- )
442
-
443
- # Generate image using Qwen/Qwen-Image model
444
- image = client.text_to_image(
445
- prompt,
446
- model="Qwen/Qwen-Image",
447
- )
448
-
449
- # Resize image to reduce size while maintaining quality
450
- max_size = 1024 # Increased size since we're not using data URIs
451
- if image.width > max_size or image.height > max_size:
452
- image.thumbnail((max_size, max_size), Image.Resampling.LANCZOS)
453
-
454
- # Convert PIL Image to bytes for upload
455
- import io
456
- buffer = io.BytesIO()
457
- # Save as JPEG with good quality since we're not embedding
458
- image.convert('RGB').save(buffer, format='JPEG', quality=90, optimize=True)
459
- image_bytes = buffer.getvalue()
460
-
461
- # Create temporary URL for preview (will be uploaded to HF during deploy)
462
- filename = f"generated_image_{image_index}.jpg"
463
- temp_url = upload_media_to_hf(image_bytes, filename, "image", token, use_temp=True)
464
-
465
- # Check if creation was successful
466
- if temp_url.startswith("Error"):
467
- return temp_url
468
-
469
- # Return HTML img tag with temporary URL
470
- return f'<img src="{temp_url}" alt="{prompt}" style="max-width: 100%; height: auto; border-radius: 8px; margin: 10px 0;" loading="lazy" />'
471
-
472
- except Exception as e:
473
- print(f"Image generation error: {str(e)}")
474
- return f"Error generating image: {str(e)}"
475
-
476
- def generate_image_to_image(input_image_data, prompt: str, token: gr.OAuthToken | None = None) -> str:
477
- """Generate an image using image-to-image with Qwen-Image-Edit via Hugging Face InferenceClient."""
478
- try:
479
- # Check token
480
- if not os.getenv('HF_TOKEN'):
481
- return "Error: HF_TOKEN environment variable is not set. Please set it to your Hugging Face API token."
482
-
483
- # Prepare client
484
- client = InferenceClient(
485
- provider="auto",
486
- api_key=os.getenv('HF_TOKEN'),
487
- bill_to="huggingface",
488
- )
489
-
490
- # Normalize input image to bytes
491
- import io
492
- from PIL import Image
493
- try:
494
- import numpy as np
495
- except Exception:
496
- np = None
497
-
498
  if hasattr(input_image_data, 'read'):
499
- # File-like object
500
  raw = input_image_data.read()
501
  pil_image = Image.open(io.BytesIO(raw))
502
  elif hasattr(input_image_data, 'mode') and hasattr(input_image_data, 'size'):
503
- # PIL Image
504
  pil_image = input_image_data
505
- elif np is not None and isinstance(input_image_data, np.ndarray):
506
  pil_image = Image.fromarray(input_image_data)
507
  elif isinstance(input_image_data, (bytes, bytearray)):
508
  pil_image = Image.open(io.BytesIO(input_image_data))
509
  else:
510
- # Fallback: try to convert via bytes
511
  pil_image = Image.open(io.BytesIO(bytes(input_image_data)))
512
-
513
  # Ensure RGB
514
  if pil_image.mode != 'RGB':
515
  pil_image = pil_image.convert('RGB')
516
-
517
- # Resize input image to avoid request body size limits
518
- max_input_size = 1024
519
- if pil_image.width > max_input_size or pil_image.height > max_input_size:
520
- pil_image.thumbnail((max_input_size, max_input_size), Image.Resampling.LANCZOS)
521
-
522
- buf = io.BytesIO()
523
- pil_image.save(buf, format='JPEG', quality=85, optimize=True)
524
- input_bytes = buf.getvalue()
525
-
526
- # Call image-to-image
527
- image = client.image_to_image(
528
- input_bytes,
529
- prompt=prompt,
530
- model="Qwen/Qwen-Image-Edit",
531
- )
532
-
533
- # Resize/optimize (larger since not using data URIs)
534
- max_size = 1024
535
- if image.width > max_size or image.height > max_size:
536
- image.thumbnail((max_size, max_size), Image.Resampling.LANCZOS)
537
-
538
- out_buf = io.BytesIO()
539
- image.convert('RGB').save(out_buf, format='JPEG', quality=90, optimize=True)
540
- image_bytes = out_buf.getvalue()
541
-
542
- # Create temporary URL for preview (will be uploaded to HF during deploy)
543
- filename = "image_to_image_result.jpg"
544
- temp_url = upload_media_to_hf(image_bytes, filename, "image", token, use_temp=True)
545
 
546
- # Check if creation was successful
547
- if temp_url.startswith("Error"):
548
- return temp_url
549
-
550
- return f"<img src=\"{temp_url}\" alt=\"{prompt}\" style=\"max-width: 100%; height: auto; border-radius: 8px; margin: 10px 0;\" loading=\"lazy\" />"
551
- except Exception as e:
552
- print(f"Image-to-image generation error: {str(e)}")
553
- return f"Error generating image (image-to-image): {str(e)}"
554
-
555
- def generate_video_from_image(input_image_data, prompt: str, session_id: Optional[str] = None, token: gr.OAuthToken | None = None) -> str:
556
- """Generate a video from an input image and prompt using Hugging Face InferenceClient."""
557
- try:
558
- print("[Image2Video] Starting video generation")
559
- if not os.getenv('HF_TOKEN'):
560
- print("[Image2Video] Missing HF_TOKEN")
561
- return "Error: HF_TOKEN environment variable is not set. Please set it to your Hugging Face API token."
562
-
563
- # Prepare client
564
- client = InferenceClient(
565
- provider="auto",
566
- api_key=os.getenv('HF_TOKEN'),
567
- bill_to="huggingface",
568
- )
569
- print(f"[Image2Video] InferenceClient initialized (provider=auto)")
570
-
571
- # Normalize input image to bytes, with downscale/compress to cap request size
572
- import io
573
- from PIL import Image
574
- try:
575
- import numpy as np
576
- except Exception:
577
- np = None
578
-
579
- def _load_pil(img_like) -> Image.Image:
580
- if hasattr(img_like, 'read'):
581
- return Image.open(io.BytesIO(img_like.read()))
582
- if hasattr(img_like, 'mode') and hasattr(img_like, 'size'):
583
- return img_like
584
- if np is not None and isinstance(img_like, np.ndarray):
585
- return Image.fromarray(img_like)
586
- if isinstance(img_like, (bytes, bytearray)):
587
- return Image.open(io.BytesIO(img_like))
588
- return Image.open(io.BytesIO(bytes(img_like)))
589
-
590
- pil_image = _load_pil(input_image_data)
591
- if pil_image.mode != 'RGB':
592
- pil_image = pil_image.convert('RGB')
593
- try:
594
- print(f"[Image2Video] Input PIL image size={pil_image.size} mode={pil_image.mode}")
595
- except Exception:
596
- pass
597
-
598
- # Progressive encode to keep payload under ~3.9MB (below 4MB limit)
599
- MAX_BYTES = 3_900_000
600
- max_dim = 1024 # initial cap on longest edge
601
  quality = 90
602
-
603
  def encode_current(pil: Image.Image, q: int) -> bytes:
604
  tmp = io.BytesIO()
605
  pil.save(tmp, format='JPEG', quality=q, optimize=True)
606
  return tmp.getvalue()
607
-
608
- # Downscale while the longest edge exceeds max_dim
609
  while max(pil_image.size) > max_dim:
610
  ratio = max_dim / float(max(pil_image.size))
611
  new_size = (max(1, int(pil_image.size[0] * ratio)), max(1, int(pil_image.size[1] * ratio)))
612
  pil_image = pil_image.resize(new_size, Image.Resampling.LANCZOS)
613
-
614
  encoded = encode_current(pil_image, quality)
615
- # If still too big, iteratively reduce quality, then dimensions
 
616
  while len(encoded) > MAX_BYTES and (quality > 40 or max(pil_image.size) > 640):
617
  if quality > 40:
618
  quality -= 10
619
  else:
620
- # reduce dims by 15% if already at low quality
621
  new_w = max(1, int(pil_image.size[0] * 0.85))
622
  new_h = max(1, int(pil_image.size[1] * 0.85))
623
  pil_image = pil_image.resize((new_w, new_h), Image.Resampling.LANCZOS)
624
  encoded = encode_current(pil_image, quality)
625
-
626
- input_bytes = encoded
627
-
628
- # Call image-to-video; require method support
629
- model_id = "Lightricks/LTX-Video-0.9.8-13B-distilled"
630
- image_to_video_method = getattr(client, "image_to_video", None)
631
- if not callable(image_to_video_method):
632
- print("[Image2Video] InferenceClient.image_to_video not available in this huggingface_hub version")
633
- return (
634
- "Error generating video (image-to-video): Your installed huggingface_hub version "
635
- "does not expose InferenceClient.image_to_video. Please upgrade with "
636
- "`pip install -U huggingface_hub` and try again."
637
- )
638
- print(f"[Image2Video] Calling image_to_video with model={model_id}, prompt length={len(prompt or '')}")
639
- video_bytes = image_to_video_method(
640
- input_bytes,
641
- prompt=prompt,
642
- model=model_id,
643
- )
644
- print(f"[Image2Video] Received video bytes: {len(video_bytes) if hasattr(video_bytes, '__len__') else 'unknown length'}")
645
-
646
- # Create temporary URL for preview (will be uploaded to HF during deploy)
647
- filename = "image_to_video_result.mp4"
648
- temp_url = upload_media_to_hf(video_bytes, filename, "video", token, use_temp=True)
649
-
650
- # Check if creation was successful
651
- if temp_url.startswith("Error"):
652
- return temp_url
653
-
654
- video_html = (
655
- f'<video controls autoplay muted loop playsinline '
656
- f'style="max-width: 100%; height: auto; border-radius: 8px; margin: 10px 0; display: block;" '
657
- f'onloadstart="this.style.backgroundColor=\'#f0f0f0\'" '
658
- f'onerror="this.style.display=\'none\'; console.error(\'Video failed to load\')">'
659
- f'<source src="{temp_url}" type="video/mp4" />'
660
- f'<p style="text-align: center; color: #666;">Your browser does not support the video tag.</p>'
661
- f'</video>'
662
- )
663
-
664
- print(f"[Image2Video] Successfully generated video HTML tag with temporary URL: {temp_url}")
665
-
666
- # Validate the generated video HTML
667
- if not validate_video_html(video_html):
668
- print("[Image2Video] Generated video HTML failed validation")
669
- return "Error: Generated video HTML is malformed"
670
-
671
- return video_html
672
- except Exception as e:
673
- import traceback
674
- print("[Image2Video] Exception during generation:")
675
- traceback.print_exc()
676
- print(f"Image-to-video generation error: {str(e)}")
677
- return f"Error generating video (image-to-video): {str(e)}"
678
-
679
- def generate_video_from_text(prompt: str, session_id: Optional[str] = None, token: gr.OAuthToken | None = None) -> str:
680
- """Generate a video from a text prompt using Hugging Face InferenceClient."""
681
- try:
682
- print("[Text2Video] Starting video generation from text")
683
- if not os.getenv('HF_TOKEN'):
684
- print("[Text2Video] Missing HF_TOKEN")
685
- return "Error: HF_TOKEN environment variable is not set. Please set it to your Hugging Face API token."
686
-
687
- client = InferenceClient(
688
- provider="auto",
689
- api_key=os.getenv('HF_TOKEN'),
690
- bill_to="huggingface",
691
- )
692
- print("[Text2Video] InferenceClient initialized (provider=auto)")
693
-
694
- # Ensure the client has text_to_video (newer huggingface_hub)
695
- text_to_video_method = getattr(client, "text_to_video", None)
696
- if not callable(text_to_video_method):
697
- print("[Text2Video] InferenceClient.text_to_video not available in this huggingface_hub version")
698
- return (
699
- "Error generating video (text-to-video): Your installed huggingface_hub version "
700
- "does not expose InferenceClient.text_to_video. Please upgrade with "
701
- "`pip install -U huggingface_hub` and try again."
702
- )
703
-
704
- model_id = "Wan-AI/Wan2.2-T2V-A14B"
705
- prompt_str = (prompt or "").strip()
706
- print(f"[Text2Video] Calling text_to_video with model={model_id}, prompt length={len(prompt_str)}")
707
- video_bytes = text_to_video_method(
708
- prompt_str,
709
- model=model_id,
710
- )
711
- print(f"[Text2Video] Received video bytes: {len(video_bytes) if hasattr(video_bytes, '__len__') else 'unknown length'}")
712
-
713
- # Create temporary URL for preview (will be uploaded to HF during deploy)
714
- filename = "text_to_video_result.mp4"
715
- temp_url = upload_media_to_hf(video_bytes, filename, "video", token, use_temp=True)
716
-
717
- # Check if creation was successful
718
- if temp_url.startswith("Error"):
719
- return temp_url
720
-
721
- video_html = (
722
- f'<video controls autoplay muted loop playsinline '
723
- f'style="max-width: 100%; height: auto; border-radius: 8px; margin: 10px 0; display: block;" '
724
- f'onloadstart="this.style.backgroundColor=\'#f0f0f0\'" '
725
- f'onerror="this.style.display=\'none\'; console.error(\'Video failed to load\')">'
726
- f'<source src="{temp_url}" type="video/mp4" />'
727
- f'<p style="text-align: center; color: #666;">Your browser does not support the video tag.</p>'
728
- f'</video>'
729
- )
730
-
731
- print(f"[Text2Video] Successfully generated video HTML tag with temporary URL: {temp_url}")
732
-
733
- # Validate the generated video HTML
734
- if not validate_video_html(video_html):
735
- print("[Text2Video] Generated video HTML failed validation")
736
- return "Error: Generated video HTML is malformed"
737
-
738
- return video_html
739
- except Exception as e:
740
- import traceback
741
- print("[Text2Video] Exception during generation:")
742
- traceback.print_exc()
743
- print(f"Text-to-video generation error: {str(e)}")
744
- return f"Error generating video (text-to-video): {str(e)}"
745
-
746
- def generate_music_from_text(prompt: str, music_length_ms: int = 30000, session_id: Optional[str] = None, token: gr.OAuthToken | None = None) -> str:
747
- """Generate music from a text prompt using ElevenLabs Music API and return an HTML <audio> tag."""
748
- try:
749
- api_key = os.getenv('ELEVENLABS_API_KEY')
750
- if not api_key:
751
- return "Error: ELEVENLABS_API_KEY environment variable is not set."
752
-
753
- headers = {
754
- 'Content-Type': 'application/json',
755
- 'xi-api-key': api_key,
756
- }
757
- payload = {
758
- 'prompt': (prompt or 'Epic orchestral theme with soaring strings and powerful brass'),
759
- 'music_length_ms': int(music_length_ms) if music_length_ms else 30000,
760
- }
761
-
762
- resp = requests.post('https://api.elevenlabs.io/v1/music/compose', headers=headers, json=payload)
763
- try:
764
- resp.raise_for_status()
765
- except Exception as e:
766
- return f"Error generating music: {getattr(e, 'response', resp).text if hasattr(e, 'response') else resp.text}"
767
-
768
- # Create temporary URL for preview (will be uploaded to HF during deploy)
769
- filename = "generated_music.mp3"
770
- temp_url = upload_media_to_hf(resp.content, filename, "audio", token, use_temp=True)
771
-
772
- # Check if creation was successful
773
- if temp_url.startswith("Error"):
774
- return temp_url
775
 
776
- audio_html = (
777
- "<div class=\"anycoder-music\" style=\"max-width:420px;margin:16px auto;padding:12px 16px;border:1px solid #e5e7eb;border-radius:12px;background:linear-gradient(180deg,#fafafa,#f3f4f6);box-shadow:0 2px 8px rgba(0,0,0,0.06)\">"
778
- " <div style=\"font-size:13px;color:#374151;margin-bottom:8px;display:flex:align-items:center;gap:6px\">"
779
- " <span>🎵 Generated music</span>"
780
- " </div>"
781
- f" <audio controls autoplay loop style=\"width:100%;outline:none;\">"
782
- f" <source src=\"{temp_url}\" type=\"audio/mpeg\" />"
783
- " Your browser does not support the audio element."
784
- " </audio>"
785
- "</div>"
786
- )
787
-
788
- print(f"[Music] Successfully generated music HTML tag with temporary URL: {temp_url}")
789
- return audio_html
790
- except Exception as e:
791
- return f"Error generating music: {str(e)}"
792
-
793
- def extract_image_prompts_from_text(text: str, num_images_needed: int = 1) -> list:
794
- """Extract image generation prompts from the full text based on number of images needed"""
795
- # Use the entire text as the base prompt for image generation
796
- # Clean up the text and create variations for the required number of images
797
-
798
- # Clean the text
799
- cleaned_text = text.strip()
800
- if not cleaned_text:
801
- return []
802
-
803
- # Create variations of the prompt for the required number of images
804
- prompts = []
805
-
806
- # Generate exactly the number of images needed
807
- for i in range(num_images_needed):
808
- if i == 0:
809
- # First image: Use the full prompt as-is
810
- prompts.append(cleaned_text)
811
- elif i == 1:
812
- # Second image: Add "visual representation" to make it more image-focused
813
- prompts.append(f"Visual representation of {cleaned_text}")
814
- elif i == 2:
815
- # Third image: Add "illustration" to create a different style
816
- prompts.append(f"Illustration of {cleaned_text}")
817
- else:
818
- # For additional images, use different variations
819
- variations = [
820
- f"Digital art of {cleaned_text}",
821
- f"Modern design of {cleaned_text}",
822
- f"Professional illustration of {cleaned_text}",
823
- f"Clean design of {cleaned_text}",
824
- f"Beautiful visualization of {cleaned_text}",
825
- f"Stylish representation of {cleaned_text}",
826
- f"Contemporary design of {cleaned_text}",
827
- f"Elegant illustration of {cleaned_text}"
828
- ]
829
- variation_index = (i - 3) % len(variations)
830
- prompts.append(variations[variation_index])
831
-
832
- return prompts
833
-
834
- def create_image_replacement_blocks(html_content: str, user_prompt: str) -> str:
835
- """Create search/replace blocks to replace placeholder images with generated Qwen images"""
836
- if not user_prompt:
837
- return ""
838
-
839
- # Find existing image placeholders in the HTML first
840
- import re
841
-
842
- # Common patterns for placeholder images
843
- placeholder_patterns = [
844
- r'<img[^>]*src=["\'](?:placeholder|dummy|sample|example)[^"\']*["\'][^>]*>',
845
- r'<img[^>]*src=["\']https?://via\.placeholder\.com[^"\']*["\'][^>]*>',
846
- r'<img[^>]*src=["\']https?://picsum\.photos[^"\']*["\'][^>]*>',
847
- r'<img[^>]*src=["\']https?://dummyimage\.com[^"\']*["\'][^>]*>',
848
- r'<img[^>]*alt=["\'][^"\']*placeholder[^"\']*["\'][^>]*>',
849
- r'<img[^>]*class=["\'][^"\']*placeholder[^"\']*["\'][^>]*>',
850
- r'<img[^>]*id=["\'][^"\']*placeholder[^"\']*["\'][^>]*>',
851
- r'<img[^>]*src=["\']data:image[^"\']*["\'][^>]*>', # Base64 images
852
- r'<img[^>]*src=["\']#["\'][^>]*>', # Empty src
853
- r'<img[^>]*src=["\']about:blank["\'][^>]*>', # About blank
854
- ]
855
-
856
- # Find all placeholder images
857
- placeholder_images = []
858
- for pattern in placeholder_patterns:
859
- matches = re.findall(pattern, html_content, re.IGNORECASE)
860
- placeholder_images.extend(matches)
861
-
862
- # Filter out HF URLs from placeholders (they are real generated content)
863
- placeholder_images = [img for img in placeholder_images if 'huggingface.co/datasets/' not in img]
864
-
865
- # If no placeholder images found, look for any img tags
866
- if not placeholder_images:
867
- img_pattern = r'<img[^>]*>'
868
- # Case-insensitive to catch <IMG> or mixed-case tags
869
- placeholder_images = re.findall(img_pattern, html_content, re.IGNORECASE)
870
-
871
- # Also look for div elements that might be image placeholders
872
- div_placeholder_patterns = [
873
- r'<div[^>]*class=["\'][^"\']*(?:image|img|photo|picture)[^"\']*["\'][^>]*>.*?</div>',
874
- r'<div[^>]*id=["\'][^"\']*(?:image|img|photo|picture)[^"\']*["\'][^>]*>.*?</div>',
875
- ]
876
-
877
- for pattern in div_placeholder_patterns:
878
- matches = re.findall(pattern, html_content, re.IGNORECASE | re.DOTALL)
879
- placeholder_images.extend(matches)
880
-
881
- # Count how many images we need to generate
882
- num_images_needed = len(placeholder_images)
883
-
884
- if num_images_needed == 0:
885
- return ""
886
-
887
- # Generate image prompts based on the number of images found
888
- image_prompts = extract_image_prompts_from_text(user_prompt, num_images_needed)
889
-
890
- # Generate images for each prompt
891
- generated_images = []
892
- for i, prompt in enumerate(image_prompts):
893
- image_html = generate_image_with_qwen(prompt, i, token=None) # TODO: Pass token from parent context
894
- if not image_html.startswith("Error"):
895
- generated_images.append((i, image_html))
896
-
897
- if not generated_images:
898
- return ""
899
-
900
- # Create search/replace blocks
901
- replacement_blocks = []
902
-
903
- for i, (prompt_index, generated_image) in enumerate(generated_images):
904
- if i < len(placeholder_images):
905
- # Replace existing placeholder
906
- placeholder = placeholder_images[i]
907
- # Clean up the placeholder for better matching
908
- placeholder_clean = re.sub(r'\s+', ' ', placeholder.strip())
909
-
910
- # Try multiple variations of the placeholder for better matching
911
- placeholder_variations = [
912
- placeholder_clean,
913
- placeholder_clean.replace('"', "'"),
914
- placeholder_clean.replace("'", '"'),
915
- re.sub(r'\s+', ' ', placeholder_clean),
916
- placeholder_clean.replace(' ', ' '),
917
- ]
918
-
919
- # Create a replacement block for each variation
920
- for variation in placeholder_variations:
921
- replacement_blocks.append(f"""{SEARCH_START}
922
- {variation}
923
- {DIVIDER}
924
- {generated_image}
925
- {REPLACE_END}""")
926
- else:
927
- # Add new image if we have more generated images than placeholders
928
- # Find a good insertion point (after body tag or main content)
929
- if '<body' in html_content:
930
- body_end = html_content.find('>', html_content.find('<body')) + 1
931
- insertion_point = html_content[:body_end] + '\n '
932
- replacement_blocks.append(f"""{SEARCH_START}
933
- {insertion_point}
934
- {DIVIDER}
935
- {insertion_point}
936
- {generated_image}
937
- {REPLACE_END}""")
938
 
939
- return '\n\n'.join(replacement_blocks)
940
-
941
- def create_image_replacement_blocks_text_to_image_single(html_content: str, prompt: str) -> str:
942
- """Create search/replace blocks that generate and insert ONLY ONE text-to-image result."""
943
- if not prompt or not prompt.strip():
944
- return ""
945
-
946
- import re
947
-
948
- # Detect placeholders similarly to the multi-image version
949
- placeholder_patterns = [
950
- r'<img[^>]*src=["\'](?:placeholder|dummy|sample|example)[^"\']*["\'][^>]*>',
951
- r'<img[^>]*src=["\']https?://via\.placeholder\.com[^"\']*["\'][^>]*>',
952
- r'<img[^>]*src=["\']https?://picsum\.photos[^"\']*["\'][^>]*>',
953
- r'<img[^>]*src=["\']https?://dummyimage\.com[^"\']*["\'][^>]*>',
954
- r'<img[^>]*alt=["\'][^"\']*placeholder[^"\']*["\'][^>]*>',
955
- r'<img[^>]*class=["\'][^"\']*placeholder[^"\']*["\'][^>]*>',
956
- r'<img[^>]*id=["\'][^"\']*placeholder[^"\']*["\'][^>]*>',
957
- r'<img[^>]*src=["\']data:image[^"\']*["\'][^>]*>',
958
- r'<img[^>]*src=["\']#["\'][^>]*>',
959
- r'<img[^>]*src=["\']about:blank["\'][^>]*>',
960
- ]
961
-
962
- placeholder_images = []
963
- for pattern in placeholder_patterns:
964
- matches = re.findall(pattern, html_content, re.IGNORECASE)
965
- if matches:
966
- placeholder_images.extend(matches)
967
 
968
- # Filter out HF URLs from placeholders (they are real generated content)
969
- placeholder_images = [img for img in placeholder_images if 'huggingface.co/datasets/' not in img]
 
 
 
 
 
 
 
970
 
971
- # Filter out HF URLs from placeholders (they are real generated content)
972
- placeholder_images = [img for img in placeholder_images if 'huggingface.co/datasets/' not in img]
973
-
974
- # Fallback to any <img> if no placeholders
975
- if not placeholder_images:
976
- img_pattern = r'<img[^>]*>'
977
- placeholder_images = re.findall(img_pattern, html_content)
978
-
979
- # Generate a single image
980
- image_html = generate_image_with_qwen(prompt, 0, token=None) # TODO: Pass token from parent context
981
- if image_html.startswith("Error"):
982
- return ""
983
-
984
- # Replace first placeholder if present
985
- if placeholder_images:
986
- placeholder = placeholder_images[0]
987
- placeholder_clean = re.sub(r'\s+', ' ', placeholder.strip())
988
- placeholder_variations = [
989
- placeholder_clean,
990
- placeholder_clean.replace('"', "'"),
991
- placeholder_clean.replace("'", '"'),
992
- re.sub(r'\s+', ' ', placeholder_clean),
993
- placeholder_clean.replace(' ', ' '),
994
- ]
995
- blocks = []
996
- for variation in placeholder_variations:
997
- blocks.append(f"""{SEARCH_START}
998
- {variation}
999
- {DIVIDER}
1000
- {image_html}
1001
- {REPLACE_END}""")
1002
- return '\n\n'.join(blocks)
1003
-
1004
- # Otherwise insert after <body>
1005
- if '<body' in html_content:
1006
- body_end = html_content.find('>', html_content.find('<body')) + 1
1007
- insertion_point = html_content[:body_end] + '\n '
1008
- return f"""{SEARCH_START}
1009
- {insertion_point}
1010
- {DIVIDER}
1011
- {insertion_point}
1012
- {image_html}
1013
- {REPLACE_END}"""
1014
-
1015
- # If no <body>, just append
1016
- return f"{SEARCH_START}\n\n{DIVIDER}\n{image_html}\n{REPLACE_END}"
1017
-
1018
- def create_video_replacement_blocks_text_to_video(html_content: str, prompt: str, session_id: Optional[str] = None) -> str:
1019
- """Create search/replace blocks that generate and insert ONLY ONE text-to-video result."""
1020
- if not prompt or not prompt.strip():
1021
- return ""
1022
-
1023
- import re
1024
-
1025
- # Detect the same placeholders as image counterparts, to replace the first image slot with a video
1026
- placeholder_patterns = [
1027
- r'<img[^>]*src=["\'](?:placeholder|dummy|sample|example)[^"\']*["\'][^>]*>',
1028
- r'<img[^>]*src=["\']https?://via\.placeholder\.com[^"\']*["\'][^>]*>',
1029
- r'<img[^>]*src=["\']https?://picsum\.photos[^"\']*["\'][^>]*>',
1030
- r'<img[^>]*src=["\']https?://dummyimage\.com[^"\']*["\'][^>]*>',
1031
- r'<img[^>]*alt=["\'][^"\']*placeholder[^"\']*["\'][^>]*>',
1032
- r'<img[^>]*class=["\'][^"\']*placeholder[^"\']*["\'][^>]*>',
1033
- r'<img[^>]*id=["\'][^"\']*placeholder[^"\']*["\'][^>]*>',
1034
- r'<img[^>]*src=["\']data:image[^"\']*["\'][^>]*>',
1035
- r'<img[^>]*src=["\']#["\'][^>]*>',
1036
- r'<img[^>]*src=["\']about:blank["\'][^>]*>',
1037
- ]
1038
-
1039
- placeholder_images = []
1040
- for pattern in placeholder_patterns:
1041
- matches = re.findall(pattern, html_content, re.IGNORECASE)
1042
- if matches:
1043
- placeholder_images.extend(matches)
1044
-
1045
- # Filter out HF URLs from placeholders (they are real generated content)
1046
- placeholder_images = [img for img in placeholder_images if 'huggingface.co/datasets/' not in img]
1047
-
1048
- if not placeholder_images:
1049
- img_pattern = r'<img[^>]*>'
1050
- placeholder_images = re.findall(img_pattern, html_content)
1051
-
1052
- video_html = generate_video_from_text(prompt, session_id=session_id, token=None) # TODO: Pass token from parent context
1053
- if video_html.startswith("Error"):
1054
- return ""
1055
-
1056
- # Replace first placeholder if present
1057
- if placeholder_images:
1058
- placeholder = placeholder_images[0]
1059
- placeholder_clean = re.sub(r'\s+', ' ', placeholder.strip())
1060
- placeholder_variations = [
1061
- placeholder,
1062
- placeholder_clean,
1063
- placeholder_clean.replace('"', "'"),
1064
- placeholder_clean.replace("'", '"'),
1065
- re.sub(r'\s+', ' ', placeholder_clean),
1066
- placeholder_clean.replace(' ', ' '),
1067
- ]
1068
- blocks = []
1069
- for variation in placeholder_variations:
1070
- blocks.append(f"""{SEARCH_START}
1071
- {variation}
1072
- {DIVIDER}
1073
- {video_html}
1074
- {REPLACE_END}""")
1075
- return '\n\n'.join(blocks)
1076
-
1077
- # Otherwise insert after <body> with proper container
1078
- if '<body' in html_content:
1079
- body_start = html_content.find('<body')
1080
- body_end = html_content.find('>', body_start) + 1
1081
- opening_body_tag = html_content[body_start:body_end]
1082
-
1083
- # Look for existing container elements to insert into
1084
- body_content_start = body_end
1085
-
1086
- # Try to find a good insertion point within existing content structure
1087
- patterns_to_try = [
1088
- r'<main[^>]*>',
1089
- r'<section[^>]*class="[^"]*hero[^"]*"[^>]*>',
1090
- r'<div[^>]*class="[^"]*container[^"]*"[^>]*>',
1091
- r'<header[^>]*>',
1092
- ]
1093
-
1094
- insertion_point = None
1095
- for pattern in patterns_to_try:
1096
- import re
1097
- match = re.search(pattern, html_content[body_content_start:], re.IGNORECASE)
1098
- if match:
1099
- match_end = body_content_start + match.end()
1100
- # Find the end of this tag
1101
- tag_content = html_content[body_content_start + match.start():match_end]
1102
- insertion_point = html_content[:match_end] + '\n '
1103
- break
1104
-
1105
- if not insertion_point:
1106
- # Fallback to right after body tag with container div
1107
- insertion_point = html_content[:body_end] + '\n '
1108
- video_with_container = f'<div class="video-container" style="margin: 20px 0; text-align: center;">\n {video_html}\n </div>'
1109
- return f"""{SEARCH_START}
1110
- {insertion_point}
1111
- {DIVIDER}
1112
- {insertion_point}
1113
- {video_with_container}
1114
- {REPLACE_END}"""
1115
- else:
1116
- return f"""{SEARCH_START}
1117
- {insertion_point}
1118
- {DIVIDER}
1119
- {insertion_point}
1120
- {video_html}
1121
- {REPLACE_END}"""
1122
-
1123
- # If no <body>, just append
1124
- return f"{SEARCH_START}\n\n{DIVIDER}\n{video_html}\n{REPLACE_END}"
1125
-
1126
- def create_music_replacement_blocks_text_to_music(html_content: str, prompt: str, session_id: Optional[str] = None) -> str:
1127
- """Create search/replace blocks that insert ONE generated <audio> near the top of <body>."""
1128
- if not prompt or not prompt.strip():
1129
- return ""
1130
-
1131
- audio_html = generate_music_from_text(prompt, session_id=session_id, token=None) # TODO: Pass token from parent context
1132
- if audio_html.startswith("Error"):
1133
- return ""
1134
-
1135
- # Prefer inserting after the first <section>...</section> if present; else after <body>
1136
- import re
1137
- section_match = re.search(r"<section\b[\s\S]*?</section>", html_content, flags=re.IGNORECASE)
1138
- if section_match:
1139
- section_html = section_match.group(0)
1140
- section_clean = re.sub(r"\s+", " ", section_html.strip())
1141
- variations = [
1142
- section_html,
1143
- section_clean,
1144
- section_clean.replace('"', "'"),
1145
- section_clean.replace("'", '"'),
1146
- re.sub(r"\s+", " ", section_clean),
1147
- ]
1148
- blocks = []
1149
- for v in variations:
1150
- blocks.append(f"""{SEARCH_START}
1151
- {v}
1152
- {DIVIDER}
1153
- {v}\n {audio_html}
1154
- {REPLACE_END}""")
1155
- return "\n\n".join(blocks)
1156
- if '<body' in html_content:
1157
- body_end = html_content.find('>', html_content.find('<body')) + 1
1158
- insertion_point = html_content[:body_end] + '\n '
1159
- return f"""{SEARCH_START}
1160
- {insertion_point}
1161
- {DIVIDER}
1162
- {insertion_point}
1163
- {audio_html}
1164
- {REPLACE_END}"""
1165
-
1166
- # If no <body>, just append
1167
- return f"{SEARCH_START}\n\n{DIVIDER}\n{audio_html}\n{REPLACE_END}"
 
1
+ """
2
+ Media generation functions for images, videos, and music using various AI models.
3
+ """
4
+
5
  import os
6
+ import io
7
  import base64
 
 
 
 
8
  import requests
 
 
 
 
 
 
 
 
 
9
  import tempfile
10
+ from typing import Optional, Dict, Any
11
+ from PIL import Image
12
+ import numpy as np
 
 
13
  import gradio as gr
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
14
 
15
+ from huggingface_hub import InferenceClient
16
+ from utils import create_temp_media_url, compress_media_for_data_uri, validate_video_html
17
+ from config import HF_TOKEN
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
18
 
19
+ class MediaGenerator:
20
+ """Handles generation of images, videos, and music"""
21
+
22
+ def __init__(self):
23
+ self.hf_client = None
24
+ if HF_TOKEN:
25
+ self.hf_client = InferenceClient(
26
+ provider="auto",
27
+ api_key=HF_TOKEN,
28
+ bill_to="huggingface"
29
+ )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
30
 
31
+ def generate_image_with_qwen(self, prompt: str, image_index: int = 0,
32
+ token: Optional[gr.OAuthToken] = None) -> str:
33
+ """Generate image using Qwen image model"""
34
  try:
35
+ if not self.hf_client:
36
+ return "Error: HF_TOKEN environment variable is not set."
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
37
 
38
+ print(f"[ImageGen] Generating image with prompt: {prompt}")
39
+
40
+ # Generate image using Qwen/Qwen-Image model
41
+ image = self.hf_client.text_to_image(
42
+ prompt,
43
+ model="Qwen/Qwen-Image",
44
+ )
45
+
46
+ # Resize image to reduce size while maintaining quality
47
+ max_size = 1024
48
+ if image.width > max_size or image.height > max_size:
49
+ image.thumbnail((max_size, max_size), Image.Resampling.LANCZOS)
50
+
51
+ # Convert to bytes
52
+ buffer = io.BytesIO()
53
+ image.convert('RGB').save(buffer, format='JPEG', quality=90, optimize=True)
54
+ image_bytes = buffer.getvalue()
55
+
56
+ # Create temporary URL
57
+ filename = f"generated_image_{image_index}.jpg"
58
+ temp_url = self._upload_media_to_hf(image_bytes, filename, "image", token, use_temp=True)
59
+
60
+ if temp_url.startswith("Error"):
61
+ return temp_url
62
+
63
+ return f'<img src="{temp_url}" alt="{prompt}" style="max-width: 100%; height: auto; border-radius: 8px; margin: 10px 0;" loading="lazy" />'
64
+
65
+ except Exception as e:
66
+ print(f"Image generation error: {str(e)}")
67
+ return f"Error generating image: {str(e)}"
68
+
69
+ def generate_image_to_image(self, input_image_data, prompt: str,
70
+ token: Optional[gr.OAuthToken] = None) -> str:
71
+ """Generate image using image-to-image with Qwen-Image-Edit"""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
72
  try:
73
+ if not self.hf_client:
74
+ return "Error: HF_TOKEN environment variable is not set."
75
+
76
+ print(f"[Image2Image] Processing with prompt: {prompt}")
77
+
78
+ # Normalize input image to bytes
79
+ pil_image = self._process_input_image(input_image_data)
80
+
81
+ # Resize input image to avoid request body size limits
82
+ max_input_size = 1024
83
+ if pil_image.width > max_input_size or pil_image.height > max_input_size:
84
+ pil_image.thumbnail((max_input_size, max_input_size), Image.Resampling.LANCZOS)
85
+
86
+ # Convert to bytes
87
+ buf = io.BytesIO()
88
+ pil_image.save(buf, format='JPEG', quality=85, optimize=True)
89
+ input_bytes = buf.getvalue()
90
+
91
+ # Call image-to-image
92
+ image = self.hf_client.image_to_image(
93
+ input_bytes,
94
+ prompt=prompt,
95
+ model="Qwen/Qwen-Image-Edit",
96
+ )
97
+
98
+ # Resize and optimize output
99
+ max_size = 1024
100
+ if image.width > max_size or image.height > max_size:
101
+ image.thumbnail((max_size, max_size), Image.Resampling.LANCZOS)
102
+
103
+ out_buf = io.BytesIO()
104
+ image.convert('RGB').save(out_buf, format='JPEG', quality=90, optimize=True)
105
+ image_bytes = out_buf.getvalue()
106
+
107
+ # Create temporary URL
108
+ filename = "image_to_image_result.jpg"
109
+ temp_url = self._upload_media_to_hf(image_bytes, filename, "image", token, use_temp=True)
110
+
111
+ if temp_url.startswith("Error"):
112
+ return temp_url
113
+
114
+ return f'<img src="{temp_url}" alt="{prompt}" style="max-width: 100%; height: auto; border-radius: 8px; margin: 10px 0;" loading="lazy" />'
115
+
116
  except Exception as e:
117
+ print(f"Image-to-image generation error: {str(e)}")
118
+ return f"Error generating image (image-to-image): {str(e)}"
119
+
120
+ def generate_video_from_image(self, input_image_data, prompt: str,
121
+ session_id: Optional[str] = None,
122
+ token: Optional[gr.OAuthToken] = None) -> str:
123
+ """Generate video from input image using Lightricks LTX-Video"""
124
  try:
125
+ print("[Image2Video] Starting video generation")
126
+ if not self.hf_client:
127
+ return "Error: HF_TOKEN environment variable is not set."
128
+
129
+ # Process input image
130
+ pil_image = self._process_input_image(input_image_data)
131
+ print(f"[Image2Video] Input image size: {pil_image.size}")
132
+
133
+ # Compress image for API limits
134
+ input_bytes = self._compress_image_for_video(pil_image, max_size_mb=3.9)
135
+
136
+ # Check for image-to-video method
137
+ image_to_video_method = getattr(self.hf_client, "image_to_video", None)
138
+ if not callable(image_to_video_method):
139
+ return ("Error: Your huggingface_hub version does not support image_to_video. "
140
+ "Please upgrade with `pip install -U huggingface_hub`")
141
+
142
+ model_id = "Lightricks/LTX-Video-0.9.8-13B-distilled"
143
+ print(f"[Image2Video] Calling API with model: {model_id}")
144
+
145
+ video_bytes = image_to_video_method(
146
+ input_bytes,
147
+ prompt=prompt,
148
+ model=model_id,
149
  )
150
+
151
+ print(f"[Image2Video] Received video bytes: {len(video_bytes) if hasattr(video_bytes, '__len__') else 'unknown'}")
152
+
153
+ # Create temporary URL
154
+ filename = "image_to_video_result.mp4"
155
+ temp_url = self._upload_media_to_hf(video_bytes, filename, "video", token, use_temp=True)
156
+
157
+ if temp_url.startswith("Error"):
158
+ return temp_url
159
+
160
+ video_html = self._create_video_html(temp_url)
161
+
162
+ if not validate_video_html(video_html):
163
+ return "Error: Generated video HTML is malformed"
164
+
165
+ print(f"[Image2Video] Successfully generated video: {temp_url}")
166
+ return video_html
167
+
168
  except Exception as e:
169
+ print(f"[Image2Video] Error: {str(e)}")
170
+ return f"Error generating video (image-to-video): {str(e)}"
171
+
172
+ def generate_video_from_text(self, prompt: str, session_id: Optional[str] = None,
173
+ token: Optional[gr.OAuthToken] = None) -> str:
174
+ """Generate video from text prompt using Wan-AI text-to-video model"""
 
 
 
 
 
 
 
 
175
  try:
176
+ print("[Text2Video] Starting video generation")
177
+ if not self.hf_client:
178
+ return "Error: HF_TOKEN environment variable is not set."
179
+
180
+ # Check for text-to-video method
181
+ text_to_video_method = getattr(self.hf_client, "text_to_video", None)
182
+ if not callable(text_to_video_method):
183
+ return ("Error: Your huggingface_hub version does not support text_to_video. "
184
+ "Please upgrade with `pip install -U huggingface_hub`")
185
+
186
+ model_id = "Wan-AI/Wan2.2-T2V-A14B"
187
+ prompt_str = (prompt or "").strip()
188
+ print(f"[Text2Video] Using model: {model_id}, prompt length: {len(prompt_str)}")
189
+
190
+ video_bytes = text_to_video_method(
191
+ prompt_str,
192
+ model=model_id,
193
  )
194
 
195
+ print(f"[Text2Video] Received video bytes: {len(video_bytes) if hasattr(video_bytes, '__len__') else 'unknown'}")
 
 
 
196
 
197
+ # Create temporary URL
198
+ filename = "text_to_video_result.mp4"
199
+ temp_url = self._upload_media_to_hf(video_bytes, filename, "video", token, use_temp=True)
200
+
201
+ if temp_url.startswith("Error"):
202
+ return temp_url
203
+
204
+ video_html = self._create_video_html(temp_url)
205
+
206
+ if not validate_video_html(video_html):
207
+ return "Error: Generated video HTML is malformed"
208
+
209
+ print(f"[Text2Video] Successfully generated video: {temp_url}")
210
+ return video_html
211
+
212
+ except Exception as e:
213
+ print(f"[Text2Video] Error: {str(e)}")
214
+ return f"Error generating video (text-to-video): {str(e)}"
215
+
216
+ def generate_music_from_text(self, prompt: str, music_length_ms: int = 30000,
217
+ session_id: Optional[str] = None,
218
+ token: Optional[gr.OAuthToken] = None) -> str:
219
+ """Generate music using ElevenLabs Music API"""
220
+ try:
221
+ api_key = os.getenv('ELEVENLABS_API_KEY')
222
+ if not api_key:
223
+ return "Error: ELEVENLABS_API_KEY environment variable is not set."
224
+
225
+ print(f"[MusicGen] Generating music: {prompt}")
226
+
227
+ headers = {
228
+ 'Content-Type': 'application/json',
229
+ 'xi-api-key': api_key,
230
+ }
231
+
232
+ payload = {
233
+ 'prompt': prompt or 'Epic orchestral theme with soaring strings and powerful brass',
234
+ 'music_length_ms': int(music_length_ms) if music_length_ms else 30000,
235
+ }
236
+
237
+ resp = requests.post(
238
+ 'https://api.elevenlabs.io/v1/music/compose',
239
+ headers=headers,
240
+ json=payload,
241
+ timeout=60
242
+ )
243
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
244
  try:
245
+ resp.raise_for_status()
 
 
246
  except Exception as e:
247
+ error_text = getattr(e, 'response', resp).text if hasattr(e, 'response') else resp.text
248
+ return f"Error generating music: {error_text}"
249
+
250
+ # Create temporary URL
251
+ filename = "generated_music.mp3"
252
+ temp_url = self._upload_media_to_hf(resp.content, filename, "audio", token, use_temp=True)
253
+
254
+ if temp_url.startswith("Error"):
255
+ return temp_url
256
+
257
+ audio_html = self._create_audio_html(temp_url)
258
+ print(f"[MusicGen] Successfully generated music: {temp_url}")
259
+ return audio_html
260
+
261
+ except Exception as e:
262
+ print(f"[MusicGen] Error: {str(e)}")
263
+ return f"Error generating music: {str(e)}"
264
+
265
+ def _process_input_image(self, input_image_data) -> Image.Image:
266
+ """Convert various image formats to PIL Image"""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
267
  if hasattr(input_image_data, 'read'):
 
268
  raw = input_image_data.read()
269
  pil_image = Image.open(io.BytesIO(raw))
270
  elif hasattr(input_image_data, 'mode') and hasattr(input_image_data, 'size'):
 
271
  pil_image = input_image_data
272
+ elif isinstance(input_image_data, np.ndarray):
273
  pil_image = Image.fromarray(input_image_data)
274
  elif isinstance(input_image_data, (bytes, bytearray)):
275
  pil_image = Image.open(io.BytesIO(input_image_data))
276
  else:
 
277
  pil_image = Image.open(io.BytesIO(bytes(input_image_data)))
278
+
279
  # Ensure RGB
280
  if pil_image.mode != 'RGB':
281
  pil_image = pil_image.convert('RGB')
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
282
 
283
+ return pil_image
284
+
285
+ def _compress_image_for_video(self, pil_image: Image.Image, max_size_mb: float = 3.9) -> bytes:
286
+ """Compress image for video generation API limits"""
287
+ MAX_BYTES = int(max_size_mb * 1024 * 1024)
288
+ max_dim = 1024
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
289
  quality = 90
290
+
291
  def encode_current(pil: Image.Image, q: int) -> bytes:
292
  tmp = io.BytesIO()
293
  pil.save(tmp, format='JPEG', quality=q, optimize=True)
294
  return tmp.getvalue()
295
+
296
+ # Downscale while too large
297
  while max(pil_image.size) > max_dim:
298
  ratio = max_dim / float(max(pil_image.size))
299
  new_size = (max(1, int(pil_image.size[0] * ratio)), max(1, int(pil_image.size[1] * ratio)))
300
  pil_image = pil_image.resize(new_size, Image.Resampling.LANCZOS)
301
+
302
  encoded = encode_current(pil_image, quality)
303
+
304
+ # Reduce quality or dimensions if still too large
305
  while len(encoded) > MAX_BYTES and (quality > 40 or max(pil_image.size) > 640):
306
  if quality > 40:
307
  quality -= 10
308
  else:
 
309
  new_w = max(1, int(pil_image.size[0] * 0.85))
310
  new_h = max(1, int(pil_image.size[1] * 0.85))
311
  pil_image = pil_image.resize((new_w, new_h), Image.Resampling.LANCZOS)
312
  encoded = encode_current(pil_image, quality)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
313
 
314
+ return encoded
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
315
 
316
+ def _upload_media_to_hf(self, media_bytes: bytes, filename: str, media_type: str,
317
+ token: Optional[gr.OAuthToken] = None, use_temp: bool = True) -> str:
318
+ """Upload media to HF or create temporary file"""
319
+ if use_temp:
320
+ return create_temp_media_url(media_bytes, filename, media_type)
321
+
322
+ # HF upload logic would go here for permanent URLs
323
+ # For now, always use temp files
324
+ return create_temp_media_url(media_bytes, filename, media_type)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
325
 
326
+ def _create_video_html(self, video_url: str) -> str:
327
+ """Create HTML video element"""
328
+ return f'''<video controls autoplay muted loop playsinline
329
+ style="max-width: 100%; height: auto; border-radius: 8px; margin: 10px 0; display: block;"
330
+ onloadstart="this.style.backgroundColor='#f0f0f0'"
331
+ onerror="this.style.display='none'; console.error('Video failed to load')">
332
+ <source src="{video_url}" type="video/mp4" />
333
+ <p style="text-align: center; color: #666;">Your browser does not support the video tag.</p>
334
+ </video>'''
335
 
336
+ def _create_audio_html(self, audio_url: str) -> str:
337
+ """Create HTML audio player"""
338
+ return f'''<div class="anycoder-music" style="max-width:420px;margin:16px auto;padding:12px 16px;border:1px solid #e5e7eb;border-radius:12px;background:linear-gradient(180deg,#fafafa,#f3f4f6);box-shadow:0 2px 8px rgba(0,0,0,0.06)">
339
+ <div style="font-size:13px;color:#374151;margin-bottom:8px;display:flex;align-items:center;gap:6px">
340
+ <span>🎵 Generated music</span>
341
+ </div>
342
+ <audio controls autoplay loop style="width:100%;outline:none;">
343
+ <source src="{audio_url}" type="audio/mpeg" />
344
+ Your browser does not support the audio element.
345
+ </audio>
346
+ </div>'''
347
+
348
+ # Global media generator instance
349
+ media_generator = MediaGenerator()
350
+
351
+ # Export main functions
352
+ def generate_image_with_qwen(prompt: str, image_index: int = 0, token: Optional[gr.OAuthToken] = None) -> str:
353
+ return media_generator.generate_image_with_qwen(prompt, image_index, token)
354
+
355
+ def generate_image_to_image(input_image_data, prompt: str, token: Optional[gr.OAuthToken] = None) -> str:
356
+ return media_generator.generate_image_to_image(input_image_data, prompt, token)
357
+
358
+ def generate_video_from_image(input_image_data, prompt: str, session_id: Optional[str] = None,
359
+ token: Optional[gr.OAuthToken] = None) -> str:
360
+ return media_generator.generate_video_from_image(input_image_data, prompt, session_id, token)
361
+
362
+ def generate_video_from_text(prompt: str, session_id: Optional[str] = None,
363
+ token: Optional[gr.OAuthToken] = None) -> str:
364
+ return media_generator.generate_video_from_text(prompt, session_id, token)
365
+
366
+ def generate_music_from_text(prompt: str, music_length_ms: int = 30000, session_id: Optional[str] = None,
367
+ token: Optional[gr.OAuthToken] = None) -> str:
368
+ return media_generator.generate_music_from_text(prompt, music_length_ms, session_id, token)