Connectors with STOMP

Ingesting or exporting data from an IOTICSpace and its Digital Twins requires the creation of a software application through the IOTICS API, which in IOTICS we call Connector.
This page covers:


Introduction to Connectors

A Connector is a software application that interacts with Digital Twins in an IOTICSpace. It acts as the bridge between an IOTICSpace and the data source.

We distinguish between three different types:

1. Publisher Connector: imports data into an IOTICSpace
2. Follower Connector: exports data from an IOTICSpace
3. Synthesiser Connector: transforms existing data within an IOTICSpace

Publisher Connector

A Publisher Connector is a software application that works as a bridge between a data source and an IOTICSpace. It enables external data to be imported into an IOTICSpace and published by one or more Digital Twins, and therefore shared with and used by other Digital Twins in the ecosystem.

A Publisher Connector:

  • Typically handles one single source of data, and is made up of one Twin Model with one or more related Twins from Model. If multiple sources have to be imported and synchronised, several Twin Models may have to be created, each with their specific metadata;
  • Periodically imports and publishes new data through the generic Digital Twins. The data can be published either in batches or in single samples, and at fixed or variable intervals. Some Twins may also need to share data with a different frequency than others;
  • Runs continuously. Once created, it is good practice to allow all the Twins in the IOTICSpace to maintain their state of consistency - be it actively sharing new data, staying idle or becoming obsolete and being deleted automatically.
519

Representation of a Publisher Connector

How to create a Publisher Connector with the IOTICS API

Implementing a Publisher Connector leads to the creation of a set of Twin Publishers that, simultaneously or in turn, publish data. A prerequisite is the development of one or more Twin Models that will serve as a template for the Twin Publishers.

The following example shows a Publisher Connector that simulates 10 temperature sensors. Each one gets data from the environment and shares sensor readings every 10 seconds. We first create a Twin Model with a Feed and then create the 10 Sensor Twins, which periodically download new data from a Server and share it.

Click to see the example code of a Publisher Connector

1. Get a fresh token to be used in the requests' headers;
2. Create a Twin Model;
  2.1 Build list of Properties;
  2.2 Build list of Feeds;
  2.3 Get a new Twin DID;
  2.4 Use the Upsert Twin API;
3. Create Twins from Model;
  3.1 Use the Describe Local Twin API to get the Twin Model's metadata;
  3.2 Build list of Properties;
  3.3 Use the Describe Feed API to get the the Twin Model Feed's metadata;
  3.4 Build list of Feeds;
  For every Sensor:
  3.5 Get a new Twin DID;
  3.6 Use the Upsert Twin API;
Every 10 seconds:
4. Download new data from the server;
5. Share data into the Space;
import base64
import json
from datetime import datetime, timezone
from time import sleep
from typing import List

from iotics.lib.identity.api.high_level_api import get_rest_high_level_identity_api
from requests import request

RESOLVER_URL = "resolver_url"
HOST = "host_url"

USER_KEY_NAME = "user_key_name"
AGENT_KEY_NAME = "agent_key_name"
USER_SEED = bytes.fromhex("user_seed")
AGENT_SEED = bytes.fromhex("agent_seed")


class PublisherConnector:
    def __init__(self):
        self._api = None
        self._user_registered_id = None
        self._agent_registered_id = None
        self._headers = None
        self._twins_info = None

    def setup(self):
        self._api = get_rest_high_level_identity_api(resolver_url=RESOLVER_URL)
        (
            self._user_registered_id,
            self._agent_registered_id,
        ) = self._api.create_user_and_agent_with_auth_delegation(
            user_seed=USER_SEED,
            user_key_name=USER_KEY_NAME,
            agent_seed=AGENT_SEED,
            agent_key_name=AGENT_KEY_NAME,
            delegation_name="#AuthDeleg",
        )

        self._headers = {
            "accept": "application/json",
            "Iotics-ClientAppId": "publisher_connector",
            "Content-Type": "application/json",
        }
        self._refresh_token()
        self._twins_info = {}

    def _refresh_token(self, duration: int = 60):
        print("Refreshing token")
        print("---")
        token = self._api.create_agent_auth_token(
            agent_registered_identity=self._agent_registered_id,
            user_did=self._user_registered_id.did,
            duration=duration,
        )

        self._headers["Authorization"] = f"Bearer {token}"

    def _get_new_did(self, twin_key_name: str):
        twin_registered_id = self._api.create_twin_with_control_delegation(
            twin_seed=AGENT_SEED,
            twin_key_name=twin_key_name,
            agent_registered_identity=self._agent_registered_id,
            delegation_name="#ControlDeleg",
        )

        return twin_registered_id.did

    def _make_api_call(
        self, method: str, url: str, json: dict = None, retry: bool = True
    ):
        try:
            response = request(method=method, url=url, headers=self._headers, json=json)
            response.raise_for_status()
        except Exception as ex:
            if retry:
                self._refresh_token()
                return self._make_api_call(method, url, json, retry=False)
            else:
                print("Retried once. Still getting error", ex)

        return response.json()

    def _create_new_twin(
        self,
        twin_key_name: str,
        properties: List[str] = None,
        feeds: List[str] = None,
        visibility: str = "PRIVATE",
        location: dict = None,
    ):
        twin_did = self._get_new_did(twin_key_name)

        self._upsert_twin(
            twin_id=twin_did,
            properties=properties,
            feeds=feeds,
            visibility=visibility,
            location=location,
        )

        return twin_did

    def _describe_feed(self, twin_id: str, feed_id: str):
        feed_description = self._make_api_call(
            method="GET",
            url=f"{HOST}/qapi/twins/{twin_id}/feeds/{feed_id}",
        )

        return feed_description

    def _describe_local_twin(self, twin_id: str):
        twin_description = self._make_api_call(
            method="GET", url=f"{HOST}/qapi/twins/{twin_id}"
        )

        return twin_description

    def _upsert_twin(
        self,
        twin_id: str,
        visibility: str = "PRIVATE",
        feeds: List[dict] = None,
        location: dict = None,
        properties: List[dict] = None,
    ):
        payload = {"twinId": twin_id, "visibility": visibility}

        if location:
            payload["location"] = location
        if feeds:
            payload["feeds"] = feeds
        if properties:
            payload["properties"] = properties

        self._make_api_call(method="PUT", url=f"{HOST}/qapi/twins", json=payload)

        print(f"Twin {twin_id} created succesfully with Metadata, Feeds and Values")
        print("---")

    def _share_data(self, twin_label: str, twin_id: str, feed_id: str, value):
        data_to_share = {"reading": value}
        encoded_data = base64.b64encode(json.dumps(data_to_share).encode()).decode()
        data_to_share_payload = {
            "sample": {
                "data": encoded_data,
                "mime": "application/json",
                "occurredAt": datetime.now(tz=timezone.utc).isoformat(
                    timespec="seconds"
                ),
            }
        }
        self._make_api_call(
            method="POST",
            url=f"{HOST}/qapi/twins/{twin_id}/feeds/{feed_id}/shares",
            json=data_to_share_payload,
        )

        print(f"Shared {data_to_share} from Twin {twin_label}")

    def create_twin_model(self):
        print("Creating Twin Model")
        # Build a list of Properties for the Twin Model
        property_list = [
            # Twin Model "special" property
            {
                "key": "http://www.w3.org/1999/02/22-rdf-syntax-ns#type",
                "uriValue": {"value": "https://data.iotics.com/app#Model"},
            },
            # Add Label
            {
                "key": "http://www.w3.org/2000/01/rdf-schema#label",
                "langLiteralValue": {
                    "value": "Temperature Sensor Publisher Model",
                    "lang": "en",
                },
            },
            # Add Comment
            {
                "key": "http://www.w3.org/2000/01/rdf-schema#comment",
                "langLiteralValue": {
                    "value": "A temperature sensor that shares temperature data",
                    "lang": "en",
                },
            },
            # Add colour
            {
                "key": "https://data.iotics.com/app#color",
                "stringLiteralValue": {"value": "#9aceff"},
            },
            # Add Space name
            {
                "key": "https://data.iotics.com/app#spaceName",
                "stringLiteralValue": {"value": "my-space"},
            },
            # Add CreatedAt property
            {
                "key": "https://data.iotics.com/app#createdAt",
                "literalValue": {
                    "dataType": "dateTime",
                    "value": datetime.now(tz=timezone.utc).isoformat(),
                },
            },
            # Add UpdatedAt property
            {
                "key": "https://data.iotics.com/app#updatedAt",
                "literalValue": {
                    "dataType": "dateTime",
                    "value": datetime.now(tz=timezone.utc).isoformat(),
                },
            },
        ]

        # Build a list of Feeds for the Twin Model
        feed_list = [
            # Add a single Feed called "currentTemp"
            {
                "id": "currentTemp",
                "storeLast": True,  # We want to store the last value sent
                "properties": [
                    # Add a Label for the Feed
                    {
                        "key": "http://www.w3.org/2000/01/rdf-schema#label",
                        "langLiteralValue": {
                            "value": "Temperature",
                            "lang": "en",
                        },
                    }
                ],
                # Add a single Value called "reading"
                "values": [
                    {
                        "comment": "Temperature in degrees Celsius",
                        "dataType": "decimal",
                        "label": "reading",
                        "unit": "http://purl.obolibrary.org/obo/UO_0000027",
                    }
                ],
            },
        ]

        model_twin_did = self._create_new_twin(
            twin_key_name="TwinPublisherModel",
            properties=property_list,
            feeds=feed_list,
        )

        return model_twin_did

    def create_twins_from_model(self, twin_model_id: str):
        twin_model = self._describe_local_twin(twin_id=twin_model_id)

        # Build a list of properties for the generic Twin
        property_list = [
            # Add the generic twins' "special" properties
            {
                "key": "https://data.iotics.com/app#model",
                "uriValue": {"value": twin_model_id},
            },
            {
                "key": "https://data.iotics.com/app#createdFrom",
                "uriValue": {"value": "https://data.iotics.com/app#ByModel"},
            },
        ]

        # Add to the list all the Twin Model's properties but consider the exceptions
        for prop in twin_model["result"]["properties"]:
            # This was the "special" property of the Twin Model, we don't need it here
            if prop["key"] == "http://www.w3.org/1999/02/22-rdf-syntax-ns#type":
                continue
            # Don't add the Label property of the Twin Model.
            # It'll be added later based on the number of the sensor
            if prop["key"] == "http://www.w3.org/2000/01/rdf-schema#label":
                continue
            # Change the Created At value
            if prop["key"] == "https://data.iotics.com/app#createdAt":
                prop["literalValue"]["value"] = datetime.now(
                    tz=timezone.utc
                ).isoformat()
            # Change the Updated At value
            if prop["key"] == "https://data.iotics.com/app#updatedAt":
                prop["literalValue"]["value"] = datetime.now(
                    tz=timezone.utc
                ).isoformat()

            property_list.append(prop)

        # Build a list of Feeds for the Twin Model
        feed_list = []

        # Scan the Twin Model's feeds list to get the Twin Model's Feed IDs.
        # Then use the Feed Describe API to get the Feeds' metadata
        for feed in twin_model["result"]["feeds"]:
            feed_id = feed["feedId"]["value"]

            # Use the Feed Describe API
            feed_description = self._describe_feed(
                twin_id=twin_model_id, feed_id=feed_id
            )
            feed_properties = feed_description["result"]["properties"]
            feed_values = feed_description["result"]["values"]
            store_last = feed_description["result"]["storeLast"]

            # Append to the list all the Feed's metadata
            feed_list.append(
                {
                    "id": feed_id,
                    "properties": feed_properties,
                    "values": feed_values,
                    "storeLast": store_last,
                }
            )

        # Finally create the 10 Publisher Twins
        for sensor_number in range(1, 11):
            twin_key_name = f"temp_sensor_{sensor_number}"
            twin_label = f"Temperature Sensor Publisher {sensor_number}"

            # Add the Label property according to the number of the Sensor
            property_list_with_label = property_list.copy()
            property_list_with_label.append(
                {
                    "key": "http://www.w3.org/2000/01/rdf-schema#label",
                    "langLiteralValue": {"value": twin_label, "lang": "en"},
                }
            )

            print(f"Creating Twin from Model - {twin_label}")

            twin_from_model_did = self._create_new_twin(
                twin_key_name=twin_key_name,
                properties=property_list_with_label,
                feeds=feed_list,
                location={"lat": 51.5, "lon": -0.1},
            )

            # Add the Twin's info in the mapping dictionary
            twin_info = {
                twin_key_name: {
                    "twin_did": twin_from_model_did,
                    "twin_label": twin_label,
                }
            }
            self._twins_info.update(twin_info)

    def publish_data(self, data: dict):
        for temp_data in data:
            temperature = temp_data["temp"]
            sensor_id = temp_data["sensor_id"]

            self._share_data(
                twin_label=self._twins_info[sensor_id]["twin_label"],
                twin_id=self._twins_info[sensor_id]["twin_did"],
                feed_id="currentTemp",
                value=temperature,
            )

    def _get_twin_label(self, property_list: List[dict]):
        for prop in property_list:
            if prop["key"] == "http://www.w3.org/2000/01/rdf-schema#label":
                return prop["langLiteralValue"]["value"]

        return None


