-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[wip] Storage manager update #679
base: main
Are you sure you want to change the base?
Changes from 4 commits
19991a3
6b81bb6
0571d28
ecd6716
37d448c
fb85a58
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -50,6 +50,7 @@ | |
settings, ChemicalSystem, LigandAtomMapping, Component, ComponentMapping, | ||
SmallMoleculeComponent, ProteinComponent, SolventComponent, | ||
) | ||
from gufe.storage import stagingregistry | ||
|
||
from .equil_rfe_settings import ( | ||
RelativeHybridTopologyProtocolSettings, SystemSettings, | ||
|
@@ -578,8 +579,10 @@ def __init__(self, *, | |
) | ||
|
||
def run(self, *, dry=False, verbose=True, | ||
scratch_basepath=None, | ||
shared_basepath=None) -> dict[str, Any]: | ||
scratch_basepath: pathlib.Path, | ||
shared_basepath: stagingregistry.StagingRegistry, | ||
permanent_basepath: stagingregistry.StagingRegistry, | ||
) -> dict[str, Any]: | ||
"""Run the relative free energy calculation. | ||
|
||
Parameters | ||
|
@@ -591,10 +594,12 @@ def run(self, *, dry=False, verbose=True, | |
verbose : bool | ||
Verbose output of the simulation progress. Output is provided via | ||
INFO level logging. | ||
scratch_basepath: Pathlike, optional | ||
Where to store temporary files, defaults to current working directory | ||
shared_basepath : Pathlike, optional | ||
Where to run the calculation, defaults to current working directory | ||
scratch_basepath: StagingDirectory | ||
Where to store temporary files | ||
shared_basepath : StagingDirectory | ||
Where to run the calculation | ||
permanent_basepath : StagingDirectory | ||
Where to store files that must persist beyond the DAG | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. update typing in docstring |
||
|
||
Returns | ||
------- | ||
|
@@ -609,11 +614,6 @@ def run(self, *, dry=False, verbose=True, | |
""" | ||
if verbose: | ||
self.logger.info("Preparing the hybrid topology simulation") | ||
if scratch_basepath is None: | ||
scratch_basepath = pathlib.Path('.') | ||
if shared_basepath is None: | ||
# use cwd | ||
shared_basepath = pathlib.Path('.') | ||
|
||
# 0. General setup and settings dependency resolution step | ||
|
||
|
@@ -664,11 +664,13 @@ def run(self, *, dry=False, verbose=True, | |
else: | ||
ffcache = None | ||
|
||
ffcache.register() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Problem 1 (easy): this line and the Problem 2 (more complicated): How is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. reading through, I think the only thing that would be cached would be the LJ parameters for the small molecules. We can probably remove this? |
||
|
||
system_generator = system_creation.get_system_generator( | ||
forcefield_settings=forcefield_settings, | ||
thermo_settings=thermo_settings, | ||
system_settings=system_settings, | ||
cache=ffcache, | ||
cache=ffcache.as_path(), | ||
has_solvent=solvent_comp is not None, | ||
) | ||
|
||
|
@@ -812,10 +814,18 @@ def run(self, *, dry=False, verbose=True, | |
) | ||
|
||
# a. Create the multistate reporter | ||
nc = shared_basepath / sim_settings.output_filename | ||
# TODO: Logic about keeping/not .nc files goes here | ||
nc = (shared_basepath / sim_settings.output_filename) | ||
checkpoint = (shared_basepath / sim_settings.checkpoint_storage) | ||
real_time_analysis = (shared_basepath / "real_time_analysis.yaml") | ||
# have to flag these files as being created so that they get brought back | ||
nc.register() | ||
checkpoint.register() | ||
real_time_analysis.register() | ||
|
||
chk = sim_settings.checkpoint_storage | ||
reporter = multistate.MultiStateReporter( | ||
storage=nc, | ||
storage=str(nc.as_path()), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're using this I'm not sure if this is implemented, but you should be able to instead just use You should only need to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah once it's MultiStateReporter wants a string (not path) and the other is going into a subprocess call, so again it's bypassing file-like objects and is a string. |
||
analysis_particle_indices=selection_indices, | ||
checkpoint_interval=sim_settings.checkpoint_interval.m, | ||
checkpoint_storage=chk, | ||
|
@@ -947,13 +957,12 @@ def run(self, *, dry=False, verbose=True, | |
sampling_method=sampler_settings.sampler_method.lower(), | ||
result_units=unit.kilocalorie_per_mole, | ||
) | ||
analyzer.plot(filepath=shared_basepath, filename_prefix="") | ||
analyzer.plot(filepath=permanent_basepath, filename_prefix="") | ||
analyzer.close() | ||
|
||
else: | ||
# clean up the reporter file | ||
fns = [shared_basepath / sim_settings.output_filename, | ||
shared_basepath / sim_settings.checkpoint_storage] | ||
fns = [nc.as_path(), checkpoint.as_path()] | ||
for fn in fns: | ||
os.remove(fn) | ||
finally: | ||
|
@@ -980,36 +989,40 @@ def run(self, *, dry=False, verbose=True, | |
|
||
if not dry: # pragma: no-cover | ||
return { | ||
'nc': nc, | ||
'last_checkpoint': chk, | ||
'nc': nc.as_path(), | ||
'last_checkpoint': checkpoint.as_path(), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably better to return the |
||
**analyzer.unit_results_dict | ||
} | ||
else: | ||
return {'debug': {'sampler': sampler}} | ||
|
||
@staticmethod | ||
def analyse(where) -> dict: | ||
def analyse(where: stagingregistry.StagingRegistry) -> dict: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is probably a |
||
# don't put energy analysis in here, it uses the open file reporter | ||
# whereas structural stuff requires that the file handle is closed | ||
ret = subprocess.run(['openfe_analysis', str(where)], | ||
trjdir = (where / '') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what's the purpose of doing this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was the question I was asking on slack. I needed a |
||
output = (where / 'results.json') | ||
ret = subprocess.run(['openfe_analysis', 'RFE_analysis', | ||
str(trjdir.as_path()), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I understand correctly, has the requirement that the files are already in the directory To run it on a different computer would require downloading all files within the directory. The best way to do that now would be to enumerate them here -- in principle there is a way to obtain all files with a given string prefix, but all of that gets deep into the weeds. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this method is called from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it is a little ugly that it needs to know if the trj was stored in scratch/shared/permanent, so there's probably something there I need to fix |
||
str(output.as_path())], | ||
stdout=subprocess.PIPE, | ||
stderr=subprocess.PIPE) | ||
if ret.returncode: | ||
return {'structural_analysis_error': ret.stderr} | ||
|
||
data = json.loads(ret.stdout) | ||
with open(output, 'r') as f: | ||
data = json.load(f) | ||
|
||
savedir = pathlib.Path(where) | ||
if d := data['protein_2D_RMSD']: | ||
fig = plotting.plot_2D_rmsd(d) | ||
fig.savefig(savedir / "protein_2D_RMSD.png") | ||
fig.savefig(where / "protein_2D_RMSD.png") | ||
plt.close(fig) | ||
f2 = plotting.plot_ligand_COM_drift(data['time(ps)'], data['ligand_wander']) | ||
f2.savefig(savedir / "ligand_COM_drift.png") | ||
f2.savefig(where / "ligand_COM_drift.png") | ||
plt.close(f2) | ||
|
||
f3 = plotting.plot_ligand_RMSD(data['time(ps)'], data['ligand_RMSD']) | ||
f3.savefig(savedir / "ligand_RMSD.png") | ||
f3.savefig(where / "ligand_RMSD.png") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (not really this line, but this is the closest changed line) You may want to include the figure filenames in the return value, too. Otherwise it will be harder to download them from the cloud. |
||
plt.close(f3) | ||
|
||
return {'structural_analysis': data} | ||
|
@@ -1020,7 +1033,8 @@ def _execute( | |
log_system_probe(logging.INFO, paths=[ctx.scratch]) | ||
with without_oechem_backend(): | ||
outputs = self.run(scratch_basepath=ctx.scratch, | ||
shared_basepath=ctx.shared) | ||
shared_basepath=ctx.shared, | ||
permanent_basepath=ctx.permanent) | ||
|
||
analysis_outputs = self.analyse(ctx.shared) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think these are
StagingRegistry
... aren't theyStagingPath
?The input to this method was, e.g.,
root/$DAG_LABEL/shared_$UNIT_LABEL
.StagingRegistry
would correspond toroot/
. The full path comes in as aStagingPath
, I believe (this is something that changed at one point, because there was no need to expose both classes to protocol developers; it also simplified serialization, IIRC.)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm working off these hints here, these three vars are just the context unpacked: https://github.com/OpenFreeEnergy/gufe/blob/staging-execute-dag/gufe/protocols/protocolunit.py#L30