Spaces:
Runtime error
Runtime error
import array | |
from datetime import datetime, date, tzinfo | |
from ipaddress import IPv4Address | |
from typing import Sequence, Optional, Any | |
from uuid import UUID, SafeUUID | |
from clickhouse_connect.driver.common import int_size | |
from clickhouse_connect.driver.types import ByteSource | |
from clickhouse_connect.driver.options import np | |
MONTH_DAYS = (0, 31, 59, 90, 120, 151, 181, 212, 243, 273, 304, 334, 365) | |
MONTH_DAYS_LEAP = (0, 31, 60, 91, 121, 152, 182, 213, 244, 274, 305, 335, 366) | |
def read_ipv4_col(source: ByteSource, num_rows: int): | |
column = source.read_array('I', num_rows) | |
fast_ip_v4 = IPv4Address.__new__ | |
new_col = [] | |
app = new_col.append | |
for x in column: | |
ipv4 = fast_ip_v4(IPv4Address) | |
ipv4._ip = x # pylint: disable=protected-access | |
app(ipv4) | |
return new_col | |
def read_datetime_col(source: ByteSource, num_rows: int, tz_info: Optional[tzinfo]): | |
src_array = source.read_array('I', num_rows) | |
if tz_info is None: | |
fts = datetime.utcfromtimestamp | |
return [fts(ts) for ts in src_array] | |
fts = datetime.fromtimestamp | |
return [fts(ts, tz_info) for ts in src_array] | |
def epoch_days_to_date(days: int) -> date: | |
cycles400, rem = divmod(days + 134774, 146097) | |
cycles100, rem = divmod(rem, 36524) | |
cycles, rem = divmod(rem, 1461) | |
years, rem = divmod(rem, 365) | |
year = (cycles << 2) + cycles400 * 400 + cycles100 * 100 + years + 1601 | |
if years == 4 or cycles100 == 4: | |
return date(year - 1, 12, 31) | |
m_list = MONTH_DAYS_LEAP if years == 3 and (year == 2000 or year % 100 != 0) else MONTH_DAYS | |
month = (rem + 24) >> 5 | |
while rem < m_list[month]: | |
month -= 1 | |
return date(year, month + 1, rem + 1 - m_list[month]) | |
def read_date_col(source: ByteSource, num_rows: int): | |
column = source.read_array('H', num_rows) | |
return [epoch_days_to_date(x) for x in column] | |
def read_date32_col(source: ByteSource, num_rows: int): | |
column = source.read_array('l' if int_size == 2 else 'i', num_rows) | |
return [epoch_days_to_date(x) for x in column] | |
def read_uuid_col(source: ByteSource, num_rows: int): | |
v = source.read_array('Q', num_rows * 2) | |
empty_uuid = UUID(int=0) | |
new_uuid = UUID.__new__ | |
unsafe = SafeUUID.unsafe | |
oset = object.__setattr__ | |
column = [] | |
app = column.append | |
for i in range(num_rows): | |
ix = i << 1 | |
int_value = v[ix] << 64 | v[ix + 1] | |
if int_value == 0: | |
app(empty_uuid) | |
else: | |
fast_uuid = new_uuid(UUID) | |
oset(fast_uuid, 'int', int_value) | |
oset(fast_uuid, 'is_safe', unsafe) | |
app(fast_uuid) | |
return column | |
def read_nullable_array(source: ByteSource, array_type: str, num_rows: int, null_obj: Any): | |
null_map = source.read_bytes(num_rows) | |
column = source.read_array(array_type, num_rows) | |
return [null_obj if null_map[ix] else column[ix] for ix in range(num_rows)] | |
def build_nullable_column(source: Sequence, null_map: bytes, null_obj: Any): | |
return [source[ix] if null_map[ix] == 0 else null_obj for ix in range(len(source))] | |
def build_lc_nullable_column(keys: Sequence, index: array.array, null_obj: Any): | |
column = [] | |
for ix in index: | |
if ix == 0: | |
column.append(null_obj) | |
else: | |
column.append(keys[ix]) | |
return column | |
def to_numpy_array(column: Sequence): | |
arr = np.empty((len(column),), dtype=np.object) | |
arr[:] = column | |
return arr | |
def pivot(data: Sequence[Sequence], start_row: int, end_row: int) -> Sequence[Sequence]: | |
return tuple(zip(*data[start_row: end_row])) | |
def write_str_col(column: Sequence, encoding: Optional[str], dest: bytearray): | |
app = dest.append | |
for x in column: | |
if not x: | |
app(0) | |
else: | |
if encoding: | |
x = x.encode(encoding) | |
sz = len(x) | |
while True: | |
b = sz & 0x7f | |
sz >>= 7 | |
if sz == 0: | |
app(b) | |
break | |
app(0x80 | b) | |
dest += x | |