Skip to content

Commit

Permalink
Merge branch 'dev' of github.com:biocore/qiita
Browse files Browse the repository at this point in the history
  • Loading branch information
antgonza committed Jan 13, 2025
2 parents c0e715b + ef26847 commit d8cb8db
Show file tree
Hide file tree
Showing 33 changed files with 693 additions and 261 deletions.
14 changes: 13 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
# Qiita changelog

Version 2025.01
---------------

Deployed on January 15th, 2025

* The Analysis owner is now displayed in the analysis list and the individual analysis page.
* Admins can now use the per-preparation "Download Data Release" button to get a "BIOM" release; this version is focus on NPH data releases.
* Improved complete_job creation time, which should result in Qiita jobs ([multiple steps](https://qiita.ucsd.edu/static/doc/html/dev/resource_allocation.html) finishing faster; for bencharks visit [patch 93.sql](https://github.com/qiita-spots/qiita/blob/master/qiita_db/support_files/patches/93.sql).
* SPP improvements: TellSeq support added; plugin refactored to allow for easier additions like TellSeq in the future. Job restart greatly improved. Much improved handling of sample-names and ids that contain substrings like ‘I1’ and ‘R2’. New SequenceCount job can count sequences and base-pairs in parallel for any list of fastq files.
* Other general fixes [#3440](https://github.com/qiita-spots/qiita/pull/3440), [#3445](https://github.com/qiita-spots/qiita/pull/3445), [#3446](https://github.com/qiita-spots/qiita/pull/3446),


Version 2024.10
---------------

Expand Down Expand Up @@ -206,7 +218,7 @@ Version 2021.11
* Allow chucked download of metadata files in analyses; this allows to process large meta-analysis (like those for The Microsetta Initiative) without worker blockage.
* Added to the qp-qiime2 plugin the possibility of filtering tables based on system available "FeatureData[Sequence]"; to start we added 90/100/150 bps bloom tables.
* Now we can instantiate a study via their title (Study.from_title); this will facilitate orchestration with qebil.
* Speed up Study listing for admins and general users; the admin study display came down from 20 to 2 seconds.
* Speed up Study listing for admins and general users; the admin study display came down from 20 to 2 seconds.
* Fixed the following issues: [3142](https://github.com/qiita-spots/qiita/issues/3142), [3149](https://github.com/qiita-spots/qiita/issues/3149), [3150](https://github.com/qiita-spots/qiita/issues/3150), [3119](https://github.com/qiita-spots/qiita/issues/3119), and [3160](https://github.com/qiita-spots/qiita/issues/3160).


Expand Down
2 changes: 1 addition & 1 deletion qiita_core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@
# The full license is in the file LICENSE, distributed with this software.
# -----------------------------------------------------------------------------

__version__ = "2024.10"
__version__ = "2025.01"
2 changes: 1 addition & 1 deletion qiita_db/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from . import user
from . import processing_job

__version__ = "2024.10"
__version__ = "2025.01"

__all__ = ["analysis", "artifact", "archive", "base", "commands",
"environment_manager", "exceptions", "investigation", "logger",
Expand Down
16 changes: 16 additions & 0 deletions qiita_db/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,22 @@ def create(cls, owner, name, description, from_default=False,
job.submit()
return instance

@classmethod
def delete_analysis_artifacts(cls, _id):
"""Deletes the artifacts linked to an artifact and then the analysis
Parameters
----------
_id : int
The analysis id
"""
analysis = cls(_id)
aids = [a.id for a in analysis.artifacts if not a.parents]
aids.sort(reverse=True)
for aid in aids:
qdb.artifact.Artifact.delete(aid)
cls.delete(analysis.id)

@classmethod
def delete(cls, _id):
"""Deletes an analysis
Expand Down
19 changes: 17 additions & 2 deletions qiita_db/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ def get_merging_scheme_from_job(cls, job):
acmd = job.command
parent = job.input_artifacts[0]
parent_pparameters = parent.processing_parameters
phms = None
if parent_pparameters is None:
parent_cmd_name = None
parent_parameters = None
Expand All @@ -125,12 +126,26 @@ def get_merging_scheme_from_job(cls, job):
parent_cmd_name = pcmd.name
parent_parameters = parent_pparameters.values
parent_merging_scheme = pcmd.merging_scheme

return qdb.util.human_merging_scheme(
if not parent_merging_scheme['ignore_parent_command']:
gp = parent.parents[0]
gp_params = gp.processing_parameters
if gp_params is not None:
gp_cmd = gp_params.command
phms = qdb.util.human_merging_scheme(
parent_cmd_name, parent_merging_scheme,
gp_cmd.name, gp_cmd.merging_scheme,
parent_parameters, [], gp_params.values)

hms = qdb.util.human_merging_scheme(
acmd.name, acmd.merging_scheme,
parent_cmd_name, parent_merging_scheme,
job.parameters.values, [], parent_parameters)

if phms is not None:
hms = qdb.util.merge_overlapping_strings(hms, phms)

return hms

@classmethod
def retrieve_feature_values(cls, archive_merging_scheme=None,
features=None):
Expand Down
4 changes: 3 additions & 1 deletion qiita_db/handlers/processing_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,9 @@ def post(self, job_id):
cmd, values_dict={'job_id': job_id,
'payload': self.request.body.decode(
'ascii')})
job = qdb.processing_job.ProcessingJob.create(job.user, params)
# complete_job are unique so it is fine to force them to be created
job = qdb.processing_job.ProcessingJob.create(
job.user, params, force=True)
job.submit()

self.finish()
Expand Down
4 changes: 2 additions & 2 deletions qiita_db/handlers/tests/test_processing_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,9 @@ def test_post_job_success(self):
self.assertIsNotNone(cj)
# additionally we can test that job.print_trace is correct
self.assertEqual(job.trace, [
f'{job.id} [Not Available]: Validate | '
f'{job.id} [Not Available] (success): Validate | '
'-p qiita -N 1 -n 1 --mem 90gb --time 150:00:00 --nice=10000',
f' {cj.id} [{cj.external_id}] | '
f' {cj.id} [{cj.external_id}] (success)| '
'-p qiita -N 1 -n 1 --mem 16gb --time 10:00:00 --nice=10000'])

def test_post_job_success_with_archive(self):
Expand Down
32 changes: 28 additions & 4 deletions qiita_db/metadata_template/prep_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def create(cls, md_template, study, data_type, investigation_type=None,
# data_type being created - if possible
if investigation_type is None:
if data_type_str in TARGET_GENE_DATA_TYPES:
investigation_type = 'Amplicon'
investigation_type = 'AMPLICON'
elif data_type_str == 'Metagenomic':
investigation_type = 'WGS'
elif data_type_str == 'Metatranscriptomic':
Expand Down Expand Up @@ -280,8 +280,22 @@ def delete(cls, id_):
qdb.sql_connection.TRN.add(sql, args)
archived_artifacts = set(
qdb.sql_connection.TRN.execute_fetchflatten())
ANALYSIS = qdb.analysis.Analysis
if archived_artifacts:
for aid in archived_artifacts:
# before we can delete the archived artifact, we need
# to delete the analyses where they were used.
sql = """SELECT analysis_id
FROM qiita.analysis
WHERE analysis_id IN (
SELECT DISTINCT analysis_id
FROM qiita.analysis_sample
WHERE artifact_id IN %s)"""
qdb.sql_connection.TRN.add(sql, [tuple([aid])])
analyses = set(
qdb.sql_connection.TRN.execute_fetchflatten())
for _id in analyses:
ANALYSIS.delete_analysis_artifacts(_id)
qdb.artifact.Artifact.delete(aid)

# Delete the prep template filepaths
Expand Down Expand Up @@ -794,14 +808,24 @@ def _get_node_info(workflow, node):

parent_cmd_name = None
parent_merging_scheme = None
phms = None
if pcmd is not None:
parent_cmd_name = pcmd.name
parent_merging_scheme = pcmd.merging_scheme
if not parent_merging_scheme['ignore_parent_command']:
phms = _get_node_info(workflow, parent)

return qdb.util.human_merging_scheme(
hms = qdb.util.human_merging_scheme(
ccmd.name, ccmd.merging_scheme, parent_cmd_name,
parent_merging_scheme, cparams, [], pparams)

# if the parent should not ignore its parent command, then we need
# to merge the previous result with the new one
if phms is not None:
hms = qdb.util.merge_overlapping_strings(hms, phms)

return hms

def _get_predecessors(workflow, node):
# recursive method to get predecessors of a given node
pred = []
Expand Down Expand Up @@ -857,7 +881,7 @@ def _get_predecessors(workflow, node):
'artifact transformation']
merging_schemes = {
qdb.archive.Archive.get_merging_scheme_from_job(j): {
x: y.id for x, y in j.outputs.items()}
x: str(y.id) for x, y in j.outputs.items()}
# we are going to select only the jobs that were a 'success', that
# are not 'hidden' and that have an output - jobs that are not
# hidden and a successs but that do not have outputs are jobs which
Expand Down Expand Up @@ -975,7 +999,7 @@ def _get_predecessors(workflow, node):
init_artifacts = {
wkartifact_type: f'{starting_job.id}:'}
else:
init_artifacts = {wkartifact_type: self.artifact.id}
init_artifacts = {wkartifact_type: str(self.artifact.id)}

cmds_to_create.reverse()
current_job = None
Expand Down
6 changes: 3 additions & 3 deletions qiita_db/metadata_template/test/test_prep_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -911,7 +911,7 @@ def _common_creation_checks(self, pt, fp_count, name):
self.assertEqual(pt.data_type(), self.data_type)
self.assertEqual(pt.data_type(ret_id=True), self.data_type_id)
self.assertEqual(pt.artifact, None)
self.assertEqual(pt.investigation_type, 'Amplicon')
self.assertEqual(pt.investigation_type, 'AMPLICON')
self.assertEqual(pt.study_id, self.test_study.id)
self.assertEqual(pt.status, "sandbox")
exp_sample_ids = {'%s.SKB8.640193' % self.test_study.id,
Expand Down Expand Up @@ -1076,7 +1076,7 @@ def test_create_warning(self):
self.assertEqual(pt.data_type(), self.data_type)
self.assertEqual(pt.data_type(ret_id=True), self.data_type_id)
self.assertEqual(pt.artifact, None)
self.assertEqual(pt.investigation_type, 'Amplicon')
self.assertEqual(pt.investigation_type, 'AMPLICON')
self.assertEqual(pt.study_id, self.test_study.id)
self.assertEqual(pt.status, 'sandbox')
exp_sample_ids = {'%s.SKB8.640193' % self.test_study.id,
Expand Down Expand Up @@ -1247,7 +1247,7 @@ def test_investigation_type_setter(self):
"""Able to update the investigation type"""
pt = qdb.metadata_template.prep_template.PrepTemplate.create(
self.metadata, self.test_study, self.data_type_id)
self.assertEqual(pt.investigation_type, 'Amplicon')
self.assertEqual(pt.investigation_type, 'AMPLICON')
pt.investigation_type = "Other"
self.assertEqual(pt.investigation_type, 'Other')
with self.assertRaises(qdb.exceptions.QiitaDBColumnError):
Expand Down
89 changes: 46 additions & 43 deletions qiita_db/processing_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -582,10 +582,10 @@ def create(cls, user, parameters, force=False):
TTRN = qdb.sql_connection.TRN
with TTRN:
command = parameters.command

# check if a job with the same parameters already exists
sql = """SELECT processing_job_id, email, processing_job_status,
COUNT(aopj.artifact_id)
if not force:
# check if a job with the same parameters already exists
sql = """SELECT processing_job_id, email,
processing_job_status, COUNT(aopj.artifact_id)
FROM qiita.processing_job
LEFT JOIN qiita.processing_job_status
USING (processing_job_status_id)
Expand All @@ -596,41 +596,42 @@ def create(cls, user, parameters, force=False):
GROUP BY processing_job_id, email,
processing_job_status"""

# we need to use ILIKE because of booleans as they can be
# false or False
params = []
for k, v in parameters.values.items():
# this is necessary in case we have an Iterable as a value
# but that is string
if isinstance(v, Iterable) and not isinstance(v, str):
for vv in v:
params.extend([k, str(vv)])
# we need to use ILIKE because of booleans as they can be
# false or False
params = []
for k, v in parameters.values.items():
# this is necessary in case we have an Iterable as a value
# but that is string
if isinstance(v, Iterable) and not isinstance(v, str):
for vv in v:
params.extend([k, str(vv)])
else:
params.extend([k, str(v)])

if params:
# divided by 2 as we have key-value pairs
len_params = int(len(params)/2)
sql = sql.format(' AND ' + ' AND '.join(
["command_parameters->>%s ILIKE %s"] * len_params))
params = [command.id] + params
TTRN.add(sql, params)
else:
params.extend([k, str(v)])

if params:
# divided by 2 as we have key-value pairs
len_params = int(len(params)/2)
sql = sql.format(' AND ' + ' AND '.join(
["command_parameters->>%s ILIKE %s"] * len_params))
params = [command.id] + params
TTRN.add(sql, params)
else:
# the sql variable expects the list of parameters but if there
# is no param we need to replace the {0} with an empty string
TTRN.add(sql.format(""), [command.id])

# checking that if the job status is success, it has children
# [2] status, [3] children count
existing_jobs = [r for r in TTRN.execute_fetchindex()
if r[2] != 'success' or r[3] > 0]
if existing_jobs and not force:
raise ValueError(
'Cannot create job because the parameters are the same as '
'jobs that are queued, running or already have '
'succeeded:\n%s' % '\n'.join(
["%s: %s" % (jid, status)
for jid, _, status, _ in existing_jobs]))
# the sql variable expects the list of parameters but if
# there is no param we need to replace the {0} with an
# empty string
TTRN.add(sql.format(""), [command.id])

# checking that if the job status is success, it has children
# [2] status, [3] children count
existing_jobs = [r for r in TTRN.execute_fetchindex()
if r[2] != 'success' or r[3] > 0]
if existing_jobs:
raise ValueError(
'Cannot create job because the parameters are the '
'same as jobs that are queued, running or already '
'have succeeded:\n%s' % '\n'.join(
["%s: %s" % (jid, status)
for jid, _, status, _ in existing_jobs]))

sql = """INSERT INTO qiita.processing_job
(email, command_id, command_parameters,
Expand Down Expand Up @@ -2052,23 +2053,25 @@ def complete_processing_job(self):
def trace(self):
""" Returns as a text array the full trace of the job, from itself
to validators and complete jobs"""
lines = [f'{self.id} [{self.external_id}]: '
lines = [f'{self.id} [{self.external_id}] ({self.status}): '
f'{self.command.name} | {self.resource_allocation_info}']
cjob = self.complete_processing_job
if cjob is not None:
lines.append(f' {cjob.id} [{cjob.external_id}] | '
lines.append(f' {cjob.id} [{cjob.external_id}] ({cjob.status})| '
f'{cjob.resource_allocation_info}')
vjob = self.release_validator_job
if vjob is not None:
lines.append(f' {vjob.id} [{vjob.external_id}] '
f'| {vjob.resource_allocation_info}')
f' ({vjob.status}) | '
f'{vjob.resource_allocation_info}')
for v in self.validator_jobs:
lines.append(f' {v.id} [{v.external_id}]: '
lines.append(f' {v.id} [{v.external_id}] ({v.status}): '
f'{v.command.name} | {v.resource_allocation_info}')
cjob = v.complete_processing_job
if cjob is not None:
lines.append(f' {cjob.id} [{cjob.external_id}] '
f'| {cjob.resource_allocation_info}')
f'({cjob.status}) | '
f'{cjob.resource_allocation_info}')
return lines


Expand Down
Loading

0 comments on commit d8cb8db

Please sign in to comment.