def get_sensor_data():
    response = request(method="GET", url="http://flaskapi.dev.iotics.com/sensor_temp")
    sensor_data = response.json()

    return sensor_data


def main():
    publisher_connector = PublisherConnector()
    publisher_connector.setup()
    model_twin_did = publisher_connector.create_twin_model()
    publisher_connector.create_twins_from_model(twin_model_id=model_twin_did)

    while True:
        data = get_sensor_data()
        publisher_connector.publish_data(data)
        sleep(10)
        print("---")


if __name__ == "__main__":
    main()

Follower Connector

The mirrored version of the Publisher Connector is the Follower Connector. It is an application that works as a bridge between an IOTICSpace and the data consumer and/or data storage.

🚧

A Follower Connector can run in the same or a different IOTICSpace than the Twin Publishers

Use remote search and remote follow if the IOTICSpaces are not the same.

631

Representation of a Follower Connector in the same Space as the Twin Publishers

How to create a Follower Connector with the IOTICS API

Similar to the Publisher Connector, a set of Twin Followers are needed in order to ask the Twin Publishers' feed for the last shared value. One or more Twin Models will serve as a template for the Twin Followers so they can be classified according to their specific nature.

The following example shows how to set up a Follower Connector. It automatically creates 10 Follower Twins that follow the Sensor Twins' feed of the Publisher Connector, as demonstrated above, and asks for their last shared value every 10 seconds.

Click to see an example of a Follower Connector

Instructions:

  1. Run the publisher_connector.py in a terminal;
  2. Run the follower_connector.py code below in another terminal and in the same IOTICSpace as the Publisher Connector.
  3. Look at the data received on your terminal;
1. Get a fresh token to be used in the requests' headers;
2. Create a Twin Model;
  2.1 Build list of Properties;
  2.2 Get a new Twin DID;
  2.3 Use the Upsert Twin API;
3. Create Twins from Model;
  3.1 Use the Describe Local Twin API to get the Twin Model's metadata;
  3.2 Build list of Properties;
  For each Twin:
  3.3 Get a new Twin DID;
  3.4 Use the Upsert Twin API;
4. Search for Twin Publishers;
Every 10 seconds:
5. Get the Twin Publishers' last shared data.
import base64
import json
from datetime import datetime, timezone, timedelta
from time import sleep
from typing import List

from iotics.lib.identity.api.high_level_api import get_rest_high_level_identity_api
from requests import request

RESOLVER_URL = "resolver_url"
HOST = "host_url"

USER_KEY_NAME = "user_key_name"
AGENT_KEY_NAME = "agent_key_name"
USER_SEED = bytes.fromhex("user_seed")
AGENT_SEED = bytes.fromhex("agent_seed")


