Skip to content

Commit

Permalink
Make pending_jobs fetch jobs by job types (#1847)
Browse files Browse the repository at this point in the history
  • Loading branch information
wangch079 authored Oct 27, 2023
1 parent b49e383 commit 436e855
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 7 deletions.
7 changes: 6 additions & 1 deletion connectors/protocol/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -977,7 +977,11 @@ async def create(self, connector, trigger_method, job_type):
}
await self.index(job_def)

async def pending_jobs(self, connector_ids):
async def pending_jobs(self, connector_ids, job_types):
if not job_types:
return
if not isinstance(job_types, list):
job_types = [str(job_types)]
query = {
"bool": {
"must": [
Expand All @@ -990,6 +994,7 @@ async def pending_jobs(self, connector_ids):
}
},
{"terms": {"connector.id": connector_ids}},
{"terms": {"job_type": job_types}},
]
}
}
Expand Down
7 changes: 6 additions & 1 deletion connectors/services/job_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,12 @@ async def _run(self):
)
else:
async for sync_job in self.sync_job_index.pending_jobs(
connector_ids=supported_connector_ids
connector_ids=supported_connector_ids,
job_types=[
JobType.FULL.value,
JobType.INCREMENTAL.value,
JobType.ACCESS_CONTROL.value,
],
):
await self._sync(sync_job)
except Exception as e:
Expand Down
33 changes: 28 additions & 5 deletions tests/protocol/test_connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -1952,8 +1952,23 @@ async def test_create_jobs_with_correct_target_index(


@pytest.mark.asyncio
@pytest.mark.parametrize(
"job_types, job_type_query, remote_call",
[
(None, None, False),
("", None, False),
(JobType.ACCESS_CONTROL.value, [JobType.ACCESS_CONTROL.value], True),
(
[JobType.FULL.value, JobType.INCREMENTAL.value],
[JobType.FULL.value, JobType.INCREMENTAL.value],
True,
),
],
)
@patch("connectors.protocol.SyncJobIndex.get_all_docs")
async def test_pending_jobs(get_all_docs, set_env):
async def test_pending_jobs(
get_all_docs, job_types, job_type_query, remote_call, set_env
):
job = Mock()
get_all_docs.return_value = AsyncIterator([job])
config = load_config(CONFIG)
Expand All @@ -1970,19 +1985,27 @@ async def test_pending_jobs(get_all_docs, set_env):
}
},
{"terms": {"connector.id": connector_ids}},
{"terms": {"job_type": job_type_query}},
]
}
}
expected_sort = [{"created_at": Sort.ASC.value}]

sync_job_index = SyncJobIndex(elastic_config=config["elasticsearch"])
jobs = [
job async for job in sync_job_index.pending_jobs(connector_ids=connector_ids)
job
async for job in sync_job_index.pending_jobs(
connector_ids=connector_ids, job_types=job_types
)
]

get_all_docs.assert_called_with(query=expected_query, sort=expected_sort)
assert len(jobs) == 1
assert jobs[0] == job
if remote_call:
get_all_docs.assert_called_with(query=expected_query, sort=expected_sort)
assert len(jobs) == 1
assert jobs[0] == job
else:
get_all_docs.assert_not_called()
assert len(jobs) == 0


@pytest.mark.asyncio
Expand Down

0 comments on commit 436e855

Please sign in to comment.