Source code for gmxapi.simulation.read_tpr

#
# This file is part of the GROMACS molecular simulation package.
#
# Copyright (c) 2019, by the GROMACS development team, led by
# Mark Abraham, David van der Spoel, Berk Hess, and Erik Lindahl,
# and including many others, as listed in the AUTHORS file in the
# top-level source directory and at http://www.gromacs.org.
#
# GROMACS is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public License
# as published by the Free Software Foundation; either version 2.1
# of the License, or (at your option) any later version.
#
# GROMACS is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with GROMACS; if not, see
# http://www.gnu.org/licenses, or write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA.
#
# If you want to redistribute modifications to GROMACS, please
# consider that scientific software is very special. Version
# control is crucial - bugs must be traceable. We will be happy to
# consider code for inclusion in the official distribution, but
# derived work must not be called official GROMACS. Details are found
# in the README & COPYING files - if they are missing, get the
# official version at http://www.gromacs.org.
#
# To help us fund GROMACS development, we humbly ask that you cite
# the research papers on the package. Check out http://www.gromacs.org.

"""read_tpr operation module

Provides implementation classes and user interface for gmxapi.read_tpr.
"""

import inspect
import typing

import gmxapi
import gmxapi.abc
import gmxapi.exceptions
import gmxapi.operation as _op
import gmxapi.typing
from gmxapi.operation import InputCollectionDescription
from gmxapi.simulation.abc import ModuleObject

from . import fileio

# Initialize module-level logger
from gmxapi import logger as root_logger

logger = root_logger.getChild('read_tpr')
logger.info('Importing {}'.format(__name__))


#
# Interface classes and internal details
#

# TODO: The output of read_tpr and modify_input should be the same and part of the
#  simulation module specification. Such output is either a special type of output proxy
#  or Future.
_output_descriptors = (
    _op.OutputDataDescriptor('_simulation_input', str),
    _op.OutputDataDescriptor('parameters', dict)
)
_publishing_descriptors = {desc._name: gmxapi.operation.Publisher(desc._name, desc._dtype) for desc in
                           _output_descriptors}
_output = _op.OutputCollectionDescription(**{descriptor._name: descriptor._dtype for descriptor in
                                             _output_descriptors})



class OutputDataProxy(ModuleObject,
                      _op.DataProxyBase,
                      descriptors=_output_descriptors):
    """Implement the 'output' attribute of ReadTpr operations."""
    def __init__(self, *args, **kwargs):
        _op.DataProxyBase.__init__(self, *args, **kwargs)


class PublishingDataProxy(_op.DataProxyBase,
                          descriptors=_publishing_descriptors
                          ):
    """Manage output resource updates for ReadTpr operation."""


_output_factory = _op.OutputFactory(output_proxy=OutputDataProxy,
                                    output_description=_output,
                                    publishing_data_proxy=PublishingDataProxy)


class SessionResources(object):
    """Input and output run-time resources for a ReadTpr operation."""
    def __init__(self, tpr_filename, publisher: PublishingDataProxy):
        self.tpr_object = fileio.TprFile(filename=tpr_filename, mode='r')
        self.output = publisher


#
# Helpers
#


_input = _op.InputCollectionDescription(
    [('filename', inspect.Parameter('filename',
                                    inspect.Parameter.POSITIONAL_OR_KEYWORD,
                                    annotation=str))])


# TODO: Clarify. The actual input and output arguments passed are customized for this operation.
def _session_resource_factory(input: _op.InputPack, output: 'PublishingDataProxy') -> SessionResources:
    """Translate resources from the gmxapi.operation Context to the ReadTpr implementation."""
    filename = input.kwargs['filename']
    return SessionResources(tpr_filename=filename, publisher=output)


def _standard_node_resource_factory(*args, **kwargs):
    """Translate Python UI input to the gmxapi.operation node builder inputs."""
    return _input.bind(*args, **kwargs)


def _run(resources: SessionResources):
    """Operation implementation in the gmxapi.operation module context."""
    # TODO: Implement for other source/target contexts. We don't always need to
    #  produce all outputs.
    with resources.tpr_object as fh:
        params = fh._tprFileHandle.params().extract()
        resources.output.parameters = params
        resources.output._simulation_input = fh.filename


#
# Implementation
#

# Note: we borrow the implementation from operation.ResourceManager for now,
# but in the future we want the implementations to either be decoupled or
# for implementations in a given context to be coupled to details that are clearly
# and explicitly related to that context. Right now, operation.ResourceManager
# is tied to the implementation of Contexts in gmxapi.operation, but that is not
# sufficiently clear and explicit.
class ResourceManager(gmxapi.operation.ResourceManager):
    """Manage resources for the ReadTpr operation in the gmxapi.operation contexts.

    Extends gmxapi.operation.ResourceManager to tolerate non-standard data payloads.
    Futures managed by this resource manager may contain additional attributes.
    """
    def future(self, name: str, description: _op.ResultDescription):
        tpr_future = super().future(name=name, description=description)
        return tpr_future

    def data(self) -> OutputDataProxy:
        return OutputDataProxy(self)

    def update_output(self):
        logger.debug('Updating output for {}.'.format(self.operation_id))
        super().update_output()
        for descriptor in _output_descriptors:
            name = descriptor._name
            if not self.is_done(name):
                raise gmxapi.exceptions.ApiError('Expected output {} not updated.'.format(name))

