Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement custom storage for orgs #2093

Open
wants to merge 64 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
8388ce2
Add back custom storage endpoints
tw4l Sep 20, 2024
5fbdca1
Flush out tests for setting custom storage
tw4l Sep 20, 2024
fb18134
Fix test issue with bucket not existing for now
tw4l Sep 20, 2024
0302275
Add additional tests
tw4l Sep 23, 2024
19850ba
Fix custom storage so it works as expected
tw4l Sep 24, 2024
c78b5b8
Actually unset custom replica storage before deleting
tw4l Sep 24, 2024
d5fd8e1
Add TODO where custom storage deletion is failing
tw4l Sep 24, 2024
1dcd39b
Fix check for whether storage label is in use
tw4l Sep 24, 2024
1a1cb30
Remove todo on endpoint that's fine
tw4l Sep 24, 2024
7558ac3
Add todos re: tasks necessary to change storage
tw4l Sep 24, 2024
46e0fc9
Check that no crawls are running before updating storage
tw4l Sep 25, 2024
791ca6e
Start adding post-storage update logic
tw4l Sep 25, 2024
b899ecf
WIP: Add background job to copy old s3 bucket to new
tw4l Sep 25, 2024
21070d6
WIP: Start adding logic to handle replica location updates
tw4l Sep 25, 2024
7b6a917
Add additional note
tw4l Sep 25, 2024
3b3116f
Fix argument
tw4l Sep 25, 2024
9076555
Fix another argument
tw4l Sep 25, 2024
853b840
Fixups
tw4l Sep 25, 2024
3a4b43c
Fix linting
tw4l Sep 25, 2024
048a563
More linting fixes
tw4l Sep 25, 2024
e3c41b9
Refactor, seperate storage and replicas updates
tw4l Sep 26, 2024
34310de
More refactoring
tw4l Sep 26, 2024
80cc5d4
Make post-update task methods private
tw4l Sep 26, 2024
4112044
Check if any bg jobs running before changing storage
tw4l Sep 26, 2024
287fae9
Check bg job finished as well
tw4l Sep 26, 2024
5096005
Fixups
tw4l Sep 26, 2024
7d7fd36
Storage update improvements
tw4l Sep 26, 2024
cb461f6
Fixup
tw4l Sep 26, 2024
6e3f3ef
Remove TODO
tw4l Sep 26, 2024
cf40245
Remove another todo
tw4l Sep 26, 2024
3de96a8
More fixups
tw4l Sep 26, 2024
db65dd8
Add provider to s3storage for rclone
tw4l Sep 26, 2024
89585fa
Fix typo
tw4l Sep 26, 2024
8abe1ef
Make API endpoints that change storage superuser-only for now
tw4l Sep 30, 2024
c146ab3
Add typing for init_storages_api, import Callable
tw4l Sep 30, 2024
abc238e
Add missing User import
tw4l Sep 30, 2024
29a54b7
Fix StorageOps in operator main
tw4l Oct 1, 2024
f348a16
Always use oid prefix in s3 storage
tw4l Oct 1, 2024
e67868b
Post-rebase fixups and remove create bucket fallback
tw4l Oct 10, 2024
e2dfa93
Create extra test buckets in CI
tw4l Oct 15, 2024
e054158
Add test for non-verified custom storage
tw4l Oct 15, 2024
f0dc334
Refactor to move updates to FastAPI background tasks
tw4l Oct 15, 2024
0d3aa35
Include default replicas in /storage response if no org replicas
tw4l Oct 15, 2024
7a8ec58
Fix unsetting of presigned URLs
tw4l Oct 16, 2024
6277331
Add --progress flag to rclone copy command
tw4l Oct 16, 2024
1eb533c
Increase ttl seconds after finished for testing on dev
tw4l Oct 17, 2024
be10a9a
Ensure there are no double slashes between bucket name and oid
tw4l Oct 17, 2024
e028dc8
Increase memory limit/request for copy job to 500Mi
tw4l Oct 17, 2024
edcd5c6
Reduce copy job ttlSecondsAfterFinished to 60
tw4l Oct 17, 2024
d676113
Add storage tag to API endpoints
tw4l Oct 17, 2024
ef59178
Add flags to rclone to reduce memory usage, set limit to 350Mi
tw4l Oct 17, 2024
07490fb
Fix positional operator in storage ref update
tw4l Oct 17, 2024
4f1aa1d
One more positional operator fix
tw4l Oct 17, 2024
5d81ed2
Update docstrings and comments
tw4l Oct 17, 2024
b095023
Make all-storages response valid JSON with response model
tw4l Oct 17, 2024
085209b
Add admin docs for storage
tw4l Oct 17, 2024
76285ef
Fix API endpoint path in docs example
tw4l Oct 17, 2024
41d4f79
Docs typo fix
tw4l Oct 17, 2024
db12ef9
Add provider field note
tw4l Oct 17, 2024
5ca80f0
Docs language cleanup
tw4l Oct 17, 2024
af0d966
Check /all-storages in backend tests
tw4l Oct 17, 2024
b461113
Add API endpoint for background job progress
tw4l Oct 18, 2024
60582c1
Fix linting
tw4l Oct 18, 2024
c889664
Format post-rebase with Black
tw4l Dec 3, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/k3d-ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ jobs:
- name: Wait for all pods to be ready
run: kubectl wait --for=condition=ready pod --all --timeout=240s

