Source code for momba.model.automata

# -*- coding:utf-8 -*-
#
# Copyright (C) 2019-2021, Saarland University
# Copyright (C) 2019-2021, Maximilian Köhl <koehl@cs.uni-saarland.de>

from __future__ import annotations

import dataclasses as d
import typing as t

import collections

from mxu.maps import FrozenMap

from . import actions, errors, expressions, observations, types

if t.TYPE_CHECKING:
    from . import context


Annotation = t.Mapping[str, t.Union[int, str, float]]


[docs] @d.dataclass(frozen=True, eq=False) class Instance: """ Represents an automaton instance. Attributes ---------- automaton: The instaniated automaton. arguments: The arguments passed to the parameters of the automaton. input_enabled: The set of action types for which the instance is input enabled. comment: An optional comment describing the instance. """ automaton: Automaton arguments: t.Tuple[expressions.Expression, ...] = () input_enable: t.FrozenSet[actions.ActionType] = frozenset() comment: t.Optional[str] = None def __post_init__(self) -> None: if len(self.arguments) != len(self.automaton.parameters): raise errors.ModelingError("invalid number of arguments for automaton")
ProgressInvariant = t.Optional[expressions.Expression] TransientValues = t.AbstractSet["Assignment"]
[docs] @d.dataclass(frozen=True, eq=False) class Location: """ Represents a location of an automaton. Attributes: name: The optional name of the location. progress_invariant: An optional expression `progress_invariant` specifies the invariant subject to which the time may progress in the location. As per the JANI specification, the invariant is only allowed for models using real-valued clocks (see :class:`~momba.model.ModelType`). transient_values: A set of assignments for transient variables. These assignments define the values of the transient variables when in the location. """ name: t.Optional[str] = None progress_invariant: ProgressInvariant = None transient_values: TransientValues = frozenset()
[docs] def validate(self, automaton: Automaton) -> None: """ Validates the location for the given automaton. Raises :class:`~errors.ModelingError` if the location is invalid to add to the automaton according to the JANI specification. For instance, if the location has a `progress_invariant` but the model type does not support clocks. """ scope = automaton.scope if self.progress_invariant is not None: if not scope.ctx.model_type.uses_clocks: raise errors.ModelingError( f"location invariant is not allowed for model type {scope.ctx.model_type}" ) if scope.get_type(self.progress_invariant) != types.BOOL: raise errors.InvalidTypeError( f"type of invariant in location {self} is not `types.BOOL`" ) if self.transient_values: if not are_compatible(self.transient_values): raise errors.IncompatibleAssignmentsError( f"incompatible assignments for transient values in location {self}" ) for assignment in self.transient_values: if assignment.index != 0: raise errors.ModelingError( f"index of assignment {assignment} to transient variable is non-zero" ) assignment.validate(scope)
[docs] @d.dataclass(frozen=True) class Assignment: """ Represents an assignment. Attributes ---------- target: The target of the assignment. value: The value to assign. index: The index of the assignment. """ target: expressions.Expression value: expressions.Expression index: int = 0
[docs] def validate(self, scope: context.Scope) -> None: """ Validates the assignment in the given scope. Raises :class:`~errors.ModelingError` if the target is not assignable from the value type. """ target_type = self.target.infer_target_type(scope) value_type = scope.get_type(self.value) if not target_type.is_assignable_from(value_type): raise errors.InvalidTypeError( f"cannot assign {value_type} to {target_type}" )
[docs] def are_compatible(assignments: t.Iterable[Assignment]) -> bool: """ Checks whether the given assignments are compatible according to the JANI specification. """ groups: t.DefaultDict[int, t.Set[expressions.Expression]] = collections.defaultdict( set ) for assignment in assignments: target = assignment.target if target in groups[assignment.index]: return False groups[assignment.index].add(target) return True
[docs] @d.dataclass(frozen=True) class Destination: """ Represents a destination of an edge. Attributes ---------- location: The target location of the destination. probability: An optional expression for the probability of the destination. assignments: A set of assignments to be executed when going to the destination. """ location: Location probability: t.Optional[expressions.Expression] = None assignments: t.Tuple[Assignment, ...] = () def _validate(self, automaton: Automaton, scope: context.Scope) -> None: self.location.validate(automaton) if self.location not in automaton.locations: raise errors.ModelingError( f"source location of edge {self} is not a location of the automaton {automaton}" ) if not are_compatible(self.assignments): raise errors.IncompatibleAssignmentsError( f"assignments on edge {self} are not compatible" ) if self.probability is not None: if not scope.get_type(self.probability).is_numeric: raise errors.InvalidTypeError("probability value must be numeric") for assignment in self.assignments: assignment.validate(scope)
[docs] @d.dataclass(frozen=True) class Edge: """ Represents an edge of an automaton. Attributes ---------- location: The source location of the edge. destinations: The destinations of the edge. action_pattern: The optional action pattern of the edge. guard: The optional guard of the edge. rate: The optional rate of the edge. annotation: An optional annotation of the edge. """ location: Location destinations: t.Tuple[Destination, ...] action_pattern: t.Optional[actions.ActionPattern] = None guard: t.Optional[expressions.Expression] = None rate: t.Optional[expressions.Expression] = None annotation: t.Optional[Annotation] = None observation: t.FrozenSet[observations.Observation] = frozenset()
[docs] def create_edge_scope(self, parent: context.Scope) -> context.Scope: """ Creates an *edge scope* with the given parent scope. .. warning:: Used for *value passing* an experimental Momba feature. Value passing is not part of the official JANI specification. """ scope = parent.create_child_scope() if self.action_pattern is not None: self.action_pattern.declare_in(scope) return scope
[docs] def validate(self, automaton: Automaton) -> None: """ Validates the edge for the given automaton. Raises :class:`~errors.ModelingError` if the edge is invalid to add to the automaton according to the JANI specification. For instance, if source location is not a location of the automaton. """ scope = automaton.scope self.location.validate(automaton) if self.location not in automaton.locations: raise errors.ModelingError( f"source location of edge {self} is not a location of the automaton {automaton}" ) if self.rate is not None and not scope.get_type(self.rate).is_numeric: raise errors.InvalidTypeError(f"type of rate on edge {self} is not numeric") edge_scope = self.create_edge_scope(scope) if self.guard is not None and edge_scope.get_type(self.guard) != types.BOOL: raise errors.InvalidTypeError( f"type of guard on edge {self} is not `types.BOOL`" ) for destination in self.destinations: destination._validate(automaton, edge_scope)
[docs] class Automaton: """ Represents an automaton. Attributes ---------- ctx: The :class:`Context` associated with the automaton. scope: The local :class:`Scope` of the automaton. name: The optional name of the automaton. """ ctx: context.Context scope: context.Scope name: t.Optional[str] _parameters: t.List[str] _locations: t.Set[Location] _initial_locations: t.Set[Location] _initial_restriction: t.Optional[expressions.Expression] _edges: t.List[Edge] _incoming_edges: t.DefaultDict[Location, t.Set[Edge]] _outgoing_edges: t.DefaultDict[Location, t.Set[Edge]] def __init__(self, ctx: context.Context, *, name: t.Optional[str] = None) -> None: self.ctx = ctx self.scope = self.ctx.global_scope.create_child_scope() self.name = name self._parameters = [] self._locations = set() self._initial_locations = set() self._initial_restriction = None self._edges = [] self._incoming_edges = collections.defaultdict(set) self._outgoing_edges = collections.defaultdict(set) def __repr__(self) -> str: return f"<Automaton name={self.name!r} at 0x{id(self):x}>" @property def locations(self) -> t.AbstractSet[Location]: """ The set of locations of the automaton. """ return self._locations @property def initial_locations(self) -> t.AbstractSet[Location]: """ The set of initial locations of the automaton. """ return self._initial_locations @property def edges(self) -> t.Sequence[Edge]: """ The set of edges of the automaton. """ return self._edges @property def initial_restriction(self) -> t.Optional[expressions.Expression]: """ An optional restriction to be satisfied by initial states. This property can be set *only once* per automaton. The expression has to be boolean. Raises :class:`~errors.ModelingError` when set twice or to a non-boolean expression. """ return self._initial_restriction @initial_restriction.setter def initial_restriction(self, initial_restriction: expressions.Expression) -> None: if self._initial_restriction is not None: raise errors.ModelingError( "restriction of initial environment has already been set" ) if self.scope.get_type(initial_restriction) != types.BOOL: raise errors.ModelingError( "restriction of initial valuations must have type `types.BOOL`" ) self._initial_restriction = initial_restriction @property def parameters(self) -> t.Sequence[str]: """ The sequence of parameters of the automaton. """ return self._parameters
[docs] def add_location(self, location: Location, *, initial: bool = False) -> None: """ Adds a location to the automaton. The flag `initial` specifies whether the location is an initial location. Raises :class:`~errors.ModelingError` when the location cannot be added to the automaton. See the method :meth:`Location.validate` for details. """ location.validate(self) self._locations.add(location) if initial: self._initial_locations.add(location)
[docs] def create_location( self, name: t.Optional[str] = None, *, progress_invariant: t.Optional[expressions.Expression] = None, transient_values: t.AbstractSet[Assignment] = frozenset(), initial: bool = False, ) -> Location: """ Creates a location with the given name and adds it to the automaton. The optional expression `progress_invariant` specifies the invariant subject to which the time may progress in the location. As per the JANI specification, the invariant is only allowed for models using real-valued clocks (see :class:`~momba.model.ModelType`). The parameter `transient_values` should be a set of assignments for transient variables. These assignments define the values of the transient variables when in the location. The flag `initial` specifies whether the location is an initial location. Raises the same exceptions as :meth:`add_location`. """ location = Location(name, progress_invariant, transient_values) self.add_location(location, initial=initial) return location
[docs] def add_edge(self, edge: Edge) -> None: """ Adds an edge to the automaton. Raises :class:`~errors.ModelingError` when the edge cannot be added to the automaton. See the method :meth:`Edge.validate` for details. """ edge.validate(self) self._edges.append(edge) self._locations.add(edge.location) self._outgoing_edges[edge.location].add(edge) for destination in edge.destinations: self._locations.add(destination.location) self._incoming_edges[destination.location].add(edge)
[docs] def create_edge( self, source: Location, destinations: t.Iterable[Destination], *, action_pattern: t.Optional[actions.ActionPattern] = None, guard: t.Optional[expressions.Expression] = None, rate: t.Optional[expressions.Expression] = None, annotation: t.Optional[Annotation] = None, observations: t.AbstractSet[observations.Observation] = frozenset(), ) -> None: """ Creates an edge and adds it to the automaton. The parameters are passed to :class:`Edge`. Raises the same exceptions as :meth:`add_edge`. """ edge = Edge( source, tuple(destinations), action_pattern, guard, rate, FrozenMap(annotation), frozenset(observations), ) self.add_edge(edge)
[docs] def get_incoming_edges(self, location: Location) -> t.AbstractSet[Edge]: """ Returns the set of outgoing edges from the given location. """ return self._incoming_edges[location]
[docs] def get_outgoing_edges(self, location: Location) -> t.AbstractSet[Edge]: """ Returns the set of incoming edges to the given location. """ return self._outgoing_edges[location]
[docs] def declare_variable( self, name: str, typ: types.Type, *, is_transient: t.Optional[bool] = None, initial_value: t.Optional[expressions.Expression] = None, ) -> None: """ Declares a variable in the local scope of the automaton. Passes the parameters to :meth:`Scope.declare_variable`. """ self.scope.declare_variable( name, typ, is_transient=is_transient, initial_value=initial_value )
[docs] def declare_parameter( self, name: str, typ: types.Type, *, default_value: t.Optional[expressions.Expression] = None, ) -> None: """ Declarse a parameter of the automaton. Passes the parameters to :meth:`Scope.declare_constant` where `value` is `default_value`. """ self.scope.declare_constant(name, typ, value=default_value) self._parameters.append(name)
[docs] def create_instance( self, *, arguments: t.Sequence[expressions.ValueOrExpression] = (), input_enable: t.AbstractSet[actions.ActionType] = frozenset(), comment: t.Optional[str] = None, ) -> Instance: """ Creates an instance of the automaton for composition. Passes the parameters to :class:`Instance`. """ return Instance( self, arguments=tuple(map(expressions.ensure_expr, arguments)), input_enable=frozenset(input_enable), )
Assignments = t.Union[ t.Iterable["Assignment"], t.Mapping[str, "expressions.Expression"] ]
[docs] def create_destination( location: Location, *, probability: t.Optional[expressions.Expression] = None, assignments: Assignments = (), ) -> Destination: """ Creates a destination with the given target location. This is a convenience function for creating destinations where assignments can be provied as a mapping. We recommend using it instead of creating :class:`Destination` objects directly. """ if isinstance(assignments, t.Mapping): return Destination( location, probability, assignments=tuple( Assignment(expressions.Name(name), value) for name, value in assignments.items() ), ) else: return Destination(location, probability, tuple(assignments))