Source code for gmxapi.simulation.mdrun

#
# This file is part of the GROMACS molecular simulation package.
#
# Copyright (c) 2019, by the GROMACS development team, led by
# Mark Abraham, David van der Spoel, Berk Hess, and Erik Lindahl,
# and including many others, as listed in the AUTHORS file in the
# top-level source directory and at http://www.gromacs.org.
#
# GROMACS is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public License
# as published by the Free Software Foundation; either version 2.1
# of the License, or (at your option) any later version.
#
# GROMACS is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with GROMACS; if not, see
# http://www.gnu.org/licenses, or write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA.
#
# If you want to redistribute modifications to GROMACS, please
# consider that scientific software is very special. Version
# control is crucial - bugs must be traceable. We will be happy to
# consider code for inclusion in the official distribution, but
# derived work must not be called official GROMACS. Details are found
# in the README & COPYING files - if they are missing, get the
# official version at http://www.gromacs.org.
#
# To help us fund GROMACS development, we humbly ask that you cite
# the research papers on the package. Check out http://www.gromacs.org.

"""mdrun operation module

The mdrun operation (in its first draft) conforms to the user-level API, but does
not use the Python Context resource manager. It uses either the legacy 0.0.7
Context or its own Context, also implemented in this module.
"""

__all__ = ['mdrun']

import inspect
import os
import typing
from contextlib import contextmanager

import gmxapi
import gmxapi.abc
import gmxapi.operation as _op
from gmxapi import exceptions

# The following imports are not marked as public API.
from .abc import ModuleObject

from . import fileio
from . import workflow

# Initialize module-level logger
from gmxapi import logger as root_logger

logger = root_logger.getChild('mdrun')
logger.info('Importing {}'.format(__name__))


# Output in the gmxapi.operation Context.
# TODO: Consider using a single base class for the DataProxy, but have distinct
#  data descriptor behavior (or different descriptor implementations in different
#  subclasses) so that static code inspection can more easily determine the
#  attributes of the data proxies.
_output_descriptors = (
    _op.OutputDataDescriptor('_work_dir', str),
    _op.OutputDataDescriptor('trajectory', str),
    _op.OutputDataDescriptor('parameters', dict)
)
_publishing_descriptors = {desc._name: gmxapi.operation.Publisher(desc._name, desc._dtype) for desc in
                           _output_descriptors}
_output = _op.OutputCollectionDescription(**{descriptor._name: descriptor._dtype for descriptor in
                                             _output_descriptors})


class OutputDataProxy(_op.DataProxyBase,
                      descriptors=_output_descriptors):
    """Implement the 'output' attribute of MDRun operations."""


class PublishingDataProxy(_op.DataProxyBase,
                          descriptors=_publishing_descriptors):
    """Manage output resource updates for MDRun operation."""


_output_factory = _op.OutputFactory(output_proxy=OutputDataProxy,
                                    output_description=_output,
                                    publishing_data_proxy=PublishingDataProxy)


# Input in the gmxapi.operation Context for the dispatching runner.
# The default empty dictionary for parameters just means that there are no overrides
# to the parameters already provided in _simulation_input.
_input = _op.InputCollectionDescription(
    [('_simulation_input', inspect.Parameter('_simulation_input',
                                             inspect.Parameter.POSITIONAL_OR_KEYWORD,
                                             annotation=str)),
     ('parameters', inspect.Parameter('parameters',
                                      inspect.Parameter.POSITIONAL_OR_KEYWORD,
                                      annotation=dict,
                                      default=dict()))
     ])


def _standard_node_resource_factory(*args, **kwargs):
    """Translate Python UI input to the gmxapi.operation node builder inputs."""
    source_collection = _input.bind(*args, **kwargs)
    logger.info('mdrun input bound as source collection {}'.format(source_collection))
    return source_collection


@contextmanager
def scoped_communicator(original_comm, requested_size: int = None):
    from gmxapi.simulation.context import _acquire_communicator, _get_ensemble_communicator

    if requested_size is None:
        communicator = _acquire_communicator(communicator=original_comm)

    else:
        if original_comm is None or not hasattr(original_comm, 'Get_size'):
            raise exceptions.UsageError('A valid communicator must be provided when requesting a specific size.')
        original_comm_size = original_comm.Get_size()
        if original_comm_size < requested_size:
            raise exceptions.FeatureNotAvailableError(
                'Cannot produce a subcommunicator of size {} from a communicator of size {}.'.format(
                    requested_size,
                    original_comm_size
                ))
        assert original_comm_size >= requested_size
        communicator = _get_ensemble_communicator(original_comm, requested_size)

    try:
        yield communicator
    finally:
        communicator.Free()