- name: Create Extra Test Buckets
run: |
kubectl exec -i deployment/local-minio -c minio -- mkdir /data/custom-primary &&
kubectl exec -i deployment/local-minio -c minio -- mkdir /data/custom-replica

- name: Run Tests
timeout-minutes: 30
run: pytest -vv ./backend/test/test_*.py
Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/microk8s-ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ jobs:
- name: Wait for all pods to be ready
run: sudo microk8s kubectl wait --for=condition=ready pod --all --timeout=240s

- name: Create Extra Test Buckets
run: |
kubectl exec -i deployment/local-minio -c minio -- mkdir /data/custom-primary &&
kubectl exec -i deployment/local-minio -c minio -- mkdir /data/custom-replica

- name: Run Tests
run: pytest -vv ./backend/test/test_*.py

Expand Down
180 changes: 169 additions & 11 deletions backend/btrixcloud/background_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,18 @@
from .models import (
BaseFile,
Organization,
BackgroundJob,
BgJobType,
CreateReplicaJob,
DeleteReplicaJob,
DeleteOrgJob,
RecalculateOrgStatsJob,
CopyBucketJob,
PaginatedBackgroundJobResponse,
AnyJob,
StorageRef,
User,
SuccessResponse,
JobProgress,
)
from .pagination import DEFAULT_PAGE_SIZE, paginated_format
from .utils import dt_now
Expand All @@ -51,7 +52,7 @@ class BackgroundJobOps:
base_crawl_ops: BaseCrawlOps
profile_ops: ProfileOps

# pylint: disable=too-many-locals, too-many-arguments, invalid-name
# pylint: disable=too-many-locals, too-many-arguments, too-many-positional-arguments, invalid-name

def __init__(self, mdb, email, user_manager, org_ops, crawl_manager, storage_ops):
self.jobs = mdb["jobs"]
Expand Down Expand Up @@ -295,14 +296,18 @@ async def create_delete_org_job(
self,
org: Organization,
existing_job_id: Optional[str] = None,
) -> Optional[str]:
) -> str:
"""Create background job to delete org and its data"""

job_type = BgJobType.DELETE_ORG.value

