pup-py's picture
10K 1MB limit
55a58f8
raw
history blame
5.49 kB
import ast
import base64
import duckdb
import json
import re
import textwrap
from ulid import ULID
HISTORY_FILE = "history.json"
MAX_ROWS = 10000
class SQLError(Exception):
pass
class NotFoundError(Exception):
pass
class Q(str):
UNSAFE = ["CREATE", "DELETE", "DROP", "INSERT", "UPDATE"]
rows=None
def __new__(cls, template: str, **kwargs):
"""Create a new Q-string."""
_template = textwrap.dedent(template).strip()
try:
instance = str.__new__(cls, _template.format(**kwargs))
except KeyError:
instance = str.__new__(cls, _template)
instance.id = str(ULID())
instance.alias = kwargs.pop("alias") if kwargs.get("alias") else None
instance.template = _template
instance.kwargs = kwargs
instance.definitions = "\n".join([f"{k} = {repr(v)}" for k, v in kwargs.items()])
for attr in ("rows", "cols", "source_id", "start", "end"):
setattr(instance, attr, None)
return instance
def __repr__(self):
"""Neat repr for inspecting Q objects."""
strings = []
for k, v in self.__dict__.items():
value_repr = "\n" + textwrap.indent(v, " ") if "\n" in str(v) else v
strings.append(f"{k}: {value_repr}")
return "\n".join(strings)
def run(self, sql_engine=None, save=False, _raise=False):
self.start = ULID()
try:
if sql_engine is None:
res = self.run_duckdb()
else:
res = self.run_sql(sql_engine)
self.rows, self.cols = res.shape
return res
except Exception as e:
if _raise:
raise e
return str(e)
finally:
self.end = ULID()
if save:
self.save()
def run_duckdb(self):
if MAX_ROWS:
return duckdb.sql(f"WITH x AS ({self}) SELECT * FROM x LIMIT {MAX_ROWS}")
else:
return duckdb.sql(self)
def df(self, sql_engine=None, save=False, _raise=False):
res = self.run(sql_engine=sql_engine, save=save, _raise=_raise)
if not getattr(self, "rows", None):
return
else:
result_df = res.df()
result_df.q = self
return result_df
def save(self, file=HISTORY_FILE):
with open(file, "a") as f:
f.write(self.json)
f.write("\n")
@property
def json(self):
serialized = {"id": self.id, "q": self}
serialized.update(self.__dict__)
return json.dumps(serialized, default=lambda x: x.datetime.strftime("%F %T.%f")[:-3])
@property
def is_safe(self):
return not any(cmd in self.template.upper() for cmd in self.UNSAFE)
@classmethod
def from_dict(cls, query_dict: dict):
q = query_dict.pop("q")
return cls(q, **query_dict)
@classmethod
def from_template_and_definitions(cls, template: str, definitions: str, alias: str|None = None):
query_dict = {"q": template, "alias": alias}
query_dict.update(parse_definitions(definitions))
instance = Q.from_dict(query_dict)
instance.definitions = definitions
return instance
@classmethod
def from_history(cls, query_id=None, alias=None):
search_query = Q(f"""
SELECT id, template, kwargs
FROM '{HISTORY_FILE}'
WHERE id='{query_id}' OR alias='{alias}'
LIMIT 1
""")
query = search_query.run()
if search_query.rows == 1:
source_id, template, kwargs = query.fetchall()[0]
kwargs = {k: v for k, v in kwargs.items() if v is not None}
instance = cls(template, **kwargs)
instance.source_id = source_id
return instance
elif search_query.rows == 0:
raise NotFoundError(f"id '{query_id}' / alias '{alias}' not found")
else:
raise SQLError(query)
# @property
# def definitions(self):
# return "\n".join([""]+[f"{k} = {v}" for k, v in self.kwargs.items()])
@property
def base64(self):
return base64.b64encode(self.encode()).decode()
@classmethod
def from_base64(cls, b64):
"""Initializing from base64-encoded URL paths."""
return cls(base64.b64decode(b64).decode())
def parse_definitions(definitions) -> dict:
"""Parse a string literal of "key=value" pairs, one per line, into kwargs."""
kwargs = {}
lines = definitions.split("\n")
for _line in lines:
line = re.sub("\s+", "", _line)
if line == "" or line.startswith("#"):
continue
if "=" in line:
key, value = line.split("=", maxsplit=1)
kwargs[key] = ast.literal_eval(value)
return kwargs
EX1 = Q.from_template_and_definitions(
template="SELECT {x} AS {colname}",
definitions="\n".join([
"# Define variables: one '=' per line",
"x=42",
"colname='answer'",
]),
alias="example1",
)
EX2 = Q(
"""
SELECT
Symbol,
Number,
Mass,
Abundance
FROM '{url}'
""",
url="https://raw.githubusercontent.com/ekwan/cctk/master/cctk/data/isotopes.csv",
alias="example2",
)
EX3 = Q(
"""
SELECT *
FROM 'history.json'
ORDER BY id DESC
""",
alias="example3",
)
EX4 = Q("SELECT nothing", alias="bad_example")