# TODO: Consider making Generic in source and target context type variables,
#  or leave unspecified and use generic function or pair of single_dispatch functions.
#  Need to know the right home for input_description, if needed.
class ResourceFactory(gmxapi.abc.ResourceFactory):
    """ReadTpr resource factory.

    Generic class for creating resources passed to read_tpr implementation details.
    Dispatching may occur based on the source and target Context of factory action.
    """
    def __init__(self, target_context, source_context):
        """Initialize an instance to support read_tpr action.

        Arguments:
            source_context: The source of the resources in the calling scope.
            target_context: The Context in which the factory product will be consumed.

        """
        self.source_context = source_context  # Determine input form of *create* method.
        self.target_context = target_context  # Context in which resources are consumed.

    # TODO: clean up the dispatching. What to do when input comes from multiple sources?
    # Use a typing overload or a single-dispatch functor for clearer input/result typing.
    def __call__(self, *args, **kwargs):
        """Create the resource product.

        Dispatch to appropriate factory functionality based on the *source_context*
        and *target_context*.
        """
        # context is used for dispatching and annotates the Context of the other arguments.
        # context == None implies inputs are from Python UI.
        if self.source_context is None:
            if isinstance(self.target_context, _op.Context):
                return _standard_node_resource_factory(*args, **kwargs)
        if isinstance(self.source_context, _op.Context):
            # TODO: Check whether the consumer is a Context.NodeBuilder or an operation runner.
            # We don't yet use this dispatcher for building nodes, so assume we are launching a session.
            assert 'input' in kwargs
            assert 'output' in kwargs
            return _session_resource_factory(input=kwargs['input'], output=kwargs['output'])
        raise gmxapi.exceptions.NotImplementedError(
            'No translation from {} context to {}'.format(self.source_context, self.target_context))

    @typing.overload
    def input_description(self, context: _op.Context) -> _op.InputDescription:
        ...

    def input_description(self, context: gmxapi.abc.Context):
        """Get an input description usable in the indicated Context.

        Arguments:
            context: Context for which to dispatch the generation of input description.

        Overrides gmxapi.abc.ResourceFactory for collaboration between this
        resource factory and a target Context.
        """
        # TODO: Clarify exposure and scope as gmxapi.abc.ResourceFactory is refined.
        # The expected use case is that a Context implementation may consult the
        # input_description when preparing input to a resource factory, or even
        # when determining in which Context a resource should be created or an
        # operation dispatched. It seems sensible that the ResourceFactory should
        # be able to describe its possible inputs. Instance functions are the
        # most common and simple Python detail, so it makes sense for this to be
        # an instance method instead of classmethod or staticmethod, until shown
        # otherwise.
        # Also note that ResourceFactory is likely to be a Generic class or a
        # container for composed functionality. Customizing method signatures
        # based on instance data is more brittle than behavior determined at the
        # class level. The behavior of this function is determined at the class
        # level to be a dispatcher, which may utilize instance data, but as an
        # implementation detail that is not the business of the caller.
        # This should probably be implemented in terms of the standard Python
        # functools.single_dispatch generic function, but it is cleaner if the
        # method itself is not generic beyond the level of typing overloads.
        if isinstance(context, _op.Context):
            return StandardInputDescription()
        raise gmxapi.exceptions.NotImplementedError('No input description available for {} context'.format(context))


class StandardInputDescription(_op.InputDescription):
    """Provide the ReadTpr input description in gmxapi.operation Contexts."""

    # TODO: Improve fingerprinting.
    # If _make_uid can't make sufficiently unique IDs, use additional "salt":
    # _next_uid = 0

    @staticmethod
    def _make_uid(input) -> str:
        # TODO: Use input fingerprint for more useful identification.
        salt = hash(input)
        # If can't make sufficiently unique IDs, use additional "salt"
        # from a class data member. E.g.
        #     new_uid = 'read_tpr_{}_{}'.format(_next_uid, salt)
        #     _next_uid += 1
        new_uid = 'read_tpr_{}'.format(salt)
        return new_uid

    def signature(self) -> InputCollectionDescription:
        return _input

    def make_uid(self, input: _op.DataEdge) -> str:
        assert isinstance(input, _op.DataEdge)
        return self._make_uid(input)


class RegisteredOperation(_op.OperationImplementation, metaclass=_op.OperationMeta):
    """Provide the gmxapi compatible ReadTpr implementation."""

    # This is a class method to allow the class object to be used in gmxapi.operation._make_registry_key
    @classmethod
    def name(self) -> str:
        """Canonical name for the operation."""
        return 'read_tpr'

    @classmethod
    def namespace(self) -> str:
        """read_tpr is importable from the gmxapi module."""
        return 'gmxapi'

    @classmethod
    def director(cls, context: gmxapi.abc.Context) -> _op.OperationDirector:
        if isinstance(context, _op.Context):
            return StandardDirector(context)
        raise gmxapi.exceptions.NotImplementedError(
            'No dispatcher for context {} of type {}'.format(context, type(context)))


