GameConfigIdea / state_prompt_fileman_UI_functions.py
kwabs22
Some changes and flie splitting
a69d738
from datetime import datetime
import json
import os
import gradio as gr
from gradio_client import Client
import shutil
import time
import random
import zipfile
import base64
# Global variables
SPFManstate = {
"last_file": 0,
"output_dir": "output",
"errors": [],
"skipped_items": [],
"is_paid_api": False,
"cost_per_item": 0.1, # Default cost per item for paid API
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
"api_provider": "default", # Default API provider
"retry_delay": 300, # 5 minutes delay for retry
"max_retries": 3 # Maximum number of retries
}
SPFManprompt_list = []
#SPFMan - State, Prompt and file manager
def SPFManload_state(config_file=None):
global SPFManstate, SPFManprompt_list
try:
if config_file:
with open(config_file, "r", encoding="utf-8", errors="replace") as f:
loaded_data = json.load(f)
SPFManstate.update(loaded_data.get("state", {}))
SPFManprompt_list = loaded_data.get("prompt_list", [])
elif os.path.exists("state.json"):
with open("state.json", "r", encoding="utf-8", errors="replace") as f:
loaded_data = json.load(f)
SPFManstate.update(loaded_data.get("state", {}))
SPFManprompt_list = loaded_data.get("prompt_list", [])
except json.JSONDecodeError as e:
print(f"Error decoding JSON: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
def SPFMansave_state():
SPFManstate["timestamp"] = datetime.now().strftime("%Y%m%d_%H%M%S")
data_to_save = {
"state": SPFManstate,
"prompt_list": SPFManprompt_list
}
with open(f"state_{SPFManstate['timestamp']}.json", "w") as f:
json.dump(data_to_save, f)
# Delete old state files
for file in os.listdir():
if file.startswith("state_") and file.endswith(".json") and file != f"state_{SPFManstate['timestamp']}.json":
os.remove(file)
def SPFManensure_output_directory():
os.makedirs(SPFManstate['output_dir'], exist_ok=True)
def SPFMangenerate_image(prompt, retries=0):
SPFManensure_output_directory()
try:
client = Client("black-forest-labs/FLUX.1-dev")
result = client.predict(
prompt=prompt,
seed=0,
randomize_seed=True,
width=1024,
height=1024,
guidance_scale=3.5,
num_inference_steps=28,
api_name="/infer"
)
image_path = result[0]
filename = f"{SPFManstate['output_dir']}/{prompt[:50].replace(' ', '_')}_image_{SPFManstate['timestamp']}.webp"
shutil.move(image_path, filename)
return f"Image saved as {filename}"
except Exception as e:
error_msg = f"Error generating image: {str(e)}"
if "exceeded your GPU quota" in str(e) and retries < SPFManstate['max_retries']:
time.sleep(SPFManstate['retry_delay'])
return SPFMangenerate_image(prompt, retries + 1)
SPFManstate["errors"].append(error_msg)
return error_msg
def SPFMangenerate_audio(prompt, retries=0):
SPFManensure_output_directory()
try:
client = Client("artificialguybr/Stable-Audio-Open-Zero")
result = client.predict(
prompt=prompt,
seconds_total=30,
steps=100,
cfg_scale=7,
api_name="/predict"
)
if isinstance(result, str) and os.path.exists(result):
filename = f"{SPFManstate['output_dir']}/{prompt[:50].replace(' ', '_')}_audio_{SPFManstate['timestamp']}.wav"
shutil.move(result, filename)
else:
audio_data = base64.b64decode(result)
filename = f"{SPFManstate['output_dir']}/{prompt[:50].replace(' ', '_')}_audio_{SPFManstate['timestamp']}.wav"
with open(filename, "wb") as audio_file:
audio_file.write(audio_data)
return f"Audio saved as {filename}"
except Exception as e:
error_msg = f"Error generating audio: {str(e)}"
if "exceeded your GPU quota" in str(e) and retries < SPFManstate['max_retries']:
time.sleep(SPFManstate['retry_delay'])
return SPFMangenerate_audio(prompt, retries + 1)
SPFManstate["errors"].append(error_msg)
return error_msg
def SPFManprocess_prompts(prompt_list):
router = {
'image': SPFMangenerate_image,
'audio': SPFMangenerate_audio,
}
results = []
for prompt_type, prompt in prompt_list:
if prompt_type in router:
result = router[prompt_type](prompt)
results.append(result)
if "Error" in result:
break # Stop processing if there's an error
else:
error_msg = f"Unknown prompt type: {prompt_type}"
SPFManstate["errors"].append(error_msg)
results.append(error_msg)
break # Stop processing if there's an error
return results
def SPFMancreate_files_with_generation(resume=True):
global SPFManstate, SPFManprompt_list
results = []
if resume and SPFManstate["last_file"] < len(SPFManprompt_list):
start = SPFManstate["last_file"]
results.append(f"Resuming from item {start + 1}")
else:
start = 0
end = len(SPFManprompt_list)
for i in range(start, end):
if i in SPFManstate["skipped_items"]:
results.append(f"Skipped item {i + 1}")
continue
prompt_type, prompt = SPFManprompt_list[i]
try:
if SPFManstate["is_paid_api"]:
generation_results = SPFManprocess_prompts([(prompt_type, prompt)])
results.extend(generation_results)
else:
results.append(f"Processing: {prompt_type} - {prompt}")
generation_results = SPFManprocess_prompts([(prompt_type, prompt)])
results.extend(generation_results)
if any("Error" in result for result in generation_results):
break # Stop processing if there's an error
SPFManstate["last_file"] = i + 1
except Exception as e:
error_msg = f"Error processing item {i + 1}: {str(e)}"
SPFManstate["errors"].append(error_msg)
results.append(error_msg)
break # Stop processing if there's an error
SPFMansave_state()
yield "\n".join(results)
if not SPFManstate["is_paid_api"]:
break # Stop after processing one item for non-paid API
def SPFManadd_prompt(prompt_type, prompt):
global SPFManprompt_list
SPFManprompt_list.append((prompt_type, prompt))
SPFMansave_state()
return f"Added {prompt_type}: {prompt}", gr.update(value=len(SPFManprompt_list))
def SPFManclear_prompts():
global SPFManprompt_list
SPFManprompt_list = []
SPFMansave_state()
return "Prompt list cleared", gr.update(value=0)
def SPFManview_all_prompts():
return "\n".join([f"{i+1}. {t}: {p}" for i, (t, p) in enumerate(SPFManprompt_list)])
def SPFManzip_files():
SPFManensure_output_directory()
zip_filename = f"output_{SPFManstate['timestamp']}.zip"
with zipfile.ZipFile(zip_filename, 'w') as zipf:
for root, _, files in os.walk(SPFManstate['output_dir']):
for file in files:
zipf.write(os.path.join(root, file))
zipf.write(f"state_{SPFManstate['timestamp']}.json")
# Add prompt list to zip
prompt_list_filename = f"prompt_list_{SPFManstate['timestamp']}.txt"
with open(prompt_list_filename, 'w') as f:
for t, p in SPFManprompt_list:
f.write(f"{t}: {p}\n")
zipf.write(prompt_list_filename)
os.remove(prompt_list_filename) # Remove the temporary file
return f"Files zipped as {zip_filename}"
def SPFMantoggle_paid_api(value):
SPFManstate["is_paid_api"] = value
SPFMansave_state()
return f"Paid API: {'Enabled' if value else 'Disabled'}"
def SPFManestimate_cost():
return f"Estimated cost: ${len(SPFManprompt_list) * SPFManstate['cost_per_item']:.2f}"
def SPFManauto_generate_prompt(prompt_type):
subjects = ["cat", "dog", "tree", "mountain", "ocean", "city", "person", "flower", "car", "building"]
styles = ["realistic", "cartoon", "abstract", "vintage", "futuristic", "minimalist", "surreal", "impressionist"]
actions = ["running", "sleeping", "flying", "dancing", "singing", "jumping", "sitting", "laughing"]
if prompt_type == "image":
return f"A {random.choice(styles)} {random.choice(subjects)} {random.choice(actions)}"
elif prompt_type == "audio":
return f"Sound of a {random.choice(subjects)} {random.choice(actions)}"
def SPFManview_config():
config = {
"state": SPFManstate,
"prompt_list_length": len(SPFManprompt_list)
}
return json.dumps(config, indent=2)
def SPFManskip_item():
if SPFManstate["last_file"] < len(SPFManprompt_list):
SPFManstate["skipped_items"].append(SPFManstate["last_file"])
SPFManstate["last_file"] += 1
SPFMansave_state()
return f"Skipped item {SPFManstate['last_file']}"
return "No more items to skip"
def SPFManupdate_api_details(provider, cost):
SPFManstate["api_provider"] = provider
SPFManstate["cost_per_item"] = float(cost)
SPFMansave_state()
return f"API details updated: Provider - {provider}, Cost per item - ${cost}"
def SPFManload_config_file(file):
global SPFManstate, SPFManprompt_list
try:
# Clear existing state and prompt list
SPFManstate = {
"last_file": 0,
"output_dir": "output",
"errors": [],
"skipped_items": [],
"is_paid_api": False,
"cost_per_item": 0.1,
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
"api_provider": "default",
"retry_delay": 300,
"max_retries": 3
}
SPFManprompt_list = []
# Check if the file is a ZIP archive
if file.name.endswith('.zip'):
# Extract the ZIP file
extracted_folder_path = 'extracted_files'
os.makedirs(extracted_folder_path, exist_ok=True)
with zipfile.ZipFile(file.name, 'r') as zip_ref:
zip_ref.extractall(extracted_folder_path)
# Find and load the state JSON file
json_files = [f for f in os.listdir(extracted_folder_path) if f.startswith('state_') and f.endswith('.json')]
if json_files:
json_file_path = os.path.join(extracted_folder_path, json_files[0])
with open(json_file_path, 'r') as json_file:
loaded_data = json.load(json_file)
SPFManstate.update(loaded_data.get("state", {}))
SPFManprompt_list = loaded_data.get("prompt_list", [])
# Find and load the prompt list text file
txt_files = [f for f in os.listdir(extracted_folder_path) if f.startswith('prompt_list_') and f.endswith('.txt')]
if txt_files:
txt_file_path = os.path.join(extracted_folder_path, txt_files[0])
with open(txt_file_path, 'r') as txt_file:
for line in txt_file:
prompt_type, prompt = line.strip().split(': ', 1)
SPFManprompt_list.append((prompt_type, prompt))
# Clean up extracted files
shutil.rmtree(extracted_folder_path)
else:
# Load new configuration from a single file
SPFManload_state(file.name)
SPFMansave_state() # Save the loaded state
return f"Configuration loaded from {file.name}", gr.update(value=len(SPFManprompt_list))
except Exception as e:
return f"Error loading configuration: {str(e)}", gr.update(value=len(SPFManprompt_list))
# Handle JSON input and loading
def SPFManload_json_configuration(json_text):
global SPFManstate, SPFManprompt_list
try:
loaded_data = json.loads(json_text)
SPFManstate = loaded_data.get("state", SPFManstate)
SPFManprompt_list = loaded_data.get("prompt_list", SPFManprompt_list)
SPFMansave_state()
return "Configuration loaded from JSON input", gr.update(value=len(SPFManprompt_list))
except json.JSONDecodeError as e:
return f"Error parsing JSON: {str(e)}", gr.update(value=len(SPFManprompt_list))
except Exception as e:
return f"Unexpected error: {str(e)}", gr.update(value=len(SPFManprompt_list))