Spaces:
Runtime error
Runtime error
File size: 4,240 Bytes
4a51346 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
import logging
from typing import Generator, Sequence, Tuple
from clickhouse_connect.driver.common import empty_gen, StreamContext
from clickhouse_connect.driver.exceptions import StreamClosedError
from clickhouse_connect.driver.types import Closable
from clickhouse_connect.driver.options import np, pd
logger = logging.getLogger(__name__)
# pylint: disable=too-many-instance-attributes
class NumpyResult(Closable):
def __init__(self,
block_gen: Generator[Sequence, None, None] = None,
column_names: Tuple = (),
column_types: Tuple = (),
d_types: Sequence = (),
source: Closable = None):
self.column_names = column_names
self.column_types = column_types
self.np_types = d_types
self.source = source
self.query_id = ''
self.summary = {}
self._block_gen = block_gen or empty_gen()
self._numpy_result = None
self._df_result = None
def _np_stream(self) -> Generator:
if self._block_gen is None:
raise StreamClosedError
block_gen = self._block_gen
self._block_gen = None
if not self.np_types:
return block_gen
d_types = self.np_types
first_type = d_types[0]
if first_type != np.object_ and all(np.dtype(np_type) == first_type for np_type in d_types):
self.np_types = first_type
def numpy_blocks():
for block in block_gen:
yield np.array(block, first_type).transpose()
else:
if any(x == np.object_ for x in d_types):
self.np_types = [np.object_] * len(self.np_types)
self.np_types = np.dtype(list(zip(self.column_names, d_types)))
def numpy_blocks():
for block in block_gen:
np_array = np.empty(len(block[0]), dtype=self.np_types)
for col_name, data in zip(self.column_names, block):
np_array[col_name] = data
yield np_array
return numpy_blocks()
def _df_stream(self) -> Generator:
if self._block_gen is None:
raise StreamClosedError
block_gen = self._block_gen
def pd_blocks():
for block in block_gen:
yield pd.DataFrame(dict(zip(self.column_names, block)))
self._block_gen = None
return pd_blocks()
def close_numpy(self):
if not self._block_gen:
raise StreamClosedError
chunk_size = 4
pieces = []
blocks = []
for block in self._np_stream():
blocks.append(block)
if len(blocks) == chunk_size:
pieces.append(np.concatenate(blocks, dtype=self.np_types))
chunk_size *= 2
blocks = []
pieces.extend(blocks)
if len(pieces) > 1:
self._numpy_result = np.concatenate(pieces, dtype=self.np_types)
elif len(pieces) == 1:
self._numpy_result = pieces[0]
else:
self._numpy_result = np.empty((0,))
self.close()
return self
def close_df(self):
pieces = list(self._df_stream())
if len(pieces) > 1:
self._df_result = pd.concat(pieces, ignore_index=True)
elif len(pieces) == 1:
self._df_result = pieces[0]
else:
self._df_result = pd.DataFrame()
self.close()
return self
@property
def np_result(self):
if self._numpy_result is None:
self.close_numpy()
return self._numpy_result
@property
def df_result(self):
if self._df_result is None:
self.close_df()
return self._df_result
@property
def np_stream(self) -> StreamContext:
return StreamContext(self, self._np_stream())
@property
def df_stream(self) -> StreamContext:
return StreamContext(self, self._df_stream())
def close(self):
if self._block_gen is not None:
self._block_gen.close()
self._block_gen = None
if self.source:
self.source.close()
self.source = None
|