class FollowerConnector:
    def __init__(self):
        self._api = None
        self._user_registered_id = None
        self._agent_registered_id = None
        self._headers = None
        self._twins_info = None

    def setup(self):
        self._api = get_rest_high_level_identity_api(resolver_url=RESOLVER_URL)
        (
            self._user_registered_id,
            self._agent_registered_id,
        ) = self._api.create_user_and_agent_with_auth_delegation(
            user_seed=USER_SEED,
            user_key_name=USER_KEY_NAME,
            agent_seed=AGENT_SEED,
            agent_key_name=AGENT_KEY_NAME,
            delegation_name="#AuthDeleg",
        )

        self._headers = {
            "accept": "application/json",
            "Iotics-ClientAppId": "follower_connector",
            "Content-Type": "application/json",
        }
        self._refresh_token()
        self._twins_info = {}

    def _refresh_token(self, duration: int = 60):
        print("Refreshing token")
        print("---")
        token = self._api.create_agent_auth_token(
            agent_registered_identity=self._agent_registered_id,
            user_did=self._user_registered_id.did,
            duration=duration,
        )

        self._headers["Authorization"] = f"Bearer {token}"

    def _get_new_did(self, twin_key_name: str):
        twin_registered_id = self._api.create_twin_with_control_delegation(
            twin_seed=AGENT_SEED,
            twin_key_name=twin_key_name,
            agent_registered_identity=self._agent_registered_id,
            delegation_name="#ControlDeleg",
        )

        return twin_registered_id.did

    def _make_api_call(
        self, method: str, url: str, json: dict = None, retry: bool = True
    ):
        try:
            response = request(method=method, url=url, headers=self._headers, json=json)
            response.raise_for_status()
        except Exception as ex:
            if retry:
                self._refresh_token()
                return self._make_api_call(method, url, json, retry=False)
            else:
                print("Retried once. Still getting error", ex)

        return response.json()

    def _create_new_twin(
        self,
        twin_key_name: str,
        properties: List[str] = None,
        feeds: List[str] = None,
        visibility: str = "PRIVATE",
        location: dict = None,
    ):
        twin_did = self._get_new_did(twin_key_name)

        self._upsert_twin(
            twin_id=twin_did,
            properties=properties,
            feeds=feeds,
            visibility=visibility,
            location=location,
        )

        return twin_did

    def _describe_local_twin(self, twin_id: str):
        twin_description = self._make_api_call(
            method="GET", url=f"{HOST}/qapi/twins/{twin_id}"
        )

        return twin_description

    def _upsert_twin(
        self,
        twin_id: str,
        visibility: str = "PRIVATE",
        feeds: List[dict] = None,
        location: dict = None,
        properties: List[dict] = None,
    ):
        payload = {"twinId": twin_id, "visibility": visibility}

        if location:
            payload["location"] = location
        if feeds:
            payload["feeds"] = feeds
        if properties:
            payload["properties"] = properties

        self._make_api_call(method="PUT", url=f"{HOST}/qapi/twins", json=payload)

        print(f"Twin {twin_id} created succesfully with Metadata, Feeds and Values")
        print("---")

    def create_twin_model(self):
        print("Creating Twin Model")
        # Build a list of Properties for the Twin Model
        property_list = [
            # Twin Model "special" property
            {
                "key": "http://www.w3.org/1999/02/22-rdf-syntax-ns#type",
                "uriValue": {"value": "https://data.iotics.com/app#Model"},
            },
            # Add Label
            {
                "key": "http://www.w3.org/2000/01/rdf-schema#label",
                "langLiteralValue": {
                    "value": "Temperature Sensor Follower Model",
                    "lang": "en",
                },
            },
            # Add Comment
            {
                "key": "http://www.w3.org/2000/01/rdf-schema#comment",
                "langLiteralValue": {
                    "value": "A temperature sensor that follows temperature data",
                    "lang": "en",
                },
            },
            # Add colour
            {
                "key": "https://data.iotics.com/app#color",
                "stringLiteralValue": {"value": "#9aceff"},
            },
            # Add Space name
            {
                "key": "https://data.iotics.com/app#spaceName",
                "stringLiteralValue": {"value": "my-space"},
            },
            # Add CreatedAt property
            {
                "key": "https://data.iotics.com/app#createdAt",
                "literalValue": {
                    "dataType": "dateTime",
                    "value": datetime.now(tz=timezone.utc).isoformat(),
                },
            },
            # Add UpdatedAt property
            {
                "key": "https://data.iotics.com/app#updatedAt",
                "literalValue": {
                    "dataType": "dateTime",
                    "value": datetime.now(tz=timezone.utc).isoformat(),
                },
            },
        ]

        model_twin_did = self._create_new_twin(
            twin_key_name="TwinFollowerModel", properties=property_list
        )

        return model_twin_did

    def create_twins_from_model(self, twin_model_id: str):
        twin_model = self._describe_local_twin(twin_id=twin_model_id)

        # Build a list of properties for the generic Twin
        property_list = [
            # Add the generic twins' "special" properties
            {
                "key": "https://data.iotics.com/app#model",
                "uriValue": {"value": twin_model_id},
            },
            {
                "key": "https://data.iotics.com/app#createdFrom",
                "uriValue": {"value": "https://data.iotics.com/app#ByModel"},
            },
        ]

        # Add to the list all the Twin Model's properties but consider the exceptions
        for prop in twin_model["result"]["properties"]:
            # This was the "special" property of the Twin Model, we don't need it here
            if prop["key"] == "http://www.w3.org/1999/02/22-rdf-syntax-ns#type":
                continue
            # Don't add the Label property of the Twin Model.
            # It'll be added later based on the number of the sensor
            if prop["key"] == "http://www.w3.org/2000/01/rdf-schema#label":
                continue
            # Change the Created At value
            if prop["key"] == "https://data.iotics.com/app#createdAt":
                prop["literalValue"]["value"] = datetime.now(
                    tz=timezone.utc
                ).isoformat()
            # Change the Updated At value
            if prop["key"] == "https://data.iotics.com/app#updatedAt":
                prop["literalValue"]["value"] = datetime.now(
                    tz=timezone.utc
                ).isoformat()

            property_list.append(prop)

        # Finally create the 10 Sensor Twin Followers
        for sensor_number in range(1, 11):
            twin_key_name = f"temp_sensor_follower_{sensor_number}"
            twin_label = f"Temperature Sensor Follower {sensor_number}"

            # Add the Label property according to the number of the Sensor
            property_list_with_label = property_list.copy()
            property_list_with_label.append(
                {
                    "key": "http://www.w3.org/2000/01/rdf-schema#label",
                    "langLiteralValue": {"value": twin_label, "lang": "en"},
                }
            )

            print(f"Creating Twin from Model - {twin_label}")

            twin_from_model_did = self._create_new_twin(
                twin_key_name=twin_key_name,
                properties=property_list,
                location={"lat": 51.5, "lon": -0.1},
            )

            # Add the Twin's info in the mapping dictionary
            twin_info = {sensor_number: twin_from_model_did}
            self._twins_info.update(twin_info)

    def search_for_twin_publishers(self):
        # Initialise an empty list.
        # It will contain the list of Twins retrieved by the search
        twins_list = []

        # Add a new temporary field in the headers.
        # Client request timeout is used to stop the request processing once the timeout is reached
        headers = self._headers.copy()
        headers.update(
            {
                "Iotics-RequestTimeout": (
                    datetime.now(tz=timezone.utc) + timedelta(seconds=10)
                ).isoformat(),
            }
        )

        with request(
            method="POST",
            url=f"{HOST}/qapi/searches",
            headers=headers,
            stream=True,
            verify=False,
            params={"scope": "LOCAL"},
            json={
                "responseType": "FULL",
                "filter": {
                    "properties": [
                        {
                            "key": "https://data.iotics.com/app#createdFrom",
                            "uriValue": {
                                "value": "https://data.iotics.com/app#ByModel"
                            },
                        },
                    ],
                    "text": "shares",
                },
            },
        ) as resp:
            # Raises HTTPError, if one occurred
            resp.raise_for_status()
            # Iterates over the response data, one line at a time
            for chunk in resp.iter_lines():
                response = json.loads(chunk)
                twins_found = []
                try:
                    twins_found = response["result"]["payload"]["twins"]
                except (KeyError, IndexError):
                    continue
                finally:
                    if twins_found:
                        # Append the twins found to the list of twins
                        twins_list.extend(twins_found)

        print(f"Found {len(twins_list)} twin(s)")

        return twins_list

    def follow_data(self, twins_list: List[dict]):
        for twin in twins_list:
            # Get the sensor number from the Twin's label last string
            twin_publisher_label = self._get_twin_label(twin["properties"])
            sensor_number = twin_publisher_label.split(" ")[-1]
            # Check if the current Twin is the Twin Model.
            # In such case we don't want to follow it.
            if not sensor_number.isdigit():
                continue

            twin_publisher_id = twin["id"]["value"]
            twin_follower_id = self._twins_info.get(int(sensor_number))

            # We are supposed to know in advance the Feed ID we want our Twin Follower to follow
            twin_publisher_feed_id = "currentTemp"

            # Get the latest shared value
            encoded_data = self._make_api_call(
                method="GET",
                url=f"{HOST}/qapi/twins/{twin_follower_id}/interests/twins/{twin_publisher_id}/feeds/{twin_publisher_feed_id}/samples/last",
            )

            time = encoded_data["feedData"]["occurredAt"]
            data = encoded_data["feedData"]["data"]

            feed_data = json.loads(base64.b64decode(data).decode("ascii"))

            print(
                f"Last shared data of Twin {twin_publisher_label} occurred at utc time {time}: {feed_data}"
            )

    def _get_twin_label(self, property_list: List[dict]):
        for prop in property_list:
            if prop["key"] == "http://www.w3.org/2000/01/rdf-schema#label":
                return prop["langLiteralValue"]["value"]

        return None


def main():
    follower_connector = FollowerConnector()
    follower_connector.setup()
    model_twin_did = follower_connector.create_twin_model()
    follower_connector.create_twins_from_model(twin_model_id=model_twin_did)
    twins_list = follower_connector.search_for_twin_publishers()

    while True:
        follower_connector.follow_data(twins_list)
        sleep(10)
        print("---")


if __name__ == "__main__":
    main()

How to create a Follower Connector with the IOTICS API + STOMP

The following example shows how a Follower Connector can be built with a STOMP Client. The code generates a Twin Model and a generic Twin Follower so that the latter can register its interest in the 10 Publisher Twins' feeds. The STOMP Client allows the Twin Follower to receive the data as soon as the Publisher Twins share the data sample without needing to know the share frequency.

