|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import functools |
|
import warnings |
|
from typing import Optional, Generator, Callable |
|
import os |
|
import pandas as pd |
|
from datetime import datetime, timedelta, UTC |
|
import requests |
|
from tqdm import tqdm |
|
from typing import List, Dict |
|
from live_traders_data import add_trading_info |
|
from utils import SUBGRAPH_API_KEY, measure_execution_time |
|
from live_utils import OMEN_SUBGRAPH_URL, CREATOR, BATCH_SIZE, DATA_DIR |
|
from queries import ( |
|
FPMMS_WITH_TOKENS_QUERY, |
|
ID_FIELD, |
|
DATA_FIELD, |
|
ANSWER_FIELD, |
|
ANSWER_TIMESTAMP_FIELD, |
|
QUERY_FIELD, |
|
TITLE_FIELD, |
|
OUTCOMES_FIELD, |
|
OPENING_TIMESTAMP_FIELD, |
|
CREATION_TIMESTAMP_FIELD, |
|
LIQUIDITY_FIELD, |
|
LIQUIDIY_MEASURE_FIELD, |
|
TOKEN_AMOUNTS_FIELD, |
|
ERROR_FIELD, |
|
QUESTION_FIELD, |
|
FPMMS_FIELD, |
|
) |
|
|
|
|
|
ResponseItemType = List[Dict[str, str]] |
|
SubgraphResponseType = Dict[str, ResponseItemType] |
|
|
|
|
|
class RetriesExceeded(Exception): |
|
"""Exception to raise when retries are exceeded during data-fetching.""" |
|
|
|
def __init__( |
|
self, msg="Maximum retries were exceeded while trying to fetch the data!" |
|
): |
|
super().__init__(msg) |
|
|
|
|
|
def hacky_retry(func: Callable, n_retries: int = 3) -> Callable: |
|
"""Create a hacky retry strategy. |
|
Unfortunately, we cannot use `requests.packages.urllib3.util.retry.Retry`, |
|
because the subgraph does not return the appropriate status codes in case of failure. |
|
Instead, it always returns code 200. Thus, we raise exceptions manually inside `make_request`, |
|
catch those exceptions in the hacky retry decorator and try again. |
|
Finally, if the allowed number of retries is exceeded, we raise a custom `RetriesExceeded` exception. |
|
|
|
:param func: the input request function. |
|
:param n_retries: the maximum allowed number of retries. |
|
:return: The request method with the hacky retry strategy applied. |
|
""" |
|
|
|
@functools.wraps(func) |
|
def wrapper_hacky_retry(*args, **kwargs) -> SubgraphResponseType: |
|
"""The wrapper for the hacky retry. |
|
|
|
:return: a response dictionary. |
|
""" |
|
retried = 0 |
|
|
|
while retried <= n_retries: |
|
try: |
|
if retried > 0: |
|
warnings.warn(f"Retrying {retried}/{n_retries}...") |
|
|
|
return func(*args, **kwargs) |
|
except (ValueError, ConnectionError) as e: |
|
warnings.warn(e.args[0]) |
|
finally: |
|
retried += 1 |
|
|
|
raise RetriesExceeded() |
|
|
|
return wrapper_hacky_retry |
|
|
|
|
|
@hacky_retry |
|
def query_subgraph(url: str, query: str, key: str) -> SubgraphResponseType: |
|
"""Query a subgraph. |
|
|
|
Args: |
|
url: the subgraph's URL. |
|
query: the query to be used. |
|
key: the key to use in order to access the required data. |
|
|
|
Returns: |
|
a response dictionary. |
|
""" |
|
content = {QUERY_FIELD: query} |
|
headers = { |
|
"Accept": "application/json", |
|
"Content-Type": "application/json", |
|
} |
|
res = requests.post(url, json=content, headers=headers) |
|
|
|
if res.status_code != 200: |
|
raise ConnectionError( |
|
"Something went wrong while trying to communicate with the subgraph " |
|
f"(Error: {res.status_code})!\n{res.text}" |
|
) |
|
|
|
body = res.json() |
|
if ERROR_FIELD in body.keys(): |
|
raise ValueError(f"The given query is not correct: {body[ERROR_FIELD]}") |
|
|
|
data = body.get(DATA_FIELD, {}).get(key, None) |
|
if data is None: |
|
raise ValueError(f"Unknown error encountered!\nRaw response: \n{body}") |
|
|
|
return data |
|
|
|
|
|
def fpmms_fetcher(current_timestamp: int) -> Generator[ResponseItemType, int, None]: |
|
"""An indefinite fetcher for the FPMMs.""" |
|
omen_subgraph = OMEN_SUBGRAPH_URL.substitute(subgraph_api_key=SUBGRAPH_API_KEY) |
|
print(f"omen_subgraph = {omen_subgraph}") |
|
while True: |
|
fpmm_id = yield |
|
fpmms_query = FPMMS_WITH_TOKENS_QUERY.substitute( |
|
creator=CREATOR, |
|
fpmm_id=fpmm_id, |
|
current_timestamp=current_timestamp, |
|
fpmms_field=FPMMS_FIELD, |
|
first=BATCH_SIZE, |
|
id_field=ID_FIELD, |
|
answer_timestamp_field=ANSWER_TIMESTAMP_FIELD, |
|
question_field=QUESTION_FIELD, |
|
outcomes_field=OUTCOMES_FIELD, |
|
title_field=TITLE_FIELD, |
|
opening_timestamp_field=OPENING_TIMESTAMP_FIELD, |
|
creation_timestamp_field=CREATION_TIMESTAMP_FIELD, |
|
liquidity_field=LIQUIDITY_FIELD, |
|
liquidity_measure_field=LIQUIDIY_MEASURE_FIELD, |
|
token_amounts_field=TOKEN_AMOUNTS_FIELD, |
|
) |
|
print(f"Executing query {fpmms_query}") |
|
yield query_subgraph(omen_subgraph, fpmms_query, FPMMS_FIELD) |
|
|
|
|
|
def fetch_fpmms(current_timestamp: int) -> pd.DataFrame: |
|
"""Fetch all the fpmms of the creator.""" |
|
print("Fetching all markets") |
|
latest_id = "" |
|
fpmms = [] |
|
fetcher = fpmms_fetcher(current_timestamp) |
|
for _ in tqdm(fetcher, unit="fpmms", unit_scale=BATCH_SIZE): |
|
batch = fetcher.send(latest_id) |
|
if len(batch) == 0: |
|
print("no data") |
|
break |
|
|
|
|
|
|
|
|
|
|
|
|
|
print(f"length of the data received = {len(batch)}") |
|
latest_id = batch[-1].get(ID_FIELD, "") |
|
if latest_id == "": |
|
raise ValueError(f"Unexpected data format retrieved: {batch}") |
|
|
|
fpmms.extend(batch) |
|
|
|
print("Finished collecting data") |
|
return pd.DataFrame(fpmms) |
|
|
|
|
|
def get_answer(fpmm: pd.Series) -> str: |
|
"""Get an answer from its index, using Series of an FPMM.""" |
|
return fpmm[QUESTION_FIELD][OUTCOMES_FIELD][fpmm[ANSWER_FIELD]] |
|
|
|
|
|
def get_first_token_perc(row): |
|
if row["total_tokens"] == 0.0: |
|
return 0 |
|
return round((row["token_first_amount"] / row["total_tokens"]) * 100, 2) |
|
|
|
|
|
def get_second_token_perc(row): |
|
if row["total_tokens"] == 0.0: |
|
return 0 |
|
return round((row["token_second_amount"] / row["total_tokens"]) * 100, 2) |
|
|
|
|
|
def transform_fpmms(fpmms: pd.DataFrame, filename: str, current_timestamp: int) -> None: |
|
"""Transform an FPMMS dataframe.""" |
|
|
|
|
|
|
|
fpmms["tokens_timestamp"] = current_timestamp |
|
fpmms["open"] = True |
|
|
|
|
|
fpmms["token_first_amount"] = fpmms.outcomeTokenAmounts.apply(lambda x: int(x[0])) |
|
fpmms["token_second_amount"] = fpmms.outcomeTokenAmounts.apply(lambda x: int(x[1])) |
|
fpmms["total_tokens"] = fpmms.apply( |
|
lambda x: x.token_first_amount + x.token_second_amount, axis=1 |
|
) |
|
fpmms["first_token_perc"] = fpmms.apply(lambda x: get_first_token_perc(x), axis=1) |
|
fpmms["second_token_perc"] = fpmms.apply(lambda x: get_second_token_perc(x), axis=1) |
|
fpmms.drop( |
|
columns=["token_first_amount", "token_second_amount", "total_tokens"], |
|
inplace=True, |
|
) |
|
|
|
old_fpmms = None |
|
if os.path.exists(DATA_DIR / filename): |
|
old_fpmms = pd.read_parquet(DATA_DIR / filename) |
|
|
|
if old_fpmms is not None: |
|
|
|
open_markets = list(fpmms.id.unique()) |
|
print("Updating market status of old markets") |
|
open_mask = old_fpmms["id"].isin(open_markets) |
|
old_fpmms.loc[~open_mask, "status"] = False |
|
|
|
|
|
print("Appending new data to previous data") |
|
fpmms = pd.concat([old_fpmms, fpmms], ignore_index=True) |
|
|
|
|
|
return |
|
|
|
|
|
@measure_execution_time |
|
def compute_distributions(filename: Optional[str]) -> pd.DataFrame: |
|
"""Fetch, process, store and return the markets as a Dataframe.""" |
|
|
|
print("fetching new markets information") |
|
current_timestamp = int(datetime.now(UTC).timestamp()) |
|
fpmms = fetch_fpmms(current_timestamp) |
|
print(fpmms.head()) |
|
|
|
print("transforming and updating previous data") |
|
|
|
transform_fpmms(fpmms, filename, current_timestamp) |
|
print(fpmms.head()) |
|
|
|
|
|
|
|
add_trading_info(fpmms) |
|
print("saving the data") |
|
print(fpmms.info()) |
|
if filename: |
|
fpmms.to_parquet(DATA_DIR / filename, index=False) |
|
|
|
return fpmms |
|
|
|
|
|
if __name__ == "__main__": |
|
compute_distributions("markets_live_data.parquet") |
|
|