Getting Started

Tag API

Overview

The TagManager class is the primary entry point of the Tag API.

When constructing a TagManager, you can pass an HttpConfiguration (like one retrieved from the HttpConfigurationManager), or let TagManager use the default connection. The default connection depends on your SystemLink Client settings.

With a TagManager object, you can:

  • Query, create, modify, and delete tags from the server

    • Use open() to get a tag’s TagData from the server when you know the tag’s path.

    • Use query() to get a collection of TagData objects based on the tags’ paths, keywords, and/or properties.

    • Use refresh() to update a list of TagData objects with fresh metadata from the server.

    • Use update() to modify the server metadata for a list of tags, using either TagData objects to overwrite the server’s tag data or TagDataUpdate objects to selectively update specific fields.

    • Use delete() to delete one or more tags from the server.

  • Read and write tag values

    • Use read() to get a tag’s current value. Via method parameters, you can also request the timestamp indicating when that value was last written and/or the aggregate data stored for the tag (if the tag’s collect_aggregates attribute is enabled on the server).

    • Use create_writer() to get a BufferedTagWriter that will buffer a set of writes, and automatically send the writes when a given buffer_size is reached or when max_buffer_time has elapsed (or when send_buffered_writes() is called).

  • Get a TagSelection that can help perform several of the above operations on several tags at once

If you have a TagSelection, you can use it to create a TagSubscription that will trigger a tag_changed event any time one of the tags’ values is changed.

Examples

Read and write individual tags

 1from datetime import timedelta
 2
 3from nisystemlink.clients.tag import DataType, TagManager
 4
 5mgr = TagManager()
 6tag = mgr.open("MyTags.Example Tag", DataType.DOUBLE, create=True)
 7
 8with mgr.create_writer(buffer_size=10, max_buffer_time=timedelta(seconds=3)) as writer:
 9    writer.write(tag.path, tag.data_type, 3.5)
10# Note: Exiting the "with" block automatically calls writer.send_buffered_writes()
11
12read_result = mgr.read(tag.path)
13assert read_result is not None
14assert read_result.value == 3.5

Subscribe to tag changes

 1from contextlib import ExitStack
 2from time import sleep
 3
 4from nisystemlink.clients.tag import DataType, TagData, TagManager, TagValueReader
 5
 6SIMULATE_EXTERNAL_TAG_CHANGES = True
 7
 8
 9def on_tag_changed(tag: TagData, reader: TagValueReader) -> None:
10    """Callback for tag_changed events."""
11    path = tag.path
12    data_type = tag.data_type.name
13
14    if reader is not None:
15        read_result = reader.read()
16        # A read_result of None means that the tag has no value, but it *must*
17        # have a value, because we got a tag_changed event!
18        assert read_result is not None
19        value = read_result.value
20    else:
21        value = "???"  # tag has unknown data type
22
23    print(f'Tag changed: "{path}" = {value} ({data_type})')
24
25
26mgr = TagManager()
27if SIMULATE_EXTERNAL_TAG_CHANGES:
28    mgr.open("MyTags.Example Tag", DataType.DOUBLE, create=True)
29    writer = mgr.create_writer(buffer_size=1)
30
31with ExitStack() as stack:
32    # Notes:
33    # 1. The tags are assumed to already exist before this example is run, but
34    #    setting SIMULATE_EXTERNAL_TAG_CHANGES to True will ensure there is one.
35    # 2. Any tags that get added later will NOT automatically appear in the
36    #    selection just because the path matches the wildcard used below; you
37    #    must call one of the selection's refresh methods to update the tag list
38    #    from the server. But even if you do that:
39    # 3. The subscription will only contain the tags that were in the selection
40    #    when the subscription was created. If you want the subscription to add
41    #    new tags that were added to the selection, you must recreate it.
42    paths = ["MyTags.*"]
43    selection = stack.enter_context(mgr.open_selection(paths))
44    if not selection.metadata:
45        print(f"Found no tags that match {paths}")
46    else:
47        print("Matching tags:")
48        for path in selection.metadata.keys():
49            print(f" - {path}")
50        print()
51
52        subscription = stack.enter_context(selection.create_subscription())
53        subscription.tag_changed += on_tag_changed
54
55        # Wait forever, until a KeyboardInterrupt (Ctrl+C)
56        print("Watching for tag changes; hit Ctrl+C to stop")
57        try:
58            i = 0
59            while True:
60                sleep(1)
61                if SIMULATE_EXTERNAL_TAG_CHANGES:
62                    writer.write("MyTags.Example Tag", DataType.DOUBLE, i)
63                    i += 1
64        except KeyboardInterrupt:
65            pass