class StandardOperationHandle(_op.AbstractOperation, ModuleObject):
    """Handle used in Python UI or gmxapi.operation Contexts."""
    def __init__(self, resource_manager: ResourceManager):
        self.__resource_manager = resource_manager

    def run(self):
        self.__resource_manager.update_output()

    @property
    def output(self) -> OutputDataProxy:
        return self.__resource_manager.data()


class StandardDirector(gmxapi.abc.OperationDirector):
    """Direct the instantiation of a read_tpr node in a gmxapi.operation Context.

    .. todo:: Compose this behavior in a more generic class.

    .. todo:: Describe where instances live.
    """
    def __init__(self, context: _op.Context):
        if not isinstance(context, _op.Context):
            raise gmxapi.exceptions.ValueError('StandardDirector requires a gmxapi.operation Context.')
        self.context = context

    def __call__(self, resources: _op.DataSourceCollection, label: str) -> StandardOperationHandle:
        builder = self.context.node_builder(operation=RegisteredOperation, label=label)

        builder.set_resource_factory(_session_resource_factory)
        builder.set_input_description(StandardInputDescription())
        builder.set_handle(StandardOperationHandle)

        def runner_director(resources):
            def runner():
                _run(resources)
            return runner

        builder.set_runner_director(runner_director)
        builder.set_output_factory(_output_factory)

        # Note: we have not yet done any dispatching based on *resources*. We should
        # translate the resources provided into the form that the Context can subscribe to
        # using the dispatching resource_factory. In the second draft, this operation
        # is dispatched to a SimulationModuleContext, which can be subscribed directly
        # to sources that are either literal filenames or gmxapi.simulation sources,
        # while standard Futures can be resolved in the standard context.
        #
        assert isinstance(resources, _op.DataSourceCollection)
        assert 'filename' in resources
        builder.add_input('filename', resources['filename'])

        handle = builder.build()
        assert isinstance(handle, StandardOperationHandle)
        return handle

    def handle_type(self, context: gmxapi.abc.Context):
        return StandardOperationHandle

    def resource_factory(self,
                         source: typing.Union[gmxapi.abc.Context, None],
                         target: gmxapi.abc.Context = None) -> ResourceFactory:
        # Distinguish between the UIContext, in which input is in the form
        # of function call arguments, and the StandardContext, implemented in
        # gmxapi.operation. UIContext is probably a virtual context that is
        # asserted by callers in order to get a factory that normalizes UI input
        # for the StandardContext.
        #
        if target is None:
            target = self.context
        if source is None:
            if isinstance(target, _op.Context):
                return ResourceFactory(target_context=target, source_context=source)
        if isinstance(source, _op.Context):
            return ResourceFactory(target_context=target, source_context=source)
        raise gmxapi.exceptions.ValueError('No dispatching from {} context to {}'.format(source, target))


[docs]def read_tpr(filename, label: str = None, context=None): """ Arguments: filename: input file name label: optional human-readable label with which to tag the new node context: Context in which to return a handle to the new node. Use default (None) for Python scripting interface Returns: Reference (handle) to the new operation instance (node). """ handle_context = context if handle_context is not None: raise gmxapi.exceptions.NotImplementedError( 'context must be None. This factory is only for the Python UI right now.') # 1. Handle node creation in the scripting interface. # When *context* input is None, dispatch to the current Context. Confirm that # it is a standard context from the gmxapi.operation module, translate the # input into that context, and create the node. # 2. Dispatch to SimulationModuleContext. # Operation is not fully implemented in gmxapi.operation context. When creating # a node in a gmxapi.operation context, dispatch and subscribe to a SimulationModuleContext, # in which # 3. Handle consumption in SimulationModuleContext. # When a consuming operation is native to the SimulationModuleContext, # detect that a richer interface is available and use it. Possible implementation: # Chain of Responsibility: subscribe() is serviced by the Context "closest" # to the source. subscriber is the most native compatible Context for the consuming operation # but decays to the context in which the handle is being created. subscribe() # can accept a callback function and an indication of the form of the message # to send (a consuming Context or ResourceFactory). # TODO: Other types of input, such as File placeholders. target_context = _op.current_context() assert isinstance(target_context, _op.Context) # Get a director that will create a node in the standard context. node_director = _op._get_operation_director(RegisteredOperation, context=target_context) assert isinstance(node_director, StandardDirector) # TODO: refine this protocol assert handle_context is None resource_factory = node_director.resource_factory(source=handle_context, target=target_context) resources = resource_factory(filename=filename) handle = node_director(resources=resources, label=label) # Note: One effect of the assertions above is to help the type checker infer # the return type of the handle. It is hard to convince the type checker that # the return value of the node builder is up-cast. We might be able to resolve # this by making both get_operation_director and ReadTprImplementation # generics of the handle type, or using the handle type as the key for # get_operation_director. return handle