diff --git a/CHANGELOG.md b/CHANGELOG.md index 7ebb4c6..0b06ad9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ See documentation for details. ## devel +- Duplicate detection moved from job submit to long running rules, i.e. those + using Cuckoo and Cortex. This should improve throughput for analyses that + don't use these rules and avoids a massive performance degredation if a very + high number quick-to-analyse samples hits a PeekabooAV cluster. - Samples now have an identity that includes sha256sum, declared name and type as well as content disposition. This allows for more reliable and efficient in-flight locking and cached result usage decisions. DB schema version raised diff --git a/peekaboo/daemon.py b/peekaboo/daemon.py index 413ae63..97b0b05 100644 --- a/peekaboo/daemon.py +++ b/peekaboo/daemon.py @@ -316,6 +316,9 @@ async def async_main(): "interval to %d seconds.", cldup_check_interval) + if not cldup_check_interval: + logger.debug("Disabling cluster duplicate handler.") + loop = asyncio.get_running_loop() sig_handler = SignalHandler(loop) @@ -383,8 +386,8 @@ async def async_main(): except asyncio.exceptions.CancelledError as error: # cancellation is expected in the case of shutdown via signal handler pass - except Exception: - logger.error("Shutting down due to unexpected exception") + except Exception as error: + logger.error("Shutting down due to unexpected exception: %s", error) # trigger shutdowns of other components if not already ongoing triggered # by the signal handler diff --git a/peekaboo/queuing.py b/peekaboo/queuing.py index af4c76a..aa6ed57 100644 --- a/peekaboo/queuing.py +++ b/peekaboo/queuing.py @@ -22,7 +22,7 @@ # # ############################################################################### -""" The main job queue with workers and a cluster duplicate handler. """ +""" The main job queue with workers. """ import asyncio @@ -64,20 +64,9 @@ def __init__(self, ruleset_config, db_con, analyzer_config, self.worker_count = worker_count self.threadpool = threadpool - # keep a backlog of samples with identities identical to samples - # currently in analysis to avoid analysing multiple identical samples - # simultaneously. Once one analysis has finished, we can submit the - # others and the ruleset will notice that we already know the result. - self.duplicates = {} - self.duplock = asyncio.Lock() - - # keep a similar backlog of samples currently being processed by - # other instances so we can regularly try to resubmit them and re-use - # the other instances' cached results from the database - self.cluster_duplicates = {} - self.ruleset_engine = RulesetEngine( - ruleset_config, self, db_con, analyzer_config, threadpool) + ruleset_config, self, db_con, analyzer_config, + cluster_duplicate_check_interval, threadpool) # we start these here because they do no lengthy init and starting can # not fail. We need this here to avoid races in startup vs. shutdown by @@ -90,25 +79,12 @@ def __init__(self, ruleset_config, db_con, analyzer_config, logger.info('Created %d Workers.', self.worker_count) - self.cluster_duplicate_handler = None - if cluster_duplicate_check_interval: - logger.debug( - "Creating cluster duplicate handler with check " - "interval %d.", cluster_duplicate_check_interval) - self.cluster_duplicate_handler = ClusterDuplicateHandler( - self, cluster_duplicate_check_interval) - else: - logger.debug("Disabling cluster duplicate handler.") - async def start(self): """ Start up the job queue including resource initialisation. """ awaitables = [] for worker in self.workers: awaitables.append(await worker.start()) - if self.cluster_duplicate_handler: - awaitables.append(await self.cluster_duplicate_handler.start()) - # create a single ruleset engine for all workers, instantiates all the # rules based on the ruleset configuration, may start up long-lived # analyzer instances which are shared as well, is otherwise stateless @@ -134,179 +110,9 @@ async def submit(self, sample): exception. @param sample: The Sample object to add to the queue. - @raises Full: if the queue is full. - """ - identity = await sample.identity - duplicate = None - cluster_duplicate = None - resubmit = None - - # we have to lock this down because async routines called from here may - # allow us to be called again concurrently from the event loop - async with self.duplock: - # check if a sample with same identity is currently in flight - duplicates = self.duplicates.get(identity) - if duplicates is not None: - # we are regularly resubmitting samples, e.g. after we've - # noticed that cuckoo is finished analysing them. This - # obviously isn't a duplicate but continued processing of the - # same sample. - if duplicates['master'] == sample: - resubmit = sample.id - await self.jobs.put(sample) - else: - # record the to-be-submitted sample as duplicate and do - # nothing - duplicate = sample.id - duplicates['duplicates'].append(sample) - else: - # are we the first of potentially multiple instances working on - # this sample? - try: - locked = await self.db_con.mark_sample_in_flight(sample) - except PeekabooDatabaseError as dberr: - logger.error(dberr) - return False - - if locked: - # initialise a per-duplicate backlog for this sample which - # also serves as in-flight marker and submit to queue - self.duplicates[identity] = { - 'master': sample, - 'duplicates': [], - } - await self.jobs.put(sample) - else: - # another instance is working on this - if self.cluster_duplicates.get(identity) is None: - self.cluster_duplicates[identity] = [] - - cluster_duplicate = sample.id - self.cluster_duplicates[identity].append(sample) - - if duplicate is not None: - logger.debug( - "%d: Sample is duplicate and waiting for running analysis " - "to finish", duplicate) - elif cluster_duplicate is not None: - logger.debug( - "%d: Sample is concurrently processed by another instance " - "and held", cluster_duplicate) - elif resubmit is not None: - logger.debug("%d: Resubmitted sample to job queue", resubmit) - else: - logger.debug("%d: New sample submitted to job queue", sample.id) - - return True - - async def submit_cluster_duplicates(self): - """ Submit samples held while being processed by another cluster - instance back into the job queue if they have finished processing. """ - if not self.cluster_duplicates.keys(): - return True - - submitted_cluster_duplicates = [] - - async with self.duplock: - # try to submit *all* samples which have been marked as being - # processed by another instance concurrently - # get the items view on a copy of the cluster duplicate backlog - # because we will change it by removing entries which would raise a - # RuntimeException - cluster_duplicates = self.cluster_duplicates.copy().items() - for identity, sample_duplicates in cluster_duplicates: - # try to mark as in-flight - try: - locked = await self.db_con.mark_sample_in_flight( - sample_duplicates[0]) - except PeekabooDatabaseError as dberr: - logger.error(dberr) - return False - - if locked: - if self.duplicates.get(identity) is not None: - logger.error( - "Possible backlog corruption for sample %d! " - "Please file a bug report. Trying to continue...", - sample.id) - continue - - # submit one of the held-back samples as a new master - # analysis in case the analysis on the other instance - # failed and we have no result in the database yet. If all - # is well, this master should finish analysis very quickly - # using the stored result, causing all the duplicates to be - # submitted and finish quickly as well. - sample = sample_duplicates.pop() - self.duplicates[identity] = { - 'master': sample, - 'duplicates': sample_duplicates, - } - submitted_cluster_duplicates.append(sample.id) - await self.jobs.put(sample) - del self.cluster_duplicates[identity] - - if len(submitted_cluster_duplicates) > 0: - logger.debug( - "Submitted cluster duplicates (and potentially their " - "duplicates) from backlog: %s", submitted_cluster_duplicates) - - return True - - async def clear_stale_in_flight_samples(self): - """ Clear any stale in-flight sample logs from the database. """ - try: - cleared = await self.db_con.clear_stale_in_flight_samples() - except PeekabooDatabaseError as dberr: - logger.error(dberr) - cleared = False - - return cleared - - async def submit_duplicates(self, identity): - """ Check if any samples have been held from processing as duplicates - and submit them now. Clear the original sample whose duplicates have - been submitted from the in-flight list. - - @param identity: identity of sample to check for duplicates """ - submitted_duplicates = [] - - async with self.duplock: - # duplicates which have been submitted from the backlog still - # report done but do not get registered as potentially having - # duplicates because we expect the ruleset to identify them as - # already known and process them quickly now that the first - # instance has gone through full analysis. Therefore we can ignore - # them here. - if identity not in self.duplicates: - return - - # submit all samples which have accumulated in the backlog - for sample in self.duplicates[identity]['duplicates']: - submitted_duplicates.append(sample.id) - await self.jobs.put(sample) - - sample = self.duplicates[identity]['master'] - try: - await self.db_con.clear_sample_in_flight(sample) - except PeekabooDatabaseError as dberr: - logger.error(dberr) - - del self.duplicates[identity] - - logger.debug("%d: Cleared sample from in-flight list", sample.id) - if len(submitted_duplicates) > 0: - logger.debug( - "Submitted duplicates from backlog: %s", submitted_duplicates) - - async def done(self, sample): - """ Perform cleanup actions after sample processing is done: - 1. Submit held duplicates and - 2. notify request handler that sample processing is done. - - @param sample: The Sample object to post-process. """ - await self.submit_duplicates(await sample.identity) + await self.jobs.put(sample) + logger.debug("%d: New sample submitted to job queue", sample.id) async def dequeue(self): """ Remove a sample from the queue. Used by the workers to get their @@ -320,9 +126,6 @@ def shut_down(self): if self.ruleset_engine is not None: self.ruleset_engine.shut_down() - if self.cluster_duplicate_handler is not None: - self.cluster_duplicate_handler.shut_down() - # tell all workers to shut down for worker in self.workers: worker.shut_down() @@ -332,64 +135,12 @@ async def close_down(self): for worker in self.workers: await worker.close_down() - if self.cluster_duplicate_handler is not None: - await self.cluster_duplicate_handler.close_down() - if self.ruleset_engine is not None: await self.ruleset_engine.close_down() logger.info("Queue shut down.") -class ClusterDuplicateHandler: - """ A housekeeper handling submission and cleanup of cluster duplicates. - """ - def __init__(self, job_queue, interval=5): - self.job_queue = job_queue - self.interval = interval - self.task = None - self.task_name = "ClusterDuplicateHandler" - - async def start(self): - self.task = asyncio.ensure_future(self.run()) - if hasattr(self.task, "set_name"): - self.task.set_name(self.task_name) - return self.task - - async def run(self): - logger.debug("Cluster duplicate handler started.") - - while True: - await asyncio.sleep(self.interval) - - logger.debug("Checking for samples in processing by other " - "instances to submit") - - await self.job_queue.clear_stale_in_flight_samples() - await self.job_queue.submit_cluster_duplicates() - - def shut_down(self): - """ Asynchronously initiate cluster duplicate handler shutdown. """ - logger.debug("Cluster duplicate handler shutdown requested.") - if self.task is not None: - self.task.cancel() - - async def close_down(self): - """ Wait for the cluster duplicate handler to close down and retrieve - any exceptions thrown. """ - if self.task is not None: - try: - await self.task - # we cancelled the task so a CancelledError is expected - except asyncio.CancelledError: - pass - except Exception: - logger.exception( - "Unexpected exception in cluster duplicate handler") - - logger.debug("Cluster duplicate handler shut down.") - - class Worker: """ A Worker to process a sample. """ def __init__(self, wid, job_queue, ruleset_engine, db_con): @@ -444,7 +195,9 @@ async def run(self): 'database: %s', sample.id, dberr) # no showstopper, we can limp on without caching in DB - await self.job_queue.done(sample) + # now is the time to submit any potential duplicates of this sample + # whose processing was deferred by rules + await self.ruleset_engine.submit_duplicates(sample) def shut_down(self): """ Asynchronously initiate worker shutdown. """ diff --git a/peekaboo/ruleset/engine.py b/peekaboo/ruleset/engine.py index a5bdfdd..4296b1d 100644 --- a/peekaboo/ruleset/engine.py +++ b/peekaboo/ruleset/engine.py @@ -29,6 +29,8 @@ from peekaboo.ruleset.rules import * from peekaboo.toolbox.cuckoo import Cuckoo from peekaboo.toolbox.cortex import Cortex +from peekaboo.toolbox.duplicates import DuplicateHandler, \ + ClusterDuplicateHandler from peekaboo.toolbox.peekabooyar import ContainsPeekabooYarRule from peekaboo.exceptions import PeekabooAnalysisDeferred, \ PeekabooConfigException, PeekabooRulesetConfigError @@ -60,7 +62,7 @@ class RulesetEngine: ] def __init__(self, config, job_queue, db_con, analyzer_config, - threadpool=None): + cluster_duplicate_check_interval=5, threadpool=None): """ Create the engine and store its config. Postpone lengthy initialisation for later so that it can be registered quickly for shutdown requests. @@ -75,6 +77,10 @@ def __init__(self, config, job_queue, db_con, analyzer_config, @type db_con: PeekabooDatabase @param analyzer_config: analyzer configuration @type analyzer_config: PeekabooAnalyzerConfig + @param cluster_duplicate_check_interval: How long to wait inbetween + checks for stale cluster + duplicate locks. + @type cluster_duplicate_check_interval: int """ self.config = config self.job_queue = job_queue @@ -83,6 +89,9 @@ def __init__(self, config, job_queue, db_con, analyzer_config, self.threadpool = threadpool self.cuckoo = None self.cortex = None + self.duplicate_handler = None + self.cluster_duplicate_handler = None + self.cluster_duplicate_check_interval = cluster_duplicate_check_interval self.rules = [] self.shutdown_requested = False @@ -179,6 +188,31 @@ async def start(self): rule.set_cortex_job_tracker(self.cortex) + if rule.uses_duplicate_handler: + if self.duplicate_handler is None: + logger.debug("Creating duplicate handler") + self.duplicate_handler = DuplicateHandler(self.job_queue) + + rule.set_duplicate_handler(self.duplicate_handler) + + if (rule.uses_cluster_duplicate_handler and + self.cluster_duplicate_check_interval): + if self.cluster_duplicate_handler is None: + logger.debug( + "Creating cluster duplicate handler with check " + "interval %d.", + self.cluster_duplicate_check_interval) + + self.cluster_duplicate_handler = ClusterDuplicateHandler( + self.job_queue, self.db_con, + self.cluster_duplicate_check_interval) + + awaitable = await self.cluster_duplicate_handler.start() + awaitables.append(awaitable) + + rule.set_cluster_duplicate_handler( + self.cluster_duplicate_handler) + self.rules.append(rule) # abort startup if we've been asked to shut down meanwhile @@ -225,6 +259,21 @@ async def run(self, sample): logger.info("%d: Rules evaluated", sample.id) + async def submit_duplicates(self, sample): + """ Submit potential local duplicates to the queue if this sample has + been finally analysed. The cluster duplicate handler will do + this directly during its polling for other instances analysing + the same sample. But we clear the in-flight lock the cluster duplicate + handler might have taken out here. + + @param sample: sample to check for withheld duplicates + @type sample: Sample """ + if self.cluster_duplicate_handler is not None: + await self.cluster_duplicate_handler.clear_sample_in_flight(sample) + + if self.duplicate_handler is not None: + await self.duplicate_handler.submit_duplicates(sample) + def shut_down_resources(self): """ Shut down dynamically allocated resources such as job trackers. """ @@ -234,6 +283,9 @@ def shut_down_resources(self): if self.cortex is not None: self.cortex.shut_down() + if self.cluster_duplicate_handler is not None: + self.cluster_duplicate_handler.shut_down() + def shut_down(self): """ Initiate asynchronous shutdown of the ruleset engine and dependent logic such as job trackers. """ diff --git a/peekaboo/ruleset/rules.py b/peekaboo/ruleset/rules.py index 9185b26..0fe9b87 100644 --- a/peekaboo/ruleset/rules.py +++ b/peekaboo/ruleset/rules.py @@ -52,6 +52,8 @@ class Rule: rule_name = 'unimplemented' uses_cuckoo = False uses_cortex = False + uses_duplicate_handler = False + uses_cluster_duplicate_handler = False def __init__(self, config, db_con, threadpool=None): """ Initialize common configuration and resources. @@ -67,6 +69,8 @@ def __init__(self, config, db_con, threadpool=None): self.cuckoo = None self.cortex = None + self.duplicate_handler = None + self.cluster_duplicate_handler = None # initialise and validate configuration self.config_options = {} @@ -144,6 +148,24 @@ def set_cortex_job_tracker(self, cortex): """ self.cortex = cortex + def set_duplicate_handler(self, duplicate_handler): + """ Set the duplicate handler to use for detecting and deferring + duplicate local analyses. + + @param duplicate_handler: the duplicate handler to use + @type duplicate_handler: DuplicateHandler + """ + self.duplicate_handler = duplicate_handler + + def set_cluster_duplicate_handler(self, cluster_duplicate_handler): + """ Set the cluster duplicate handler to use for detecting and + deferring duplicate local analyses. + + @param cluster_duplicate_handler: the cluster duplicate handler to use + @type cluster_duplicate_handler: ClusterDuplicateHandler + """ + self.cluster_duplicate_handler = cluster_duplicate_handler + async def get_cuckoo_report(self, sample): """ Get the samples cuckoo_report or submit the sample for analysis by Cuckoo. @@ -157,6 +179,17 @@ async def get_cuckoo_report(self, sample): if report is not None: return report + # submitting to Cuckoo is an expensive operation. So try to prevent + # redundant analyses by employing duplicate handlers. + if (self.duplicate_handler is not None and + await self.duplicate_handler.is_duplicate(sample)): + raise PeekabooAnalysisDeferred() + + if (self.cluster_duplicate_handler is not None and + await self.cluster_duplicate_handler.is_cluster_duplicate( + sample)): + raise PeekabooAnalysisDeferred() + logger.debug("%d: Submitting to Cuckoo", sample.id) try: job_id = await self.cuckoo.submit(sample) @@ -227,6 +260,17 @@ async def submit_to_cortex(self, sample, analyzer): ruleset run until result has been retrieved. """ + # submitting to Cortex is an expensive operation. So try to prevent + # redundant analyses by employing duplicate handlers. + if (self.duplicate_handler is not None and + await self.duplicate_handler.is_duplicate(sample)): + raise PeekabooAnalysisDeferred() + + if (self.cluster_duplicate_handler is not None and + await self.cluster_duplicate_handler.is_cluster_duplicate( + sample)): + raise PeekabooAnalysisDeferred() + logger.debug("%d: Submitting to Cortex", sample.id) try: job_id = await self.cortex.submit(sample, analyzer) @@ -413,6 +457,8 @@ def evaluate_report(self, report): class CuckooRule(Rule): """ A common base class for rules that evaluate the Cuckoo report. """ uses_cuckoo = True + uses_duplicate_handler = True + uses_cluster_duplicate_handler = True async def evaluate(self, sample): """ If a report is present for the sample in question we call method @@ -424,6 +470,7 @@ async def evaluate(self, sample): @param sample: The sample to evaluate. @raises PeekabooAnalysisDeferred: if the sample was submitted to Cuckoo + or is a local or cluster duplicate @returns: RuleResult containing verdict. """ report = await self.get_cuckoo_report(sample) @@ -686,6 +733,18 @@ def uses_cortex(self): class variable with a dynamic determination. """ return self.uses_identifier("cortexreport") + @property + def uses_duplicate_handler(self): + """ Tells if any expression uses the duplicate handler. Overrides base + class variable with a dynamic determination. """ + return self.uses_cuckoo or self.uses_cortex + + @property + def uses_cluster_duplicate_handler(self): + """ Tells if any expression uses the cluster duplicate handler. + Overrides base class variable with a dynamic determination. """ + return self.uses_cuckoo or self.uses_cortex + async def resolve_identifier(self, identifier, context, sample): """ Resolves a missing identifer into an object. diff --git a/peekaboo/server.py b/peekaboo/server.py index aa65b30..e81e9af 100644 --- a/peekaboo/server.py +++ b/peekaboo/server.py @@ -226,10 +226,8 @@ async def scan(self, request): return sanic.response.json( {'message': 'Failed to add analysis to database'}, 500) - if not await self.job_queue.submit(sample): - logger.error('Error submitting sample to job queue') - return sanic.response.json( - {'message': 'Error submitting sample to job queue'}, 500) + # can not fail since our queue is limitless + await self.job_queue.submit(sample) # send answer to client return sanic.response.json({'job_id': sample.id}, 200) diff --git a/peekaboo/toolbox/duplicates.py b/peekaboo/toolbox/duplicates.py new file mode 100644 index 0000000..d651243 --- /dev/null +++ b/peekaboo/toolbox/duplicates.py @@ -0,0 +1,339 @@ +############################################################################### +# # +# Peekaboo Extended Email Attachment Behavior Observation Owl # +# # +# toolbox/ # +# duplicates.py # +############################################################################### +# # +# Copyright (C) 2016-2022 science + computing ag # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or (at # +# your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, but # +# WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # +# General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +############################################################################### + +""" A local and cluster duplicate handler. """ + +import asyncio +import logging + +from peekaboo.exceptions import PeekabooDatabaseError + + +logger = logging.getLogger(__name__) + + +class DuplicateHandler: + """ A class to handle duplicate local analyses by deferring them. """ + def __init__(self, job_queue): + """ Initialize the object. """ + # keep a backlog of samples with identities identical to samples + # currently in analysis to avoid analysing multiple identical samples + # simultaneously. Once one analysis has finished, we can submit the + # others and the ruleset will notice that we already know the result. + self.duplicates = {} + self.duplock = asyncio.Lock() + + self.job_queue = job_queue + + async def is_duplicate(self, sample): + """ Check if another sample with the same identity is already being + analysed locally. If so, signal that processing of this new sample + should be deferred and remember it in a list of duplicates. + + @param sample: sample to check for duplicates + @type sample: Sample + """ + identity = await sample.identity + duplicate = False + resubmit = False + + # we have to lock this down because async routines called from here may + # allow us to be called again concurrently from the event loop + async with self.duplock: + # check if a sample with same identity is currently in flight + # locally + duplicates = self.duplicates.get(identity) + if duplicates is None: + # initialise a per-duplicate backlog for this sample which + # also serves as in-flight marker and submit to queue + self.duplicates[identity] = [] + else: + # record the to-be-submitted sample as duplicate + duplicate = True + duplicates.append(sample) + + if duplicate: + logger.debug( + "%d: Sample is local duplicate and should wait for running " + "analysis to finish", sample.id) + return True + + if resubmit: + logger.debug( + "%d: Sample has been resubmitted to job queue", sample.id) + return False + + logger.debug("%d: Sample is not a local duplicate", sample.id) + return False + + async def submit_duplicates(self, sample): + """ Check if any samples have been held from processing as duplicates + and submit them now. Clear the original sample whose duplicates have + been submitted from the in-flight list. + + @param sample: sample to check for duplicates + @type sample: Sample + """ + if not self.duplicates.keys(): + return + + identity = await sample.identity + submitted_duplicates = [] + + async with self.duplock: + # this sample simply might not have had any duplicates + if identity not in self.duplicates: + return + + # submit all samples which have accumulated in the backlog. The + # idea here it that they'll not reach any rule again which uses the + # duplicate handler because some kind of cached result is now + # available. If that's not the case then they'll be put in the + # backlog again and a single one will be allowed to go on, + # essentially serialising their processing in hopes that a final + # verdict will be reached and cached eventually. + for duplicate in self.duplicates[identity]: + submitted_duplicates.append(duplicate.id) + await self.job_queue.submit(duplicate) + + del self.duplicates[identity] + + logger.debug("%d: Cleared sample from local in-flight list", sample.id) + if len(submitted_duplicates) > 0: + logger.debug( + "Submitted duplicates from local backlog: %s", + submitted_duplicates) + + +class ClusterDuplicateHandler: + """ A housekeeper handling submission and cleanup of cluster duplicates. + """ + def __init__(self, job_queue, db_con, interval=5): + self.job_queue = job_queue + self.db_con = db_con + self.interval = interval + self.task = None + self.task_name = "ClusterDuplicateHandler" + + # keep a log of samples we've locked for processing ourselves + self.in_flight_locks = {} + + # keep a backlog of samples currently being processed by other + # instances so we can regularly try to resubmit them and re-use the + # other instances' cached results from the database + self.cluster_duplicates = {} + self.cluster_duplock = asyncio.Lock() + + async def is_cluster_duplicate(self, sample): + """ Check if a given sample is already being processed by another + instance in a cluster. + + @param sample: the sample to check for concurrent processing + @type sample: Sample + @returns: Return True if being processed concurrently, False otherwise. + """ + identity = await sample.identity + + # if we already hold a lock on this identity, whether it's this exact + # same sample or another, this is not a cluster duplicate. This ensures + # parallel processing of identical samples which have been held as + # cluster duplicates and have now been resubmitted in batch. The local + # duplicate handler might still serialise them though. + if self.in_flight_locks.get(identity) is not None: + return False + + cluster_duplicate = False + submitted_cluster_duplicates = [] + + # we have to lock this down because async routines called from here may + # allow us to be called again concurrently from the event loop + async with self.cluster_duplock: + # are we the first of potentially multiple instances working on + # this sample? + try: + locked = await self.db_con.mark_sample_in_flight(sample) + except PeekabooDatabaseError as dberr: + # on database error we weren't able to confirm it's a + # duplicate. So we potentially limp on with reduced throughput + # and duplicate analysis but we give it our best shot. + logger.error(dberr) + return False + + cluster_duplicates = self.cluster_duplicates.get(identity) + if locked: + self.in_flight_locks[identity] = True + + if cluster_duplicates: + # apparently we've delayed some samples before because they + # were in processing on another instance. Now we've + # received the same sample again and successfully locked + # it. So we can bounce these back to the queue where they + # will be held as local duplicates. + for duplicate in cluster_duplicates: + submitted_cluster_duplicates.append(duplicate.id) + await self.job_queue.submit(duplicate) + + del self.cluster_duplicates[identity] + else: + if cluster_duplicates is None: + self.cluster_duplicates[identity] = [] + + # another instance is working on this + cluster_duplicate = True + self.cluster_duplicates[identity].append(sample) + + if cluster_duplicate: + logger.debug( + "%d: Sample is concurrently processed by another instance " + "and held", sample.id) + return True + + if len(submitted_cluster_duplicates) > 0: + logger.debug( + "Submitted old cluster duplicates from backlog: %s", + submitted_cluster_duplicates) + + logger.debug("%d: Sample is not a cluster duplicate", sample.id) + return False + + async def clear_sample_in_flight(self, sample): + """ Clear in-flight lock on a sample. + + @param sample: the sample to check for finished processing + @type sample: Sample """ + identity = await sample.identity + + # nothing to do if we do not hold an in-flight lock on this sample + locked = self.in_flight_locks.get(identity) + if not locked: + return + + del self.in_flight_locks[identity] + + try: + await self.db_con.clear_sample_in_flight(sample) + except PeekabooDatabaseError as dberr: + logger.error(dberr) + + logger.debug( + "%d: Cleared sample from cluster in-flight list", sample.id) + + async def submit_cluster_duplicates(self): + """ Submit samples held while being processed by another cluster + instance back into the job queue if they have finished processing. """ + if not self.cluster_duplicates.keys(): + return + + submitted_cluster_duplicates = [] + + async with self.cluster_duplock: + # try to submit *all* samples which have been marked as being + # processed by another instance concurrently + # get the items view on a copy of the cluster duplicate backlog + # because we will change it by removing entries which would raise a + # RuntimeException + cluster_duplicates = self.cluster_duplicates.copy().items() + for identity, sample_duplicates in cluster_duplicates: + # try to mark as in-flight + try: + locked = await self.db_con.mark_sample_in_flight( + sample_duplicates[0]) + except PeekabooDatabaseError as dberr: + logger.error(dberr) + return False + + if locked: + self.in_flight_locks[identity] = True + + # submit all of the held-back samples at once. The local + # duplicate handler should kick in when processing it and + # delay all but one as local duplicates. This is sensible + # in case the analysis on the other instance failed and we + # have no result in the database yet. If all is well, this + # local canary analysis should finish analysis very quickly + # using the stored result, causing all the duplicates to be + # submitted and finish quickly as well. + for sample in sample_duplicates: + submitted_cluster_duplicates.append(sample.id) + await self.job_queue.submit(sample) + + del self.cluster_duplicates[identity] + + if len(submitted_cluster_duplicates) > 0: + logger.debug( + "Submitted cluster duplicates from backlog: %s", + submitted_cluster_duplicates) + + async def clear_stale_in_flight_samples(self): + """ Clear any stale in-flight sample logs from the database. """ + try: + cleared = await self.db_con.clear_stale_in_flight_samples() + except PeekabooDatabaseError as dberr: + logger.error(dberr) + cleared = False + + return cleared + + async def start(self): + """ Start the cluster duplicare handler. """ + self.task = asyncio.ensure_future(self.run()) + if hasattr(self.task, "set_name"): + self.task.set_name(self.task_name) + return self.task + + async def run(self): + """ Regularly check for withheld cluster duplicates, potentially + resubmit them to the queue and clean up stale lock entries. """ + logger.debug("Cluster duplicate handler started.") + + while True: + await asyncio.sleep(self.interval) + + logger.debug("Checking for samples in processing by other " + "instances to submit") + + await self.clear_stale_in_flight_samples() + await self.submit_cluster_duplicates() + + def shut_down(self): + """ Asynchronously initiate cluster duplicate handler shutdown. """ + logger.debug("Cluster duplicate handler shutdown requested.") + if self.task is not None: + self.task.cancel() + + async def close_down(self): + """ Wait for the cluster duplicate handler to close down and retrieve + any exceptions thrown. """ + if self.task is not None: + try: + await self.task + # we cancelled the task so a CancelledError is expected + except asyncio.CancelledError: + pass + except Exception: + logger.exception( + "Unexpected exception in cluster duplicate handler") + + logger.debug("Cluster duplicate handler shut down.")