Spaces:
Runtime error
Runtime error
File size: 3,743 Bytes
4a51346 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
from sqlalchemy.engine.default import DefaultDialect
from clickhouse_connect import dbapi
from clickhouse_connect.cc_sqlalchemy.inspector import ChInspector
from clickhouse_connect.cc_sqlalchemy.sql import full_table
from clickhouse_connect.cc_sqlalchemy.sql.ddlcompiler import ChDDLCompiler
from clickhouse_connect.cc_sqlalchemy import ischema_names, dialect_name
from clickhouse_connect.cc_sqlalchemy.sql.preparer import ChIdentifierPreparer
from clickhouse_connect.driver.query import quote_identifier, format_str
# pylint: disable=too-many-public-methods,no-self-use,unused-argument
class ClickHouseDialect(DefaultDialect):
"""
See :py:class:`sqlalchemy.engine.interfaces`
"""
name = dialect_name
driver = 'connect'
default_schema_name = 'default'
supports_native_decimal = True
supports_native_boolean = True
returns_unicode_strings = True
postfetch_lastrowid = False
ddl_compiler = ChDDLCompiler
preparer = ChIdentifierPreparer
description_encoding = None
max_identifier_length = 127
ischema_names = ischema_names
inspector = ChInspector
# pylint: disable=method-hidden
@classmethod
def dbapi(cls):
return dbapi
def initialize(self, connection):
pass
@staticmethod
def get_schema_names(connection, **_):
return [row.name for row in connection.execute('SHOW DATABASES')]
@staticmethod
def has_database(connection, db_name):
return (connection.execute('SELECT name FROM system.databases ' +
f'WHERE name = {format_str(db_name)}')).rowcount > 0
def get_table_names(self, connection, schema=None, **kw):
cmd = 'SHOW TABLES'
if schema:
cmd += ' FROM ' + quote_identifier(schema)
return [row.name for row in connection.execute(cmd)]
def get_primary_keys(self, connection, table_name, schema=None, **kw):
return []
# pylint: disable=arguments-renamed
def get_pk_constraint(self, connection, table_name, schema=None, **kw):
return []
def get_foreign_keys(self, connection, table_name, schema=None, **kw):
return []
def get_temp_table_names(self, connection, schema=None, **kw):
return []
def get_view_names(self, connection, schema=None, **kw):
return []
def get_temp_view_names(self, connection, schema=None, **kw):
return []
def get_view_definition(self, connection, view_name, schema=None, **kw):
pass
def get_indexes(self, connection, table_name, schema=None, **kw):
return []
def get_unique_constraints(self, connection, table_name, schema=None, **kw):
return []
def get_check_constraints(self, connection, table_name, schema=None, **kw):
return []
def has_table(self, connection, table_name, schema=None, **_kw):
result = connection.execute(f'EXISTS TABLE {full_table(table_name, schema)}')
row = result.fetchone()
return row[0] == 1
def has_sequence(self, connection, sequence_name, schema=None, **_kw):
return False
def do_begin_twophase(self, connection, xid):
raise NotImplementedError
def do_prepare_twophase(self, connection, xid):
raise NotImplementedError
def do_rollback_twophase(self, connection, xid, is_prepared=True, recover=False):
raise NotImplementedError
def do_commit_twophase(self, connection, xid, is_prepared=True, recover=False):
raise NotImplementedError
def do_recover_twophase(self, connection):
raise NotImplementedError
def set_isolation_level(self, dbapi_conn, level):
pass
def get_isolation_level(self, dbapi_conn):
return None
|