# -*- coding: utf-8 -*-
"""Implementation of TagManager."""
import asyncio
import datetime
from typing import (
Any,
Awaitable,
Dict,
Iterable,
List,
Optional,
Sequence,
Tuple,
Union,
)
from nisystemlink.clients import core, tag as tbase
from nisystemlink.clients.core._internal._http_client import HttpClient, HttpResponse
from nisystemlink.clients.core._internal._timestamp_utilities import TimestampUtilities
from nisystemlink.clients.tag._core._manual_reset_timer import ManualResetTimer
from nisystemlink.clients.tag._core._serialized_tag_with_aggregates import (
SerializedTagWithAggregates,
)
from nisystemlink.clients.tag._core._system_time_stamper import SystemTimeStamper
from nisystemlink.clients.tag._http._http_async_tag_query_result_collection import (
HttpAsyncTagQueryResultCollection,
)
from nisystemlink.clients.tag._http._http_buffered_tag_writer import (
HttpBufferedTagWriter,
)
from nisystemlink.clients.tag._http._http_tag_query_result_collection import (
HttpTagQueryResultCollection,
)
from nisystemlink.clients.tag._http._http_tag_selection import HttpTagSelection
from nisystemlink.clients.tag._http._temporary_tag_selection import (
TemporaryTagSelection,
)
from typing_extensions import final
[docs]@final
class TagManager(tbase.ITagReader):
"""Represents common ways to create, read, and query SystemLink tags for a specific server connection."""
def __init_subclass__(cls) -> None:
raise TypeError("type 'TagManager' is not an acceptable base type")
[docs] def __init__(self, configuration: Optional[core.HttpConfiguration] = None) -> None:
"""Initialize an instance.
Args:
configuration: Defines the web server to connect to and information about
how to connect.
Raises:
ApiException: if the current system cannot communicate with a SystemLink
Server, or if the configuration provided by SystemLink Client cannot be
found.
"""
if configuration is None:
configuration = core.HttpConfigurationManager.get_configuration()
self._http_client = HttpClient(configuration)
self._api = self._http_client.at_uri("/nitag/v2")
[docs] def create_selection(self, tags: List[tbase.TagData]) -> tbase.TagSelection:
"""Create an :class:`TagSelection` that initially contains the given ``tags``
without retrieving any additional data from the server.
Args:
tags: The tags to include in the selection.
Returns:
The created selection.
Raises:
ValueError: if any of the given ``tags`` is None or has an invalid path.
ValueError: if ``tags`` is None.
"""
return HttpTagSelection(self._http_client, tags)
[docs] def open_selection(self, paths: List[str]) -> tbase.TagSelection:
"""Query the server for the metadata for the given tag ``paths`` and return the
results in a :class:`TagSelection`.
Args:
paths: The paths of the tags to include in the selection. May include
glob-style wildcards.
Returns:
The created selection with :attr:`TagSelection.metadata` containing the
metadata.
Raises:
ValueError: if any of the given ``paths`` is None or invalid.
ValueError: if ``paths`` contains duplicate paths.
ValueError: if ``paths`` is None.
ApiException: if the API call fails.
"""
return HttpTagSelection.open(self._http_client, paths)
[docs] def open_selection_async(self, paths: List[str]) -> Awaitable[tbase.TagSelection]:
"""Asynchronously query the server for the metadata for the given tag ``paths``
and return the results in a :class:`TagSelection`.
Args:
paths: The paths of the tags to include in the selection. May include
glob-style wildcards.
Returns:
A task representing the asynchronous operation. On success, contains the
created selection with :attr:`TagSelection.metadata` containing the
metadata.
Raises:
ValueError: if any of the given ``paths`` is None or invalid.
ValueError: if ``paths`` contains duplicate paths.
ValueError: if ``paths`` is None.
ApiException: if the API call fails.
"""
return HttpTagSelection.open_async(self._http_client, paths)
[docs] def open(
self,
path: str,
data_type: Optional[tbase.DataType] = None,
*,
create: Optional[bool] = None
) -> tbase.TagData:
"""Query the server for the metadata of a tag, optionally creating it if it doesn't already exist.
If ``data_type`` is provided, ``create`` defaults to True. If ``data_type`` is
not provided, ``create`` cannot be set to True.
The call fails if the tag already exists as a different data type than specified
or if it doesn't exist and ``create`` is False.
Args:
path: The path of the tag to open.
data_type: The expected data type of the tag.
create: True to create the tag if it doesn't already exist, False to fail if
it doesn't exist.
Returns:
Information about the tag.
Raises:
ValueError: if ``path`` is None or empty.
ValueError: if ``data_type`` is invalid.
ValueError: if ``create`` is True, but ``data_type`` is None.
ApiException: if the API call fails.
"""
if create is None:
create = data_type is not None
elif create is True:
if data_type is None:
raise ValueError("Cannot create if data_type is not specified")
if data_type == tbase.DataType.UNKNOWN:
raise ValueError("Must specify a valid data type")
tag = None # type: Optional[Dict[str, Any]]
try:
tag, _ = self._api.get(
"/tags/{path}", params={"path": tbase.TagPathUtilities.validate(path)}
)
except core.ApiException as ex:
error_name = None if ex.error is None else ex.error.name
if create and (error_name or "").startswith("Tag.NoSuchTag"):
pass # continue on and create the tag
else:
raise
if tag is not None:
if data_type is not None and tag["type"] != data_type.api_name:
raise core.ApiException("Tag exists with a conflicting data type")
return tbase.TagData.from_json_dict(tag)
else:
if data_type is None:
raise ValueError("data_type cannot be None when create is True")
# Tag didn't already exist, so try to create it.
self._api.post("/tags", data={"type": data_type.api_name, "path": path})
return tbase.TagData(path, data_type)
[docs] async def open_async(
self,
path: str,
data_type: Optional[tbase.DataType] = None,
*,
create: Optional[bool] = None
) -> tbase.TagData:
"""Asynchronously query the server for the metadata of a tag, optionally
creating it if it doesn't already exist.
The call fails if the tag already exists as a different data type than specified
or if it doesn't exist and ``create`` is False.
Args:
path: The path of the tag to open.
data_type: The expected data type of the tag.
create: True to create the tag if it doesn't already exist, False to fail if
it doesn't exist.
Returns:
A task representing the asynchronous operation. On success, contains
information about the tag.
Raises:
ValueError: if ``path`` is None or empty.
ValueError: if ``data_type`` is invalid.
ValueError: if ``create`` is True, but ``data_type`` is None.
ApiException: if the API call fails.
"""
if create is None:
create = data_type is not None
elif create is True:
if data_type is None:
raise ValueError("Cannot create if data_type is not specified")
if data_type == tbase.DataType.UNKNOWN:
raise ValueError("Must specify a valid data type")
tag = None # type: Optional[Dict[str, Any]]
try:
tag, _ = await self._api.as_async.get(
"/tags/{path}", params={"path": tbase.TagPathUtilities.validate(path)}
)
except core.ApiException as ex:
error_name = None if ex.error is None else ex.error.name
if create and (error_name or "").startswith("Tag.NoSuchTag"):
pass # continue on and create the tag
else:
raise
if tag is not None:
if data_type is not None and tag["type"] != data_type.api_name:
raise core.ApiException("Tag exists with a conflicting data type")
return tbase.TagData.from_json_dict(tag)
else:
if data_type is None:
raise ValueError("data_type cannot be None when create is True")
# Tag didn't already exist, so try to create it.
await self._api.as_async.post(
"/tags", data={"type": data_type.api_name, "path": path}
)
return tbase.TagData(path, data_type)
[docs] def refresh(self, tags: List[tbase.TagData]) -> None:
"""Populate the given ``tags`` with the latest metadata from the server.
Only the :attr:`TagData.path` needs to be initialized. Tags that don't exist on
the server will have their :attr:`TagData.data_type` set to
:attr:`DataType.UNKNOWN`.
Args:
tags: The tags to refresh.
Raises:
ValueError: if any ``tags`` are None or have invalid paths.
ValueError: if ``tags`` is None.
ApiException: if the API call fails.
"""
paths = self._prepare_refresh(tags)
response, http_response = self._api.get(
"/tags", params={"path": paths, "take": str(len(tags))}
)
self._handle_refresh(tags, response, http_response)
[docs] async def refresh_async(self, tags: List[tbase.TagData]) -> None:
"""Asynchronously populate the given ``tags`` with the latest metadata from the server.
Only the :attr:`TagData.path` needs to be initialized. Tags that don't exist on
the server will have their :attr:`TagData.data_type` set to
:attr:`DataType.UNKNOWN`.
Args:
tags: The tags to refresh.
Returns:
A task representing the asynchronous operation.
Raises:
ValueError: if any ``tags`` are None or have invalid paths.
ValueError: if ``tags`` is None.
ApiException: if the API call fails.
"""
paths = self._prepare_refresh(tags)
response, http_response = await self._api.as_async.get(
"/tags", params={"path": paths, "take": str(len(tags))}
)
self._handle_refresh(tags, response, http_response)
def _prepare_refresh(self, tags: List[tbase.TagData]) -> str:
if tags is None:
raise ValueError("tags cannot be None")
elif any(t is None for t in tags):
raise ValueError("Tags cannot contain None")
return ",".join(t.validate_path() for t in tags)
def _handle_refresh(
self,
tags: List[tbase.TagData],
refresh_result: Dict[str, Any],
http_response: HttpResponse,
) -> None:
if refresh_result is None or refresh_result.get("tags") is None:
raise self.invalid_response(http_response)
read_tags = {t["path"]: t for t in refresh_result["tags"]}
for data in tags:
tag = read_tags.get(data.path)
if tag is not None:
data.clear_retention()
data.data_type = tbase.DataType.from_api_name(
tag.get("type") or "UNKNOWN"
)
data.replace_keywords(tag.get("keywords") or [])
data.replace_properties(tag.get("properties") or {})
data.collect_aggregates = tag.get("collectAggregates") or False
else:
data.data_type = tbase.DataType.UNKNOWN
[docs] def query(
self,
paths: Optional[Sequence[str]] = None,
keywords: Optional[Iterable[str]] = None,
properties: Optional[Dict[str, str]] = None,
*,
skip: int = 0,
take: Optional[int] = None
) -> tbase.TagQueryResultCollection:
"""Query the server for available tags matching the given criteria.
Args:
paths: List of tag paths to include in the result. May include glob-style
wildcards.
keywords: List of keywords that tags must have, or None.
properties: Mapping of properties and their values that tags must have, or
None.
skip: The number of tags to initially skip in the results.
take: The number of tags to include in each page of results.
Returns:
A :class:`TagQueryResultCollection` containing the first page of results.
Enumerating the collection will retrieve additional pages according to the
``take`` parameter.
Raises:
ValueError: if ``skip`` or ``take`` is negative.
ValueError: if ``paths`` is an empty list.
ValueError: if any of ``paths`` are None.
ApiException: if the API call fails.
"""
path_str, keyword_str, prop_str = self._prepare_query(
paths, keywords, properties, skip, take
)
params = {
"path": path_str,
"keywords": keyword_str,
"properties": prop_str,
"skip": str(skip) if skip is not None else None,
"take": str(take) if take is not None else None,
}
for k, v in list(params.items()):
if v is None:
del params[k]
first_page, http_response = self._api.get("/tags", params=params)
return HttpTagQueryResultCollection(
self._http_client,
path_str,
keyword_str,
prop_str,
skip,
take,
first_page,
http_response,
)
[docs] async def query_async(
self,
paths: Optional[Sequence[str]] = None,
keywords: Optional[Iterable[str]] = None,
properties: Optional[Dict[str, str]] = None,
*,
skip: int = 0,
take: Optional[int] = None
) -> tbase.AsyncTagQueryResultCollection:
"""Asynchronously query the server for available tags matching the given criteria.
Args:
paths: List of tag paths to include in the result. May include glob-style
wildcards.
keywords: List of keywords that tags must have, or None.
properties: Mapping of properties and their values that tags must have, or
None.
skip: The number of tags to initially skip in the results.
take: The number of tags to include in each page of results.
Returns:
A task representing the asynchronous operation. On success, contains a
:class:`TagQueryResultCollection` containing the first page of results.
Enumerating the collection will retrieve additional pages according to the
``take`` parameter.
Raises:
ValueError: if ``skip`` is negative.
ValueError: if ``take`` is negative.
ApiException: if the API call fails.
"""
path_str, keyword_str, prop_str = self._prepare_query(
paths, keywords, properties, skip, take
)
params = {
"path": path_str,
"keywords": keyword_str,
"properties": prop_str,
"skip": str(skip) if skip is not None else None,
"take": str(take) if take is not None else None,
}
for k, v in list(params.items()):
if v is None:
del params[k]
first_page, http_response = await self._api.as_async.get("/tags", params=params)
return HttpAsyncTagQueryResultCollection(
self._http_client,
path_str,
keyword_str,
prop_str,
skip,
take,
first_page,
http_response,
)
def _prepare_query(
self,
paths: Optional[Sequence[str]],
keywords: Optional[Iterable[str]],
properties: Optional[Dict[str, str]],
skip: Optional[int],
take: Optional[int] = None,
) -> Tuple[Optional[str], Optional[str], Optional[str]]:
if paths is not None and len(paths) == 0:
raise ValueError("paths cannot be empty an empty list")
if paths is not None and any(p is None for p in paths):
raise ValueError("paths cannot contain None")
if skip is not None and skip < 0:
raise ValueError("skip cannot be negative")
if take is not None and take < 0:
raise ValueError("take cannot be negative")
path_str = None
keyword_str = None
prop_str = None
if paths is not None:
path_str = ",".join(tbase.TagPathUtilities.validate_query(p) for p in paths)
if keywords:
keyword_str = ",".join(keywords)
if properties:
prop_str = ",".join("{}={}".format(k, v) for k, v in properties.items())
return path_str, keyword_str, prop_str
[docs] def update(
self, updates: Union[Sequence[tbase.TagData], Sequence[tbase.TagDataUpdate]]
) -> None:
"""Update the metadata of one or more tags on the server, creating tags that don't exist.
If ``updates`` contains :class:`TagData` objects, existing metadata will be
replaced. If ``updates`` contains :class:`TagDataUpdate` objects instead, tags
that already exist will have their existing keywords, properties, and settings
merged with those specified in the corresponding :class:`TagDataUpdate`.
The call fails if any of the tags already exist as a different data type.
Args:
updates: The tags to update (if :class:`TagData` objects are given), or the
tag metadata updates to send (if :class:`TagDataUpdate` objects are
given).
Raises:
ValueError: if ``updates`` is None or empty.
ValueError: if ``updates`` contains any invalid tags.
ValueError: if ``updates`` contains both ``TagData`` objects and
``TagDataUpdate`` objects.
ApiException: if the API call fails.
"""
tag_models, merge = self._prepare_update(updates)
partial_success, _ = self._api.post(
"/update-tags", data={"tags": tag_models, "merge": merge}
)
if partial_success is not None:
err_dict = partial_success.get("error")
if err_dict is None and "code" in partial_success:
# SystemLink Cloud has a bug in which it returns the error directly
# instead of under the "error" key
err_dict = partial_success
err_obj = core.ApiError.parse_obj(err_dict) if err_dict else None
if err_dict is None:
assert False, partial_success
raise core.ApiException(error=err_obj)
[docs] async def update_async(
self, updates: Union[Sequence[tbase.TagData], Sequence[tbase.TagDataUpdate]]
) -> None:
"""Asynchronously update the metadata of one or more tags on the server, creating tags that don't exist.
If ``updates`` contains :class:`TagData` objects, existing metadata will be
replaced. If ``updates`` contains :class:`TagDataUpdate` objects instead, tags
that already exist will have their existing keywords, properties, and settings
merged with those specified in the corresponding :class:`TagDataUpdate`.
Args:
updates: The tags to update (if :class:`TagData` objects are given), or the
tag metadata updates to send (if :class:`TagDataUpdate` objects are
given).
Returns:
A task representing the asynchronous operation.
Raises:
ValueError: if ``updates`` is None or empty.
ValueError: if ``updates`` contains any invalid tags.
ValueError: if ``updates`` contains both ``TagData`` objects and
``TagDataUpdate`` objects.
ApiException: if the API call fails.
"""
tag_models, merge = self._prepare_update(updates)
partial_success, _ = await self._api.as_async.post(
"/update-tags", data={"tags": tag_models, "merge": merge}
)
if partial_success is not None:
err_dict = partial_success.get("error")
if err_dict is None and "code" in partial_success:
# SystemLink Cloud has a bug in which it returns the error directly
# instead of under the "error" key
err_dict = partial_success
err_obj = core.ApiError.parse_obj(err_dict) if err_dict else None
if err_dict is None:
assert False, partial_success
raise core.ApiException(error=err_obj)
def _prepare_update(
self, updates: Union[Sequence[tbase.TagData], Sequence[tbase.TagDataUpdate]]
) -> Tuple[List[Dict[str, Any]], bool]:
if updates is None:
raise ValueError("updates cannot be None")
if not updates:
raise ValueError("updates cannot be empty")
if not all(isinstance(u, type(updates[0])) for u in updates):
raise ValueError(
"updates must contain only TagData objects or TagDataUpdate objects, not both"
)
merge = isinstance(updates[0], tbase.TagDataUpdate)
return [u.to_json_dict() for u in updates], merge
[docs] def delete(self, tags: Iterable[Union[tbase.TagData, str]]) -> None:
"""Delete one or more tags from the server.
Args:
tags: The tags (or tag paths) to delete.
Raises:
ValueError: if ``tags`` is None.
ValueError: if ``tags`` contains any invalid tags.
ApiException: if the API call fails.
"""
if tags is None:
raise ValueError("tags cannot be None")
if any(t is None for t in tags):
raise ValueError("tags cannot contain None")
validated_paths = [
(
tbase.TagPathUtilities.validate(t)
if isinstance(t, str)
else t.validate_path()
)
for t in tags
]
self._perform_delete(validated_paths)
[docs] def delete_async(
self, tags: Iterable[Union[tbase.TagData, str]] = None
) -> Awaitable[None]:
"""Asynchronously delete one or more tags from the server.
Args:
tags: The tags (or tag paths) to delete.
Returns:
A task representing the asynchronous operation.
Raises:
ValueError: if ``tags`` is None.
ValueError: if ``tags`` contains any invalid tags.
ApiException: if the API call fails.
"""
try:
if tags is None:
raise ValueError("tags cannot be None")
if any(t is None for t in tags):
raise ValueError("tags cannot contain None")
validated_paths = [
(
tbase.TagPathUtilities.validate(t)
if isinstance(t, str)
else t.validate_path()
)
for t in tags
]
except Exception as ex:
f = asyncio.get_event_loop().create_future()
f.set_exception(ex)
return f
return self._perform_delete_async(validated_paths)
def _perform_delete(self, paths: List[str]) -> None:
if len(paths) < 4:
# Few enough to make multiple, single deletes rather than creating a selection.
exceptions = []
for path in paths:
try:
self._api.delete(
"/tags/{path}",
params={"path": tbase.TagPathUtilities.validate(path)},
)
except core.ApiException as ex:
exceptions.append(ex)
if exceptions:
raise exceptions[0] from None
else:
with TemporaryTagSelection.create(self._http_client, paths) as selection:
self._api.delete("/selections/{id}/tags", params={"id": selection.id})
async def _perform_delete_async(self, paths: List[str]) -> None:
if len(paths) < 4:
# Few enough to make multiple, single deletes rather than creating a selection.
await asyncio.gather(
*[
self._api.as_async.delete(
"/tags/{path}",
params={"path": tbase.TagPathUtilities.validate(p)},
)
for p in paths
]
)
else:
async with await TemporaryTagSelection.create_async(
self._http_client, paths
) as selection:
await self._api.as_async.delete(
"/selections/{id}/tags", params={"id": selection.id}
)
[docs] def create_writer(
self,
*,
buffer_size: Optional[int] = None,
max_buffer_time: Optional[datetime.timedelta] = None
) -> tbase.BufferedTagWriter:
"""Create a tag writer that buffers tag values until
:meth:`~BufferedTagWriter.send_buffered_writes()` is called on the returned
object, ``buffer_size`` writes have been buffered, or ``max_buffer_time`` time
has past since buffering a value, at which point the writes will be sent
automatically.
Args:
buffer_size: The maximum number of tag writes to buffer before automatically
sending them to the server.
max_buffer_time: The amount of time before writes are sent.
Returns:
The created writer. Close the writer to free resources.
Raises:
ValueError: if ``buffer_size`` and ``max_buffer_time`` are both None.
ValueError: if ``buffer_size`` is less than one.
"""
if buffer_size is None and max_buffer_time is None:
raise ValueError("must provide either buffer_size or max_buffer_time")
if buffer_size is not None:
if buffer_size < 1:
raise ValueError("buffer_size cannot be 0 or negative")
else:
buffer_size = 0
if max_buffer_time is not None:
if max_buffer_time.total_seconds() < 0.001:
raise ValueError("max_buffer_time must be at least 1 millisecond")
timer = ManualResetTimer(max_buffer_time)
else:
timer = ManualResetTimer.null_timer
return HttpBufferedTagWriter(
self._http_client, SystemTimeStamper(), buffer_size, timer
)
def _read(
self, path: str, include_timestamp: bool, include_aggregates: bool
) -> Optional[SerializedTagWithAggregates]:
"""Retrieve the current value of the tag with the given ``path`` from the server.
Optionally retrieves the aggregate values as well. The tag must exist. Clients
do not typically call this method directly. Use a :class:`.TagValueReader`
instead.
Args:
path: The path of the tag to read.
include_timestamp: True to include the timestamp associated with the value
in the result.
include_aggregates: True to include the tag's aggregate values in the result
if the tag is set to :attr:`TagData.collect_aggregates`.
Returns:
The value serialized as a string, along with timestamp and/or aggregates, if
requested, or None if the tag exists but doesn't have a value.
Raises:
ValueError: if ``path`` is empty or invalid.
ValueError: if ``path`` is None.
ApiException: if the API call fails.
"""
path = tbase.TagPathUtilities.validate(path)
if include_aggregates:
response, http_response = self._api.get(
"/tags/{path}/values", params={"path": path}
)
return self._handle_read(
path, response, http_response, include_timestamp, True
)
elif include_timestamp:
response2, http_response = self._api.get(
"/tags/{path}/values/current", params={"path": path}
)
return self._handle_read(
path, response2, http_response, include_timestamp, False
)
else:
response3, http_response = self._api.get(
"/tags/{path}/values/current/value", params={"path": path}
)
return self._handle_read(path, response3, http_response)
async def _read_async(
self, path: str, include_timestamp: bool, include_aggregates: bool
) -> Optional[SerializedTagWithAggregates]:
"""Asynchronously retrieve the current value of the tag with the given ``path`` from the server.
Optionally retrieves the aggregate values as well. The tag must exist. Clients
do not typically call this method directly. Use a :class:`.TagValueReader`
instead.
Args:
path: The path of the tag to read.
include_timestamp: True to include the timestamp associated with the value
in the result.
include_aggregates: True to include the tag's aggregate values in the result
if the tag is set to :attr:`TagData.collect_aggregates`.
Returns:
The value serialized as a string, or None if the tag exists but doesn't have
a value.
Raises:
ValueError: if ``path`` is empty or invalid.
ValueError: if ``path`` is None.
ApiException: if the API call fails.
"""
path = tbase.TagPathUtilities.validate(path)
if include_aggregates:
response, http_response = await self._api.as_async.get(
"/tags/{path}/values", params={"path": path}
)
return self._handle_read(
path, response, http_response, include_timestamp, True
)
elif include_timestamp:
response2, http_response = await self._api.as_async.get(
"/tags/{path}/values/current", params={"path": path}
)
return self._handle_read(
path, response2, http_response, include_timestamp, False
)
else:
response3, http_response = await self._api.as_async.get(
"/tags/{path}/values/current/value", params={"path": path}
)
return self._handle_read(path, response3, http_response)
def _handle_read(
self,
path: str,
response: Dict[str, Any],
http_response: HttpResponse,
include_timestamp: Optional[bool] = False,
include_aggregates: Optional[bool] = False,
) -> Optional[SerializedTagWithAggregates]:
if response is None:
return None
agg = response.get("aggregates") or {} # type: Dict[str, Any]
current = response.get("current", response) # type: Dict[str, Any]
if current is None:
return None
if "type" in response:
val = response
else:
val = current["value"]
if val is None:
return None
assert "value" in val
timestamp = None # type: Optional[datetime.datetime]
if include_timestamp:
if current.get("timestamp") is None:
raise self.invalid_response(http_response)
timestamp = TimestampUtilities.str_to_datetime(current["timestamp"])
return SerializedTagWithAggregates(
path,
tbase.DataType.from_api_name(val["type"]),
val["value"],
timestamp,
agg.get("count"),
agg.get("min"),
agg.get("max"),
float(agg["avg"]) if agg.get("avg") is not None else None,
)
@classmethod
def invalid_response(cls, response: HttpResponse) -> core.ApiException:
request = response.request
return core.ApiException(
"Invalid response from {} {}".format(request.method, request.url)
)