class LegacyImplementationSubscription(object):
    """Input type representing a subscription to 0.0.7 implementation in gmxapi.operation.

    This input resource is a subscription to work that is dispatched to a sub-context.
    The resource can be created from the standard data of the simulation module.
    """

    def __init__(self, resource_manager: _op.ResourceManager):
        from .context import Context as LegacyContext
        import gmxapi._gmxapi as _gmxapi
        self._gmxapi = _gmxapi

        assert isinstance(resource_manager, _op.ResourceManager)
        # We use several details of the gmxapi.operation.Context.ResourceManager.
        # These dependencies can evolve into the subscription protocol between Contexts.

        # Configure and run a gmxapi 0.0.7 session.
        # 0. Determine ensemble width.
        # 1. Choose, create/check working directories.
        # 2. Create source TPR.
        # 3. Create workspec.
        # 3.5 Add plugin potentials, if any.
        # 4. Run.
        # 5. Collect outputs from context (including plugin outputs) and be ready to publish them.

        # Determine ensemble width
        ensemble_width = resource_manager.ensemble_width

        # Choose working directories
        # TODO: operation working directory naming scheme should be centrally well-defined.
        # Note that workflow.WorkSpec.uid is currently dependent on the input file parameter,
        # so we cannot create the input file path in the working directory based on WorkSpec.uid.
        workdir_list = ['{node}_{member}'.format(node=resource_manager.operation_id,
                                                 member=member)
                        for member in range(ensemble_width)]
        parameters_dict_list = [{}] * ensemble_width

        # This is a reasonable place to start using MPI ensemble implementation details.
        # We will want better abstraction in the future, but it is best if related filesystem
        # accesses occur from the same processes, consistently. Note that we already
        # handle some optimization and dependency reduction when the ensemble size is 1.
        # TODO: multithread and multiprocess alternatives to MPI ensemble management.

        # TODO: Allow user to provide communicator instead of implicitly getting COMM_WORLD
        with scoped_communicator(None) as context_comm:
            context_rank = context_comm.Get_rank()
            with scoped_communicator(context_comm, ensemble_width) as ensemble_comm:
                # Note that in the current implementation, extra ranks have nothing to do,
                # but they may have a dummy communicator, so be sure to skip those members
                # of the context_comm.
                if context_rank < ensemble_width:
                    assert ensemble_comm.Get_size() == ensemble_width
                    ensemble_rank = ensemble_comm.Get_rank()
                    # TODO: This should be a richer object that includes at least host information
                    #  and preferably the full gmxapi Future interface.
                    self.workdir = os.path.abspath(workdir_list[ensemble_rank])

                    with resource_manager.local_input(member=ensemble_rank) as input_pack:
                        source_file = input_pack.kwargs['_simulation_input']
                        parameters = input_pack.kwargs['parameters']
                        # If there are any other key word arguments to process from the gmxapi.mdrun
                        # factory call, do it here.

                    # TODO: We should really name this file with a useful input-dependent tag.
                    tprfile = os.path.join(self.workdir, 'topol.tpr')

                    expected_working_files = [tprfile]

                    if os.path.exists(self.workdir):
                        if os.path.isdir(self.workdir):
                            # Confirm that this is a restarted simulation.
                            # It is unspecified by the API, but at least through gmxapi 0.1,
                            # all simulations are initialized with a checkpoint file named state.cpt
                            # (see src/api/cpp/context.cpp)
                            checkpoint_file = os.path.join(self.workdir, 'state.cpp')
                            expected_working_files.append(checkpoint_file)

                            for file in expected_working_files:
                                if not os.path.exists(file):
                                    raise exceptions.ApiError(
                                        'Cannot determine working directory state: {}'.format(self.workdir))
                        else:
                            raise exceptions.ApiError(
                                'Chosen working directory path exists but is not a directory: {}'.format(self.workdir))
                    else:
                        # Build the working directory and input files.
                        os.mkdir(self.workdir)
                        sim_input = fileio.read_tpr(source_file)
                        for key, value in parameters.items():
                            try:
                                sim_input.parameters.set(key=key, value=value)
                            except _gmxapi.Exception as e:
                                raise exceptions.ApiError(
                                    'Bug encountered. Unknown error when trying to set simulation '
                                    'parameter {} to {}'.format(key, value)
                                ) from e

                        fileio.write_tpr_file(output=tprfile, input=sim_input)
                    logger.info('Created {} on rank {}'.format(tprfile, context_rank))

                    # Gather the actual outputs from the ensemble members.
                    if hasattr(ensemble_comm, 'allgather'):
                        # We should not assume that abspath expands the same on different MPI ranks.
                        workdir_list = ensemble_comm.allgather(self.workdir)
                        tpr_filenames = ensemble_comm.allgather(tprfile)
                        parameters = fileio.read_tpr(tprfile).parameters.extract()
                        parameters_dict_list = ensemble_comm.allgather(parameters)
                    else:
                        workdir_list = [os.path.abspath(workdir) for workdir in workdir_list]
                        # TODO: If we use better input file names, they need to be updated in multiple places.
                        tpr_filenames = [os.path.join(workdir, 'topol.tpr') for workdir in workdir_list]
                        parameters_dict_list = [fileio.read_tpr(tprfile).parameters.extract() for tprfile in tpr_filenames]

                    logger.debug('Context rank {} acknowledges working directories {}'.format(context_rank,
                                                                                             workdir_list))
                    logger.debug('Operation {}:{} will use {}'.format(resource_manager.operation_id,
                                                                      ensemble_rank,
                                                                      self.workdir
                                                                      ))
                    # TODO: We have not exposed the ability to pass any run time parameters to mdrun.
                    work = workflow.from_tpr(tpr_filenames)
                    self.workspec = work.workspec
                    context = LegacyContext(work=self.workspec, workdir_list=workdir_list, communicator=ensemble_comm)
                    self.simulation_module_context = context
                    # Go ahead and execute immediately. No need for lazy initialization in this basic case.
                    with self.simulation_module_context as session:
                        session.run()
                        # TODO: There may be some additional results that we need to extract...
                    # end: if context_rank < ensemble_width

                # end scoped_communicator: ensemble_comm

            if context_comm.Get_size() > 1:
                context_comm.bcast(workdir_list, root=0)
            # end scoped_communicator: context_comm

        self.workdir = workdir_list
        self.parameters = parameters_dict_list