DataFrame API

Overview

The DataFrameClient class is the primary entry point of the DataFrame API.

When constructing a DataFrameClient, you can pass an HttpConfiguration (like one retrieved from the HttpConfigurationManager), or let DataFrameClient use the default connection. The default connection depends on your environment.

With a DataFrameClient object, you can:

  • Create and delete data tables.

  • Modify table metadata and query for tables by their metadata.

  • Append rows of data to a table, query for rows of data from a table, and decimate table data.

  • Export table data in a comma-separated values (CSV) format.

Examples

Create and write data to a table

 1import random
 2from datetime import datetime
 3
 4from nisystemlink.clients.dataframe import DataFrameClient
 5from nisystemlink.clients.dataframe.models import (
 6    AppendTableDataRequest,
 7    Column,
 8    ColumnType,
 9    CreateTableRequest,
10    DataFrame,
11    DataType,
12)
13
14client = DataFrameClient()
15
16# Create table
17table_id = client.create_table(
18    CreateTableRequest(
19        name="Example Table",
20        columns=[
21            Column(name="index", dataType=DataType.Int32, columnType=ColumnType.Index),
22            Column(name="Float_Column", dataType=DataType.Float32),
23            Column(name="Timestamp_Column", dataType=DataType.Timestamp),
24        ],
25    )
26)
27
28# Generate example data
29frame = DataFrame(
30    data=[[i, random.random(), datetime.now().isoformat()] for i in range(100)]
31)
32
33# Write example data to table
34client.append_table_data(
35    table_id, data=AppendTableDataRequest(frame=frame, endOfData=True)
36)

Query and read data from a table

 1from nisystemlink.clients.dataframe import DataFrameClient
 2from nisystemlink.clients.dataframe.models import (
 3    DecimationMethod,
 4    DecimationOptions,
 5    QueryDecimatedDataRequest,
 6)
 7
 8client = DataFrameClient()
 9
10# List a table
11response = client.list_tables(take=1)
12table = response.tables[0]
13
14# Get table metadata by table id
15client.get_table_metadata(table.id)
16
17# Query decimated table data
18request = QueryDecimatedDataRequest(
19    decimation=DecimationOptions(
20        x_column="index",
21        y_columns=["col1"],
22        intervals=1,
23        method=DecimationMethod.MaxMin,
24    )
25)
26client.query_decimated_data(table.id, request)

Export data from a table

 1from shutil import copyfileobj
 2
 3from nisystemlink.clients.dataframe import DataFrameClient
 4from nisystemlink.clients.dataframe.models import (
 5    ColumnFilter,
 6    ColumnOrderBy,
 7    ExportFormat,
 8    ExportTableDataRequest,
 9    FilterOperation,
10)
11
12client = DataFrameClient()
13
14# List a table
15response = client.list_tables(take=1)
16table = response.tables[0]
17
18# Export table data with query options
19request = ExportTableDataRequest(
20    columns=["col1"],
21    order_by=[ColumnOrderBy(column="col2", descending=True)],
22    filters=[
23        ColumnFilter(column="col1", operation=FilterOperation.NotEquals, value="0")
24    ],
25    response_format=ExportFormat.CSV,
26)
27
28data = client.export_table_data(id=table.id, query=request)
29
30# Write the export data to a file
31with open(f"{table.name}.csv", "wb") as f:
32    copyfileobj(data, f)
33
34# Alternatively, load the export data into a pandas dataframe
35# import pandas as pd
36# df = pd.read_csv(data)