Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
package to uv
Browse files Browse the repository at this point in the history
  • Loading branch information
sundy-li committed Dec 7, 2024
1 parent 1bffb20 commit 43cc5f5
Show file tree
Hide file tree
Showing 26 changed files with 854 additions and 331 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ jobs:
with:
python-version: '3.10'

- name: Pip Install
- name: Uvx Install
run: |
make install
curl -LsSf https://astral.sh/uv/install.sh | sh
- name: Verify Service Running
run: |
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
upload.csv
.envrc
# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
9 changes: 3 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
test:
python tests/test_client.py
pytest .

ci:
python tests/test_client.py
pytest .

lint:
pyflakes .
uvx ruff check

install:
pip install -r requirements.txt
pip install -e .
141 changes: 82 additions & 59 deletions databend_py/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,19 @@ class Client(object):
"""

def __init__(self, *args, **kwargs):
self.settings = (kwargs.pop('settings', None) or {}).copy()
self.result_config = (kwargs.pop('result_config', None) or {}).copy()
self.settings = (kwargs.pop("settings", None) or {}).copy()
self.result_config = (kwargs.pop("result_config", None) or {}).copy()
self.connection = Connection(*args, **kwargs)
self.query_result_cls = QueryResult
self.helper = Helper
self._debug = asbool(self.settings.get('debug', False))
self._uploader = DataUploader(self, self.connection, self.settings, debug=self._debug,
compress=self.settings.get('compress', False))
self._debug = asbool(self.settings.get("debug", False))
self._uploader = DataUploader(
self,
self.connection,
self.settings,
debug=self._debug,
compress=self.settings.get("compress", False),
)

def __enter__(self):
return self
Expand All @@ -34,9 +39,9 @@ def disconnect_connection(self):
self.connection.disconnect()

def _data_generator(self, raw_data):
while raw_data['next_uri'] is not None:
while raw_data["next_uri"] is not None:
try:
raw_data = self._receive_data(raw_data['next_uri'])
raw_data = self._receive_data(raw_data["next_uri"])
yield raw_data
except (Exception, KeyboardInterrupt):
self.disconnect()
Expand All @@ -57,7 +62,8 @@ def _receive_result(self, query, query_id=None, with_column_types=False):
helper.check_error()
gen = self._data_generator(raw_data)
result = self.query_result_cls(
gen, raw_data, with_column_types=with_column_types, **self.result_config)
gen, raw_data, with_column_types=with_column_types, **self.result_config
)
return result.get_result()

def _iter_receive_result(self, query, query_id=None, with_column_types=False):
Expand All @@ -67,14 +73,16 @@ def _iter_receive_result(self, query, query_id=None, with_column_types=False):
helper.check_error()
gen = self._data_generator(raw_data)
result = self.query_result_cls(
gen, raw_data, with_column_types=with_column_types, **self.result_config)
gen, raw_data, with_column_types=with_column_types, **self.result_config
)
_, rows = result.get_result()
for row in rows:
for r in row:
yield r

def execute(self, query, params=None, with_column_types=False,
query_id=None, settings=None):
def execute(
self, query, params=None, with_column_types=False, query_id=None, settings=None
):
"""
Executes query.
:param query: query that will be send to server.
Expand Down Expand Up @@ -112,52 +120,63 @@ def execute(self, query, params=None, with_column_types=False,
return [], rv

column_types, rv = self._process_ordinary_query(
query, params=params, with_column_types=with_column_types,
query_id=query_id)
query, params=params, with_column_types=with_column_types, query_id=query_id
)
return column_types, rv

# params = [(1,),(2,)] or params = [(1,2),(2,3)]
def _process_insert_query(self, query, params):
insert_rows = 0
if "values" in query:
query = query.split("values")[0] + 'values'
query = query.split("values")[0] + "values"
elif "VALUES" in query:
query = query.split("VALUES")[0] + 'VALUES'
if len(query.split(' ')) < 3:
query = query.split("VALUES")[0] + "VALUES"
if len(query.split(" ")) < 3:
raise Exception("Not standard insert/replace statement")
table_name = query.split(' ')[2]
batch_size = query.count(',') + 1
table_name = query.split(" ")[2]
batch_size = query.count(",") + 1
if params is not None and len(params) > 0:
if isinstance(params[0], tuple):
tuple_ls = params
else:
tuple_ls = [tuple(params[i:i + batch_size]) for i in range(0, len(params), batch_size)]
tuple_ls = [
tuple(params[i : i + batch_size])
for i in range(0, len(params), batch_size)
]
insert_rows = len(tuple_ls)
self._uploader.upload_to_table_by_copy(table_name, tuple_ls)
return insert_rows

