Source code for nisystemlink.clients.tag._tag_subscription

# -*- coding: utf-8 -*-

"""Implementation of TagSubscription."""

import abc
import contextlib
import datetime
import weakref
from types import TracebackType
from typing import Iterable, List, Optional, Type

import events
from nisystemlink.clients import core, tag as tbase
from nisystemlink.clients.tag._core._manual_reset_timer import ManualResetTimer


[docs]class TagSubscription(events.Events, abc.ABC): """Represents a subscription for changes to one or more tags' values. Call :meth:`close()` to stop receiving events. Note that :class:`TagSubscription` objects support using the ``with`` statement (or the ``async with`` statement), to :meth:`close()` the subsription automatically on exit. Attributes: tag_changed: An event that is triggered when one of the subscription's tag changes. The callback will receive a :class:`TagData` parameter and an :class:`Optional` [:class:`TagValueReader`] parameter. Example:: def my_callback(tag: TagData, reader: Optional[TagValueReader]): print("{} changed".format(tag.path)) if reader is None: print(" - unknown data type") else: value = reader.read() assert value is not None print(" - new value: {}".format(value.value)) subscription.tag_changed += my_callback """ __events__ = ["tag_changed"] # Under certain circumstances, mypy complains about the event not having a type hint # unless we specify it explicitly. (But we also need to delete the attribute so that # Events.__getattr__ can do its magic.) tag_changed = None # type: events._EventSlot del tag_changed _HEARTBEAT_INTERVAL_MILLISECONDS = 30000.0 """Send a heartbeat every 30 seconds based on a server-side expiration of 60 seconds.""" def __init__( self, paths: Iterable[str], heartbeat_timer: Optional[ManualResetTimer] ) -> None: """Initialize the instance. Derived types must call :meth:`_initialize()` or :meth:`_initialize_async()` after construction. Args: paths: The tag path queries to include in the subscription. heartbeat_timer: A timer for sending a heartbeat to keep the subscription alive for testing purposes, or None to use a default timer. Raises: ValueError: if ``paths`` is None. """ if paths is None: raise ValueError("paths cannot be None") super().__init__() self._paths = list(paths) if heartbeat_timer is not None: self._heartbeat_timer = heartbeat_timer else: self._heartbeat_timer = ManualResetTimer( datetime.timedelta(milliseconds=self._HEARTBEAT_INTERVAL_MILLISECONDS) ) self._exit_stack = contextlib.ExitStack() self._exit_stack.enter_context(self._heartbeat_timer) callback_ref = weakref.WeakMethod(self._heartbeat_timer_elapsed) # type: ignore def callback() -> None: actual_callback = callback_ref() # type: ignore if actual_callback: actual_callback() self._heartbeat_timer_handler = callback self._heartbeat_timer.elapsed += self._heartbeat_timer_handler self._closed = False def __del__(self) -> None: self._exit_stack.close() def _initialize(self) -> None: """Create and initialize the subscription. Derived types must call this method or :meth:`_initialize_async()` after construction to create and keep the subscription alive. Raises: ApiException: if the API call fails. """ self._create_subscription_on_server(self._paths) self._heartbeat_timer.start() async def _initialize_async(self) -> None: """Asynchronously create and initializes the subscription. Derived types must call this method or :meth:`_initialize()` after construction to create and keep the subscription alive. Raises: ApiException: if the API call fails. """ await self._create_subscription_on_server_async(self._paths) self._heartbeat_timer.start() @abc.abstractmethod def _create_subscription_on_server(self, paths: List[str]) -> None: """Create the subscription on the server. Implementations should retrieve and throw away the first set of updates before returning. Args: paths: The tag path queries to include in the subscription. Raises: ApiException: if the API call fails. """ ... @abc.abstractmethod async def _create_subscription_on_server_async(self, paths: List[str]) -> None: """Asynchronously create the subscription on the server. Implementations should retrieve and throw away the first set of updates before returning. Args: paths: The tag path queries to include in the subscription. Returns: A task representing the asynchronous operation. Raises: ApiException: if the API call fails. """ ... @abc.abstractmethod def _send_heartbeat(self) -> None: """Send a heartbeat for the subscription to keep it active. Raises: ApiException: if the API call fails. """ ... @abc.abstractmethod def _close_internal(self) -> None: """Clean up server resources associated with the subscription.""" ... @abc.abstractmethod async def _close_internal_async(self) -> None: """Asynchronously clean up server resources associated with the subscription. Returns: A task representing the asynchronous operation. """ ...
[docs] def close(self) -> None: """Close server resources associated with the subscription. Further tag writes will not trigger new events. """ if self._closed: return self._close_internal() self._heartbeat_timer.elapsed -= self._heartbeat_timer_handler self._closed = True
[docs] async def close_async(self) -> None: """Asynchronously close server resources associated with the subscription. Further tag writes will not trigger new events. Returns: A task representing the asynchronous operation. """ if self._closed: return await self._close_internal_async() self._heartbeat_timer.elapsed -= self._heartbeat_timer_handler self._closed = True
def __enter__(self) -> "TagSubscription": return self async def __aenter__(self) -> "TagSubscription": return self def __exit__( self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType], ) -> bool: """Close server resources associated with the subscription. Further tag writes will not trigger new events. """ suppress = False try: self.close() finally: suppress = self._exit_stack.__exit__(exc_type, exc_val, exc_tb) return suppress async def __aexit__( self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType], ) -> bool: """Asynchronously close server resources associated with the subscription. Further tag writes will not trigger new events. """ suppress = False try: await self.close_async() finally: suppress = self._exit_stack.__exit__(exc_type, exc_val, exc_tb) return suppress def _on_tag_changed( self, tag: tbase.TagData, value: Optional[tbase.TagValueReader] ) -> None: """Raise the :attr:`tag_changed` event. Args: tag: The tag that was changed. value: The new value and any associated information, or None if the tag has an unknown data type. """ self.tag_changed(tag, value) def _heartbeat_timer_elapsed(self) -> None: try: self._send_heartbeat() except core.ApiException: try: self._create_subscription_on_server(self._paths) except core.ApiException: # Ignore, we'll try again later pass self._heartbeat_timer.start()