Skip to content

Commit

Permalink
Add query_table_version to the rest client (#111)
Browse files Browse the repository at this point in the history
This is proposed in #82 

Also remove the restriction for Python 3.10 #110 

Closes #82 and #110
  • Loading branch information
nkvuong authored Jan 29, 2022
1 parent c292372 commit eed5241
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 13 deletions.
9 changes: 9 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,12 @@ spark-warehouse/

# For Python
*.egg-info

# For VSCode
*.vscode

# For Metals
*.metals

# For venv
*.venv
9 changes: 6 additions & 3 deletions python/delta_sharing/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,12 @@ def to_pandas(self) -> pd.DataFrame:
if left == 0:
break

return pd.concat(pdfs, axis=0, ignore_index=True, copy=False,)[
[field["name"] for field in schema_json["fields"]]
]
return pd.concat(
pdfs,
axis=0,
ignore_index=True,
copy=False,
)[[field["name"] for field in schema_json["fields"]]]

def _copy(
self, *, predicateHints: Optional[Sequence[str]], limit: Optional[int]
Expand Down
31 changes: 30 additions & 1 deletion python/delta_sharing/rest_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ class QueryTableMetadataResponse:
metadata: Metadata


@dataclass(frozen=True)
class QueryTableVersionResponse:
delta_table_version: int


@dataclass(frozen=True)
class ListFilesInTableResponse:
protocol: Protocol
Expand Down Expand Up @@ -219,6 +224,19 @@ def query_table_metadata(self, table: Table) -> QueryTableMetadataResponse:
metadata=Metadata.from_json(metadata_json["metaData"]),
)

@retry_with_exponential_backoff
def query_table_version(self, table: Table) -> QueryTableVersionResponse:
headers = self._head_internal(
f"/shares/{table.share}/schemas/{table.schema}/tables/{table.name}"
)

# it's a bug in the server if it doesn't return delta-table-version in the header
if "delta-table-version" not in headers:
raise LookupError("Missing delta-table-version header")

table_version = int(headers.get("delta-table-version"))
return QueryTableVersionResponse(delta_table_version=table_version)

@retry_with_exponential_backoff
def list_files_in_table(
self,
Expand All @@ -234,7 +252,8 @@ def list_files_in_table(
data["limitHint"] = limitHint

with self._post_internal(
f"/shares/{table.share}/schemas/{table.schema}/tables/{table.name}/query", data=data,
f"/shares/{table.share}/schemas/{table.schema}/tables/{table.name}/query",
data=data,
) as lines:
protocol_json = json.loads(next(lines))
metadata_json = json.loads(next(lines))
Expand All @@ -253,6 +272,16 @@ def _get_internal(self, target: str, data: Optional[Dict[str, Any]] = None):
def _post_internal(self, target: str, data: Optional[Dict[str, Any]] = None):
return self._request_internal(request=self._session.post, target=target, json=data)

def _head_internal(self, target: str):
assert target.startswith("/"), "Targets should start with '/'"
response = self._session.head(f"{self._profile.endpoint}{target}")
try:
response.raise_for_status()
headers = response.headers
return headers
finally:
response.close()

@contextmanager
def _request_internal(self, request, target: str, **kwargs) -> Generator[str, None, None]:
assert target.startswith("/"), "Targets should start with '/'"
Expand Down
21 changes: 18 additions & 3 deletions python/delta_sharing/tests/test_delta_sharing.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,19 +273,34 @@ def list_all_tables(
pytest.param(
"share_azure.default.table_wasb",
None,
pd.DataFrame({"c1": ["foo bar"], "c2": ["foo bar"],}),
pd.DataFrame(
{
"c1": ["foo bar"],
"c2": ["foo bar"],
}
),
id="Azure Blob Storage",
),
pytest.param(
"share_azure.default.table_abfs",
None,
pd.DataFrame({"c1": ["foo bar"], "c2": ["foo bar"],}),
pd.DataFrame(
{
"c1": ["foo bar"],
"c2": ["foo bar"],
}
),
id="Azure Data Lake Storage Gen2",
),
pytest.param(
"share_gcp.default.table_gcs",
None,
pd.DataFrame({"c1": ["foo bar"], "c2": ["foo bar"],}),
pd.DataFrame(
{
"c1": ["foo bar"],
"c2": ["foo bar"],
}
),
id="Google Cloud Storage",
),
],
Expand Down
30 changes: 27 additions & 3 deletions python/delta_sharing/tests/test_rest_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@
Share,
Table,
)
from delta_sharing.rest_client import DataSharingRestClient, retry_with_exponential_backoff
from delta_sharing.rest_client import (
DataSharingRestClient,
retry_with_exponential_backoff,
)
from delta_sharing.tests.conftest import ENABLE_INTEGRATION, SKIP_MESSAGE


Expand Down Expand Up @@ -186,7 +189,9 @@ def test_query_table_metadata_partitioned(rest_client: DataSharingRestClient):


@pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE)
def test_query_table_metadata_partitioned_different_schemas(rest_client: DataSharingRestClient):
def test_query_table_metadata_partitioned_different_schemas(
rest_client: DataSharingRestClient,
):
response = rest_client.query_table_metadata(
Table(name="table3", share="share1", schema="default")
)
Expand All @@ -205,6 +210,23 @@ def test_query_table_metadata_partitioned_different_schemas(rest_client: DataSha
)


@pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE)
def test_query_existed_table_version(rest_client: DataSharingRestClient):
response = rest_client.query_table_version(
Table(name="table1", share="share1", schema="default")
)
assert isinstance(response.delta_table_version, int)
assert response.delta_table_version > 0


@pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE)
def test_query_nonexistent_table_version(rest_client: DataSharingRestClient):
with pytest.raises(HTTPError):
rest_client.query_table_version(
Table(name="nonexistenttable", share="share1", schema="default")
)


@pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE)
def test_list_files_in_table_non_partitioned(rest_client: DataSharingRestClient):
response = rest_client.list_files_in_table(
Expand Down Expand Up @@ -299,7 +321,9 @@ def test_list_files_in_table_partitioned(rest_client: DataSharingRestClient):


@pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE)
def test_list_files_in_table_partitioned_different_schemas(rest_client: DataSharingRestClient):
def test_list_files_in_table_partitioned_different_schemas(
rest_client: DataSharingRestClient,
):
response = rest_client.list_files_in_table(
Table(name="table3", share="share1", schema="default")
)
Expand Down
2 changes: 1 addition & 1 deletion python/dev/reformat
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ cd "$FWDIR"
PYTHON_EXECUTABLE="${PYTHON_EXECUTABLE:-python}"

BLACK_BUILD="$PYTHON_EXECUTABLE -m black"
BLACK_VERSION="19.10b0"
BLACK_VERSION="21.12b0"
$BLACK_BUILD 2> /dev/null
if [ $? -ne 0 ]; then
echo "The '$BLACK_BUILD' command was not found. Please install Black, for example, via 'pip install black==$BLACK_VERSION'."
Expand Down
2 changes: 1 addition & 1 deletion python/requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ mypy==0.812
flake8

# Code formatter. Only support Python 3.6+
black==19.10b0
black==21.12b0

# Test
pytest
2 changes: 1 addition & 1 deletion python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
packages=[
'delta_sharing',
],
python_requires='>=3.6,<3.10',
python_requires='>=3.6',
install_requires=[
'pandas',
'pyarrow>=4.0.0',
Expand Down

0 comments on commit eed5241

Please sign in to comment.