SungBeom's picture
Upload folder using huggingface_hub
4a51346
raw
history blame
8.43 kB
import logging
from math import log
from typing import Iterable, Sequence, Optional, Any, Dict, NamedTuple, Generator, Union, TYPE_CHECKING
from clickhouse_connect.driver.ctypes import data_conv
from clickhouse_connect.driver.context import BaseQueryContext
from clickhouse_connect.driver.options import np, pd
from clickhouse_connect.driver.exceptions import ProgrammingError
if TYPE_CHECKING:
from clickhouse_connect.datatypes.base import ClickHouseType
logger = logging.getLogger(__name__)
DEFAULT_BLOCK_BYTES = 1 << 24 # Try to generate blocks between 16 and 32MB in raw size
class InsertBlock(NamedTuple):
column_count: int
row_count: int
column_names: Iterable[str]
column_types: Iterable['ClickHouseType']
column_data: Iterable[Sequence[Any]]
# pylint: disable=too-many-instance-attributes
class InsertContext(BaseQueryContext):
"""
Reusable Argument/parameter object for inserts.
"""
# pylint: disable=too-many-arguments
def __init__(self,
table: str,
column_names: Sequence[str],
column_types: Sequence['ClickHouseType'],
data: Any = None,
column_oriented: Optional[bool] = None,
settings: Optional[Dict[str, Any]] = None,
compression: Optional[Union[str, bool]] = None,
query_formats: Optional[Dict[str, str]] = None,
column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None,
block_size: Optional[int] = None):
super().__init__(settings, query_formats, column_formats)
self.table = table
self.column_names = column_names
self.column_types = column_types
self.column_oriented = False if column_oriented is None else column_oriented
self.compression = compression
self.req_block_size = block_size
self.block_size = DEFAULT_BLOCK_BYTES
self.data = data
self.insert_exception = None
@property
def empty(self) -> bool:
return self._data is None
@property
def data(self):
return self._raw_data
@data.setter
def data(self, data: Any):
self._raw_data = data
self.current_block = 0
self.current_row = 0
self.row_count = 0
self.column_count = 0
self._data = None
if data is None or len(data) == 0:
return
if pd and isinstance(data, pd.DataFrame):
data = self._convert_pandas(data)
self.column_oriented = True
if np and isinstance(data, np.ndarray):
data = self._convert_numpy(data)
if self.column_oriented:
self._next_block_data = self._column_block_data
self._block_columns = data # [SliceView(column) for column in data]
self._block_rows = None
self.column_count = len(data)
self.row_count = len(data[0])
else:
self._next_block_data = self._row_block_data
self._block_rows = data
self._block_columns = None
self.row_count = len(data)
self.column_count = len(data[0])
if self.row_count and self.column_count:
if self.column_count != len(self.column_names):
raise ProgrammingError('Insert data column count does not match column names')
self._data = data
self.block_size = self._calc_block_size()
def _calc_block_size(self) -> int:
if self.req_block_size:
return self.req_block_size
row_size = 0
sample_size = min((log(self.row_count) + 1) * 2, 64)
sample_freq = max(1, int(self.row_count / sample_size))
for i, d_type in enumerate(self.column_types):
if d_type.byte_size:
row_size += d_type.byte_size
continue
if self.column_oriented:
col_data = self._data[i]
if sample_freq == 1:
d_size = d_type.data_size(col_data)
else:
sample = [col_data[j] for j in range(0, self.row_count, sample_freq)]
d_size = d_type.data_size(sample)
else:
data = self._data
sample = [data[j][i] for j in range(0, self.row_count, sample_freq)]
d_size = d_type.data_size(sample)
row_size += d_size
return 1 << (24 - int(log(row_size, 2)))
def next_block(self) -> Generator[InsertBlock, None, None]:
while True:
block_end = min(self.current_row + self.block_size, self.row_count)
row_count = block_end - self.current_row
if row_count <= 0:
return
self.current_block += 1
data = self._next_block_data(self.current_row, block_end)
yield InsertBlock(self.column_count, row_count, self.column_names, self.column_types, data)
self.current_row = block_end
def _column_block_data(self, block_start, block_end):
if block_start == 0 and self.row_count <= block_end:
return self._block_columns # Optimization if we don't need to break up the block
return [col[block_start: block_end] for col in self._block_columns]
def _row_block_data(self, block_start, block_end):
return data_conv.pivot(self._block_rows, block_start, block_end)
def _convert_pandas(self, df):
data = []
for df_col_name, col_name, ch_type in zip(df.columns, self.column_names, self.column_types):
df_col = df[df_col_name]
d_type = str(df_col.dtype)
if ch_type.python_type == int:
if 'float' in d_type:
df_col = df_col.round().astype(ch_type.base_type, copy=False)
else:
df_col = df_col.astype(ch_type.base_type, copy=False)
elif 'datetime' in ch_type.np_type and (pd.core.dtypes.common.is_datetime_or_timedelta_dtype(df_col)
or 'datetime64[ns' in d_type):
div = ch_type.nano_divisor
data.append([None if pd.isnull(x) else x.value // div for x in df_col])
self.column_formats[col_name] = 'int'
continue
if ch_type.nullable:
if d_type == 'object':
# This is ugly, but the multiple replaces seem required as a result of this bug:
# https://github.com/pandas-dev/pandas/issues/29024
df_col = df_col.replace({pd.NaT: None}).replace({np.nan: None})
elif 'Float' in ch_type.base_type:
# This seems to be the only way to convert any null looking things to nan
df_col = df_col.astype(ch_type.np_type)
else:
df_col = df_col.replace({np.nan: None})
data.append(df_col.to_numpy(copy=False))
return data
def _convert_numpy(self, np_array):
if np_array.dtype.names is None:
if 'date' in str(np_array.dtype):
for col_name, col_type in zip(self.column_names, self.column_types):
if 'date' in col_type.np_type:
self.column_formats[col_name] = 'int'
return np_array.astype('int').tolist()
for col_type in self.column_types:
if col_type.byte_size == 0 or col_type.byte_size > np_array.dtype.itemsize:
return np_array.tolist()
return np_array
if set(self.column_names).issubset(set(np_array.dtype.names)):
data = [np_array[col_name] for col_name in self.column_names]
else:
# Column names don't match, so we have to assume they are in order
data = [np_array[col_name] for col_name in np_array.dtype.names]
for ix, (col_name, col_type) in enumerate(zip(self.column_names, self.column_types)):
d_type = data[ix].dtype
if 'date' in str(d_type) and 'date' in col_type.np_type:
self.column_formats[col_name] = 'int'
data[ix] = data[ix].astype(int).tolist()
elif col_type.byte_size == 0 or col_type.byte_size > d_type.itemsize:
data[ix] = data[ix].tolist()
self.column_oriented = True
return data