class SubscriptionSessionResources(object):
    """Input and output run-time resources for a MDRun subscription.

    A generalization of this class is the probably the main hook for customizing the resources
    provided to the operation at run time.

    .. todo:: Better factoring of SessionResources, ResourceFactory, Director.resource_factory.
    """

    def __init__(self, input: LegacyImplementationSubscription, output: PublishingDataProxy):
        assert isinstance(input, LegacyImplementationSubscription)
        assert isinstance(output, PublishingDataProxy)
        self.output = output
        member_id = self.output._client_identifier
        # Before enabling the following, be sure we understand what is happening.
        # if member_id is None:
        #     member_id = 0
        self.workdir = input.workdir[member_id]
        self.parameters = input.parameters[member_id]


class SubscriptionPublishingRunner(object):
    """Handle execution in the gmxapi.operation context as a subscription to the gmxapi.simulation.context."""
    input_resource_factory = LegacyImplementationSubscription

    def __init__(self, resources: SubscriptionSessionResources):
        assert isinstance(resources, SubscriptionSessionResources)
        # Note that the resources contain a reference to a simulation ensemble that has already run.
        self.resources = resources

    def run(self):
        """Operation implementation in the gmxapi.operation module context."""
        publisher = self.resources.output
        publisher._work_dir = self.resources.workdir
        publisher.parameters = self.resources.parameters
        # TODO: Make the return value a trajectory handle rather than a file path.
        # TODO: Decide how to handle append vs. noappend output.
        # TODO: More rigorous handling of the trajectory file(s)
        # We have no way to query the name of the trajectory produced, and we
        # have avoided exposing the ability to specify it, so we have to assume
        # GROMACS default behavior.
        publisher.trajectory = os.path.join(self.resources.workdir, 'traj.trr')


_next_uid = 0


def _make_uid(input) -> str:
    # TODO: Use input fingerprint for more useful identification.
    salt = hash(input)
    global _next_uid
    new_uid = 'mdrun_{}_{}'.format(_next_uid, salt)
    _next_uid += 1
    return new_uid


