Spaces:
Runtime error
Runtime error
File size: 1,531 Bytes
4a51346 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
from typing import Union
from clickhouse_connect.dbapi.cursor import Cursor
from clickhouse_connect.driver import create_client
from clickhouse_connect.driver.query import QueryResult
class Connection:
"""
See :ref:`https://peps.python.org/pep-0249/`
"""
# pylint: disable=too-many-arguments
def __init__(self,
dsn: str = None,
username: str = '',
password: str = '',
host: str = None,
database: str = None,
interface: str = None,
port: int = 0,
secure: Union[bool, str] = False,
**kwargs):
self.client = create_client(host=host,
username=username,
password=password,
database=database,
interface=interface,
port=port,
secure=secure,
dsn=dsn,
generic_args=kwargs)
self.timezone = self.client.server_tz
def close(self):
self.client.close()
def commit(self):
pass
def rollback(self):
pass
def command(self, cmd: str):
return self.client.command(cmd)
def raw_query(self, query: str) -> QueryResult:
return self.client.query(query)
def cursor(self):
return Cursor(self.client)
|