Skip to content

Commit

Permalink
Limit ORM usage for ImapUids objects (#969)
Browse files Browse the repository at this point in the history
* imapuids_for_message_query

* message_imapuids_exists

* Remove uid_accessor

* Cleanup parameter

* Use more descriptive variable name
  • Loading branch information
squeaky-pl authored Nov 6, 2024
1 parent 6c67bc5 commit c66d909
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 29 deletions.
55 changes: 42 additions & 13 deletions inbox/mailsync/backends/imap/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from typing import List, Set

from sqlalchemy import bindparam, desc
from sqlalchemy.orm import Session
from sqlalchemy.orm import Query, Session
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.sql.expression import func

Expand Down Expand Up @@ -73,22 +73,33 @@ def lastseenuid(account_id, session, folder_id):
return res or 0


IMAPUID_PER_MESSAGE_SANITY_LIMIT = 100


def update_message_metadata(
session: Session, account: Account, message: Message, is_draft: bool
) -> None:
"""Update the message's metadata"""
# Sort imapuids in a way that the ones that were added later come last
now = datetime.utcnow()
sorted_imapuids: List[ImapUid] = sorted(
message.imapuids, key=lambda imapuid: imapuid.updated_at or now
# Sort imapuids in a way that the ones that were added later come first.
# There are non-conforming IMAP servers that can list the same message thousands of times
# in the same folder. This is a workaround to limit the memory pressure caused by such
# servers. The metadata is meaningless for such messages anyway.
latest_imapuids = (
imapuids_for_message_query(
account_id=account.id,
message_id=message.id,
only_latest=IMAPUID_PER_MESSAGE_SANITY_LIMIT,
)
.with_session(session)
.all()
)

message.is_read = any(imapuid.is_seen for imapuid in sorted_imapuids)
message.is_starred = any(imapuid.is_flagged for imapuid in sorted_imapuids)
message.is_read = any(imapuid.is_seen for imapuid in latest_imapuids)
message.is_starred = any(imapuid.is_flagged for imapuid in latest_imapuids)
message.is_draft = is_draft

sorted_categories: List[Category] = [
category for imapuid in sorted_imapuids for category in imapuid.categories
latest_categories: List[Category] = [
category for imapuid in latest_imapuids for category in imapuid.categories
]

categories: Set[Category]
Expand All @@ -101,9 +112,9 @@ def update_message_metadata(
# (and in turn one category) depending on the order they were returned
# from the database. This makes it deterministic and more-correct because a message
# is likely in a folder (and category) it was added to last.
categories = {sorted_categories[-1]} if sorted_categories else set()
categories = {latest_categories[0]} if latest_categories else set()
elif account.category_type == "label":
categories = set(sorted_categories)
categories = set(latest_categories)
else:
raise AssertionError("Unreachable")

Expand Down Expand Up @@ -198,6 +209,18 @@ def update_metadata(account_id, folder_id, folder_role, new_flags, session):
log.info("Updated UID metadata", changed=change_count, out_of=len(new_flags))


def imapuids_for_message_query(
*, account_id: int, message_id: int, only_latest: int | None = None
) -> Query:
query = Query([ImapUid]).filter(
ImapUid.account_id == account_id, ImapUid.message_id == message_id
)
if only_latest is not None:
query = query.order_by(ImapUid.updated_at.desc()).limit(only_latest)

return query


def remove_deleted_uids(account_id, folder_id, uids):
"""
Make sure you're holding a db write lock on the account. (We don't try
Expand Down Expand Up @@ -238,7 +261,13 @@ def remove_deleted_uids(account_id, folder_id, uids):
db_session.delete(imapuid)

if message is not None:
if not message.imapuids and message.is_draft:
message_imapuids_exist = db_session.query(
imapuids_for_message_query(
account_id=account_id, message_id=message.id
).exists()
).scalar()

if not message_imapuids_exist and message.is_draft:
# Synchronously delete drafts.
thread = message.thread
if thread is not None:
Expand All @@ -257,7 +286,7 @@ def remove_deleted_uids(account_id, folder_id, uids):
update_message_metadata(
db_session, account, message, message.is_draft
)
if not message.imapuids:
if not message_imapuids_exist:
# But don't outright delete messages. Just mark them as
# 'deleted' and wait for the asynchronous
# dangling-message-collector to delete them.
Expand Down
1 change: 0 additions & 1 deletion inbox/mailsync/backends/imap/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ def start_delete_handler(self):
account_id=self.account_id,
namespace_id=self.namespace_id,
provider_name=self.provider_name,
uid_accessor=lambda m: m.imapuids,
)
self.delete_handler.start()

Expand Down
14 changes: 6 additions & 8 deletions inbox/mailsync/gc.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,6 @@ class DeleteHandler(InterruptibleThread):
----------
account_id, namespace_id: int
IDs for the namespace to check.
uid_accessor: function
Function that takes a message and returns a list of associated uid
objects. For IMAP sync, this would just be
`uid_accessor=lambda m: m.imapuids`
message_ttl: int
Number of seconds to wait after a message is marked for deletion before
deleting it for good.
Expand All @@ -59,15 +55,13 @@ def __init__(
account_id,
namespace_id,
provider_name,
uid_accessor,
message_ttl=DEFAULT_MESSAGE_TTL,
thread_ttl=DEFAULT_THREAD_TTL,
):
bind_context(self, "deletehandler", account_id)
self.account_id = account_id
self.namespace_id = namespace_id
self.provider_name = provider_name
self.uids_for_message = uid_accessor
self.log = log.new(account_id=account_id)
self.message_ttl = datetime.timedelta(seconds=message_ttl)
self.thread_ttl = datetime.timedelta(seconds=thread_ttl)
Expand Down Expand Up @@ -106,14 +100,18 @@ def check(self, current_time):
# If the message isn't *actually* dangling (i.e., it has
# imapuids associated with it), undelete it.
try:
uids_for_message = self.uids_for_message(message)
message_imapuids_exist = db_session.query(
common.imapuids_for_message_query(
account_id=self.account_id, message_id=message.id
).exists()
).scalar()
except ObjectDeletedError:
# It looks like we are expiring the session potentially when one message is deleted,
# and then when accessing the IMAP uids, there is a lazy load trying to get the data.
# If that object has also been deleted (how?) it raises this exception.
continue

if uids_for_message:
if message_imapuids_exist:
message.deleted_at = None
continue

Expand Down
7 changes: 0 additions & 7 deletions tests/imap/test_delete_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ def test_deletion_with_short_ttl(
account_id=default_account.id,
namespace_id=default_namespace.id,
provider_name=default_account.provider,
uid_accessor=lambda m: m.imapuids,
message_ttl=0,
thread_ttl=0,
)
Expand All @@ -110,7 +109,6 @@ def test_thread_deletion_with_short_ttl(
account_id=default_account.id,
namespace_id=default_namespace.id,
provider_name=default_account.provider,
uid_accessor=lambda m: m.imapuids,
message_ttl=0,
thread_ttl=120,
)
Expand Down Expand Up @@ -148,7 +146,6 @@ def test_non_orphaned_messages_get_unmarked(
account_id=default_account.id,
namespace_id=default_namespace.id,
provider_name=default_account.provider,
uid_accessor=lambda m: m.imapuids,
message_ttl=0,
)
handler.check(marked_deleted_message.deleted_at + timedelta(seconds=1))
Expand All @@ -165,7 +162,6 @@ def test_threads_only_deleted_when_no_messages_left(
account_id=default_account.id,
namespace_id=default_namespace.id,
provider_name=default_account.provider,
uid_accessor=lambda m: m.imapuids,
message_ttl=0,
)
# Add another message onto the thread
Expand All @@ -187,7 +183,6 @@ def test_deletion_deferred_with_longer_ttl(
account_id=default_account.id,
namespace_id=default_namespace.id,
provider_name=default_account.provider,
uid_accessor=lambda m: m.imapuids,
message_ttl=5,
)
db.session.commit()
Expand All @@ -207,7 +202,6 @@ def test_deletion_creates_revision(
account_id=default_account.id,
namespace_id=default_namespace.id,
provider_name=default_account.provider,
uid_accessor=lambda m: m.imapuids,
message_ttl=0,
)
handler.check(marked_deleted_message.deleted_at + timedelta(seconds=1))
Expand Down Expand Up @@ -270,7 +264,6 @@ def test_deleted_labels_get_gced(
account_id=default_account.id,
namespace_id=default_namespace.id,
provider_name=default_account.provider,
uid_accessor=lambda m: m.imapuids,
message_ttl=0,
)
handler.gc_deleted_categories()
Expand Down

0 comments on commit c66d909

Please sign in to comment.