Click to see the code

Instructions:

  1. Download this library on your local machine;
  2. Install the above library in your Python venv: pip install iotic.web.stomp-1.0.6.tar.gz;
  3. Run the publisher_connector.py in a terminal;
  4. Run the follower_connector_stomp.py code below in another terminal and in the same IOTICSpace as the Publisher Connector.
  5. Look at the data received on your terminal.
1. Get a fresh token to be used in the requests' headers and in the Stomp Client;
2. Instantiate a Stomp Client;
3. Create a Twin Model;
  3.1 Build list of Properties;
  3.2 Get a new Twin DID;
  3.3 Use the Upsert Twin API;
4. Create a Twin from Model;
  4.1 Use the Describe Local Twin API to get the Twin Model's metadata;
  4.2 Build list of Properties;
  4.3 Get a new Twin DID;
  4.4 Use the Upsert Twin API;
5. Search for Twin Publishers;

For each Twin Publisher found:
6. Create a new STOMP subscription;
7. Print the data received;
import base64
import json
from datetime import datetime, timedelta, timezone
from time import sleep
from typing import Callable, Tuple, List

import stomp
from iotic.web.stomp.client import StompWSConnection12
from iotics.lib.identity.api.high_level_api import get_rest_high_level_identity_api
from requests import request

RESOLVER_URL = "resolver_url"
HOST = "host_url"

USER_KEY_NAME = "user_key_name"
AGENT_KEY_NAME = "agent_key_name"
USER_SEED = bytes.fromhex("user_seed")
AGENT_SEED = bytes.fromhex("agent_seed")


class StompClient:
    def __init__(
        self,
        endpoint: str,
        callback: Callable,
        token: str,
        heartbeats: Tuple[int, int] = (10000, 10000),
    ):
        self._endpoint = endpoint
        self._token = token
        self._stomp_client = None
        self._heartbeats = heartbeats
        self._callback = callback
        self._subscriptions_list = None

    def setup(self):
        self._stomp_client = StompWSConnection12(
            endpoint=self._endpoint, heartbeats=self._heartbeats
        )
        self._stomp_client.set_ssl(verify=False)
        self._stomp_client.set_listener(
            name="stomp_listener",
            lstnr=StompListener(
                stomp_client=self._stomp_client, callback=self._callback
            ),
        )

        self._stomp_client.connect(wait=True, passcode=self._token)
        self._subscriptions_list = []

    def subscribe(self, destination, subscription_id, headers):
        self._stomp_client.subscribe(
            destination=destination, id=subscription_id, headers=headers
        )
        self._subscriptions_list.append((destination, subscription_id, headers))

    def disconnect(self):
        self._stomp_client.disconnect()


class StompListener(stomp.ConnectionListener):
    def __init__(self, stomp_client, callback):
        self._stomp_client = stomp_client
        self._callback = callback

    def on_error(self, headers, body):
        print('Received an error "%s"' % body)

    def on_message(self, headers, body):
        self._callback(headers, body)

    def on_disconnected(self):
        self._stomp_client.disconnect()
        print("Disconnected")


class FollowerConnectorStomp:
    def __init__(self):
        self._api = None
        self._user_registered_id = None
        self._agent_registered_id = None
        self._headers = None
        self._twins_publisher_info = None
        self._stomp_client = None

    def setup(self):
        self._api = get_rest_high_level_identity_api(resolver_url=RESOLVER_URL)
        (
            self._user_registered_id,
            self._agent_registered_id,
        ) = self._api.create_user_and_agent_with_auth_delegation(
            user_seed=USER_SEED,
            user_key_name=USER_KEY_NAME,
            agent_seed=AGENT_SEED,
            agent_key_name=AGENT_KEY_NAME,
            delegation_name="#AuthDeleg",
        )

        self._headers = {
            "accept": "application/json",
            "Iotics-ClientAppId": "follower_connector_stomp",
            "Content-Type": "application/json",
        }
        token = self._refresh_token()
        self._twins_publisher_info = {}
        stomp_endpoint = request(method="GET", url=f"{HOST}/index.json").json()["stomp"]
        self._stomp_client = StompClient(
            endpoint=stomp_endpoint,
            callback=self._follow_callback,
            token=token,
        )
        self._stomp_client.setup()

    def _refresh_token(self, duration: int = 600):
        print("Refreshing token")
        print("---")
        token = self._api.create_agent_auth_token(
            agent_registered_identity=self._agent_registered_id,
            user_did=self._user_registered_id.did,
            duration=duration,
        )

        self._headers["Authorization"] = f"Bearer {token}"

        return token

    def _get_new_did(self, twin_key_name: str):
        twin_registered_id = self._api.create_twin_with_control_delegation(
            twin_seed=AGENT_SEED,
            twin_key_name=twin_key_name,
            agent_registered_identity=self._agent_registered_id,
            delegation_name="#ControlDeleg",
        )

        return twin_registered_id.did

    def _make_api_call(
        self, method: str, url: str, json: dict = None, retry: bool = True
    ):
        try:
            response = request(method=method, url=url, headers=self._headers, json=json)
            response.raise_for_status()
        except Exception as ex:
            if retry:
                self._refresh_token()
                return self._make_api_call(method, url, json, retry=False)

            print("Retried once. Still getting error", ex)

        return response.json()

    def _create_new_twin(
        self,
        twin_key_name: str,
        properties: List[str] = None,
        feeds: List[str] = None,
        visibility: str = "PRIVATE",
        location: dict = None,
    ):
        twin_did = self._get_new_did(twin_key_name)

        self._upsert_twin(
            twin_id=twin_did,
            properties=properties,
            feeds=feeds,
            visibility=visibility,
            location=location,
        )

        return twin_did

    def _describe_local_twin(self, twin_id: str):
        twin_description = self._make_api_call(
            method="GET", url=f"{HOST}/qapi/twins/{twin_id}"
        )

        return twin_description

    def _upsert_twin(
        self,
        twin_id: str,
        visibility: str = "PRIVATE",
        feeds: List[dict] = None,
        location: dict = None,
        properties: List[dict] = None,
    ):
        payload = {"twinId": twin_id, "visibility": visibility}

        if location:
            payload["location"] = location
        if feeds:
            payload["feeds"] = feeds
        if properties:
            payload["properties"] = properties

        self._make_api_call(method="PUT", url=f"{HOST}/qapi/twins", json=payload)

        print(f"Twin {twin_id} created succesfully with Metadata, Feeds and Values")
        print("---")

    def create_twin_model(self):
        print("Creating Twin Model")
        # Build a list of Properties for the Twin Model
        property_list = [
            # Twin Model "special" property
            {
                "key": "http://www.w3.org/1999/02/22-rdf-syntax-ns#type",
                "uriValue": {"value": "https://data.iotics.com/app#Model"},
            },
            # Add Label
            {
                "key": "http://www.w3.org/2000/01/rdf-schema#label",
                "langLiteralValue": {
                    "value": "Temperature Sensor Follower Model",
                    "lang": "en",
                },
            },
            # Add Comment
            {
                "key": "http://www.w3.org/2000/01/rdf-schema#comment",
                "langLiteralValue": {
                    "value": "A temperature sensor that follows temperature data",
                    "lang": "en",
                },
            },
            # Add colour
            {
                "key": "https://data.iotics.com/app#color",
                "stringLiteralValue": {"value": "#9aceff"},
            },
            # Add Space name
            {
                "key": "https://data.iotics.com/app#spaceName",
                "stringLiteralValue": {"value": "my-space"},
            },
            # Add CreatedAt property
            {
                "key": "https://data.iotics.com/app#createdAt",
                "literalValue": {
                    "dataType": "dateTime",
                    "value": datetime.now(tz=timezone.utc).isoformat(),
                },
            },
            # Add UpdatedAt property
            {
                "key": "https://data.iotics.com/app#updatedAt",
                "literalValue": {
                    "dataType": "dateTime",
                    "value": datetime.now(tz=timezone.utc).isoformat(),
                },
            },
        ]

        model_twin_did = self._create_new_twin(
            twin_key_name="TwinFollowerModel", properties=property_list
        )

        return model_twin_did

    def create_twin_from_model(self, twin_model_id: str):
        twin_model = self._describe_local_twin(twin_id=twin_model_id)

        # Build a list of properties for the generic Twin
        property_list = [
            # Add the generic twins' "special" properties
            {
                "key": "https://data.iotics.com/app#model",
                "uriValue": {"value": twin_model_id},
            },
            {
                "key": "https://data.iotics.com/app#createdFrom",
                "uriValue": {"value": "https://data.iotics.com/app#ByModel"},
            },
        ]

        # Add to the list all the Twin Model's properties but consider the exceptions
        for prop in twin_model["result"]["properties"]:
            # This was the "special" property of the Twin Model, we don't need it here
            if prop["key"] == "http://www.w3.org/1999/02/22-rdf-syntax-ns#type":
                continue
            # Don't add the Label property of the Twin Model.
            # It'll be added later based on the number of the sensor
            if prop["key"] == "http://www.w3.org/2000/01/rdf-schema#label":
                continue
            # Change the Created At value
            if prop["key"] == "https://data.iotics.com/app#createdAt":
                prop["literalValue"]["value"] = datetime.now(
                    tz=timezone.utc
                ).isoformat()
            # Change the Updated At value
            if prop["key"] == "https://data.iotics.com/app#updatedAt":
                prop["literalValue"]["value"] = datetime.now(
                    tz=timezone.utc
                ).isoformat()

            property_list.append(prop)

        # Finally create the Sensor Twin Follower
        twin_key_name = "temp_sensor_follower"
        twin_label = "Temperature Sensor Follower"

        # Add the Label property according to the number of the Sensor
        property_list_with_label = property_list.copy()
        property_list_with_label.append(
            {
                "key": "http://www.w3.org/2000/01/rdf-schema#label",
                "langLiteralValue": {"value": twin_label, "lang": "en"},
            }
        )

        print(f"Creating Twin from Model - {twin_label}")

        twin_follower_did = self._create_new_twin(
            twin_key_name=twin_key_name,
            properties=property_list_with_label,
            location={"lat": 51.5, "lon": -0.1},
        )

        return twin_follower_did

    def search_for_twins_publisher(self):
        # Initialise an empty list.
        # It will contain the list of Twins retrieved by the search
        twins_list = []

        # Add a new temporary field in the headers.
        # Client request timeout is used to stop the request processing once the timeout is reached
        headers = self._headers.copy()
        headers.update(
            {
                "Iotics-RequestTimeout": (
                    datetime.now(tz=timezone.utc) + timedelta(seconds=10)
                ).isoformat(),
            }
        )

        with request(
            method="POST",
            url=f"{HOST}/qapi/searches",
            headers=headers,
            stream=True,
            verify=False,
            params={"scope": "LOCAL"},
            json={
                "responseType": "FULL",
                "filter": {
                    "properties": [
                        {
                            "key": "https://data.iotics.com/app#createdFrom",
                            "uriValue": {
                                "value": "https://data.iotics.com/app#ByModel"
                            },
                        },
                    ],
                    "text": "shares",
                },
            },
        ) as resp:
            # Raises HTTPError, if one occurred
            resp.raise_for_status()
            # Iterates over the response data, one line at a time
            for chunk in resp.iter_lines():
                response = json.loads(chunk)
                twins_found = []
                try:
                    twins_found = response["result"]["payload"]["twins"]
                except (KeyError, IndexError):
                    continue
                finally:
                    if twins_found:
                        # Append the twins found to the list of twins
                        twins_list.extend(twins_found)

        print(f"Found {len(twins_list)} twin(s) publisher")

        return twins_list

    def _follow_callback(self, headers, body):
        payload = json.loads(body)

        twin_publisher_did = payload["interest"]["followedFeed"]["feed"]["twinId"][
            "value"
        ]
        twin_publisher_label = self._twins_publisher_info.get(twin_publisher_did)
        shared_data = payload["feedData"]["data"]
        time = payload["feedData"]["occurredAt"]

        decoded_data = json.loads(base64.b64decode(shared_data).decode("ascii"))
        print(f"Received {decoded_data} from {twin_publisher_label} at {time}")

    def _subscribe_to_feed(
        self,
        follower_twin_id,
        followed_twin_id,
        followed_feed_name,
    ):
        follow_local_feed_url = f"/qapi/twins/{follower_twin_id}/interests/twins/{followed_twin_id}/feeds/{followed_feed_name}"

        self._stomp_client.subscribe(
            destination=follow_local_feed_url,
            subscription_id=followed_twin_id,
            headers=self._headers,
        )

        return follower_twin_id

    def follow_feed(self, twins_found_list: List[dict], twin_follower_did: str):
        for twin in twins_found_list:
            twin_publisher_did = twin["id"]["value"]
            twin_publisher_label = self._get_twin_label(twin["properties"])

            self._subscribe_to_feed(
                follower_twin_id=twin_follower_did,
                followed_twin_id=twin_publisher_did,
                followed_feed_name="currentTemp",
            )

            twin_info = {twin_publisher_did: twin_publisher_label}
            self._twins_publisher_info.update(twin_info)

    @staticmethod
    def _get_twin_label(property_list: List[dict]):
        for prop in property_list:
            if prop["key"] == "http://www.w3.org/2000/01/rdf-schema#label":
                return prop["langLiteralValue"]["value"]

        return None