#
# Implementation
#


class ResourceManager(gmxapi.operation.ResourceManager):
    """Manage resources for the MDRun operation in the gmxapi.operation contexts.

    Extends gmxapi.operation.ResourceManager to tolerate non-standard data payloads.
    Futures managed by this resource manager may contain additional attributes.
    """

    def future(self, name: str, description: _op.ResultDescription):
        tpr_future = super().future(name=name, description=description)
        return tpr_future

    def data(self) -> OutputDataProxy:
        return OutputDataProxy(self)

    def update_output(self):
        """Override gmxapi.operation.ResourceManager.update_output because we handle paralellism as 0.0.7."""
        # For the moment, this is copy-pasted from gmxapi.operation.ResourceManager,
        # but the only part we need to override is the ensemble handling at `for i in range(self.ensemble_width)`
        # TODO: Reimplement as the resource factory and director for the operation target context.
        if not self.done():
            self.__operation_entrance_counter += 1
            if self.__operation_entrance_counter > 1:
                raise exceptions.ProtocolError('Bug detected: resource manager tried to execute operation twice.')
            if not self.done():
                # TODO: rewrite with the pattern that this block is directing and then resolving an operation in the
                #  operation's library/implementation context.

                ###
                # Note: this is the resource translation from gmxapi.operation context
                # to the dispatching runner director. It uses details of the gmxapi.operation.Context
                # and of the operation.

                # TODO: Dispatch/discover this resource factory from a canonical place.
                assert hasattr(self._runner_director, 'input_resource_factory')
                # Create on all ranks.
                input = self._runner_director.input_resource_factory(self)
                # End of action of the InputResourceDirector[Context, MdRunSubscription].
                ###

                # We are giving the director a resource that contains the subscription
                # to the dispatched work.

                publishing_resources = self.publishing_resources()
                for member in range(self.ensemble_width):
                    with publishing_resources(ensemble_member=member) as output:
                        resources = self._resource_factory(input=input, output=output)
                        runner = self._runner_director(resources)
                        runner.run()


class StandardInputDescription(_op.InputDescription):
    """Provide the ReadTpr input description in gmxapi.operation Contexts."""

    def signature(self) -> _op.InputCollectionDescription:
        return _input

    def make_uid(self, input: _op.DataEdge) -> str:
        return _make_uid(input)


class RegisteredOperation(_op.OperationImplementation, metaclass=_op.OperationMeta):
    """Provide the gmxapi compatible ReadTpr implementation."""

    # This is a class method to allow the class object to be used in gmxapi.operation._make_registry_key
    @classmethod
    def name(self) -> str:
        """Canonical name for the operation."""
        return 'mdrun'

    @classmethod
    def namespace(self) -> str:
        """read_tpr is importable from the gmxapi module."""
        return 'gmxapi'

    @classmethod
    def director(cls, context: gmxapi.abc.Context) -> _op.OperationDirector:
        if isinstance(context, _op.Context):
            return StandardDirector(context)


class StandardOperationHandle(_op.AbstractOperation):
    """Handle used in Python UI or gmxapi.operation Contexts."""

    def __init__(self, resource_manager: ResourceManager):
        self.__resource_manager = resource_manager

    def run(self):
        self.__resource_manager.update_output()

    @property
    def output(self) -> OutputDataProxy:
        return self.__resource_manager.data()


