Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.

package to uv #84

Merged
merged 10 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 11 additions & 24 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,39 +9,26 @@ on:
jobs:
test:
runs-on: ubuntu-latest
services:
databend:
# image: datafuselabs/databend-query
image: datafuselabs/databend
env:
QUERY_DEFAULT_USER: databend
QUERY_DEFAULT_PASSWORD: databend
MINIO_ENABLED: true
ports:
- 8000:8000
- 9000:9000

steps:
- name: Checkout
uses: actions/checkout@v2

- name: Setup Python-3.10
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install uv
uses: astral-sh/setup-uv@v4

- name: Pip Install
run: |
make install
- name: Set up Python
run: uv python install

- name: Verify Service Running
run: |
cid=$(docker ps -a | grep databend | cut -d' ' -f1)
docker logs ${cid}
curl -v http://localhost:8000/v1/health
- name: Install the project
run: uv sync --all-extras --dev

- name: Start databend-server
run: make up

- name: Test
env:
TEST_DATABEND_DSN: "http://databend:databend@localhost:8000/default"
TEST_DATABEND_DSN: "http://root:@localhost:8000/default"
run: |
make lint
make ci
19 changes: 7 additions & 12 deletions .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,14 @@ jobs:
- name: Checkout Repository
uses: actions/checkout@v3

- name: Set up Python
uses: actions/setup-python@v3
with:
python-version: 3.9
- name: Install uv
uses: astral-sh/setup-uv@v4

- name: Install Dependencies
run: |
python -m pip install --upgrade pip
pip install setuptools wheel twine
- name: Set up Python
run: uv python install

- name: Determine Version Change
run: |
export VERSION=$(cat databend_py/VERSION)
- name: Install the project
run: uv sync --all-extras --dev