def main():
    follower_connector = FollowerConnectorStomp()
    follower_connector.setup()
    model_twin_did = follower_connector.create_twin_model()
    twin_follower_did = follower_connector.create_twin_from_model(
        twin_model_id=model_twin_did
    )
    twins_list = follower_connector.search_for_twins_publisher()
    follower_connector.follow_feed(
        twins_found_list=twins_list, twin_follower_did=twin_follower_did
    )

    while True:
        sleep(10)


if __name__ == "__main__":
    main()

How to export data from an IOTICSpace

Data can be exported outside IOTICS in order to persistently store a Digital Twin's published data.

The code below provides an example of how to allow the follower_connector.py to store the sensor data received into either a CSV file or an SQLite database.

Click to see the code

Instructions:

  1. Install a new python library to use the SQLite Engine: pip install SQLAlchemy;
  2. Create a new directory called export_data. Within the latter create 2 other directories: csv and sqlite;
  3. Add the sqlite_engine.py code below to the sqlite directory and the csv_engine.py to the csv directory as follows:
├── publisher_connector.py
├── follower_connector_export.py
├── export_data
│   ├── csv
│   │   └── csv_engine.py
│   └── sqlite
│       └── sqlite_engine.py
  1. Run the publisher_connector.py in a terminal and the follower_connector_export.py code below in another terminal;
  2. Change the export_data_class class (line 352) with either CSV or SQLite;
  3. Look at the data being added to the temperatures.csv file or temperatures.db according to the choice above.
import base64
import json
from datetime import datetime, timedelta, timezone
from time import sleep
from typing import List

from iotics.lib.identity.api.high_level_api import get_rest_high_level_identity_api
from requests import request

from export_data.csv.csv_engine import CSV
from export_data.sqlite.sqlite_engine import SQLite

RESOLVER_URL = "resolver_url"
HOST = "host_url"

USER_KEY_NAME = "user_key_name"
AGENT_KEY_NAME = "agent_key_name"
USER_SEED = bytes.fromhex("user_seed")
AGENT_SEED = bytes.fromhex("agent_seed")


