import spaces import gradio as gr import numpy as np import requests import base64 import os from datetime import datetime from pytz import timezone import torch import diffusers from diffusers import DDPMPipeline from transformers import AutoTokenizer, AutoModel tz = timezone('EST') API_ENDPOINT = os.getenv('API_ENDPOINT') API_KEY = os.getenv('API_KEY') print (API_ENDPOINT) print (API_KEY) title = "

Markup-to-Image Diffusion Models with Scheduled Sampling

" authors = "
Yuntian Deng, Noriyuki Kojima, Alexander M. Rush
" info = '
Paper Code
' #notice = "

Notice: Due to resource constraints, we've transitioned from GPU to CPU processing for this demo, which results in significantly longer inference times. We appreciate your understanding.

" notice = "

Acknowledgment: This demo is powered by GPU resources supported by the Hugging Face Community Grant.

" # setup def setup(): img_pipe = DDPMPipeline.from_pretrained("yuntian-deng/latex2im_ss_finetunegptneo") model_type = "EleutherAI/gpt-neo-125M" #encoder = AutoModel.from_pretrained(model_type).to(device) encoder = img_pipe.unet.text_encoder if False: l = len(img_pipe.unet.down_blocks) for i in range(l): img_pipe.unet.down_blocks[i] = torch.compile(img_pipe.unet.down_blocks[i]) l = len(img_pipe.unet.up_blocks) for i in range(l): img_pipe.unet.up_blocks[i] = torch.compile(img_pipe.unet.up_blocks[i]) tokenizer = AutoTokenizer.from_pretrained(model_type, max_length=1024) eos_id = tokenizer.encode(tokenizer.eos_token)[0] def forward_encoder(latex): device = ("cuda" if torch.cuda.is_available() else "cpu") img_pipe.to(device) encoded = tokenizer(latex, return_tensors='pt', truncation=True, max_length=1024) input_ids = encoded['input_ids'] input_ids = torch.cat((input_ids, torch.LongTensor([eos_id,]).unsqueeze(0)), dim=-1) input_ids = input_ids.to(device) attention_mask = encoded['attention_mask'] attention_mask = torch.cat((attention_mask, torch.LongTensor([1,]).unsqueeze(0)), dim=-1) attention_mask = attention_mask.to(device) with torch.no_grad(): outputs = encoder(input_ids=input_ids, attention_mask=attention_mask) last_hidden_state = outputs.last_hidden_state last_hidden_state = attention_mask.unsqueeze(-1) * last_hidden_state # shouldn't be necessary return last_hidden_state return img_pipe, forward_encoder img_pipe, forward_encoder = setup() with gr.Blocks() as demo: gr.Markdown(title) gr.Markdown(authors) gr.Markdown(info) gr.Markdown(notice) with gr.Row(): with gr.Column(scale=2): textbox = gr.Textbox(label=r'Type LaTeX formula below and click "Generate"', lines=1, max_lines=1, placeholder='Type LaTeX formula here and click "Generate"', value=r'\sum_{t=1}^T\E_{y_t \sim {\tilde P(y_t| y_0)}} \left\| \frac{y_t - \sqrt{\bar{\alpha}_t}y_0}{\sqrt{1-\bar{\alpha}_t}} - \epsilon_\theta(y_t, t)\right\|^2.') submit_btn = gr.Button("Generate", elem_id="btn") with gr.Column(scale=3): slider = gr.Slider(0, 1000, value=0, label='step (out of 1000)') image = gr.Image(label="Rendered Image", show_label=False, elem_id="image") inputs = [textbox] outputs = [slider, image, submit_btn] @spaces.GPU(duration=120) def infer(formula): current_time = datetime.now(tz) print (current_time, formula) data = {'formula': formula, 'api_key': API_KEY} latex = formula # TODO: normalize encoder_hidden_states = forward_encoder(latex) try: i = 0 results = [] for _, image_clean in img_pipe.run_clean(batch_size=1, generator=torch.manual_seed(0), encoder_hidden_states=encoder_hidden_states, output_type="numpy"): i += 1 image_clean = image_clean[0] image_clean = np.ascontiguousarray(image_clean) #s = base64.b64encode(image_clean).decode('ascii') #yield s q = image_clean q = q.reshape((64, 320, 3)) print (q.min(), q.max()) yield i, q, gr.update(visible=False) yield i, q, submit_btn.update(visible=True) #with requests.post(url=API_ENDPOINT, data=data, timeout=600, stream=True) as r: # i = 0 # for line in r.iter_lines(): # response = line.decode('ascii').strip() # r = base64.decodebytes(response.encode('ascii')) # q = np.frombuffer(r, dtype=np.float32).reshape((64, 320, 3)) # i += 1 # yield i, q, gr.update(visible=False) # yield i, q, gr.update(visible=True) except Exception as e: yield 1000, 255*np.ones((64, 320, 3)), gr.update(visible=True) submit_btn.click(fn=infer, inputs=inputs, outputs=outputs, concurrency_limit=1) demo.queue(max_size=20).launch()