Skip to content

Commit

Permalink
Merge branch main into colin/par-eval-expr
Browse files Browse the repository at this point in the history
  • Loading branch information
colin-ho committed Jan 21, 2025
2 parents ec3702b + bae106c commit bcd8887
Show file tree
Hide file tree
Showing 221 changed files with 8,897 additions and 6,097 deletions.
37 changes: 0 additions & 37 deletions .github/ci-scripts/format_env_vars.py

This file was deleted.

33 changes: 19 additions & 14 deletions .github/ci-scripts/job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@

from ray.job_submission import JobStatus, JobSubmissionClient

# We impose a 5min timeout here
# If any job does *not* finish in 5min, then we cancel it and mark the question as a "DNF" (did-not-finish).
TIMEOUT_S = 60 * 5


def parse_env_var_str(env_var_str: str) -> dict:
iter = map(
Expand All @@ -29,13 +33,17 @@ async def print_logs(logs):
print(lines, end="")


async def wait_on_job(logs, timeout_s):
await asyncio.wait_for(print_logs(logs), timeout=timeout_s)
async def wait_on_job(logs, timeout_s) -> bool:
try:
await asyncio.wait_for(print_logs(logs), timeout=timeout_s)
return False
except asyncio.exceptions.TimeoutError:
return True


@dataclass
class Result:
query: int
arguments: str
duration: timedelta
error_msg: Optional[str]

Expand All @@ -45,16 +53,13 @@ def submit_job(
entrypoint_script: str,
entrypoint_args: str,
env_vars: str,
enable_ray_tracing: bool,
):
if "GHA_OUTPUT_DIR" not in os.environ:
raise RuntimeError("Output directory environment variable not found; don't know where to store outputs")
output_dir = Path(os.environ["GHA_OUTPUT_DIR"])
output_dir.mkdir(exist_ok=True, parents=True)

env_vars_dict = parse_env_var_str(env_vars)
if enable_ray_tracing:
env_vars_dict["DAFT_ENABLE_RAY_TRACING"] = "1"

client = JobSubmissionClient(address="http://localhost:8265")

Expand All @@ -66,7 +71,7 @@ def submit_job(

results = []

for index, args in enumerate(list_of_entrypoint_args):
for args in list_of_entrypoint_args:
entrypoint = f"DAFT_RUNNER=ray python {entrypoint_script} {args}"
print(f"{entrypoint=}")
start = datetime.now()
Expand All @@ -78,18 +83,20 @@ def submit_job(
},
)

asyncio.run(wait_on_job(client.tail_job_logs(job_id), timeout_s=60 * 30))
timed_out = asyncio.run(wait_on_job(client.tail_job_logs(job_id), timeout_s=TIMEOUT_S))

status = client.get_job_status(job_id)
assert status.is_terminal(), "Job should have terminated"
end = datetime.now()
duration = end - start
error_msg = None
if status != JobStatus.SUCCEEDED:
job_info = client.get_job_info(job_id)
error_msg = job_info.message
if timed_out:
error_msg = f"Job exceeded {TIMEOUT_S} second(s)"
else:
job_info = client.get_job_info(job_id)
error_msg = job_info.message

result = Result(query=index, duration=duration, error_msg=error_msg)
result = Result(arguments=args, duration=duration, error_msg=error_msg)
results.append(result)

output_file = output_dir / "out.csv"
Expand All @@ -106,7 +113,6 @@ def submit_job(
parser.add_argument("--entrypoint-script", type=str, required=True)
parser.add_argument("--entrypoint-args", type=str, required=True)
parser.add_argument("--env-vars", type=str, required=True)
parser.add_argument("--enable-ray-tracing", action="store_true")

args = parser.parse_args()

Expand All @@ -122,5 +128,4 @@ def submit_job(
entrypoint_script=args.entrypoint_script,
entrypoint_args=args.entrypoint_args,
env_vars=args.env_vars,
enable_ray_tracing=args.enable_ray_tracing,
)
16 changes: 15 additions & 1 deletion .github/ci-scripts/templatize_ray_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,20 @@ class Metadata(BaseModel, extra="allow"):
sudo chmod 777 /tmp
fi""",
),
"benchmarking-arm": Profile(
instance_type="i8g.4xlarge",
image_id="ami-0d4eea77bb23270f4",
node_count=8,
ssh_user="ubuntu",
volume_mount=""" |
findmnt /tmp 1> /dev/null
code=$?
if [ $code -ne 0 ]; then
sudo mkfs.ext4 /dev/nvme0n1
sudo mount -t ext4 /dev/nvme0n1 /tmp
sudo chmod 777 /tmp
fi""",
),
}


Expand All @@ -71,7 +85,7 @@ class Metadata(BaseModel, extra="allow"):
parser.add_argument("--daft-wheel-url")
parser.add_argument("--daft-version")
parser.add_argument("--python-version", required=True)
parser.add_argument("--cluster-profile", required=True, choices=["debug_xs-x86", "medium-x86"])
parser.add_argument("--cluster-profile", required=True, choices=["debug_xs-x86", "medium-x86", "benchmarking-arm"])
parser.add_argument("--working-dir", required=True)
parser.add_argument("--entrypoint-script", required=True)
args = parser.parse_args()
Expand Down
12 changes: 6 additions & 6 deletions .github/workflows/run-cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ on:
description: Cluster profile
type: choice
options:
- benchmarking-arm
- medium-x86
- debug_xs-x86
required: false
Expand Down Expand Up @@ -49,7 +50,7 @@ jobs:
uses: ./.github/workflows/build-commit.yaml
if: ${{ inputs.daft_version == '' && inputs.daft_wheel_url == '' }}
with:
arch: x86
arch: ${{ (inputs.cluster_profile == 'debug_xs-x86' || inputs.cluster_profile == 'medium-x86') && 'x86' || 'arm' }}
python_version: ${{ inputs.python_version }}
secrets:
ACTIONS_AWS_ROLE_ARN: ${{ secrets.ACTIONS_AWS_ROLE_ARN }}
Expand Down Expand Up @@ -131,13 +132,12 @@ jobs:
--entrypoint-script='${{ inputs.entrypoint_script }}' \
--entrypoint-args='${{ inputs.entrypoint_args }}' \
--env-vars='${{ inputs.env_vars }}' \
--enable-ray-tracing
- name: Download log files from ray cluster
if: always()
run: |
source .venv/bin/activate
ray rsync-down .github/assets/ray.yaml /tmp/ray/session_*/logs ray-daft-logs
find ray-daft-logs -depth -name '*:*' -exec bash -c '
ray rsync-down .github/assets/ray.yaml /tmp/ray/session_*/logs ray-logs
find ray-logs -depth -name '*:*' -exec bash -c '
for filepath; do
dir=$(dirname "$filepath")
base=$(basename "$filepath")
Expand Down Expand Up @@ -172,5 +172,5 @@ jobs:
if: always()
uses: actions/upload-artifact@v4
with:
name: ray-daft-logs
path: ray-daft-logs
name: ray-logs
path: ray-logs
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,6 @@ log/

# helix editor
.helix

# uv
uv.lock
3 changes: 2 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ repos:
(?x)^(
tutorials/.*\.ipynb|
docs/.*\.ipynb|
docs/source/user_guide/fotw/data/
docs/source/user_guide/fotw/data/|
.*\.jsonl
)$
args:
- --autofix
Expand Down
1 change: 1 addition & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ To set up your development environment:
1. `make build`: recompile your code after modifying any Rust code in `src/`
2. `make test`: run tests
3. `DAFT_RUNNER=ray make test`: set the runner to the Ray runner and run tests (DAFT_RUNNER defaults to `py`)
4. `make docs`: build and serve docs

### Developing with Ray

Expand Down
27 changes: 23 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ daft-micropartition = {path = "src/daft-micropartition", default-features = fals
daft-minhash = {path = "src/daft-minhash", default-features = false}
daft-parquet = {path = "src/daft-parquet", default-features = false}
daft-physical-plan = {path = "src/daft-physical-plan", default-features = false}
daft-ray-execution = {path = "src/daft-ray-execution", default-features = false}
daft-scan = {path = "src/daft-scan", default-features = false}
daft-scheduler = {path = "src/daft-scheduler", default-features = false}
daft-sql = {path = "src/daft-sql", default-features = false}
Expand All @@ -56,7 +57,6 @@ python = [
"common-system-info/python",
"daft-catalog-python-catalog/python",
"daft-catalog/python",
"daft-connect/python",
"daft-core/python",
"daft-csv/python",
"daft-dsl/python",
Expand Down Expand Up @@ -172,7 +172,8 @@ members = [
"src/parquet2",
# "src/spark-connect-script",
"src/generated/spark-connect",
"src/common/partitioning"
"src/common/partitioning",
"src/daft-ray-execution"
]

[workspace.dependencies]
Expand Down Expand Up @@ -200,6 +201,7 @@ daft-hash = {path = "src/daft-hash"}
daft-local-execution = {path = "src/daft-local-execution"}
daft-logical-plan = {path = "src/daft-logical-plan"}
daft-micropartition = {path = "src/daft-micropartition"}
daft-ray-execution = {path = "src/daft-ray-execution"}
daft-scan = {path = "src/daft-scan"}
daft-schema = {path = "src/daft-schema"}
daft-sql = {path = "src/daft-sql"}
Expand Down Expand Up @@ -277,7 +279,7 @@ features = ['async']
path = "src/parquet2"

[workspace.dependencies.pyo3]
features = ["extension-module", "multiple-pymethods", "abi3-py39", "indexmap"]
features = ["extension-module", "multiple-pymethods", "abi3-py39", "indexmap", "chrono"]
version = "0.23.3"

[workspace.dependencies.pyo3-log]
Expand Down
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ test: .venv build ## Run tests
dsdgen: .venv ## Generate TPC-DS data
$(VENV_BIN)/python benchmarking/tpcds/datagen.py --scale-factor=$(SCALE_FACTOR) --tpcds-gen-folder=$(OUTPUT_DIR)

.PHONY: docs
docs: .venv ## Serve docs
uv run --with-requirements requirements-docs.txt mkdocs serve

.PHONY: clean
clean:
rm -rf $(VENV)
Loading

0 comments on commit bcd8887

Please sign in to comment.