Spaces:
Sleeping
Sleeping
import requests | |
import structlog | |
import openai | |
import os | |
import io | |
import random | |
import tiktoken | |
import enum | |
import time | |
import retrying | |
import IPython.display as display | |
from base64 import b64decode | |
import base64 | |
from io import BytesIO | |
import PIL | |
import PIL.Image | |
import PIL.ImageDraw | |
import PIL.ImageFont | |
import gradio as gr | |
import cachetools.func | |
from huggingface_hub import hf_hub_download | |
import concurrent.futures | |
import geopy | |
logger = structlog.getLogger() | |
weather_api_key = os.environ['WEATHER_API'] | |
openai.api_key = os.environ.get("OPENAI_KEY", None) | |
animals = [x.strip() for x in open('animals.txt').readlines()] | |
art_styles = [x.strip() for x in open('art_styles.txt').readlines()] | |
font_path = hf_hub_download("ybelkada/fonts", "Arial.TTF") | |
def get_lat_long(zip): | |
loc = geopy.Nominatim(user_agent='weatherboy-gpt').geocode(str(zip)) | |
return loc.latitude, loc.longitude | |
class Chat: | |
class Model(enum.Enum): | |
GPT3_5 = "gpt-3.5-turbo" | |
GPT_4 = "gpt-4" | |
def __init__(self, system, max_length=4096//2): | |
self._system = system | |
self._max_length = max_length | |
self._history = [ | |
{"role": "system", "content": self._system}, | |
] | |
def num_tokens_from_text(cls, text, model="gpt-3.5-turbo"): | |
"""Returns the number of tokens used by some text.""" | |
encoding = tiktoken.encoding_for_model(model) | |
return len(encoding.encode(text)) | |
def num_tokens_from_messages(cls, messages, model="gpt-3.5-turbo"): | |
"""Returns the number of tokens used by a list of messages.""" | |
encoding = tiktoken.encoding_for_model(model) | |
num_tokens = 0 | |
for message in messages: | |
num_tokens += 4 # every message follows <im_start>{role/name}\n{content}<im_end>\n | |
for key, value in message.items(): | |
num_tokens += len(encoding.encode(value)) | |
if key == "name": # if there's a name, the role is omitted | |
num_tokens += -1 # role is always required and always 1 token | |
num_tokens += 2 # every reply is primed with <im_start>assistant | |
return num_tokens | |
def _msg(self, *args, model=Model.GPT3_5.value, **kwargs): | |
return openai.ChatCompletion.create( | |
*args, | |
model=model, | |
messages=self._history, | |
**kwargs | |
) | |
def message(self, next_msg=None, **kwargs): | |
# TODO: Optimize this if slow through easy caching | |
while len(self._history) > 1 and self.num_tokens_from_messages(self._history) > self._max_length: | |
logger.info(f'Popping message: {self._history.pop(1)}') | |
if next_msg is not None: | |
self._history.append({"role": "user", "content": next_msg}) | |
logger.info('requesting openai...') | |
resp = self._msg(**kwargs) | |
logger.info('received openai...') | |
text = resp.choices[0].message.content | |
self._history.append({"role": "assistant", "content": text}) | |
return text | |
class Weather: | |
def __init__(self, zip_code='10001', api_key=weather_api_key): | |
self.zip_code = zip_code | |
self.api_key = api_key | |
def get_weather(self): | |
lat, long = get_lat_long(self.zip_code) | |
url = f"https://forecast.weather.gov/MapClick.php?lat={lat:.2f}&lon={long:.2f}&unit=0&lg=english&FcstType=json" | |
headers = {'accept': 'application/json'} | |
return requests.get(url, headers=headers).json() | |
def get_info(self): | |
data = self.get_weather() | |
new_data = {} | |
new_data['now'] = data['currentobservation'] | |
# The 'time' and 'data' keys seem to have hourly/daily data | |
# Assuming the first entry in these lists is for the current hour | |
new_data['hour'] = { | |
'time': data['time']['startValidTime'][0], | |
'tempLabel': data['time']['tempLabel'][0], | |
'temperature': data['data']['temperature'][0], | |
'pop': data['data']['pop'][0], | |
'weather': data['data']['weather'][0], | |
'iconLink': data['data']['iconLink'][0], | |
'text': data['data']['text'][0], | |
} | |
# And the rest of the 'time' and 'data' lists are for the rest of the day | |
new_data['day'] = { | |
'time': data['time']['startValidTime'][1:], | |
'tempLabel': data['time']['tempLabel'][1:], | |
'temperature': data['data']['temperature'][1:], | |
'pop': data['data']['pop'][1:], | |
'weather': data['data']['weather'][1:], | |
'iconLink': data['data']['iconLink'][1:], | |
'text': data['data']['text'][1:], | |
} | |
return new_data | |
class Image: | |
class Size(enum.Enum): | |
SMALL = "256x256" | |
MEDIUM = "512x512" | |
LARGE = "1024x1024" | |
def create(cls, prompt, n=1, size=Size.SMALL): | |
logger.info('requesting openai.Image...') | |
resp = openai.Image.create(prompt=prompt, n=n, size=size.value, response_format='b64_json') | |
logger.info('received openai.Image...') | |
if n == 1: return resp["data"][0] | |
return resp["data"] | |
def create_collage(image1, image2, image3, image4): | |
# assuming images are the same size | |
width, height = image1.size | |
new_img = PIL.Image.new('RGB', (2 * width, 2 * height)) | |
# place images in collage image | |
new_img.paste(image1, (0,0)) | |
new_img.paste(image2, (width, 0)) | |
new_img.paste(image3, (0, height)) | |
new_img.paste(image4, (width, height)) | |
return new_img | |
def overlay_text_on_image(img, text, position, text_color=(255, 255, 255), box_color=(0, 0, 0, 128), decode=False): | |
# Convert the base64 string back to an image | |
if decode: | |
img_bytes = base64.b64decode(img) | |
img = PIL.Image.open(BytesIO(img_bytes)) | |
# Get image dimensions | |
img_width, img_height = img.size | |
# Create a ImageDraw object | |
draw = PIL.ImageDraw.Draw(img) | |
# Reduce the font size until it fits the image width or height | |
l, r = 1, 50 | |
while l < r: | |
font_size = (l + r) // 2 | |
font = PIL.ImageFont.truetype(font_path, font_size) | |
left, upper, right, lower = draw.textbbox((0, 0), text, font=font) | |
text_width = right - left | |
text_height = lower - upper | |
if text_width <= img_width and text_height <= img_height: | |
l = font_size + 1 | |
else: | |
r = font_size - 1 | |
font_size = max(l-1, 1) | |
left, upper, right, lower = draw.textbbox((0, 0), text, font=font) | |
text_width = right - left | |
text_height = lower - upper | |
if position == 'top-left': | |
x, y = 0, 0 | |
elif position == 'top-right': | |
x, y = img_width - text_width, 0 | |
elif position == 'bottom-left': | |
x, y = 0, img_height - text_height | |
elif position == 'bottom-right': | |
x, y = img_width - text_width, img_height - text_height | |
else: | |
raise ValueError("Position should be 'top-left', 'top-right', 'bottom-left' or 'bottom-right'.") | |
# Draw a semi-transparent box around the text | |
draw.rectangle([x, y, x + text_width, y + text_height], fill=box_color) | |
# Draw the text on the image | |
draw.text((x, y), text, font=font, fill=text_color) | |
return img | |
class WeatherDraw: | |
def clean_text(self, weather_info): | |
chat = Chat("Given the following weather conditions, write a very small, concise plaintext summary that will overlay on top of an image.") | |
text = chat.message(str(weather_info)) | |
return text | |
def generate_image(self, weather_info, **kwargs): | |
animal = random.choice(animals) | |
logger.info(f"Got animal {animal}") | |
chat = Chat(f''' | |
Given the following weather conditions, write a plaintext, short, and vivid description of an | |
adorable {animal} in the weather conditions doing a fun activity a human would do matching these weather conditions. | |
For instance, if its snowing, the activity would be snowboarding. | |
Make sure to include a small background. | |
Only write the short description and nothing else. | |
Do not include specific numbers.'''.replace('\n', ' ')) | |
description = chat.message(str(weather_info)) | |
prompt = f'{description} In the style of {random.choice(art_styles)}' | |
logger.info(prompt) | |
img = Image.create(prompt, **kwargs) | |
return img["b64_json"], prompt | |
def step_one_forecast(self, weather_info, **kwargs): | |
img, txt = self.generate_image(weather_info, **kwargs) | |
# text = self.clean_text(weather_info) | |
# return overlay_text_on_image(img, text, 'bottom-left') | |
return img, txt | |
def weather_img(self, weather_data): | |
import io | |
# Create a new image with white background | |
image = PIL.Image.new('RGB', (256, 256), (255, 255, 255)) | |
draw = PIL.ImageDraw.Draw(image) | |
# Load a font | |
font = PIL.ImageFont.truetype(font_path, 12) | |
# Draw text on the image | |
y_text = 5 | |
items_to_display = { | |
'now': {'Temperature': weather_data['now']['Temp'], | |
'Condition': weather_data['now']['Weather'],}, | |
'hour': {'Temperature': weather_data['hour']['temperature'], | |
'Condition': weather_data['hour']['weather']}, | |
'day': {'High': max(weather_data['day']['temperature']), | |
'Low': min(weather_data['day']['temperature']), | |
'Condition': weather_data['day']['weather'][0]}, | |
} | |
for category, values in items_to_display.items(): | |
draw.text((5, y_text), category, font=font, fill=(0, 0, 0)) | |
y_text += 15 | |
for key, value in values.items(): | |
text = f"{key}: {value}" | |
draw.text((10, y_text), text, font=font, fill=(0, 0, 0)) | |
y_text += 15 | |
# Download the weather condition icon for now, day and next hour | |
for index, time in enumerate(items_to_display.keys()): | |
if time == 'day': | |
icon_url = weather_data['day']['iconLink'][0] | |
elif time == 'now': | |
icon_url = 'https://forecast.weather.gov/newimages/medium/'+weather_data['now']['Weatherimage'] | |
else: | |
icon_url = weather_data[time]['iconLink'] | |
try: | |
response = requests.get(icon_url) | |
icon = PIL.Image.open(io.BytesIO(response.content)) | |
except: | |
print(time, icon_url) | |
continue | |
# Resize the icon | |
icon = icon.resize((60, 60)) | |
# Paste the icon on the image | |
image.paste(icon, (index*70 + 10, 190)) | |
return image | |
def step(self, zip_code='10001', **kwargs): | |
forecast = Weather(zip_code).get_info() | |
images, texts = [], [] | |
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as e: | |
runs = {} | |
for time, data in forecast.items(): | |
if time == 'etc': continue | |
runs[e.submit(self.step_one_forecast, data, **kwargs)] = time, data | |
for r in concurrent.futures.as_completed(runs.keys()): | |
img, txt = r.result() | |
time, data = runs[r] | |
images.append(overlay_text_on_image(img, time, 'top-right', decode=True)) | |
# images.append(overlay_text_on_image(img, '', 'top-right', decode=True)) | |
texts.append(txt) | |
return create_collage(*images, self.weather_img(forecast)), *texts, str(forecast) | |
# Define Gradio interface | |
iface = gr.Interface(fn=WeatherDraw().step, | |
inputs=gr.inputs.Textbox(label="Enter Zipcode"), | |
outputs=[gr.outputs.Image(type='pil'), "text", "text", "text", "text"], | |
title="US Zipcode Weather", | |
description="Enter a US Zipcode and get some weather.") | |
# Run the interface | |
iface.launch() |