Skip to content

Commit

Permalink
worker: Log which outputs are missing when task is unexpectedly incom…
Browse files Browse the repository at this point in the history
…plete (#3258)

* worker: Log which outputs are missing when task is unexpectedly incomplete
  • Loading branch information
progval authored Oct 21, 2023
1 parent 319ce20 commit d86b97e
Show file tree
Hide file tree
Showing 12 changed files with 41 additions and 1 deletion.
3 changes: 3 additions & 0 deletions luigi/contrib/dropbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,9 @@ def __init__(self, path, token, format=None, user_agent="Luigi"):
self.client = DropboxClient(token, user_agent)
self.format = format or luigi.format.get_default_format()

def __str__(self):
return self.path

@property
def fs(self):
return self.client
Expand Down
3 changes: 3 additions & 0 deletions luigi/contrib/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,9 @@ def __init__(self, table, partition, database='default', fail_missing_table=True
self.client = client or get_default_client()
self.fail_missing_table = fail_missing_table

def __str__(self):
return self.path

def exists(self):
"""
returns `True` if the partition/table exists
Expand Down
3 changes: 3 additions & 0 deletions luigi/contrib/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ def __init__(self, mongo_client, index, collection):
self._index = index
self._collection = collection

def __str__(self):
return f'{self._index}/{self._collection}'

def get_collection(self):
"""
Return targeted mongo collection to query on
Expand Down
3 changes: 3 additions & 0 deletions luigi/contrib/mssqldb.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ def __init__(self, host, database, user, password, table, update_id):
self.table = table
self.update_id = update_id

def __str__(self):
return self.table

def touch(self, connection=None):
"""
Mark this update as complete.
Expand Down
3 changes: 3 additions & 0 deletions luigi/contrib/mysqldb.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ def __init__(self, host, database, user, password, table, update_id, **cnx_kwarg
self.update_id = update_id
self.cnx_kwargs = cnx_kwargs

def __str__(self):
return self.table

def touch(self, connection=None):
"""
Mark this update as complete.
Expand Down
3 changes: 3 additions & 0 deletions luigi/contrib/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,9 @@ def __init__(
self.table = table
self.update_id = update_id

def __str__(self):
return self.table

def touch(self, connection=None):
"""
Mark this update as complete.
Expand Down
3 changes: 3 additions & 0 deletions luigi/contrib/presto.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ def __init__(self, client, catalog, database, table, partition=None):
self._client = client
self._count = None

def __str__(self):
return self.table

@property
def _count_query(self):
partition = OrderedDict(self.partition or {1: 1})
Expand Down
3 changes: 3 additions & 0 deletions luigi/contrib/redis_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ def __init__(self, host, port, db, update_id, password=None,
socket_timeout=self.socket_timeout,
)

def __str__(self):
return self.marker_key()

def marker_key(self):
"""
Generate a key for the indicator hash.
Expand Down
3 changes: 3 additions & 0 deletions luigi/contrib/simulate.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ def __init__(self, task_obj):
shutil.rmtree(path)
logger.debug('Deleted temporary directory %s', path)

def __str__(self):
return self.task_id

def get_path(self):
"""
Returns a temporary file path based on a MD5 hash generated with the task's name and its arguments
Expand Down
3 changes: 3 additions & 0 deletions luigi/contrib/sqla.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ def __init__(self, connection_string, target_table, update_id, echo=False, conne
self.connect_args = connect_args
self.marker_table_bound = None

def __str__(self):
return self.target_table

@property
def engine(self):
"""
Expand Down
3 changes: 3 additions & 0 deletions luigi/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ def __init__(self, path):
# cast to str to allow path to be objects like pathlib.PosixPath and py._path.local.LocalPath
self.path = str(path)

def __str__(self):
return self.path

@property
@abc.abstractmethod
def fs(self):
Expand Down
9 changes: 8 additions & 1 deletion luigi/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,14 @@ def run(self):
# checking completeness of self.task so outputs of dependencies are
# irrelevant.
if self.check_unfulfilled_deps and not _is_external(self.task):
missing = [dep.task_id for dep in self.task.deps() if not self.check_complete(dep)]
missing = []
for dep in self.task.deps():
if not self.check_complete(dep):
nonexistent_outputs = [output for output in dep.output() if not output.exists()]
if nonexistent_outputs:
missing.append(f'{dep.task_id} ({", ".join(map(str, nonexistent_outputs))})')
else:
missing.append(dep.task_id)
if missing:
deps = 'dependency' if len(missing) == 1 else 'dependencies'
raise RuntimeError('Unfulfilled %s at run time: %s' % (deps, ', '.join(missing)))
Expand Down

0 comments on commit d86b97e

Please sign in to comment.