class FollowerConnectorExport:
    def __init__(self, export_data_class):
        self._api = None
        self._user_registered_id = None
        self._agent_registered_id = None
        self._headers = None
        self._twins_info = None
        self._export_data = export_data_class

    def setup(self):
        self._api = get_rest_high_level_identity_api(resolver_url=RESOLVER_URL)
        (
            self._user_registered_id,
            self._agent_registered_id,
        ) = self._api.create_user_and_agent_with_auth_delegation(
            user_seed=USER_SEED,
            user_key_name=USER_KEY_NAME,
            agent_seed=AGENT_SEED,
            agent_key_name=AGENT_KEY_NAME,
            delegation_name="#AuthDeleg",
        )

        self._headers = {
            "accept": "application/json",
            "Iotics-ClientAppId": "follower_connector",
            "Content-Type": "application/json",
        }
        self._refresh_token()
        self._twins_info = {}

    def _refresh_token(self, duration: int = 60):
        print("Refreshing token")
        print("---")
        token = self._api.create_agent_auth_token(
            agent_registered_identity=self._agent_registered_id,
            user_did=self._user_registered_id.did,
            duration=duration,
        )

        self._headers["Authorization"] = f"Bearer {token}"

    def _get_new_did(self, twin_key_name: str):
        twin_registered_id = self._api.create_twin_with_control_delegation(
            twin_seed=AGENT_SEED,
            twin_key_name=twin_key_name,
            agent_registered_identity=self._agent_registered_id,
            delegation_name="#ControlDeleg",
        )

        return twin_registered_id.did

    def _make_api_call(
        self, method: str, url: str, json: dict = None, retry: bool = True
    ):
        try:
            response = request(method=method, url=url, headers=self._headers, json=json)
            response.raise_for_status()
        except Exception as ex:
            if retry:
                self._refresh_token()
                return self._make_api_call(method, url, json, retry=False)
            else:
                print("Retried once. Still getting error", ex)

        return response.json()

    def _create_new_twin(
        self,
        twin_key_name: str,
        properties: List[str] = None,
        feeds: List[str] = None,
        visibility: str = "PRIVATE",
        location: dict = None,
    ):
        twin_did = self._get_new_did(twin_key_name)

        self._upsert_twin(
            twin_id=twin_did,
            properties=properties,
            feeds=feeds,
            visibility=visibility,
            location=location,
        )

        return twin_did

    def _describe_local_twin(self, twin_id: str):
        twin_description = self._make_api_call(
            method="GET", url=f"{HOST}/qapi/twins/{twin_id}"
        )

        return twin_description

    def _upsert_twin(
        self,
        twin_id: str,
        visibility: str = "PRIVATE",
        feeds: List[dict] = None,
        location: dict = None,
        properties: List[dict] = None,
    ):
        payload = {"twinId": twin_id, "visibility": visibility}

        if location:
            payload["location"] = location
        if feeds:
            payload["feeds"] = feeds
        if properties:
            payload["properties"] = properties

        self._make_api_call(method="PUT", url=f"{HOST}/qapi/twins", json=payload)

        print(f"Twin {twin_id} created succesfully with Metadata, Feeds and Values")
        print("---")

    def create_twin_model(self):
        print("Creating Twin Model")
        # Build a list of Properties for the Twin Model
        property_list = [
            # Twin Model "special" property
            {
                "key": "http://www.w3.org/1999/02/22-rdf-syntax-ns#type",
                "uriValue": {"value": "https://data.iotics.com/app#Model"},
            },
            # Add Label
            {
                "key": "http://www.w3.org/2000/01/rdf-schema#label",
                "langLiteralValue": {
                    "value": "Temperature Sensor Follower Model",
                    "lang": "en",
                },
            },
            # Add Comment
            {
                "key": "http://www.w3.org/2000/01/rdf-schema#comment",
                "langLiteralValue": {
                    "value": "A temperature sensor that follows temperature data",
                    "lang": "en",
                },
            },
            # Add colour
            {
                "key": "https://data.iotics.com/app#color",
                "stringLiteralValue": {"value": "#9aceff"},
            },
            # Add Space name
            {
                "key": "https://data.iotics.com/app#spaceName",
                "stringLiteralValue": {"value": "my-space"},
            },
            # Add CreatedAt property
            {
                "key": "https://data.iotics.com/app#createdAt",
                "literalValue": {
                    "dataType": "dateTime",
                    "value": datetime.now(tz=timezone.utc).isoformat(),
                },
            },
            # Add UpdatedAt property
            {
                "key": "https://data.iotics.com/app#updatedAt",
                "literalValue": {
                    "dataType": "dateTime",
                    "value": datetime.now(tz=timezone.utc).isoformat(),
                },
            },
        ]

        model_twin_did = self._create_new_twin(
            twin_key_name="TwinFollowerModel", properties=property_list
        )

        return model_twin_did

    def create_twins_from_model(self, twin_model_id: str):
        twin_model = self._describe_local_twin(twin_id=twin_model_id)

        # Build a list of properties for the generic Twin
        property_list = [
            # Add the generic twins' "special" properties
            {
                "key": "https://data.iotics.com/app#model",
                "uriValue": {"value": twin_model_id},
            },
            {
                "key": "https://data.iotics.com/app#createdFrom",
                "uriValue": {"value": "https://data.iotics.com/app#ByModel"},
            },
        ]

        # Add to the list all the Twin Model's properties but consider the exceptions
        for prop in twin_model["result"]["properties"]:
            # This was the "special" property of the Twin Model, we don't need it here
            if prop["key"] == "http://www.w3.org/1999/02/22-rdf-syntax-ns#type":
                continue
            # Don't add the Label property of the Twin Model.
            # It'll be added later based on the number of the sensor
            if prop["key"] == "http://www.w3.org/2000/01/rdf-schema#label":
                continue
            # Change the Created At value
            if prop["key"] == "https://data.iotics.com/app#createdAt":
                prop["literalValue"]["value"] = datetime.now(
                    tz=timezone.utc
                ).isoformat()
            # Change the Updated At value
            if prop["key"] == "https://data.iotics.com/app#updatedAt":
                prop["literalValue"]["value"] = datetime.now(
                    tz=timezone.utc
                ).isoformat()

            property_list.append(prop)

        # Finally create the 10 Sensor Twin Followers
        for sensor_number in range(1, 11):
            twin_key_name = f"temp_sensor_follower_{sensor_number}"
            twin_label = f"Temperature Sensor Follower {sensor_number}"

            # Add the Label property according to the number of the Sensor
            property_list_with_label = property_list.copy()
            property_list_with_label.append(
                {
                    "key": "http://www.w3.org/2000/01/rdf-schema#label",
                    "langLiteralValue": {"value": twin_label, "lang": "en"},
                }
            )

            print(f"Creating Twin from Model - {twin_label}")

            twin_from_model_did = self._create_new_twin(
                twin_key_name=twin_key_name,
                properties=property_list,
                location={"lat": 51.5, "lon": -0.1},
            )

            # Add the Twin's info in the mapping dictionary
            twin_info = {sensor_number: twin_from_model_did}
            self._twins_info.update(twin_info)

    def search_for_twin_publishers(self):
        # Initialise an empty list.
        # It will contain the list of Twins retrieved by the search
        twins_list = []

        # Add a new temporary field in the headers.
        # Client request timeout is used to stop the request processing once the timeout is reached
        headers = self._headers.copy()
        headers.update(
            {
                "Iotics-RequestTimeout": (
                    datetime.now(tz=timezone.utc) + timedelta(seconds=10)
                ).isoformat(),
            }
        )

        with request(
            method="POST",
            url=f"{HOST}/qapi/searches",
            headers=headers,
            stream=True,
            verify=False,
            params={"scope": "LOCAL"},
            json={
                "responseType": "FULL",
                "filter": {
                    "properties": [
                        {
                            "key": "https://data.iotics.com/app#createdFrom",
                            "uriValue": {
                                "value": "https://data.iotics.com/app#ByModel"
                            },
                        },
                    ],
                    "text": "shares",
                },
            },
        ) as resp:
            # Raises HTTPError, if one occurred
            resp.raise_for_status()
            # Iterates over the response data, one line at a time
            for chunk in resp.iter_lines():
                response = json.loads(chunk)
                twins_found = []
                try:
                    twins_found = response["result"]["payload"]["twins"]
                except (KeyError, IndexError):
                    continue
                finally:
                    if twins_found:
                        # Append the twins found to the list of twins
                        twins_list.extend(twins_found)

        print(f"Found {len(twins_list)} twin(s)")

        return twins_list

    def follow_data(self, twins_list: List[dict]):
        for twin in twins_list:
            # Get the sensor number from the Twin's label last string
            twin_publisher_label = self._get_twin_label(twin["properties"])
            sensor_number = twin_publisher_label.split(" ")[-1]
            # Check if the current Twin is the Twin Model.
            # In such case we don't want to follow it.
            if not sensor_number.isdigit():
                continue

            twin_publisher_id = twin["id"]["value"]
            twin_follower_id = self._twins_info.get(int(sensor_number))

            # We are supposed to know in advance the Feed ID we want our Twin Follower to follow
            twin_publisher_feed_id = "currentTemp"

            # Get the latest shared value
            encoded_data = self._make_api_call(
                method="GET",
                url=f"{HOST}/qapi/twins/{twin_follower_id}/interests/twins/{twin_publisher_id}/feeds/{twin_publisher_feed_id}/samples/last",
            )

            datetime = encoded_data["feedData"]["occurredAt"]
            data = encoded_data["feedData"]["data"]

            feed_data = json.loads(base64.b64decode(data).decode("ascii"))

            print(
                f"Last shared data of Twin {twin_publisher_label} occurred at utc time {datetime}: {feed_data}"
            )

            # Store data externally
            self._export_data.export(
                datetime=datetime,
                received_from=twin_publisher_label,
                data=feed_data.get("reading"),
            )

    def _get_twin_label(self, property_list: List[dict]):
        for prop in property_list:
            if prop["key"] == "http://www.w3.org/2000/01/rdf-schema#label":
                return prop["langLiteralValue"]["value"]

        return None


def main():
    # Replace the 'export_data_class' class with 'CSV' to store the data into a CSV file
    follower_connector = FollowerConnectorExport(export_data_class=SQLite())
    follower_connector.setup()
    model_twin_did = follower_connector.create_twin_model()
    follower_connector.create_twins_from_model(twin_model_id=model_twin_did)
    twins_list = follower_connector.search_for_twin_publishers()

    while True:
        follower_connector.follow_data(twins_list)
        sleep(10)
        print("---")


if __name__ == "__main__":
    main()
from sqlalchemy import Column, Float, Integer, String, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

DB_NAME = "sqlite:///export_data/sqlite/temperatures.db"
Base = declarative_base()


class SensorReading(Base):
    __tablename__ = "SensorReading"

    id = Column(Integer, primary_key=True)
    timestamp = Column(String(100))
    sensor_number = Column(Integer)
    sensor_reading = Column(Float)


