#
# This file is part of the GROMACS molecular simulation package.
#
# Copyright (c) 2019, by the GROMACS development team, led by
# Mark Abraham, David van der Spoel, Berk Hess, and Erik Lindahl,
# and including many others, as listed in the AUTHORS file in the
# top-level source directory and at http://www.gromacs.org.
#
# GROMACS is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public License
# as published by the Free Software Foundation; either version 2.1
# of the License, or (at your option) any later version.
#
# GROMACS is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with GROMACS; if not, see
# http://www.gnu.org/licenses, or write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# If you want to redistribute modifications to GROMACS, please
# consider that scientific software is very special. Version
# control is crucial - bugs must be traceable. We will be happy to
# consider code for inclusion in the official distribution, but
# derived work must not be called official GROMACS. Details are found
# in the README & COPYING files - if they are missing, get the
# official version at http://www.gromacs.org.
#
# To help us fund GROMACS development, we humbly ask that you cite
# the research papers on the package. Check out http://www.gromacs.org.
"""modify_input operation module
Provides implementation classes and user interface for gmxapi.modify_input.
The *modify_input* operation accepts simulation input, such as from *read_tpr*,
and allows aspects of the simulation input to be overridden.
"""
__all__ = ['modify_input']
import collections
import inspect
import typing
import gmxapi
import gmxapi.abc
import gmxapi.exceptions
import gmxapi.operation as _op
import gmxapi.typing
from gmxapi.operation import InputCollectionDescription
from .abc import ModuleObject
# Initialize module-level logger
from gmxapi import logger as root_logger
logger = root_logger.getChild('modify_input')
logger.info('Importing {}'.format(__name__))
_output_descriptors = (
_op.OutputDataDescriptor('_simulation_input', str),
_op.OutputDataDescriptor('parameters', dict)
)
_publishing_descriptors = {desc._name: gmxapi.operation.Publisher(desc._name, desc._dtype) for desc in
_output_descriptors}
_output = _op.OutputCollectionDescription(**{descriptor._name: descriptor._dtype for descriptor in
_output_descriptors})
class OutputDataProxy(ModuleObject,
_op.DataProxyBase,
descriptors=_output_descriptors):
"""Implement the 'output' attribute of ReadTpr operations."""
def __init__(self, *args, **kwargs):
_op.DataProxyBase.__init__(self, *args, **kwargs)
class PublishingDataProxy(_op.DataProxyBase,
descriptors=_publishing_descriptors
):
"""Manage output resource updates for ReadTpr operation."""
_output_factory = _op.OutputFactory(output_proxy=OutputDataProxy,
output_description=_output,
publishing_data_proxy=PublishingDataProxy)
class SessionResources(object):
"""Input and output run-time resources for a ModifyTPR operation."""
def __init__(self, _simulation_input: str, parameters: dict, publisher: PublishingDataProxy):
"""Initialize resources for a gmxapi.operation context.
Arguments:
parameters: Source of new simulation parameters data.
publisher: Output publishing resource provided by the Context.
_simulation_input: Unspecified. Reserved for implementation details.
"""
# For the intial implementation, _simulation_input is just a string that
# will be interpreted as a TPR file path. The typing will change in the
# near term as we build the data flow model. Before 1.0, unusual members
# like this will be removed or hidden in the interface of the other
# input objects.
assert isinstance(_simulation_input, str)
self._simulation_input = _simulation_input
assert isinstance(parameters, collections.abc.Mapping)
self.parameters = parameters
self.output = publisher
#
# Helpers
#
# The '_simulation_input' input is a workaround until the data model is improved and the
# simulation module is more integrated.
# Note that when a read_tpr handle is passed with the "input" key word argument,
# its outputs will be mapped directly to modify_input inputs. The presence of
# a user-supplied 'parameters' argument overrides the 'parameters' value from 'input'.
# The lack of a default on 'parameters' makes it required.
_input = _op.InputCollectionDescription(
[('_simulation_input', inspect.Parameter('_simulation_input',
inspect.Parameter.POSITIONAL_OR_KEYWORD,
annotation=str)),
('parameters', inspect.Parameter('parameters',
inspect.Parameter.POSITIONAL_OR_KEYWORD,
annotation=dict))
])
def _session_resource_factory(input: _op.InputPack, output: PublishingDataProxy) -> SessionResources:
"""Translate resources from the gmxapi.operation Context to the ReadTpr implementation."""
filename = input.kwargs['_simulation_input']
parameters = input.kwargs['parameters']
return SessionResources(_simulation_input=filename, parameters=parameters, publisher=output)
def _standard_node_resource_factory(*args, **kwargs):
"""Translate Python UI input to the gmxapi.operation node builder inputs."""
return _input.bind(*args, **kwargs)
def _run(resources: SessionResources):
"""Operation implementation in the gmxapi.operation module context."""
# We combine the input from *input*, and key word arguments into the new output.
for named_data in resources.output.keys():
assert isinstance(named_data, str)
assert hasattr(resources, named_data)
source_value = getattr(resources, named_data)
logger.debug('modify_input publishing {} to {}'.format(source_value, named_data))
setattr(resources.output, named_data, source_value)
_next_uid = 0
def _make_uid(input) -> str:
# TODO: Use input fingerprint for more useful identification.
salt = hash(input)
global _next_uid
new_uid = 'read_tpr_{}_{}'.format(_next_uid, salt)
_next_uid += 1
return new_uid
class ResourceManager(gmxapi.operation.ResourceManager):
def update_output(self):
logger.debug('Updating output for {}.'.format(self.operation_id))
super().update_output()
for descriptor in _output_descriptors:
name = descriptor._name
if not self.is_done(name):
raise gmxapi.exceptions.ApiError('Expected output {} not updated.'.format(name))
# Note: this is a class because we attach the input_description functionality,
# but all of its functionality can be composed from dispatching functions.
# TODO: Consider replacing this unique class with a pattern of composed instances of a common class.
class ResourceFactory(gmxapi.abc.ResourceFactory):
"""ModifyInput resource factory."""
def __init__(self, target_context, source_context):
self.source_context = source_context # Determine input form of *create* method.
self.target_context = target_context # Context in which resources are consumed.
# TODO: clean up the dispatching. What to do when input comes from multiple sources?
# Use a typing overload or a single-dispatch functor for clearer input/result typing.
def __call__(self, *args, **kwargs):
# context is used for dispatching and annotates the Context of the other arguments.
# context == None implies inputs are from Python UI.
if self.source_context is None:
if isinstance(self.target_context, _op.Context):
return _standard_node_resource_factory(*args, **kwargs)
if isinstance(self.source_context, _op.Context):
# TODO: Check whether the consumer is a Context.NodeBuilder or an operation runner.
# We don't yet use this dispatcher for building nodes, so assume we are launching a session.
assert 'input' in kwargs
assert 'output' in kwargs
return _session_resource_factory(input=kwargs['input'], output=kwargs['output'])
raise gmxapi.exceptions.NotImplementedError(
'No translation from {} context to {}'.format(self.source_context, self.target_context))
@typing.overload
def input_description(self, context: _op.Context) -> _op.InputDescription:
...
def input_description(self, context: gmxapi.abc.Context):
if isinstance(context, _op.Context):
return StandardInputDescription()
raise gmxapi.exceptions.NotImplementedError('No input description available for {} context'.format(context))
class StandardInputDescription(_op.InputDescription):
"""Provide the ReadTpr input description in gmxapi.operation Contexts."""
def signature(self) -> InputCollectionDescription:
return _input
def make_uid(self, input: _op.DataEdge) -> str:
return _make_uid(input)
class RegisteredOperation(_op.OperationImplementation, metaclass=_op.OperationMeta):
"""Provide the gmxapi compatible ReadTpr implementation."""
# This is a class method to allow the class object to be used in gmxapi.operation._make_registry_key
@classmethod
def name(self) -> str:
"""Canonical name for the operation."""
return 'modify_input'
@classmethod
def namespace(self) -> str:
"""read_tpr is importable from the gmxapi module."""
return 'gmxapi'
@classmethod
def director(cls, context: gmxapi.abc.Context) -> _op.OperationDirector:
if isinstance(context, _op.Context):
return StandardDirector(context)
class StandardOperationHandle(_op.AbstractOperation, ModuleObject):
"""Handle used in Python UI or gmxapi.operation Contexts."""
def __init__(self, resource_manager: ResourceManager):
self.__resource_manager = resource_manager
def run(self):
self.__resource_manager.update_output()
@property
def output(self) -> OutputDataProxy:
return self.__resource_manager.data()
class StandardDirector(gmxapi.abc.OperationDirector):
"""Direct the instantiation of a modify_input node in a gmxapi.operation Context.
.. todo:: Compose this behavior in a more generic class.
.. todo:: Describe where instances live.
"""
def __init__(self, context: _op.Context):
if not isinstance(context, _op.Context):
raise gmxapi.exceptions.ValueError('StandardDirector requires a gmxapi.operation Context.')
self.context = context
def __call__(self, resources: _op.DataSourceCollection, label: str = None) -> StandardOperationHandle:
builder = self.context.node_builder(operation=RegisteredOperation, label=label)
builder.set_resource_factory(_session_resource_factory)
builder.set_input_description(StandardInputDescription())
builder.set_handle(StandardOperationHandle)
def runner_director(resources):
def runner():
_run(resources)
return runner
builder.set_runner_director(runner_director)
builder.set_output_factory(_output_factory)
builder.set_resource_manager(ResourceManager)
# Note: we have not yet done any dispatching based on *resources*. We should
# translate the resources provided into the form that the Context can subscribe to
# using the dispatching resource_factory.
assert isinstance(resources, _op.DataSourceCollection)
for name, source in resources.items():
builder.add_input(name, source)
handle = builder.build()
assert isinstance(handle, StandardOperationHandle)
return handle
def handle_type(self, context: gmxapi.abc.Context):
return StandardOperationHandle
def resource_factory(self,
source: typing.Union[gmxapi.abc.Context, ModuleObject, None],
target: gmxapi.abc.Context = None):
# Distinguish between the UIContext, in which input is in the form
# of function call arguments, and the StandardContext, implemented in
# gmxapi.operation. UIContext is probably a virtual context that is
# asserted by callers in order to get a factory that normalizes UI input
# for the StandardContext.
#
if target is None:
target = self.context
if source is None:
if isinstance(target, _op.Context):
# Return a factory that can bind to function call arguments to produce a DataSourceCollection.
return ResourceFactory(target_context=target, source_context=source)
if isinstance(source, _op.Context):
return ResourceFactory(target_context=target, source_context=source)
if isinstance(source, ModuleObject):
if isinstance(target, _op.Context):
# We are creating a node in gmxapi.operation.Context from another gmxapi.simulation operation.
# This means that we want to subscribe to the subcontext instead of the gmxapi.operation.Context.
# In the first draft, though, we just access a special payload.
# Return a factory that will consume *_simulation_input* and *parameters*
# members of a received object.
logger.info('Building mdrun operation from source {}'.format(source))
def simulation_input_workaround(input, parameters: dict):
source = input
# TODO: Normalize mechanism for obtaining SimulationInput references.
if hasattr(source, 'output'):
source = input.output
assert hasattr(source, '_simulation_input')
assert hasattr(source, 'parameters')
logger.info('modify_input receiving input {}: {}'.format(source._simulation_input.name,
source._simulation_input.description))
source_collection = _input.bind(_simulation_input=source._simulation_input,
parameters=parameters)
logger.info('modify_input input bound as source collection {}'.format(source_collection))
return source_collection
return simulation_input_workaround
raise gmxapi.exceptions.ValueError('No dispatching from {} context to {}'.format(source, target))
# TODO: This operation is intended to be able to compose complete simulation input. E.g:
# def modify_input(input: SimulationInput, parameters=None, topology=None, conformation=None, simulation_state=None):