try:
job_id = await self.crawl_manager.run_delete_org_job(
oid=str(org.id),
job_type=job_type,
backend_image=os.environ.get("BACKEND_IMAGE", ""),
pull_policy=os.environ.get("BACKEND_IMAGE_PULL_POLICY", ""),
job_id_prefix=f"{job_type}-{org.id}",
existing_job_id=existing_job_id,
)
if existing_job_id:
Expand Down Expand Up @@ -334,7 +339,7 @@ async def create_delete_org_job(
except Exception as exc:
# pylint: disable=raise-missing-from
print(f"warning: delete org job could not be started: {exc}")
return None
return ""

async def create_recalculate_org_stats_job(
self,
Expand Down Expand Up @@ -381,6 +386,73 @@ async def create_recalculate_org_stats_job(
print(f"warning: recalculate org stats job could not be started: {exc}")
return None

async def create_copy_bucket_job(
self,
org: Organization,
prev_storage_ref: StorageRef,
new_storage_ref: StorageRef,
existing_job_id: Optional[str] = None,
) -> str:
"""Start background job to copy entire s3 bucket and return job id"""
prev_storage = self.storage_ops.get_org_storage_by_ref(org, prev_storage_ref)
prev_endpoint, prev_bucket = self.strip_bucket(prev_storage.endpoint_url)

new_storage = self.storage_ops.get_org_storage_by_ref(org, new_storage_ref)
new_endpoint, new_bucket = self.strip_bucket(new_storage.endpoint_url)

# Ensure buckets terminate with trailing slash
prev_bucket = os.path.join(prev_bucket, "")
new_bucket = os.path.join(new_bucket, "")

job_type = BgJobType.COPY_BUCKET.value

try:
job_id = await self.crawl_manager.run_copy_bucket_job(
oid=str(org.id),
job_type=job_type,
prev_storage=prev_storage_ref,
prev_endpoint=prev_endpoint,
prev_bucket=prev_bucket,
new_storage=new_storage_ref,
new_endpoint=new_endpoint,
new_bucket=new_bucket,
job_id_prefix=f"{job_type}-{org.id}",
existing_job_id=existing_job_id,
)
if existing_job_id:
copy_job = await self.get_background_job(existing_job_id, org.id)
previous_attempt = {
"started": copy_job.started,
"finished": copy_job.finished,
}
if copy_job.previousAttempts:
copy_job.previousAttempts.append(previous_attempt)
else:
copy_job.previousAttempts = [previous_attempt]
copy_job.started = dt_now()
copy_job.finished = None
copy_job.success = None
else:
copy_job = CopyBucketJob(
id=job_id,
oid=org.id,
started=dt_now(),
prev_storage=prev_storage_ref,
new_storage=new_storage_ref,
)

await self.jobs.find_one_and_update(
{"_id": job_id}, {"$set": copy_job.to_dict()}, upsert=True
)

return job_id
# pylint: disable=broad-exception-caught
except Exception as exc:
print(
f"warning: copy bucket job could not be started for org {org.id}: {exc}"
)
return ""

async def job_finished(
self,
job_id: str,
Expand All @@ -406,6 +478,9 @@ async def job_finished(
await self.handle_delete_replica_job_finished(
cast(DeleteReplicaJob, job)
)
if job_type == BgJobType.COPY_BUCKET:
org = await self.org_ops.get_org_by_id(oid)
await self.org_ops.update_read_only(org, False)
else:
print(
f"Background job {job.id} failed, sending email to superuser",
Expand All @@ -430,7 +505,11 @@ async def job_finished(
async def get_background_job(
self, job_id: str, oid: Optional[UUID] = None
) -> Union[
CreateReplicaJob, DeleteReplicaJob, DeleteOrgJob, RecalculateOrgStatsJob
CreateReplicaJob,
DeleteReplicaJob,
CopyBucketJob,
DeleteOrgJob,
RecalculateOrgStatsJob,
]:
"""Get background job"""
query: dict[str, object] = {"_id": job_id}
Expand All @@ -445,27 +524,78 @@ async def get_background_job(

def _get_job_by_type_from_data(self, data: dict[str, object]):
"""convert dict to propert background job type"""
if data["type"] == BgJobType.CREATE_REPLICA:
if data["type"] == BgJobType.CREATE_REPLICA.value:
return CreateReplicaJob.from_dict(data)

if data["type"] == BgJobType.DELETE_REPLICA:
if data["type"] == BgJobType.DELETE_REPLICA.value:
return DeleteReplicaJob.from_dict(data)

if data["type"] == BgJobType.RECALCULATE_ORG_STATS:
if data["type"] == BgJobType.RECALCULATE_ORG_STATS.value:
return RecalculateOrgStatsJob.from_dict(data)

if data["type"] == BgJobType.COPY_BUCKET.value:
return CopyBucketJob.from_dict(data)

return DeleteOrgJob.from_dict(data)

async def get_job_progress(self, job_id: str) -> JobProgress:
"""Return progress of background job for supported types"""
job = await self.get_background_job(job_id)

if job.type != BgJobType.COPY_BUCKET:
raise HTTPException(status_code=403, detail="job_type_not_supported")

if job.success is False:
raise HTTPException(status_code=400, detail="job_failed")

if job.finished:
return JobProgress(percentage=1.0)

log_tail = await self.crawl_manager.tail_background_job(job_id)
if not log_tail:
raise HTTPException(status_code=400, detail="job_log_not_available")

lines = log_tail.splitlines()
reversed_lines = list(reversed(lines))

progress = JobProgress(percentage=0.0)

# Parse lines in reverse order until we find one with latest stats
for line in reversed_lines:
try:
if "ETA" not in line:
continue

stats_groups = line.split(",")
for group in stats_groups:
group = group.strip()
if "%" in group:
progress.percentage = float(group.strip("%")) / 100
if "ETA" in group:
eta_str = group.strip("ETA ")
# Split on white space to remove byte mark rclone sometimes
# adds to end of stats line
eta_list = eta_str.split(" ")
progress.eta = eta_list[0]

break
# pylint: disable=bare-except
except:
continue

return progress

async def list_background_jobs(
self,
org: Organization,
page_size: int = DEFAULT_PAGE_SIZE,
page: int = 1,
success: Optional[bool] = None,
running: Optional[bool] = None,
job_type: Optional[str] = None,
sort_by: Optional[str] = None,
sort_direction: Optional[int] = -1,
) -> Tuple[List[BackgroundJob], int]:
) -> Tuple[List[Union[CreateReplicaJob, DeleteReplicaJob, CopyBucketJob]], int]:
"""List all background jobs"""
# pylint: disable=duplicate-code
# Zero-index page for query
Expand All @@ -477,6 +607,12 @@ async def list_background_jobs(
if success in (True, False):
query["success"] = success

if running:
query["success"] = None

if running is False:
query["success"] = {"$in": [True, False]}

if job_type:
query["type"] = job_type

Expand Down Expand Up @@ -595,6 +731,14 @@ async def retry_background_job(
existing_job_id=job_id,
)

if job.type == BgJobType.COPY_BUCKET:
await self.create_copy_bucket_job(
org,
job.prev_storage,
job.new_storage,
existing_job_id=job_id,
)

return {"success": True}

async def retry_failed_background_jobs(
Expand Down Expand Up @@ -630,7 +774,7 @@ async def retry_all_failed_background_jobs(


# ============================================================================
# pylint: disable=too-many-arguments, too-many-locals, invalid-name, fixme
# pylint: disable=too-many-arguments, too-many-locals, invalid-name, fixme, too-many-positional-arguments
def init_background_jobs_api(
app, mdb, email, user_manager, org_ops, crawl_manager, storage_ops, user_dep
):
Expand All @@ -657,7 +801,19 @@ async def get_background_job(
"""Retrieve information for background job"""
return await ops.get_background_job(job_id, org.id)

@app.get("/orgs/all/jobs/{job_id}", response_model=SuccessResponse, tags=["jobs"])
@router.get(
"/{job_id}/progress",
response_model=JobProgress,
)
async def get_job_progress(
job_id: str,
# pylint: disable=unused-argument
org: Organization = Depends(org_crawl_dep),
):
"""Return progress information for background job"""
return await ops.get_job_progress(job_id)

@app.get("/orgs/all/jobs/{job_id}", response_model=AnyJob, tags=["jobs"])
async def get_background_job_all_orgs(job_id: str, user: User = Depends(user_dep)):
"""Get background job from any org"""
if not user.is_superuser:
Expand Down Expand Up @@ -696,6 +852,7 @@ async def list_background_jobs(
pageSize: int = DEFAULT_PAGE_SIZE,
page: int = 1,
success: Optional[bool] = None,
running: Optional[bool] = None,
jobType: Optional[str] = None,
sortBy: Optional[str] = None,
sortDirection: Optional[int] = -1,
Expand All @@ -706,6 +863,7 @@ async def list_background_jobs(
page_size=pageSize,
page=page,
success=success,
running=running,
job_type=jobType,
sort_by=sortBy,
sort_direction=sortDirection,
Expand Down
2 changes: 1 addition & 1 deletion backend/btrixcloud/basecrawls.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@


# ============================================================================
# pylint: disable=too-many-instance-attributes, too-many-public-methods, too-many-lines
# pylint: disable=too-many-instance-attributes, too-many-public-methods, too-many-lines, too-many-positional-arguments
class BaseCrawlOps:
"""operations that apply to all crawls"""

Expand Down
1 change: 1 addition & 0 deletions backend/btrixcloud/colls.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@


# ============================================================================
# pylint: disable=too-many-positional-arguments
class CollectionOps:
"""ops for working with named collections of crawls"""

Expand Down
4 changes: 2 additions & 2 deletions backend/btrixcloud/crawlconfigs.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
class CrawlConfigOps:
"""Crawl Config Operations"""

# pylint: disable=too-many-arguments, too-many-instance-attributes, too-many-public-methods
# pylint: disable=too-many-arguments, too-many-instance-attributes, too-many-public-methods, too-many-positional-arguments

user_manager: UserManager
org_ops: OrgOps
Expand Down Expand Up @@ -1081,7 +1081,7 @@ async def stats_recompute_all(crawl_configs, crawls, cid: UUID):


# ============================================================================
# pylint: disable=redefined-builtin,invalid-name,too-many-locals,too-many-arguments
# pylint: disable=redefined-builtin,invalid-name,too-many-locals,too-many-arguments,too-many-positional-arguments
def init_crawl_config_api(
app,
dbclient,
Expand Down
Loading
Loading