class SQLite:
    def __init__(self, echo=False):
        engine = create_engine(DB_NAME, echo=echo)
        Base.metadata.create_all(bind=engine)
        Session = sessionmaker(engine)
        self._session = Session()

    def _store(self, item: SensorReading):
        with self._session as session:
            session.add(item)
            session.commit()

        print("Item stored correctly")

    def export(self, datetime: str, received_from: str, data: dict):
        sensor_reading_obj = SensorReading(
            timestamp=datetime, sensor_number=received_from, sensor_reading=data
        )

        self._store(sensor_reading_obj)
import csv
from collections import namedtuple

CSV_FILE = "./export_data/csv/temperatures.csv"
CSV_FIELDS = ["timestamp", "sensor_number", "sensor_reading"]
SensorReading = namedtuple("SensorReading", CSV_FIELDS)


class CSV:
    def __init__(self, n_items=10):
        self._items_list = []
        self._n_items = n_items

        with open(CSV_FILE, "w") as csv_file:
            writer = csv.writer(csv_file)
            # Write header
            writer.writerow(CSV_FIELDS)

    def _store(self):
        with open(CSV_FILE, "a") as csv_file:
            writer = csv.writer(csv_file)
            for item in self._items_list:
                writer.writerow(item)

        print("Item stored correctly")
        self._items_list.clear()

    def export(self, datetime: str, received_from: str, data: dict):
        new_item = SensorReading(
            timestamp=datetime, sensor_number=received_from, sensor_reading=data
        )
        self._items_list.append(new_item)

        if len(self._items_list) >= self._n_items:
            self._store()

Synthesiser Connector

A Synthesiser Connector, or simply Synthesiser, is an application that continuously allows Follower Twins to receive, transform and send the data back into the Space.

In other words, a Synthesiser creates a set of Twins that
1 continuously follow Publisher Twins' feeds;
2. "synthesise" or interoperate the data;
3. publish the new data back as a feed.

572

Representation of a Synthesiser Connector

How to create a Synthesiser Connector with the IOTICS API

A Synthesiser application creates a set of Follower Twins with one or more Feeds so that they both receive data and share the new data.

The following snippet provides an example of a Synthesiser that:

  1. Creates 10 Synthesiser Twins with a single Feed;
  2. Follows the Publisher Connector Twins' feed with temperature readings;
  3. Computes the average temperature for each sensor Twin;
  4. Shares the average data as a feed.
Click to see the code

Instructions:

  1. Run the publisher_connector.py in a terminal;
  2. Run the synthesiser_connector.py code below in another terminal.
1. Get a fresh token to be used in the requests' headers;
2. Create a Twin Model;
  2.1 Build list of Properties;
  2.2 Build list of Feeds;
  2.3 Get a new Twin DID;
  2.4 Use the Upsert Twin API;
3. Create Twins from Model;
  3.1 Use the Describe Local Twin API to get the Twin Model's metadata;
  3.2 Build list of Properties;
  3.3 Use the Describe Feed API to get the  the Twin Model's Feed's metadata;
  3.4 Build list of Feeds;
  For each Twin to follow:
  3.5 Get a new Twin DID;
  3.6 Use the Upsert Twin API;
Every 10 seconds:
4. Get the Twin Publishers' last shared data;
5. Compute the average temperature;
6. Share data;
import base64
import json
from datetime import datetime, timezone, timedelta
from time import sleep
from typing import List

from iotics.lib.identity.api.high_level_api import get_rest_high_level_identity_api
from requests import request

RESOLVER_URL = "resolver_url"
HOST = "host_url"

USER_KEY_NAME = "user_key_name"
AGENT_KEY_NAME = "agent_key_name"
USER_SEED = bytes.fromhex("user_seed")
AGENT_SEED = bytes.fromhex("agent_seed")