- name: Release Package and Tag
env:
Expand All @@ -38,7 +33,7 @@ jobs:
git config user.email "[email protected]"
git tag -a "v$VERSION" -m "Release Version $VERSION"
git push origin "v$VERSION"
python setup.py sdist bdist_wheel
uv publish
echo "show user name:"
echo ${{ secrets.TWINE_USERNAME }}
twine upload -u ${{ secrets.TWINE_USERNAME }} -p ${{ secrets.TWINE_PASSWORD }} dist/*
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
upload.csv
.envrc
# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
18 changes: 12 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
prepare:
mkdir -p data/databend

up: prepare
docker compose -f docker-compose.yaml up --quiet-pull -d databend --wait
curl -u root: -XPOST "http://localhost:8000/v1/query" -H 'Content-Type: application/json' -d '{"sql": "select version()", "pagination": { "wait_time_secs": 10}}'

start: up

test:
python tests/test_client.py
uv run pytest .

ci:
python tests/test_client.py
uv run pytest .

lint:
pyflakes .
uv run ruff check

install:
pip install -r requirements.txt
pip install -e .
141 changes: 82 additions & 59 deletions databend_py/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,19 @@ class Client(object):
"""

def __init__(self, *args, **kwargs):
self.settings = (kwargs.pop('settings', None) or {}).copy()
self.result_config = (kwargs.pop('result_config', None) or {}).copy()
self.settings = (kwargs.pop("settings", None) or {}).copy()
self.result_config = (kwargs.pop("result_config", None) or {}).copy()
self.connection = Connection(*args, **kwargs)
self.query_result_cls = QueryResult
self.helper = Helper
self._debug = asbool(self.settings.get('debug', False))
self._uploader = DataUploader(self, self.connection, self.settings, debug=self._debug,
compress=self.settings.get('compress', False))
self._debug = asbool(self.settings.get("debug", False))
self._uploader = DataUploader(
self,
self.connection,
self.settings,
debug=self._debug,
compress=self.settings.get("compress", False),
)

def __enter__(self):
return self
Expand All @@ -34,9 +39,9 @@ def disconnect_connection(self):
self.connection.disconnect()

def _data_generator(self, raw_data):
while raw_data['next_uri'] is not None:
while raw_data["next_uri"] is not None:
try:
raw_data = self._receive_data(raw_data['next_uri'])
raw_data = self._receive_data(raw_data["next_uri"])
yield raw_data
except (Exception, KeyboardInterrupt):
self.disconnect()
Expand All @@ -57,7 +62,8 @@ def _receive_result(self, query, query_id=None, with_column_types=False):
helper.check_error()
gen = self._data_generator(raw_data)
result = self.query_result_cls(
gen, raw_data, with_column_types=with_column_types, **self.result_config)
gen, raw_data, with_column_types=with_column_types, **self.result_config
)
return result.get_result()

def _iter_receive_result(self, query, query_id=None, with_column_types=False):
Expand All @@ -67,14 +73,16 @@ def _iter_receive_result(self, query, query_id=None, with_column_types=False):
helper.check_error()
gen = self._data_generator(raw_data)
result = self.query_result_cls(
gen, raw_data, with_column_types=with_column_types, **self.result_config)
gen, raw_data, with_column_types=with_column_types, **self.result_config
)
_, rows = result.get_result()
for row in rows:
for r in row:
yield r

def execute(self, query, params=None, with_column_types=False,
query_id=None, settings=None):
def execute(
self, query, params=None, with_column_types=False, query_id=None, settings=None
):
"""
Executes query.
:param query: query that will be send to server.
Expand Down Expand Up @@ -112,52 +120,63 @@ def execute(self, query, params=None, with_column_types=False,
return [], rv

column_types, rv = self._process_ordinary_query(
query, params=params, with_column_types=with_column_types,
query_id=query_id)
query, params=params, with_column_types=with_column_types, query_id=query_id
)
return column_types, rv

# params = [(1,),(2,)] or params = [(1,2),(2,3)]
def _process_insert_query(self, query, params):
insert_rows = 0
if "values" in query:
query = query.split("values")[0] + 'values'
query = query.split("values")[0] + "values"
elif "VALUES" in query:
query = query.split("VALUES")[0] + 'VALUES'
if len(query.split(' ')) < 3:
query = query.split("VALUES")[0] + "VALUES"
if len(query.split(" ")) < 3:
raise Exception("Not standard insert/replace statement")
table_name = query.split(' ')[2]
batch_size = query.count(',') + 1
table_name = query.split(" ")[2]
batch_size = query.count(",") + 1
if params is not None and len(params) > 0:
if isinstance(params[0], tuple):
tuple_ls = params
else:
tuple_ls = [tuple(params[i:i + batch_size]) for i in range(0, len(params), batch_size)]
tuple_ls = [
tuple(params[i : i + batch_size])
for i in range(0, len(params), batch_size)
]
insert_rows = len(tuple_ls)
self._uploader.upload_to_table_by_copy(table_name, tuple_ls)
return insert_rows

def _process_ordinary_query(self, query, params=None, with_column_types=False,
query_id=None):
def _process_ordinary_query(
self, query, params=None, with_column_types=False, query_id=None
):
if params is not None:
query = self._substitute_params(
query, params, self.connection.context
)
return self._receive_result(query, query_id=query_id, with_column_types=with_column_types, )

def execute_iter(self, query, params=None, with_column_types=False,
query_id=None, settings=None):
query = self._substitute_params(query, params, self.connection.context)
return self._receive_result(
query,
query_id=query_id,
with_column_types=with_column_types,
)

def execute_iter(
self, query, params=None, with_column_types=False, query_id=None, settings=None
):
if params is not None:
query = self._substitute_params(
query, params, self.connection.context
)
return self._iter_receive_result(query, query_id=query_id, with_column_types=with_column_types)

def _iter_process_ordinary_query(self, query, with_column_types=False, query_id=None):
return self._iter_receive_result(query, query_id=query_id, with_column_types=with_column_types)
query = self._substitute_params(query, params, self.connection.context)
return self._iter_receive_result(
query, query_id=query_id, with_column_types=with_column_types
)

def _iter_process_ordinary_query(
self, query, with_column_types=False, query_id=None
):
return self._iter_receive_result(
query, query_id=query_id, with_column_types=with_column_types
)

def _substitute_params(self, query, params, context):
if not isinstance(params, dict):
raise ValueError('Parameters are expected in dict form')
raise ValueError("Parameters are expected in dict form")

escaped = escape_params(params, context)
return query % escaped
Expand Down Expand Up @@ -186,59 +205,59 @@ def from_url(cls, url):
continue

timeouts = {
'connect_timeout',
'read_timeout',
'send_receive_timeout',
'sync_request_timeout'
"connect_timeout",
"read_timeout",
"send_receive_timeout",
"sync_request_timeout",
}

value = value[0]

if name == 'client_name':
if name == "client_name":
kwargs[name] = value
elif name == 'tenant':
elif name == "tenant":
kwargs[name] = value
elif name == 'warehouse':
elif name == "warehouse":
kwargs[name] = value
elif name == 'secure':
elif name == "secure":
kwargs[name] = asbool(value)
elif name == 'copy_purge':
elif name == "copy_purge":
kwargs[name] = asbool(value)
settings[name] = asbool(value)
elif name == 'debug':
elif name == "debug":
settings[name] = asbool(value)
elif name == 'compress':
elif name == "compress":
settings[name] = asbool(value)
elif name in timeouts:
kwargs[name] = float(value)
elif name == 'persist_cookies':
elif name == "persist_cookies":
kwargs[name] = asbool(value)
elif name == 'null_to_none':
elif name == "null_to_none":
result_config[name] = asbool(value)
else:
settings[name] = value # settings={'copy_purge':False}
secure = kwargs.get("secure", False)
kwargs['secure'] = secure
kwargs["secure"] = secure

host = parsed_url.hostname

if parsed_url.port is not None:
kwargs['port'] = parsed_url.port
kwargs["port"] = parsed_url.port

path = parsed_url.path.replace('/', '', 1)
path = parsed_url.path.replace("/", "", 1)
if path:
kwargs['database'] = path
kwargs["database"] = path

if parsed_url.username is not None:
kwargs['user'] = unquote(parsed_url.username)
kwargs["user"] = unquote(parsed_url.username)

if parsed_url.password is not None:
kwargs['password'] = unquote(parsed_url.password)
kwargs["password"] = unquote(parsed_url.password)

if settings:
kwargs['settings'] = settings
kwargs["settings"] = settings
if result_config:
kwargs['result_config'] = result_config
kwargs["result_config"] = result_config

return cls(host, **kwargs)

Expand All @@ -250,7 +269,9 @@ def insert(self, database_name, table_name, data):
data: the data which write into, it's a list of tuple
"""
# TODO: escape the database & table name
self._uploader.upload_to_table_by_copy("%s.%s" % (database_name, table_name), data)
self._uploader.upload_to_table_by_copy(
"%s.%s" % (database_name, table_name), data
)

def replace(self, database_name, table_name, conflict_keys, data):
"""
Expand All @@ -260,7 +281,9 @@ def replace(self, database_name, table_name, conflict_keys, data):
conflict_keys: the key that use to replace into
data: the data which write into, it's a list of tuple
"""
self._uploader.replace_into_table("%s.%s" % (database_name, table_name), conflict_keys, data)
self._uploader.replace_into_table(
"%s.%s" % (database_name, table_name), conflict_keys, data
)

def upload_to_stage(self, stage_dir, file_name, data):
"""
Expand Down
Loading
Loading