def _process_ordinary_query(self, query, params=None, with_column_types=False,
query_id=None):
def _process_ordinary_query(
self, query, params=None, with_column_types=False, query_id=None
):
if params is not None:
query = self._substitute_params(
query, params, self.connection.context
)
return self._receive_result(query, query_id=query_id, with_column_types=with_column_types, )

def execute_iter(self, query, params=None, with_column_types=False,
query_id=None, settings=None):
query = self._substitute_params(query, params, self.connection.context)
return self._receive_result(
query,
query_id=query_id,
with_column_types=with_column_types,
)

def execute_iter(
self, query, params=None, with_column_types=False, query_id=None, settings=None
):
if params is not None:
query = self._substitute_params(
query, params, self.connection.context
)
return self._iter_receive_result(query, query_id=query_id, with_column_types=with_column_types)

def _iter_process_ordinary_query(self, query, with_column_types=False, query_id=None):
return self._iter_receive_result(query, query_id=query_id, with_column_types=with_column_types)
query = self._substitute_params(query, params, self.connection.context)
return self._iter_receive_result(
query, query_id=query_id, with_column_types=with_column_types
)

def _iter_process_ordinary_query(
self, query, with_column_types=False, query_id=None
):
return self._iter_receive_result(
query, query_id=query_id, with_column_types=with_column_types
)

def _substitute_params(self, query, params, context):
if not isinstance(params, dict):
raise ValueError('Parameters are expected in dict form')
raise ValueError("Parameters are expected in dict form")

escaped = escape_params(params, context)
return query % escaped
Expand Down Expand Up @@ -186,59 +205,59 @@ def from_url(cls, url):
continue

timeouts = {
'connect_timeout',
'read_timeout',
'send_receive_timeout',
'sync_request_timeout'
"connect_timeout",
"read_timeout",
"send_receive_timeout",
"sync_request_timeout",
}

value = value[0]

if name == 'client_name':
if name == "client_name":
kwargs[name] = value
elif name == 'tenant':
elif name == "tenant":
kwargs[name] = value
elif name == 'warehouse':
elif name == "warehouse":
kwargs[name] = value
elif name == 'secure':
elif name == "secure":
kwargs[name] = asbool(value)
elif name == 'copy_purge':
elif name == "copy_purge":
kwargs[name] = asbool(value)
settings[name] = asbool(value)
elif name == 'debug':
elif name == "debug":
settings[name] = asbool(value)
elif name == 'compress':
elif name == "compress":
settings[name] = asbool(value)
elif name in timeouts:
kwargs[name] = float(value)
elif name == 'persist_cookies':
elif name == "persist_cookies":
kwargs[name] = asbool(value)
elif name == 'null_to_none':
elif name == "null_to_none":
result_config[name] = asbool(value)
else:
settings[name] = value # settings={'copy_purge':False}
secure = kwargs.get("secure", False)
kwargs['secure'] = secure
kwargs["secure"] = secure

host = parsed_url.hostname

if parsed_url.port is not None:
kwargs['port'] = parsed_url.port
kwargs["port"] = parsed_url.port

path = parsed_url.path.replace('/', '', 1)
path = parsed_url.path.replace("/", "", 1)
if path:
kwargs['database'] = path
kwargs["database"] = path

if parsed_url.username is not None:
kwargs['user'] = unquote(parsed_url.username)
kwargs["user"] = unquote(parsed_url.username)

if parsed_url.password is not None:
kwargs['password'] = unquote(parsed_url.password)
kwargs["password"] = unquote(parsed_url.password)

if settings:
kwargs['settings'] = settings
kwargs["settings"] = settings
if result_config:
kwargs['result_config'] = result_config
kwargs["result_config"] = result_config

return cls(host, **kwargs)

Expand All @@ -250,7 +269,9 @@ def insert(self, database_name, table_name, data):
data: the data which write into, it's a list of tuple
"""
# TODO: escape the database & table name
self._uploader.upload_to_table_by_copy("%s.%s" % (database_name, table_name), data)
self._uploader.upload_to_table_by_copy(
"%s.%s" % (database_name, table_name), data
)

def replace(self, database_name, table_name, conflict_keys, data):
"""
Expand All @@ -260,7 +281,9 @@ def replace(self, database_name, table_name, conflict_keys, data):
conflict_keys: the key that use to replace into
data: the data which write into, it's a list of tuple
"""
self._uploader.replace_into_table("%s.%s" % (database_name, table_name), conflict_keys, data)
self._uploader.replace_into_table(
"%s.%s" % (database_name, table_name), conflict_keys, data
)

def upload_to_stage(self, stage_dir, file_name, data):
"""
Expand Down
Loading

0 comments on commit 43cc5f5

Please sign in to comment.