class SynthesiserConnector:
    def __init__(self):
        self._api = None
        self._user_registered_id = None
        self._agent_registered_id = None
        self._headers = None
        self._twins_info = None

    def setup(self):
        self._api = get_rest_high_level_identity_api(resolver_url=RESOLVER_URL)
        (
            self._user_registered_id,
            self._agent_registered_id,
        ) = self._api.create_user_and_agent_with_auth_delegation(
            user_seed=USER_SEED,
            user_key_name=USER_KEY_NAME,
            agent_seed=AGENT_SEED,
            agent_key_name=AGENT_KEY_NAME,
            delegation_name="#AuthDeleg",
        )

        self._headers = {
            "accept": "application/json",
            "Iotics-ClientAppId": "synthesiser_connector",
            "Content-Type": "application/json",
        }
        self._refresh_token()
        self._twins_info = {}

    def _refresh_token(self, duration: int = 60):
        print("Refreshing token")
        print("---")
        token = self._api.create_agent_auth_token(
            agent_registered_identity=self._agent_registered_id,
            user_did=self._user_registered_id.did,
            duration=duration,
        )

        self._headers["Authorization"] = f"Bearer {token}"

    def _get_new_did(self, twin_key_name: str):
        twin_registered_id = self._api.create_twin_with_control_delegation(
            twin_seed=AGENT_SEED,
            twin_key_name=twin_key_name,
            agent_registered_identity=self._agent_registered_id,
            delegation_name="#ControlDeleg",
        )

        return twin_registered_id.did

    def _make_api_call(
        self, method: str, url: str, json: dict = None, retry: bool = True
    ):
        try:
            response = request(method=method, url=url, headers=self._headers, json=json)
            response.raise_for_status()
        except Exception as ex:
            if retry:
                self._refresh_token()
                return self._make_api_call(method, url, json, retry=False)
            else:
                print("Retried once. Still getting error", ex)

        return response.json()

    def _create_new_twin(
        self,
        twin_key_name: str,
        properties: List[str] = None,
        feeds: List[str] = None,
        visibility: str = "PRIVATE",
        location: dict = None,
    ):
        twin_did = self._get_new_did(twin_key_name)

        self._upsert_twin(
            twin_id=twin_did,
            properties=properties,
            feeds=feeds,
            visibility=visibility,
            location=location,
        )

        return twin_did

    def _describe_feed(self, twin_id: str, feed_id: str):
        feed_description = self._make_api_call(
            method="GET",
            url=f"{HOST}/qapi/twins/{twin_id}/feeds/{feed_id}",
        )

        return feed_description

    def _describe_local_twin(self, twin_id: str):
        twin_description = self._make_api_call(
            method="GET", url=f"{HOST}/qapi/twins/{twin_id}"
        )

        return twin_description

    def _upsert_twin(
        self,
        twin_id: str,
        visibility: str = "PRIVATE",
        feeds: List[dict] = None,
        location: dict = None,
        properties: List[dict] = None,
    ):
        payload = {"twinId": twin_id, "visibility": visibility}

        if location:
            payload["location"] = location
        if feeds:
            payload["feeds"] = feeds
        if properties:
            payload["properties"] = properties

        self._make_api_call(method="PUT", url=f"{HOST}/qapi/twins", json=payload)

        print(f"Twin {twin_id} created succesfully with Metadata, Feeds and Values")
        print("---")

    def _share_data(self, twin_label: str, twin_id: str, feed_id: str, value):
        data_to_share = {"average": value}
        encoded_data = base64.b64encode(json.dumps(data_to_share).encode()).decode()
        data_to_share_payload = {
            "sample": {
                "data": encoded_data,
                "mime": "application/json",
                "occurredAt": datetime.now(tz=timezone.utc).isoformat(
                    timespec="seconds"
                ),
            }
        }
        self._make_api_call(
            method="POST",
            url=f"{HOST}/qapi/twins/{twin_id}/feeds/{feed_id}/shares",
            json=data_to_share_payload,
        )

        print(f"Shared {data_to_share} from Twin {twin_label}")

    def create_twin_model(self):
        print("Creating Twin Model")
        # Build a list of Properties for the Twin Model
        property_list = [
            # Twin Model "special" property
            {
                "key": "http://www.w3.org/1999/02/22-rdf-syntax-ns#type",
                "uriValue": {"value": "https://data.iotics.com/app#Model"},
            },
            # Add Label
            {
                "key": "http://www.w3.org/2000/01/rdf-schema#label",
                "langLiteralValue": {
                    "value": "Temperature Sensor Synthesiser Model",
                    "lang": "en",
                },
            },
            # Add Comment
            {
                "key": "http://www.w3.org/2000/01/rdf-schema#comment",
                "langLiteralValue": {
                    "value": "A temperature sensor that follows temperature data",
                    "lang": "en",
                },
            },
            # Add colour
            {
                "key": "https://data.iotics.com/app#color",
                "stringLiteralValue": {"value": "#9aceff"},
            },
            # Add Space name
            {
                "key": "https://data.iotics.com/app#spaceName",
                "stringLiteralValue": {"value": "my-space"},
            },
            # Add CreatedAt property
            {
                "key": "https://data.iotics.com/app#createdAt",
                "literalValue": {
                    "dataType": "dateTime",
                    "value": datetime.now(tz=timezone.utc).isoformat(),
                },
            },
            # Add UpdatedAt property
            {
                "key": "https://data.iotics.com/app#updatedAt",
                "literalValue": {
                    "dataType": "dateTime",
                    "value": datetime.now(tz=timezone.utc).isoformat(),
                },
            },
        ]

        # Build a list of Feeds for the Twin Model
        feed_list = [
            # Add a single Feed called "currentTemp"
            {
                "id": "averageTemp",
                "storeLast": True,  # We want to store the last value sent
                "properties": [
                    # Add a Label for the Feed
                    {
                        "key": "http://www.w3.org/2000/01/rdf-schema#label",
                        "langLiteralValue": {
                            "value": "Average Temperature",
                            "lang": "en",
                        },
                    }
                ],
                # Add a single Value called "reading"
                "values": [
                    {
                        "comment": "Temperature in degrees Celsius",
                        "dataType": "decimal",
                        "label": "average",
                        "unit": "http://purl.obolibrary.org/obo/UO_0000027",
                    }
                ],
            },
        ]

        model_twin_did = self._create_new_twin(
            twin_key_name="TwinSynthesiserModel",
            properties=property_list,
            feeds=feed_list,
        )

        return model_twin_did

    def create_twins_from_model(self, twin_model_id: str):
        twin_model = self._describe_local_twin(twin_id=twin_model_id)

        # Build a list of properties for the generic Twin
        property_list = [
            # Add the generic twins' "special" properties
            {
                "key": "https://data.iotics.com/app#model",
                "uriValue": {"value": twin_model_id},
            },
            {
                "key": "https://data.iotics.com/app#createdFrom",
                "uriValue": {"value": "https://data.iotics.com/app#ByModel"},
            },
        ]

        # Add to the list all the Twin Model's properties but consider the exceptions
        for prop in twin_model["result"]["properties"]:
            # This was the "special" property of the Twin Model, we don't need it here
            if prop["key"] == "http://www.w3.org/1999/02/22-rdf-syntax-ns#type":
                continue
            # Don't add the Label property of the Twin Model.
            # It'll be added later based on the number of the sensor
            if prop["key"] == "http://www.w3.org/2000/01/rdf-schema#label":
                continue
            # Change the Created At value
            if prop["key"] == "https://data.iotics.com/app#createdAt":
                prop["literalValue"]["value"] = datetime.now(
                    tz=timezone.utc
                ).isoformat()
            # Change the Updated At value
            if prop["key"] == "https://data.iotics.com/app#updatedAt":
                prop["literalValue"]["value"] = datetime.now(
                    tz=timezone.utc
                ).isoformat()

            property_list.append(prop)

        # Build a list of Feeds for the Twin Model
        feed_list = []

        # Scan the Twin Model's feeds list to get the Twin Model's Feed IDs.
        # Then use the Feed Describe API to get the Feeds' metadata
        for feed in twin_model["result"]["feeds"]:
            feed_id = feed["feedId"]["value"]

            # Use the Feed Describe API
            feed_description = self._describe_feed(
                twin_id=twin_model_id, feed_id=feed_id
            )
            feed_properties = feed_description["result"]["properties"]
            feed_values = feed_description["result"]["values"]
            store_last = feed_description["result"]["storeLast"]

            # Append to the list all the Feed's metadata
            feed_list.append(
                {
                    "id": feed_id,
                    "properties": feed_properties,
                    "values": feed_values,
                    "storeLast": store_last,
                }
            )

        # Finally create the 10 Twin Synthesisers
        for sensor_number in range(1, 11):
            twin_key_name = f"avg_temp_sensor_{sensor_number}"
            twin_label = f"Temperature Sensor Synthesiser {sensor_number}"

            # Add the Label property according to the number of the Sensor
            property_list_with_label = property_list.copy()
            property_list_with_label.append(
                {
                    "key": "http://www.w3.org/2000/01/rdf-schema#label",
                    "langLiteralValue": {"value": twin_label, "lang": "en"},
                }
            )

            print(f"Creating Twin from Model - {twin_label}")

            twin_from_model_did = self._create_new_twin(
                twin_key_name=twin_key_name,
                properties=property_list,
                feeds=feed_list,
                location={"lat": 51.5, "lon": -0.1},
            )

            # Add the Twin's info in the mapping dictionary
            twin_info = {
                sensor_number: {
                    "twin_label": twin_label,
                    "twin_id": twin_from_model_did,
                    "feed_id": feed_id,
                    "previous_mean": 0,
                    "sample_count": 1,
                }
            }
            self._twins_info.update(twin_info)

    def search_for_twin_publishers(self):
        # Initialise an empty list.
        # It will contain the list of Twins retrieved by the search
        twins_list = []

        # Add a new temporary field in the headers.
        # Client request timeout is used to stop the request processing once the timeout is reached
        headers = self._headers.copy()
        headers.update(
            {
                "Iotics-RequestTimeout": (
                    datetime.now(tz=timezone.utc) + timedelta(seconds=10)
                ).isoformat(),
            }
        )

        with request(
            method="POST",
            url=f"{HOST}/qapi/searches",
            headers=headers,
            stream=True,
            verify=False,
            params={"scope": "LOCAL"},
            json={
                "responseType": "FULL",
                "filter": {
                    "properties": [
                        {
                            "key": "https://data.iotics.com/app#createdFrom",
                            "uriValue": {
                                "value": "https://data.iotics.com/app#ByModel"
                            },
                        },
                    ],
                    "text": "shares",
                },
            },
        ) as resp:
            # Raises HTTPError, if one occurred
            resp.raise_for_status()
            # Iterates over the response data, one line at a time
            for chunk in resp.iter_lines():
                response = json.loads(chunk)
                twins_found = []
                try:
                    twins_found = response["result"]["payload"]["twins"]
                except (KeyError, IndexError):
                    continue
                finally:
                    if twins_found:
                        # Append the twins found to the list of twins
                        twins_list.extend(twins_found)

        print(f"Found {len(twins_list)} twin(s)")

        return twins_list

    def synthesise_data(self, twins_list: List[dict]):
        for twin in twins_list:
            # Get the sensor number from the Twin's label last string
            twin_publisher_label = self._get_twin_label(twin["properties"])
            sensor_number = twin_publisher_label.split(" ")[-1]
            # Check if the current Twin is the Twin Model.
            # In such case we don't want to follow it.
            if not sensor_number.isdigit():
                continue

            twin_publisher_id = twin["id"]["value"]
            twin_info = self._twins_info.get(int(sensor_number))
            synthesiser_twin_id = twin_info["twin_id"]
            synthesiser_twin_previous_mean = twin_info["previous_mean"]
            synthesiser_twin_sample_count = twin_info["sample_count"]

            # We are supposed to know in advance the Feed ID we want our Twin Synthesiser to follow
            twin_publisher_feed_id = "currentTemp"

            # Get the latest shared value
            encoded_data = self._make_api_call(
                method="GET",
                url=f"{HOST}/qapi/twins/{synthesiser_twin_id}/interests/twins/{twin_publisher_id}/feeds/{twin_publisher_feed_id}/samples/last",
            )

            time = encoded_data["feedData"]["occurredAt"]
            data = encoded_data["feedData"]["data"]

            feed_data = json.loads(base64.b64decode(data).decode("ascii"))

            print(
                f"Last shared data of Twin {twin_publisher_label} occurred at utc time {time}: {feed_data}"
            )

            new_sample = feed_data["reading"]
            new_mean = (
                (synthesiser_twin_sample_count - 1) * synthesiser_twin_previous_mean
                + new_sample
            ) / synthesiser_twin_sample_count

            # Publish the new data
            self.publish_data(twin_info=twin_info, data=new_mean)

            # Update the average
            self._twins_info[int(sensor_number)]["previous_mean"] = new_mean
            self._twins_info[int(sensor_number)]["sample_count"] += 1

    def publish_data(self, twin_info: dict, data: dict):
        synthesiser_twin_id = twin_info["twin_id"]
        synthesiser_feed_id = twin_info["feed_id"]
        synthesiser_twin_label = twin_info["twin_label"]

        self._share_data(
            twin_label=synthesiser_twin_label,
            twin_id=synthesiser_twin_id,
            feed_id=synthesiser_feed_id,
            value=data,
        )

    def _get_twin_label(self, property_list: List[dict]):
        for prop in property_list:
            if prop["key"] == "http://www.w3.org/2000/01/rdf-schema#label":
                return prop["langLiteralValue"]["value"]

        return None


def main():
    synthesiser_connector = SynthesiserConnector()
    synthesiser_connector.setup()
    model_twin_did = synthesiser_connector.create_twin_model()
    synthesiser_connector.create_twins_from_model(twin_model_id=model_twin_did)
    twins_list = synthesiser_connector.search_for_twin_publishers()

    while True:
        synthesiser_connector.synthesise_data(twins_list)
        sleep(10)
        print("---")


if __name__ == "__main__":
    main()