Spaces:
Runtime error
Runtime error
File size: 4,087 Bytes
4a51346 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
import array
from datetime import datetime, date, tzinfo
from ipaddress import IPv4Address
from typing import Sequence, Optional, Any
from uuid import UUID, SafeUUID
from clickhouse_connect.driver.common import int_size
from clickhouse_connect.driver.types import ByteSource
from clickhouse_connect.driver.options import np
MONTH_DAYS = (0, 31, 59, 90, 120, 151, 181, 212, 243, 273, 304, 334, 365)
MONTH_DAYS_LEAP = (0, 31, 60, 91, 121, 152, 182, 213, 244, 274, 305, 335, 366)
def read_ipv4_col(source: ByteSource, num_rows: int):
column = source.read_array('I', num_rows)
fast_ip_v4 = IPv4Address.__new__
new_col = []
app = new_col.append
for x in column:
ipv4 = fast_ip_v4(IPv4Address)
ipv4._ip = x # pylint: disable=protected-access
app(ipv4)
return new_col
def read_datetime_col(source: ByteSource, num_rows: int, tz_info: Optional[tzinfo]):
src_array = source.read_array('I', num_rows)
if tz_info is None:
fts = datetime.utcfromtimestamp
return [fts(ts) for ts in src_array]
fts = datetime.fromtimestamp
return [fts(ts, tz_info) for ts in src_array]
def epoch_days_to_date(days: int) -> date:
cycles400, rem = divmod(days + 134774, 146097)
cycles100, rem = divmod(rem, 36524)
cycles, rem = divmod(rem, 1461)
years, rem = divmod(rem, 365)
year = (cycles << 2) + cycles400 * 400 + cycles100 * 100 + years + 1601
if years == 4 or cycles100 == 4:
return date(year - 1, 12, 31)
m_list = MONTH_DAYS_LEAP if years == 3 and (year == 2000 or year % 100 != 0) else MONTH_DAYS
month = (rem + 24) >> 5
while rem < m_list[month]:
month -= 1
return date(year, month + 1, rem + 1 - m_list[month])
def read_date_col(source: ByteSource, num_rows: int):
column = source.read_array('H', num_rows)
return [epoch_days_to_date(x) for x in column]
def read_date32_col(source: ByteSource, num_rows: int):
column = source.read_array('l' if int_size == 2 else 'i', num_rows)
return [epoch_days_to_date(x) for x in column]
def read_uuid_col(source: ByteSource, num_rows: int):
v = source.read_array('Q', num_rows * 2)
empty_uuid = UUID(int=0)
new_uuid = UUID.__new__
unsafe = SafeUUID.unsafe
oset = object.__setattr__
column = []
app = column.append
for i in range(num_rows):
ix = i << 1
int_value = v[ix] << 64 | v[ix + 1]
if int_value == 0:
app(empty_uuid)
else:
fast_uuid = new_uuid(UUID)
oset(fast_uuid, 'int', int_value)
oset(fast_uuid, 'is_safe', unsafe)
app(fast_uuid)
return column
def read_nullable_array(source: ByteSource, array_type: str, num_rows: int, null_obj: Any):
null_map = source.read_bytes(num_rows)
column = source.read_array(array_type, num_rows)
return [null_obj if null_map[ix] else column[ix] for ix in range(num_rows)]
def build_nullable_column(source: Sequence, null_map: bytes, null_obj: Any):
return [source[ix] if null_map[ix] == 0 else null_obj for ix in range(len(source))]
def build_lc_nullable_column(keys: Sequence, index: array.array, null_obj: Any):
column = []
for ix in index:
if ix == 0:
column.append(null_obj)
else:
column.append(keys[ix])
return column
def to_numpy_array(column: Sequence):
arr = np.empty((len(column),), dtype=np.object)
arr[:] = column
return arr
def pivot(data: Sequence[Sequence], start_row: int, end_row: int) -> Sequence[Sequence]:
return tuple(zip(*data[start_row: end_row]))
def write_str_col(column: Sequence, encoding: Optional[str], dest: bytearray):
app = dest.append
for x in column:
if not x:
app(0)
else:
if encoding:
x = x.encode(encoding)
sz = len(x)
while True:
b = sz & 0x7f
sz >>= 7
if sz == 0:
app(b)
break
app(0x80 | b)
dest += x
|