class StandardDirector(gmxapi.abc.OperationDirector):
    """Direct the instantiation of a read_tpr node in a gmxapi.operation Context.

    .. todo:: Compose this behavior in a more generic class.

    .. todo:: Describe where instances live.
    """

    def __init__(self, context: _op.Context):
        if not isinstance(context, _op.Context):
            raise gmxapi.exceptions.ValueError('StandardDirector requires a gmxapi.operation Context.')
        self.context = context

    def __call__(self, resources: _op.DataSourceCollection, label: str = None) -> StandardOperationHandle:
        builder = self.context.node_builder(operation=RegisteredOperation, label=label)

        builder.set_resource_factory(SubscriptionSessionResources)
        builder.set_input_description(StandardInputDescription())
        builder.set_handle(StandardOperationHandle)
        # The runner in the gmxapi.operation context is the servicer for the legacy context.
        builder.set_runner_director(SubscriptionPublishingRunner)
        builder.set_output_factory(_output_factory)
        builder.set_resource_manager(ResourceManager)

        # Note: we have not yet done any dispatching based on *resources*. We should
        # translate the resources provided into the form that the Context can subscribe to
        # using the dispatching resource_factory. In the second draft, this operation
        # is dispatched to a SimulationModuleContext, which can be subscribed directly
        # to sources that are either literal filenames or gmxapi.simulation sources,
        # while standard Futures can be resolved in the standard context.
        #
        assert isinstance(resources, _op.DataSourceCollection)
        for name, source in resources.items():
            builder.add_input(name, source)

        handle = builder.build()
        assert isinstance(handle, StandardOperationHandle)
        return handle

    def handle_type(self, context: gmxapi.abc.Context):
        return StandardOperationHandle

    # Developer note: the Director instance is a convenient place to get a dispatching
    # factory. The Director may become generic or more universal, but the resource_factory
    # would likely not be typed on the generic parameters of the Director class.
    # Instead, it is likely a generic function with its own TypeVar parameters.
    def resource_factory(self,
                         source: typing.Union[gmxapi.abc.Context, ModuleObject, None],
                         target: gmxapi.abc.Context = None):
        # Distinguish between the UIContext, in which input is in the form
        # of function call arguments, and the StandardContext, implemented in
        # gmxapi.operation. UIContext is probably a virtual context that is
        # asserted by callers in order to get a factory that normalizes UI input
        # for the StandardContext.
        #
        if target is None:
            target = self.context
        if source is None:
            if isinstance(target, _op.Context):
                # Return a factory that can bind to function call arguments to produce a DataSourceCollection.
                return _standard_node_resource_factory
        if isinstance(source, _op.Context):
            return SubscriptionSessionResources
        if isinstance(source, ModuleObject):
            if isinstance(target, _op.Context):
                # We are creating a node in gmxapi.operation.Context from another gmxapi.simulation operation.
                # This means that we want to subscribe to the subcontext instead of the gmxapi.operation.Context.
                # In the first draft, though, we just access a special payload.
                # Return a factory that will consume *_simulation_input* and *parameters*
                # members of a received object.
                logger.info('Building mdrun operation from source {}'.format(source))

                def simulation_input_workaround(input):
                    source = input
                    if hasattr(source, 'output'):
                        source = input.output
                    assert hasattr(source, '_simulation_input')
                    assert hasattr(source, 'parameters')
                    logger.info('mdrun receiving input {}: {}'.format(source._simulation_input.name,
                                                                      source._simulation_input.description))
                    source_collection = _input.bind(_simulation_input=source._simulation_input,
                                                    parameters=source.parameters)
                    logger.info('mdrun input bound as source collection {}'.format(source_collection))
                    return source_collection

                return simulation_input_workaround

        raise gmxapi.exceptions.ValueError('No dispatching from {} context to {}'.format(source, target))


[docs]def mdrun(input, label: str = None, context=None): """MD simulation operation. Arguments: input : valid simulation input Returns: runnable operation to perform the specified simulation The *output* attribute of the returned operation handle contains dynamically determined outputs from the operation. `input` may be a TPR file name or a an object providing the SimulationInput interface. Note: New function names will be appearing to handle tasks that are separate "simulate" is plausibly a dispatcher or base class for various tasks dispatched by mdrun. Specific work factories are likely "minimize," "test_particle_insertion," "legacy_simulation" (do_md), or "simulation" composition (which may be leap-frog, vv, and other algorithms) """ handle_context = context if handle_context is not None: raise gmxapi.exceptions.NotImplementedError( 'context must be None. This factory is only for the Python UI right now.') target_context = _op.current_context() assert isinstance(target_context, _op.Context) # Get a director that will create a node in the standard context. node_director = _op._get_operation_director(RegisteredOperation, context=target_context) assert isinstance(node_director, StandardDirector) # TODO: refine this protocol assert handle_context is None # Examine the input. if isinstance(input, ModuleObject): # The input is from read_tpr or modify_input. source_context = input else: # Allow automatic dispatching source_context = None resource_factory = node_director.resource_factory(source=source_context, target=target_context) resources = resource_factory(input) handle = node_director(resources=resources, label=label) # Note: One effect of the assertions above is to help the type checker infer # the return type of the handle. It is hard to convince the type checker that # the return value of the node builder is up-cast. We might be able to resolve # this by making both get_operation_director and ReadTprImplementation # generics of the handle type, or using the handle type as the key for # get_operation_director. return handle