-
Notifications
You must be signed in to change notification settings - Fork 321
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] ncmec: store checkpoint occasionally when start, end diff is one second #1731
base: main
Are you sure you want to change the base?
Conversation
@@ -39,14 +39,16 @@ class NCMECCheckpoint( | |||
|
|||
# The biggest value of "to", and the next "from" | |||
get_entries_max_ts: int | |||
next_fetch: str | |||
last_fetch_time: int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is unsued currently but I know we talked about invalidating checkpoints older than 24hr.. happy to add that! just wanted to get confirmation on the checkpointing piece that this is on the right track
928ddce
to
4f12e50
Compare
2965a46
to
5270515
Compare
5270515
to
d7f207e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looking good, thanks for making this change, and I think it will help a lot!
I am slightly suspicious that the paging URLs can go sour (e.g. I have noticed that NCMEC API tends to throw exceptions near the very end of the paging list that make me think that they are invaliding), so I think adding the time-based invalidation logic is a requirement.
As part of your test plan, can you also attempt fetching past an extremely dense time segment in the NCMEC API and confirm the behavior works as expected?
class NCMECCheckpointWithoutNext(FetchCheckpointBase): | ||
""" | ||
0.99.x => 1.0.0 | ||
|
||
get_entries_max_ts: int => | ||
get_entries_max_ts: int | ||
next_fetch: str | ||
last_fetch_time: int | ||
""" | ||
|
||
get_entries_max_ts: int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding this test case!
@@ -39,14 +39,16 @@ class NCMECCheckpoint( | |||
|
|||
# The biggest value of "to", and the next "from" | |||
get_entries_max_ts: int | |||
next_fetch: t.Optional[str] = "" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
blocking: Hmm, not sold on this name. NCMEC calls these "paging urls" so maybe paging_url?
blocking; Add a comment explaining what this variable represents.
nit: Since the default value is empty string, suggest making this non-Optional
and just using that.
|
||
updates.extend(entry.updates) | ||
|
||
if i % 100 == 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
blocking: by change this from elif
to if
, I think it will now print the large update warning every update, which is incorrect, no?
|
||
def get_progress_timestamp(self) -> t.Optional[int]: | ||
return self.get_entries_max_ts | ||
|
||
@classmethod | ||
def from_ncmec_fetch(cls, response: api.GetEntriesResponse) -> "NCMECCheckpoint": | ||
"""Synthesizes a checkpoint from the API response""" | ||
return cls(response.max_timestamp) | ||
return cls(response.max_timestamp, response.next, int(time.time())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
blocking: NCMEC's documentation asks us not to store these checkpoints long term, so let's respect that in this implementation.
One way we could handle this is not to directly access this variable, but wrap it with a helper function that will return empty string if it's too old.
def get_resume_url_if_recent(self):
if time.time() - self.last_fetch_time < SOME_CONSTANT_MAYBE_12_HOURS:
return self.next_fetch
return ""
@@ -39,14 +39,16 @@ class NCMECCheckpoint( | |||
|
|||
# The biggest value of "to", and the next "from" | |||
get_entries_max_ts: int | |||
next_fetch: t.Optional[str] = "" | |||
last_fetch_time: t.Optional[int] = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
blocking: Add a comment on the relationship between the paging URL for why we save this.
ignorable: We could also make this variable explicitly mention the paging_url and not set it if we are not paging
@dataclass | ||
class NCMECCheckpointWithoutNext(FetchCheckpointBase): | ||
""" | ||
0.99.x => 1.0.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
blocking: This version isn't quite correct, the current version is 1.2.3
@@ -565,7 +565,7 @@ def get_entries( | |||
) | |||
|
|||
def get_entries_iter( | |||
self, *, start_timestamp: int = 0, end_timestamp: int = 0 | |||
self, *, start_timestamp: int = 0, end_timestamp: int = 0, next_: str = "" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
blocking: next_ is a bit cryptic, perhaps we can do checkpointed_paging_url
and then do
next_ = checkpointed_paging_url
log(f"large fetch ({i}), up to {len(updates)}") | ||
updates.extend(entry.updates) | ||
# so store the checkpoint occasionally | ||
log(f"large fetch ({i}), up to {len(updates)}. storing checkpoint") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: You don't actually store the checkpoint by yielding, technically the caller can decide whether to keep calling or store.
start_timestamp=current_start, end_timestamp=current_end | ||
start_timestamp=current_start, | ||
end_timestamp=current_end, | ||
next_=current_next_fetch, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
blocking: Danger! It's actually very easy to mess up this argument and accidentally trigger and endless loop. It may be that you have done so in the current code, but it's hard to tell.
The only time current_next_fetch
should be populated is when you are resuming from checkpoint, and you need to explicitly disable the overfetch check (L290) then.
There might be a refactoring of this code that makes this easier, or now that we are switching over to the next pointer version we can get rid of the probing behavior, which simplifies the implementation quite a bit.
start_timestamp=current_start, end_timestamp=current_end | ||
start_timestamp=current_start, | ||
end_timestamp=current_end, | ||
next_=current_next_fetch, | ||
) | ||
): | ||
if i == 0: # First batch, check for overfetch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a comment, it turns out my implementation for estimation of the entries in range was completely off, and so this is basically always overly cautious. Not sure what to do about it, since the alternatives that I can think of are complicated.
@@ -240,15 +242,18 @@ def fetch_iter( | |||
the cursor | |||
""" | |||
start_time = 0 | |||
next_fetch = "" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: just call this variable current_next_fetch - it's unconditionally written at 256.
last_fetch_time=int(time.time()), | ||
), | ||
) | ||
current_next_fetch = entry.next |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This value is never read, as it always unconditionally replaced on L359
Summary
sometimes ncmec fails to make progress after hitting a second w/ a large number of results: #1679. when that happens (diff of end and start is a second and we have lots of data), store checkpoints occasionally via a next pointer
Test Plan
confirm that next pointer is used when fetching hashes locally