Source code for gmxapi.simulation.modify_input

# This file is part of the GROMACS molecular simulation package.
# Copyright (c) 2019, by the GROMACS development team, led by
# Mark Abraham, David van der Spoel, Berk Hess, and Erik Lindahl,
# and including many others, as listed in the AUTHORS file in the
# top-level source directory and at
# GROMACS is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public License
# as published by the Free Software Foundation; either version 2.1
# of the License, or (at your option) any later version.
# GROMACS is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public
# License along with GROMACS; if not, see
#, or write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA.
# If you want to redistribute modifications to GROMACS, please
# consider that scientific software is very special. Version
# control is crucial - bugs must be traceable. We will be happy to
# consider code for inclusion in the official distribution, but
# derived work must not be called official GROMACS. Details are found
# in the README & COPYING files - if they are missing, get the
# official version at
# To help us fund GROMACS development, we humbly ask that you cite
# the research papers on the package. Check out

"""modify_input operation module

Provides implementation classes and user interface for gmxapi.modify_input.

The *modify_input* operation accepts simulation input, such as from *read_tpr*,
and allows aspects of the simulation input to be overridden.

__all__ = ['modify_input']

import collections
import inspect
import typing

import gmxapi
import gmxapi.exceptions
import gmxapi.operation as _op
import gmxapi.typing
from gmxapi.operation import InputCollectionDescription
from .abc import ModuleObject

# Initialize module-level logger
from gmxapi import logger as root_logger

logger = root_logger.getChild('modify_input')'Importing {}'.format(__name__))

_output_descriptors = (
    _op.OutputDataDescriptor('_simulation_input', str),
    _op.OutputDataDescriptor('parameters', dict)
_publishing_descriptors = {desc._name: gmxapi.operation.Publisher(desc._name, desc._dtype) for desc in
_output = _op.OutputCollectionDescription(**{descriptor._name: descriptor._dtype for descriptor in

class OutputDataProxy(ModuleObject,
    """Implement the 'output' attribute of ReadTpr operations."""

    def __init__(self, *args, **kwargs):
        _op.DataProxyBase.__init__(self, *args, **kwargs)

class PublishingDataProxy(_op.DataProxyBase,
    """Manage output resource updates for ReadTpr operation."""

_output_factory = _op.OutputFactory(output_proxy=OutputDataProxy,

class SessionResources(object):
    """Input and output run-time resources for a ModifyTPR operation."""

    def __init__(self, _simulation_input: str, parameters: dict, publisher: PublishingDataProxy):
        """Initialize resources for a gmxapi.operation context.

            parameters: Source of new simulation parameters data.
            publisher: Output publishing resource provided by the Context.
            _simulation_input: Unspecified. Reserved for implementation details.

        # For the intial implementation, _simulation_input is just a string that
        # will be interpreted as a TPR file path. The typing will change in the
        # near term as we build the data flow model. Before 1.0, unusual members
        # like this will be removed or hidden in the interface of the other
        # input objects.
        assert isinstance(_simulation_input, str)
        self._simulation_input = _simulation_input
        assert isinstance(parameters,
        self.parameters = parameters
        self.output = publisher

# Helpers

# The '_simulation_input' input is a workaround until the data model is improved and the
# simulation module is more integrated.
# Note that when a read_tpr handle is passed with the "input" key word argument,
# its outputs will be mapped directly to modify_input inputs. The presence of
# a user-supplied 'parameters' argument overrides the 'parameters' value from 'input'.
# The lack of a default on 'parameters' makes it required.
_input = _op.InputCollectionDescription(
    [('_simulation_input', inspect.Parameter('_simulation_input',
     ('parameters', inspect.Parameter('parameters',

def _session_resource_factory(input: _op.InputPack, output: PublishingDataProxy) -> SessionResources:
    """Translate resources from the gmxapi.operation Context to the ReadTpr implementation."""
    filename = input.kwargs['_simulation_input']
    parameters = input.kwargs['parameters']
    return SessionResources(_simulation_input=filename, parameters=parameters, publisher=output)

def _standard_node_resource_factory(*args, **kwargs):
    """Translate Python UI input to the gmxapi.operation node builder inputs."""
    return _input.bind(*args, **kwargs)

def _run(resources: SessionResources):
    """Operation implementation in the gmxapi.operation module context."""
    # We combine the input from *input*, and key word arguments into the new output.
    for named_data in resources.output.keys():
        assert isinstance(named_data, str)
        assert hasattr(resources, named_data)
        source_value = getattr(resources, named_data)
        logger.debug('modify_input publishing {} to {}'.format(source_value, named_data))
        setattr(resources.output, named_data, source_value)

_next_uid = 0

def _make_uid(input) -> str:
    # TODO: Use input fingerprint for more useful identification.
    salt = hash(input)
    global _next_uid
    new_uid = 'read_tpr_{}_{}'.format(_next_uid, salt)
    _next_uid += 1
    return new_uid

class ResourceManager(gmxapi.operation.ResourceManager):

    def update_output(self):
        logger.debug('Updating output for {}.'.format(self.operation_id))
        for descriptor in _output_descriptors:
            name = descriptor._name
            if not self.is_done(name):
                raise gmxapi.exceptions.ApiError('Expected output {} not updated.'.format(name))

# Note: this is a class because we attach the input_description functionality,
# but all of its functionality can be composed from dispatching functions.
# TODO: Consider replacing this unique class with a pattern of composed instances of a common class.
class ResourceFactory(
    """ModifyInput resource factory."""

    def __init__(self, target_context, source_context):
        self.source_context = source_context  # Determine input form of *create* method.
        self.target_context = target_context  # Context in which resources are consumed.

    # TODO: clean up the dispatching. What to do when input comes from multiple sources?
    # Use a typing overload or a single-dispatch functor for clearer input/result typing.
    def __call__(self, *args, **kwargs):
        # context is used for dispatching and annotates the Context of the other arguments.
        # context == None implies inputs are from Python UI.
        if self.source_context is None:
            if isinstance(self.target_context, _op.Context):
                return _standard_node_resource_factory(*args, **kwargs)
        if isinstance(self.source_context, _op.Context):
            # TODO: Check whether the consumer is a Context.NodeBuilder or an operation runner.
            # We don't yet use this dispatcher for building nodes, so assume we are launching a session.
            assert 'input' in kwargs
            assert 'output' in kwargs
            return _session_resource_factory(input=kwargs['input'], output=kwargs['output'])
        raise gmxapi.exceptions.NotImplementedError(
            'No translation from {} context to {}'.format(self.source_context, self.target_context))

    def input_description(self, context: _op.Context) -> _op.InputDescription:

    def input_description(self, context:
        if isinstance(context, _op.Context):
            return StandardInputDescription()
        raise gmxapi.exceptions.NotImplementedError('No input description available for {} context'.format(context))

class StandardInputDescription(_op.InputDescription):
    """Provide the ReadTpr input description in gmxapi.operation Contexts."""

    def signature(self) -> InputCollectionDescription:
        return _input

    def make_uid(self, input: _op.DataEdge) -> str:
        return _make_uid(input)

class RegisteredOperation(_op.OperationImplementation, metaclass=_op.OperationMeta):
    """Provide the gmxapi compatible ReadTpr implementation."""

    # This is a class method to allow the class object to be used in gmxapi.operation._make_registry_key
    def name(self) -> str:
        """Canonical name for the operation."""
        return 'modify_input'

    def namespace(self) -> str:
        """read_tpr is importable from the gmxapi module."""
        return 'gmxapi'

    def director(cls, context: -> _op.OperationDirector:
        if isinstance(context, _op.Context):
            return StandardDirector(context)

class StandardOperationHandle(_op.AbstractOperation, ModuleObject):
    """Handle used in Python UI or gmxapi.operation Contexts."""

    def __init__(self, resource_manager: ResourceManager):
        self.__resource_manager = resource_manager

    def run(self):

    def output(self) -> OutputDataProxy:

class StandardDirector(
    """Direct the instantiation of a modify_input node in a gmxapi.operation Context.

    .. todo:: Compose this behavior in a more generic class.

    .. todo:: Describe where instances live.

    def __init__(self, context: _op.Context):
        if not isinstance(context, _op.Context):
            raise gmxapi.exceptions.ValueError('StandardDirector requires a gmxapi.operation Context.')
        self.context = context

    def __call__(self, resources: _op.DataSourceCollection, label: str = None) -> StandardOperationHandle:
        builder = self.context.node_builder(operation=RegisteredOperation, label=label)


        def runner_director(resources):
            def runner():

            return runner


        # Note: we have not yet done any dispatching based on *resources*. We should
        # translate the resources provided into the form that the Context can subscribe to
        # using the dispatching resource_factory.
        assert isinstance(resources, _op.DataSourceCollection)
        for name, source in resources.items():
            builder.add_input(name, source)

        handle =
        assert isinstance(handle, StandardOperationHandle)
        return handle

    def handle_type(self, context:
        return StandardOperationHandle

    def resource_factory(self,
                         source: typing.Union[, ModuleObject, None],
                         target: = None):
        # Distinguish between the UIContext, in which input is in the form
        # of function call arguments, and the StandardContext, implemented in
        # gmxapi.operation. UIContext is probably a virtual context that is
        # asserted by callers in order to get a factory that normalizes UI input
        # for the StandardContext.
        if target is None:
            target = self.context
        if source is None:
            if isinstance(target, _op.Context):
                # Return a factory that can bind to function call arguments to produce a DataSourceCollection.
                return ResourceFactory(target_context=target, source_context=source)
        if isinstance(source, _op.Context):
            return ResourceFactory(target_context=target, source_context=source)
        if isinstance(source, ModuleObject):
            if isinstance(target, _op.Context):
                # We are creating a node in gmxapi.operation.Context from another gmxapi.simulation operation.
                # This means that we want to subscribe to the subcontext instead of the gmxapi.operation.Context.
                # In the first draft, though, we just access a special payload.
                # Return a factory that will consume *_simulation_input* and *parameters*
                # members of a received object.
      'Building mdrun operation from source {}'.format(source))

                def simulation_input_workaround(input, parameters: dict):
                    source = input
                    # TODO: Normalize mechanism for obtaining SimulationInput references.
                    if hasattr(source, 'output'):
                        source = input.output
                    assert hasattr(source, '_simulation_input')
                    assert hasattr(source, 'parameters')
          'modify_input receiving input {}: {}'.format(,
                    source_collection = _input.bind(_simulation_input=source._simulation_input,
          'modify_input input bound as source collection {}'.format(source_collection))
                    return source_collection

                return simulation_input_workaround

        raise gmxapi.exceptions.ValueError('No dispatching from {} context to {}'.format(source, target))

# TODO: This operation is intended to be able to compose complete simulation input. E.g:
#  def modify_input(input: SimulationInput, parameters=None, topology=None, conformation=None, simulation_state=None):
[docs]def modify_input(input, parameters: dict, label: str = None, context=None): """Modify simulation input with data flow operations. Given simulation input *input*, override components of simulation input with additional arguments, such as *parameters*. """ handle_context = context if handle_context is not None: raise gmxapi.exceptions.NotImplementedError( 'context must be None. This factory is only for the Python UI right now.') target_context = _op.current_context() assert isinstance(target_context, _op.Context) # Get a director that will create a node in the standard context. node_director = _op._get_operation_director(RegisteredOperation, context=target_context) assert isinstance(node_director, StandardDirector) # TODO: refine this protocol assert handle_context is None # Examine the input. if isinstance(input, ModuleObject): # The input is from read_tpr or modify_input. source_context = input else: # Allow automatic dispatching source_context = None # TODO: Fix this protocol to do dispatching in the correct place. # The source Context here is None (the handle Context). The resources themselves # may be from different Contexts, so we should dispatch at the add_resource # builder method, not here in the director client. resource_factory = node_director.resource_factory(source=source_context, target=target_context) resources = resource_factory(input=input, parameters=parameters) handle = node_director(resources=resources, label=label) return handle