Spaces:
Runtime error
Runtime error
File size: 4,383 Bytes
4a51346 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
import sys
import array
from typing import Any, Iterable
from clickhouse_connect.driver.exceptions import StreamCompleteException
from clickhouse_connect.driver.types import ByteSource
must_swap = sys.byteorder == 'big'
class ResponseBuffer(ByteSource):
slots = 'slice_sz', 'buf_loc', 'end', 'gen', 'buffer', 'slice'
def __init__(self, source):
self.slice_sz = 4096
self.buf_loc = 0
self.buf_sz = 0
self.source = source
self.gen = source.gen
self.buffer = bytes()
def read_bytes(self, sz: int):
if self.buf_loc + sz <= self.buf_sz:
self.buf_loc += sz
return self.buffer[self.buf_loc - sz: self.buf_loc]
# Create a temporary buffer that bridges two or more source chunks
bridge = bytearray(self.buffer[self.buf_loc: self.buf_sz])
self.buf_loc = 0
self.buf_sz = 0
while len(bridge) < sz:
chunk = next(self.gen, None)
if not chunk:
raise StreamCompleteException
x = len(chunk)
if len(bridge) + x <= sz:
bridge.extend(chunk)
else:
tail = sz - len(bridge)
bridge.extend(chunk[:tail])
self.buffer = chunk
self.buf_sz = x
self.buf_loc = tail
return bridge
def read_byte(self) -> int:
if self.buf_loc < self.buf_sz:
self.buf_loc += 1
return self.buffer[self.buf_loc - 1]
self.buf_sz = 0
self.buf_loc = 0
chunk = next(self.gen, None)
if not chunk:
raise StreamCompleteException
x = len(chunk)
if x > 1:
self.buffer = chunk
self.buf_loc = 1
self.buf_sz = x
return chunk[0]
def read_leb128(self) -> int:
sz = 0
shift = 0
while True:
b = self.read_byte()
sz += ((b & 0x7f) << shift)
if (b & 0x80) == 0:
return sz
shift += 7
def read_leb128_str(self) -> str:
sz = self.read_leb128()
return self.read_bytes(sz).decode()
def read_uint64(self) -> int:
return int.from_bytes(self.read_bytes(8), 'little', signed=False)
def read_str_col(self,
num_rows: int,
encoding: str,
nullable: bool = False,
null_obj: Any = None) -> Iterable[str]:
column = []
app = column.append
null_map = self.read_bytes(num_rows) if nullable else None
for ix in range(num_rows):
sz = 0
shift = 0
while True:
b = self.read_byte()
sz += ((b & 0x7f) << shift)
if (b & 0x80) == 0:
break
shift += 7
x = self.read_bytes(sz)
if null_map and null_map[ix]:
app(null_obj)
elif encoding:
try:
app(x.decode(encoding))
except UnicodeDecodeError:
app(x.hex())
else:
app(x)
return column
def read_bytes_col(self, sz: int, num_rows: int) -> Iterable[bytes]:
source = self.read_bytes(sz * num_rows)
return [bytes(source[x:x+sz]) for x in range(0, sz * num_rows, sz)]
def read_fixed_str_col(self, sz: int, num_rows: int, encoding: str) -> Iterable[str]:
source = self.read_bytes(sz * num_rows)
column = []
app = column.append
for ix in range(0, sz * num_rows, sz):
try:
app(str(source[ix: ix + sz], encoding).rstrip('\x00'))
except UnicodeDecodeError:
app(source[ix: ix + sz].hex())
return column
def read_array(self, array_type: str, num_rows: int) -> Iterable[Any]:
column = array.array(array_type)
sz = column.itemsize * num_rows
b = self.read_bytes(sz)
column.frombytes(b)
if must_swap:
column.byteswap()
return column
@property
def last_message(self):
if len(self.buffer) == 0:
return None
return self.buffer.decode()
def close(self):
if self.source:
self.source.close()
self.source = None
|