Connectors

Ingesting or exporting data from an IOTICSpace and its Digital Twins requires the creation of a software application through the IOTICS API, which in IOTICS we call Connector.

This page covers:


Introduction to Connectors

A Connector is a software application that interacts with Digital Twins in an IOTICSpace. It acts as the bridge between an IOTICSpace and the data source.

We distinguish between three different types:

1. Publisher Connector: imports data into an IOTICSpace
2. Follower Connector: exports data from an IOTICSpace
3. Synthesiser Connector: transforms existing data within an IOTICSpace

Publisher Connector

A Publisher Connector is a software application that works as a bridge between a data source and an IOTICSpace. It enables external data to be imported into an IOTICSpace and published by one or more Digital Twins, and therefore shared with and used by other Digital Twins in the ecosystem.

A Publisher Connector:

  • Typically handles one single source of data, and is made up of one Twin Model with one or more related Twins from Model. If multiple sources have to be imported and synchronised, several Twin Models may have to be created, each with their specific metadata;
  • Periodically imports and publishes new data through the Twins from Model. The data can be published either in batches or in single samples, and at fixed or variable intervals. Some Twins may also need to share data with a different frequency than others;
  • Runs continuously. Once created, it is good practice to allow all the Twins in the IOTICSpace to maintain their state of consistency - be it actively sharing new data, staying idle or becoming obsolete and being deleted automatically.
519519

Representation of a Publisher Connector

How to create a Publisher Connector with the IOTICS API

Implementing a Publisher Connector leads to the creation of a set of Twin Publishers that, simultaneously or in turn, publish data. A prerequisite is the development of one or more Twin Models that will serve as a template for the Twin Publishers.

The following example shows a Publisher Connector that simulates 10 temperature sensors. Each one gets data from the environment and shares sensor readings every 10 seconds. We first create a Twin Model with a Feed and then create the 10 Sensor Twins, which periodically download new data from a Server and share it.

Click to see the example code of a Publisher Connector

1. Get a fresh token to be used in the requests' headers;
2. Create a Twin Model;
  2.1 Build list of Properties;
  2.2 Build list of Feeds;
  2.3 Get a new Twin DID;
  2.4 Use the Upsert Twin API;
3. Create Twins from Model;
  3.1 Use the Describe Local Twin API to get the Twin Model's metadata;
  3.2 Build list of Properties;
  3.3 Use the Describe Feed API to get the the Twin Model Feed's metadata;
  3.4 Build list of Feeds;
  For every Sensor:
  3.5 Get a new Twin DID;
  3.6 Use the Upsert Twin API;
Every 10 seconds:
4. Download new data from the server;
5. Share data into the Space;
import base64
import json
from time import sleep
from typing import List

from iotics.lib.identity.api.high_level_api import get_rest_high_level_identity_api
from requests import request

RESOLVER_URL = "resolver_url"
HOST = "host_url"

USER_KEY_NAME = "user_key_name"
AGENT_KEY_NAME = "agent_key_name"
USER_SEED = bytes.fromhex("user_seed")
AGENT_SEED = bytes.fromhex("agent_seed")


class PublisherConnector:
    def __init__(self):
        self._high_level_identity_api = None
        self._user_registered_id = None
        self._agent_registered_id = None
        self._headers = None
        self._twins_info = None
        self._host_id = None

    def setup(self):
        self._high_level_identity_api = get_rest_high_level_identity_api(
            resolver_url=RESOLVER_URL
        )
        (
            self._user_registered_id,
            self._agent_registered_id,
        ) = self._high_level_identity_api.create_user_and_agent_with_auth_delegation(
            user_seed=USER_SEED,
            user_key_name=USER_KEY_NAME,
            agent_seed=AGENT_SEED,
            agent_key_name=AGENT_KEY_NAME,
            delegation_name="#AuthDeleg",
        )

        self._headers = {
            "accept": "application/json",
            "Iotics-ClientAppId": "publisher_connector",
            "Content-Type": "application/json",
        }
        self._refresh_token()
        self._twins_info = {}
        self._host_id = self._get_host_id()

    def _refresh_token(self, duration: int = 60):
        print("Refreshing token")
        print("---")
        token = self._high_level_identity_api.create_agent_auth_token(
            agent_registered_identity=self._agent_registered_id,
            user_did=self._user_registered_id.did,
            duration=duration,
        )

        self._headers["Authorization"] = f"Bearer {token}"

    def _get_host_id(self):
        host_id = self._make_api_call(method="GET", url=f"{HOST}/qapi/host/id")

        return host_id["hostId"]

    def _create_twin_identity(self, twin_key_name: str):
        twin_registered_id = (
            self._high_level_identity_api.create_twin_with_control_delegation(
                twin_seed=AGENT_SEED,
                twin_key_name=twin_key_name,
                agent_registered_identity=self._agent_registered_id,
                delegation_name="#ControlDeleg",
            )
        )

        return twin_registered_id.did

    def _make_api_call(
        self, method: str, url: str, json: dict = None, retry: bool = True
    ):
        try:
            response = request(method=method, url=url, headers=self._headers, json=json)
            response.raise_for_status()
        except Exception as ex:
            if retry:
                self._refresh_token()
                return self._make_api_call(method, url, json, retry=False)
            else:
                print("Retried once. Still getting error", ex)

        return response.json()

    def _create_new_twin(
        self,
        twin_key_name: str,
        properties: List[str] = None,
        feeds: List[str] = None,
        location: dict = None,
    ):
        twin_did = self._create_twin_identity(twin_key_name=twin_key_name)

        self._upsert_twin(
            twin_did=twin_did,
            host_id=self._host_id,
            properties=properties,
            feeds=feeds,
            location=location,
        )

        return twin_did

    def _upsert_twin(
        self,
        twin_did: str,
        host_id: str,
        feeds: List[dict] = None,
        inputs: List[dict] = None,
        location: dict = None,
        properties: List[dict] = None,
    ):
        payload = {"twinId": {"hostId": host_id, "id": twin_did}}

        if location:
            payload["location"] = location
        if feeds:
            payload["feeds"] = feeds
        if inputs:
            payload["inputs"] = inputs
        if properties:
            payload["properties"] = properties

        self._make_api_call(method="PUT", url=f"{HOST}/qapi/twins", json=payload)

        print(f"Twin {twin_did} created successfully")
        print("---")

    def _share_data(
        self, twin_did: str, host_id: str, feed_id: str, data_to_share: dict
    ):
        encoded_data = base64.b64encode(json.dumps(data_to_share).encode()).decode()
        data_to_share_payload = {
            "sample": {"data": encoded_data, "mime": "application/json"}
        }

        self._make_api_call(
            method="POST",
            url=f"{HOST}/qapi/hosts/{host_id}/twins/{twin_did}/feeds/{feed_id}/shares",
            json=data_to_share_payload,
        )

        print(f"Shared {data_to_share} from Twin {twin_did}")

    def create_twin_model(self):
        print("Creating Twin Model")
        # Build a list of Properties for the Twin Model
        property_list = [
            # Twin Model "special" property
            {
                "key": "http://www.w3.org/1999/02/22-rdf-syntax-ns#type",
                "uriValue": {"value": "https://data.iotics.com/app#Model"},
            },
            # Add Label
            {
                "key": "http://www.w3.org/2000/01/rdf-schema#label",
                "langLiteralValue": {
                    "value": "Temperature Sensor Publisher Model",
                    "lang": "en",
                },
            },
            # Add Comment
            {
                "key": "http://www.w3.org/2000/01/rdf-schema#comment",
                "langLiteralValue": {
                    "value": "A temperature sensor that shares temperature data",
                    "lang": "en",
                },
            },
            # Add colour
            {
                "key": "https://data.iotics.com/app#color",
                "stringLiteralValue": {"value": "#9aceff"},
            },
            # Add Space name
            {
                "key": "https://data.iotics.com/app#spaceName",
                "stringLiteralValue": {"value": "my-space"},
            },
        ]

        # Build a list of Feeds for the Twin Model
        feed_list = [
            # Add a single Feed called "currentTemp"
            {
                "id": "currentTemp",
                "storeLast": True,  # We want to store the last value sent
                "properties": [
                    # Add a Label for the Feed
                    {
                        "key": "http://www.w3.org/2000/01/rdf-schema#label",
                        "langLiteralValue": {
                            "value": "Temperature",
                            "lang": "en",
                        },
                    }
                ],
                # Add a single Value called "reading"
                "values": [
                    {
                        "comment": "Temperature in degrees Celsius",
                        "dataType": "decimal",
                        "label": "reading",
                        "unit": "http://qudt.org/vocab/unit/DEG_C",
                    }
                ],
            },
        ]

        model_twin_did = self._create_new_twin(
            twin_key_name="TwinPublisherModel",
            properties=property_list,
            feeds=feed_list,
        )

        return model_twin_did

    def create_twins_from_model(self, twin_model_did: str):
        # Build a list of properties for the Twin from Model
        property_list = [
            # Add the Twins' from Model "special" property
            {
                "key": "https://data.iotics.com/app#model",
                "uriValue": {"value": twin_model_did},
            },
            # Add Comment
            {
                "key": "http://www.w3.org/2000/01/rdf-schema#comment",
                "langLiteralValue": {
                    "value": "A temperature sensor that shares temperature data",
                    "lang": "en",
                },
            },
            # Add colour
            {
                "key": "https://data.iotics.com/app#color",
                "stringLiteralValue": {"value": "#9aceff"},
            },
            # Add Space name
            {
                "key": "https://data.iotics.com/app#spaceName",
                "stringLiteralValue": {"value": "my-space"},
            },
        ]

        # Build a list of Feeds for the Twin Model
        feed_list = [
            # Add a single Feed called "currentTemp"
            {
                "id": "currentTemp",
                "storeLast": True,  # We want to store the last value sent
                "properties": [
                    # Add a Label for the Feed
                    {
                        "key": "http://www.w3.org/2000/01/rdf-schema#label",
                        "langLiteralValue": {
                            "value": "Temperature",
                            "lang": "en",
                        },
                    }
                ],
                # Add a single Value called "reading"
                "values": [
                    {
                        "comment": "Temperature in degrees Celsius",
                        "dataType": "decimal",
                        "label": "reading",
                        "unit": "http://qudt.org/vocab/unit/DEG_C",
                    }
                ],
            },
        ]

        # Finally create the 10 Publisher Twins
        for sensor_number in range(1, 11):
            twin_key_name = f"temp_sensor_{sensor_number}"
            twin_label = f"Temperature Sensor Publisher {sensor_number}"

            # Add the Label property according to the number of the Sensor
            property_list_with_label = property_list.copy()
            property_list_with_label.append(
                {
                    "key": "http://www.w3.org/2000/01/rdf-schema#label",
                    "langLiteralValue": {"value": twin_label, "lang": "en"},
                }
            )

            print(f"Creating Twin from Model - {twin_label}")

            twin_from_model_did = self._create_new_twin(
                twin_key_name=twin_key_name,
                properties=property_list_with_label,
                feeds=feed_list,
                location={"lat": 51.5, "lon": -0.1},
            )

            # Add the Twin's info in the mapping dictionary
            twin_info = {
                twin_key_name: {
                    "twin_did": twin_from_model_did,
                    "twin_label": twin_label,
                }
            }
            self._twins_info.update(twin_info)

    def publish_data(self, data: dict):
        for temp_data in data:
            temperature = temp_data["temp"]
            sensor_id = temp_data["sensor_id"]

            self._share_data(
                twin_did=self._twins_info[sensor_id]["twin_did"],
                host_id=self._host_id,
                feed_id="currentTemp",
                data_to_share={"reading": temperature},
            )


def get_sensor_data():
    response = request(method="GET", url="http://flaskapi.dev.iotics.com/sensor_temp")
    sensor_data = response.json()

    return sensor_data


def main():
    publisher_connector = PublisherConnector()
    publisher_connector.setup()
    model_twin_did = publisher_connector.create_twin_model()
    publisher_connector.create_twins_from_model(twin_model_did=model_twin_did)

    while True:
        data = get_sensor_data()
        publisher_connector.publish_data(data)
        sleep(10)
        print("---")


if __name__ == "__main__":
    main()

Follower Connector

The mirrored version of the Publisher Connector is the Follower Connector. It is an application that works as a bridge between an IOTICSpace and the data consumer and/or data storage.

🚧

A Follower Connector can run in the same or a different IOTICSpace than the Twin Publishers

Use remote search and remote follow if the IOTICSpaces are not the same.

631631

Representation of a Follower Connector in the same Space as the Twin Publishers

How to create a Follower Connector with the IOTICS API + STOMP

Similar to the Publisher Connector, a set of Twin Followers are needed in order to ask the Twin Publishers' feed for the last shared value. One or more Twin Models will serve as a template for the Twin Followers so they can be classified according to their specific nature.

The following example shows how to set up a Follower Connector with a STOMP Client. The code generates a Twin Model and a Twin Follower from Model so that the latter can register its interest in the 10 Publisher Twins' feeds. The STOMP Client allows the Twin Follower to receive the data as soon as the Publisher Twins share the data sample.

Click to see the code

Instructions:

  1. Download this library on your local machine;
  2. Install the above library in your Python venv: pip install iotic.web.stomp-1.0.6.tar.gz;
  3. Run the publisher_connector.py in a terminal;
  4. Run the follower_connector_stomp.py code below in another terminal and in the same IOTICSpace as the Publisher Connector.
  5. Look at the data received on your terminal.
1. Get a fresh token to be used in the requests' headers and in the Stomp Client;
2. Instantiate a Stomp Client;
3. Create a Twin Model;
  3.1 Build list of Properties;
  3.2 Get a new Twin DID;
  3.3 Use the Upsert Twin API;
4. Create a Twin from Model;
  4.1 Use the Describe Local Twin API to get the Twin Model's metadata;
  4.2 Build list of Properties;
  4.3 Get a new Twin DID;
  4.4 Use the Upsert Twin API;
5. Search for Twin Publishers;

For each Twin Publisher found:
6. Create a new STOMP subscription;
7. Print the data received;
import base64
import json
from datetime import datetime, timedelta, timezone
from time import sleep
from typing import Callable, List, Tuple

import stomp
from iotic.web.stomp.client import StompWSConnection12
from iotics.lib.identity.api.high_level_api import get_rest_high_level_identity_api
from requests import request

RESOLVER_URL = "resolver_url"
HOST = "host_url"
STOMP_ENDPOINT = "stomp_endpoint"

USER_KEY_NAME = "user_key_name"
AGENT_KEY_NAME = "agent_key_name"
USER_SEED = bytes.fromhex("user_seed")
AGENT_SEED = bytes.fromhex("agent_seed")


class StompClient:
    def __init__(
        self,
        endpoint: str,
        callback: Callable,
        heartbeats: Tuple[int, int] = (10000, 10000),
    ):
        self._endpoint = endpoint
        self._stomp_client = None
        self._heartbeats = heartbeats
        self._callback = callback

    def setup(self, token: str):
        self._stomp_client = StompWSConnection12(
            endpoint=self._endpoint, heartbeats=self._heartbeats
        )
        self._stomp_client.set_ssl(verify=True)
        self._stomp_client.set_listener(
            name="stomp_listener",
            lstnr=StompListener(
                stomp_client=self._stomp_client, callback=self._callback
            ),
        )

        self._stomp_client.connect(wait=True, passcode=token)

    def subscribe(self, destination, subscription_id, headers):
        self._stomp_client.subscribe(
            destination=destination, id=subscription_id, headers=headers
        )

    def disconnect(self):
        self._stomp_client.disconnect()


class StompListener(stomp.ConnectionListener):
    def __init__(self, stomp_client, callback):
        self._stomp_client = stomp_client
        self._callback = callback

    def on_error(self, headers, body):
        print(f"Received an error {body}")

    def on_message(self, headers, body):
        self._callback(headers, body)

    def on_disconnected(self):
        self._stomp_client.disconnect()
        print("Disconnected")


class FollowerConnector:
    def __init__(self):
        self._high_level_identity_api = None
        self._user_registered_id = None
        self._agent_registered_id = None
        self._headers = None
        self._twins_publisher_info = None
        self._host_id = None

    def setup(self):
        self._high_level_identity_api = get_rest_high_level_identity_api(
            resolver_url=RESOLVER_URL
        )
        (
            self._user_registered_id,
            self._agent_registered_id,
        ) = self._high_level_identity_api.create_user_and_agent_with_auth_delegation(
            user_seed=USER_SEED,
            user_key_name=USER_KEY_NAME,
            agent_seed=AGENT_SEED,
            agent_key_name=AGENT_KEY_NAME,
            delegation_name="#AuthDeleg",
        )

        self._headers = {
            "accept": "application/json",
            "Iotics-ClientAppId": "example_code",
            "Content-Type": "application/json",
        }
        token = self._refresh_token()

        self._stomp_client = StompClient(
            endpoint=STOMP_ENDPOINT, callback=self.follow_callback
        )
        self._stomp_client.setup(token=token)
        self._host_id = self._get_host_id()
        self._twins_publisher_info = {}

    def _refresh_token(self, duration: int = 60):
        print("Refreshing token")
        print("---")
        token = self._high_level_identity_api.create_agent_auth_token(
            agent_registered_identity=self._agent_registered_id,
            user_did=self._user_registered_id.did,
            duration=duration,
        )

        print(f"Token will expire at {datetime.now() + timedelta(seconds=duration)}")

        self._headers["Authorization"] = f"Bearer {token}"

        return token

    def _get_host_id(self):
        host_id = self._make_api_call(method="GET", url=f"{HOST}/qapi/host/id")

        return host_id["hostId"]

    def _create_twin_identity(self, twin_key_name: str):
        twin_registered_id = (
            self._high_level_identity_api.create_twin_with_control_delegation(
                twin_seed=AGENT_SEED,
                twin_key_name=twin_key_name,
                agent_registered_identity=self._agent_registered_id,
                delegation_name="#ControlDeleg",
            )
        )

        return twin_registered_id.did

    def _make_api_call(
        self, method: str, url: str, json: dict = None, retry: bool = True
    ):
        try:
            response = request(method=method, url=url, headers=self._headers, json=json)
            response.raise_for_status()
        except Exception as ex:
            if retry:
                self._refresh_token()
                return self._make_api_call(method, url, json, retry=False)

            print("Retried once. Still getting error", ex)

        return response.json()

    def create_new_twin(
        self,
        twin_key_name: str,
        properties: List[str] = None,
        feeds: List[str] = None,
        location: dict = None,
    ):
        twin_did = self._create_twin_identity(twin_key_name=twin_key_name)

        self._upsert_twin(
            twin_did=twin_did,
            host_id=self._host_id,
            properties=properties,
            feeds=feeds,
            location=location,
        )

        return twin_did

    def _upsert_twin(
        self,
        twin_did: str,
        host_id: str,
        feeds: List[dict] = None,
        inputs: List[dict] = None,
        location: dict = None,
        properties: List[dict] = None,
    ):
        payload = {"twinId": {"hostId": host_id, "id": twin_did}}

        if location:
            payload["location"] = location
        if feeds:
            payload["feeds"] = feeds
        if inputs:
            payload["inputs"] = inputs
        if properties:
            payload["properties"] = properties

        self._make_api_call(method="PUT", url=f"{HOST}/qapi/twins", json=payload)

        print(f"Twin {twin_did} created successfully")
        print("---")

    def search_twins(
        self,
        text: str = None,
        location: dict = None,
        properties: List[dict] = None,
        scope: str = "LOCAL",
        response_type: str = "FULL",
    ):
        twins_list = []

        search_headers = self._headers.copy()
        # Search headers require a new header "Iotics-RequestTimeout".
        # The latter is used to stop the request once the timeout is reached
        search_headers.update(
            {
                "Iotics-RequestTimeout": (
                    datetime.now(tz=timezone.utc) + timedelta(seconds=5)
                ).isoformat(),
            }
        )

        payload = {"responseType": response_type, "filter": {}}

        if text:
            payload["filter"]["text"] = text
        if properties:
            payload["filter"]["properties"] = properties
        if location:
            payload["filter"]["location"] = location

        with request(
            method="POST",
            url=f"{HOST}/qapi/searches",
            headers=search_headers,
            stream=True,
            verify=True,
            params={"scope": scope},
            json=payload,
        ) as resp:
            resp.raise_for_status()
            # Iterates over the response data, one Host at a time
            for chunk in resp.iter_lines():
                response = json.loads(chunk)
                twins_found = []
                try:
                    twins_found = response["result"]["payload"]["twins"]
                except KeyError:
                    continue
                finally:
                    if twins_found:
                        # Append the twins found to the list of twins
                        twins_list.extend(twins_found)

        print(f"Found {len(twins_list)} twin(s)")

        return twins_list

    def subscribe_to_feed(self, twin_publisher: dict, twin_follower_did: str):
        twin_follower_host_id = twin_publisher_host_id = self._host_id

        twin_publisher_did = twin_publisher["twinId"]["id"]
        twin_publisher_host_id = twin_publisher["twinId"]["hostId"]
        twin_publisher_feed_id = twin_publisher["feeds"][0]["feedId"]["id"]

        endpoint = f"/qapi/hosts/{twin_follower_host_id}/twins/{twin_follower_did}/interests/hosts/{twin_publisher_host_id}/twins/{twin_publisher_did}/feeds/{twin_publisher_feed_id}"

        self._stomp_client.subscribe(
            destination=endpoint,
            subscription_id=f"{twin_publisher_did}-{twin_publisher_feed_id}",
            headers=self._headers,
        )

        twin_publisher_label = self._get_twin_label(twin_publisher["properties"])

        twin_info = {twin_publisher_did: twin_publisher_label}
        self._twins_publisher_info.update(twin_info)

    @staticmethod
    def _get_twin_label(property_list: List[dict]):
        for prop in property_list:
            if prop["key"] == "http://www.w3.org/2000/01/rdf-schema#label":
                return prop["langLiteralValue"]["value"]

        return None

    def follow_callback(self, headers, body):
        encoded_data = json.loads(body)

        twin_publisher_did = encoded_data["interest"]["followedFeedId"]["twinId"]
        twin_publisher_label = self._twins_publisher_info.get(twin_publisher_did)

        try:
            time = encoded_data["feedData"]["occurredAt"]
            data = encoded_data["feedData"]["data"]
        except KeyError:
            print("NO DATA")
        else:
            decoded_feed_data = json.loads(base64.b64decode(data).decode("ascii"))
            print(f"Received {decoded_feed_data} from {twin_publisher_label} at {time}")


def main():
    follower_connector = FollowerConnector()
    follower_connector.setup()
    twin_did = follower_connector.create_new_twin(
        twin_key_name="TwinFollower",
        properties=[
            # Label
            {
                "key": "http://www.w3.org/2000/01/rdf-schema#label",
                "langLiteralValue": {"value": "Twin Follower", "lang": "en"},
            },
            # Comment
            {
                "key": "http://www.w3.org/2000/01/rdf-schema#comment",
                "langLiteralValue": {
                    "value": "An example of a Twin Follower",
                    "lang": "en",
                },
            },
        ],
    )

    twins_list = follower_connector.search_twins(text="shares")
    for twin_publisher in twins_list:
        follower_connector.subscribe_to_feed(
            twin_publisher=twin_publisher, twin_follower_did=twin_did
        )

    while True:
        sleep(10)


if __name__ == "__main__":
    main()

How to export data from an IOTICSpace

Data can be exported outside IOTICS in order to persistently store a Digital Twin's published data.

The code below provides an example of how to allow the follower_connector.py to store the sensor data received into either a CSV file or an SQLite database.

Click to see the code

Instructions:

  1. Install a new python library to use the SQLite Engine: pip install SQLAlchemy;
  2. Create a new directory called export_data. Within the latter create 2 other directories: csv and sqlite;
  3. Add the sqlite_engine.py code below to the sqlite directory and the csv_engine.py to the csv directory as follows:
├── publisher_connector.py
├── follower_connector_export.py
├── export_data
│   ├── csv
│   │   └── csv_engine.py
│   └── sqlite
│       └── sqlite_engine.py
  1. Run the publisher_connector.py in a terminal and the follower_connector_export.py code below in another terminal;
  2. Change the export_data_class class (line 352) with either CSV or SQLite;
  3. Look at the data being added to the temperatures.csv file or temperatures.db according to the choice above.
import base64
import json
from datetime import datetime, timedelta, timezone
from time import sleep
from typing import Callable, List, Tuple

import stomp
from iotic.web.stomp.client import StompWSConnection12
from iotics.lib.identity.api.high_level_api import get_rest_high_level_identity_api
from requests import request

from export_data.csv.csv_engine import CSV
from export_data.sqlite.sqlite_engine import SQLite

RESOLVER_URL = "resolver_url"
HOST = "host_url"
STOMP_ENDPOINT = "stomp_endpoint"

USER_KEY_NAME = "user_key_name"
AGENT_KEY_NAME = "agent_key_name"
USER_SEED = bytes.fromhex("user_seed")
AGENT_SEED = bytes.fromhex("agent_seed")


class StompClient:
    def __init__(
        self,
        endpoint: str,
        callback: Callable,
        heartbeats: Tuple[int, int] = (10000, 10000),
    ):
        self._endpoint = endpoint
        self._stomp_client = None
        self._heartbeats = heartbeats
        self._callback = callback

    def setup(self, token: str):
        self._stomp_client = StompWSConnection12(
            endpoint=self._endpoint, heartbeats=self._heartbeats
        )
        self._stomp_client.set_ssl(verify=True)
        self._stomp_client.set_listener(
            name="stomp_listener",
            lstnr=StompListener(
                stomp_client=self._stomp_client, callback=self._callback
            ),
        )

        self._stomp_client.connect(wait=True, passcode=token)

    def subscribe(self, destination, subscription_id, headers):
        self._stomp_client.subscribe(
            destination=destination, id=subscription_id, headers=headers
        )

    def disconnect(self):
        self._stomp_client.disconnect()


class StompListener(stomp.ConnectionListener):
    def __init__(self, stomp_client, callback):
        self._stomp_client = stomp_client
        self._callback = callback

    def on_error(self, headers, body):
        print(f"Received an error {body}")

    def on_message(self, headers, body):
        self._callback(headers, body)

    def on_disconnected(self):
        self._stomp_client.disconnect()
        print("Disconnected")


class FollowerConnectorExport:
    def __init__(self, export_data_class):
        self._export_data_class = export_data_class
        self._high_level_identity_api = None
        self._user_registered_id = None
        self._agent_registered_id = None
        self._headers = None
        self._twins_publisher_info = None
        self._stomp_client = None
        self._host_id = None

    def setup(self):
        self._high_level_identity_api = get_rest_high_level_identity_api(
            resolver_url=RESOLVER_URL
        )
        (
            self._user_registered_id,
            self._agent_registered_id,
        ) = self._high_level_identity_api.create_user_and_agent_with_auth_delegation(
            user_seed=USER_SEED,
            user_key_name=USER_KEY_NAME,
            agent_seed=AGENT_SEED,
            agent_key_name=AGENT_KEY_NAME,
            delegation_name="#AuthDeleg",
        )

        self._headers = {
            "accept": "application/json",
            "Iotics-ClientAppId": "example_code",
            "Content-Type": "application/json",
        }
        token = self._refresh_token()

        self._stomp_client = StompClient(
            endpoint=STOMP_ENDPOINT, callback=self.follow_callback
        )
        self._stomp_client.setup(token=token)
        self._host_id = self._get_host_id()
        self._twins_publisher_info = {}

    def _refresh_token(self, duration: int = 60):
        print("Refreshing token")
        print("---")
        token = self._high_level_identity_api.create_agent_auth_token(
            agent_registered_identity=self._agent_registered_id,
            user_did=self._user_registered_id.did,
            duration=duration,
        )

        print(f"Token will expire at {datetime.now() + timedelta(seconds=duration)}")

        self._headers["Authorization"] = f"Bearer {token}"

        return token

    def _get_host_id(self):
        host_id = self._make_api_call(method="GET", url=f"{HOST}/qapi/host/id")

        return host_id["hostId"]

    def _create_twin_identity(self, twin_key_name: str):
        twin_registered_id = (
            self._high_level_identity_api.create_twin_with_control_delegation(
                twin_seed=AGENT_SEED,
                twin_key_name=twin_key_name,
                agent_registered_identity=self._agent_registered_id,
                delegation_name="#ControlDeleg",
            )
        )

        return twin_registered_id.did

    def _make_api_call(
        self, method: str, url: str, json: dict = None, retry: bool = True
    ):
        try:
            response = request(method=method, url=url, headers=self._headers, json=json)
            response.raise_for_status()
        except Exception as ex:
            if retry:
                self._refresh_token()
                return self._make_api_call(method, url, json, retry=False)

            print("Retried once. Still getting error", ex)

        return response.json()

    def create_new_twin(
        self,
        twin_key_name: str,
        properties: List[str] = None,
        feeds: List[str] = None,
        location: dict = None,
    ):
        twin_did = self._create_twin_identity(twin_key_name=twin_key_name)

        self._upsert_twin(
            twin_did=twin_did,
            host_id=self._host_id,
            properties=properties,
            feeds=feeds,
            location=location,
        )

        return twin_did

    def _upsert_twin(
        self,
        twin_did: str,
        host_id: str,
        feeds: List[dict] = None,
        inputs: List[dict] = None,
        location: dict = None,
        properties: List[dict] = None,
    ):
        payload = {"twinId": {"hostId": host_id, "id": twin_did}}

        if location:
            payload["location"] = location
        if feeds:
            payload["feeds"] = feeds
        if inputs:
            payload["inputs"] = inputs
        if properties:
            payload["properties"] = properties

        self._make_api_call(method="PUT", url=f"{HOST}/qapi/twins", json=payload)

        print(f"Twin {twin_did} created successfully")
        print("---")

    def search_twins(
        self,
        text: str = None,
        location: dict = None,
        properties: List[dict] = None,
        scope: str = "LOCAL",
        response_type: str = "FULL",
    ):
        twins_list = []

        search_headers = self._headers.copy()
        # Search headers require a new header "Iotics-RequestTimeout".
        # The latter is used to stop the request once the timeout is reached
        search_headers.update(
            {
                "Iotics-RequestTimeout": (
                    datetime.now(tz=timezone.utc) + timedelta(seconds=5)
                ).isoformat(),
            }
        )

        payload = {"responseType": response_type, "filter": {}}

        if text:
            payload["filter"]["text"] = text
        if properties:
            payload["filter"]["properties"] = properties
        if location:
            payload["filter"]["location"] = location

        with request(
            method="POST",
            url=f"{HOST}/qapi/searches",
            headers=search_headers,
            stream=True,
            verify=True,
            params={"scope": scope},
            json=payload,
        ) as resp:
            resp.raise_for_status()
            # Iterates over the response data, one Host at a time
            for chunk in resp.iter_lines():
                response = json.loads(chunk)
                twins_found = []
                try:
                    twins_found = response["result"]["payload"]["twins"]
                except KeyError:
                    continue
                finally:
                    if twins_found:
                        # Append the twins found to the list of twins
                        twins_list.extend(twins_found)

        print(f"Found {len(twins_list)} twin(s)")

        return twins_list

    def subscribe_to_feed(self, twin_publisher: dict, twin_follower_did: str):
        twin_follower_host_id = twin_publisher_host_id = self._host_id

        twin_publisher_did = twin_publisher["twinId"]["id"]
        twin_publisher_host_id = twin_publisher["twinId"]["hostId"]
        twin_publisher_feed_id = twin_publisher["feeds"][0]["feedId"]["id"]

        endpoint = f"/qapi/hosts/{twin_follower_host_id}/twins/{twin_follower_did}/interests/hosts/{twin_publisher_host_id}/twins/{twin_publisher_did}/feeds/{twin_publisher_feed_id}"

        self._stomp_client.subscribe(
            destination=endpoint,
            subscription_id=f"{twin_publisher_did}-{twin_publisher_feed_id}",
            headers=self._headers,
        )

        twin_publisher_label = self._get_twin_label(twin_publisher["properties"])

        twin_info = {twin_publisher_did: twin_publisher_label}
        self._twins_publisher_info.update(twin_info)

    @staticmethod
    def _get_twin_label(property_list: List[dict]):
        for prop in property_list:
            if prop["key"] == "http://www.w3.org/2000/01/rdf-schema#label":
                return prop["langLiteralValue"]["value"]

        return None

    def follow_callback(self, headers, body):
        encoded_data = json.loads(body)

        twin_publisher_did = encoded_data["interest"]["followedFeedId"]["twinId"]
        twin_publisher_label = self._twins_publisher_info.get(twin_publisher_did)

        try:
            time = encoded_data["feedData"]["occurredAt"]
            data = encoded_data["feedData"]["data"]
        except KeyError:
            print("NO DATA")
        else:
            decoded_feed_data = json.loads(base64.b64decode(data).decode("ascii"))
            print(f"Received {decoded_feed_data} from {twin_publisher_label} at {time}")
            # Store data externally
            self._export_data_class.export(
                datetime=time,
                received_from=twin_publisher_label,
                data=decoded_feed_data.get(
                    "reading"
                ),  # 'reading' corresponds to the Feed's Value Label
            )


def main():
    # Replace the 'export_data_class' class with 'CSV' to store the data into a CSV file
    follower_connector_export = FollowerConnectorExport(export_data_class=SQLite())
    follower_connector_export.setup()
    twin_did = follower_connector_export.create_new_twin(
        twin_key_name="TwinFollower",
        properties=[
            # Label
            {
                "key": "http://www.w3.org/2000/01/rdf-schema#label",
                "langLiteralValue": {"value": "Twin Follower", "lang": "en"},
            },
            # Comment
            {
                "key": "http://www.w3.org/2000/01/rdf-schema#comment",
                "langLiteralValue": {
                    "value": "An example of a Twin Follower",
                    "lang": "en",
                },
            },
        ],
    )

    twins_list = follower_connector_export.search_twins(text="shares")
    for twin_publisher in twins_list:
        follower_connector_export.subscribe_to_feed(
            twin_publisher=twin_publisher, twin_follower_did=twin_did
        )

    while True:
        sleep(10)


if __name__ == "__main__":
    main()
from sqlalchemy import Column, Float, Integer, String, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

DB_NAME = "sqlite:///export_data/sqlite/temperatures.db"
Base = declarative_base()


class SensorReading(Base):
    __tablename__ = "SensorReading"

    id = Column(Integer, primary_key=True)
    timestamp = Column(String(100))
    sensor_number = Column(Integer)
    sensor_reading = Column(Float)


class SQLite:
    def __init__(self, echo=False):
        engine = create_engine(DB_NAME, echo=echo)
        Base.metadata.create_all(bind=engine)
        Session = sessionmaker(engine)
        self._session = Session()

    def _store(self, item: SensorReading):
        with self._session as session:
            session.add(item)
            session.commit()

        print("Item stored correctly")

    def export(self, datetime: str, received_from: str, data: dict):
        sensor_reading_obj = SensorReading(
            timestamp=datetime, sensor_number=received_from, sensor_reading=data
        )

        self._store(sensor_reading_obj)
import csv
from collections import namedtuple

CSV_FILE = "./export_data/csv/temperatures.csv"
CSV_FIELDS = ["timestamp", "sensor_number", "sensor_reading"]
SensorReading = namedtuple("SensorReading", CSV_FIELDS)


class CSV:
    def __init__(self, n_items=10):
        self._items_list = []
        self._n_items = n_items

        with open(CSV_FILE, "w") as csv_file:
            writer = csv.writer(csv_file)
            # Write header
            writer.writerow(CSV_FIELDS)

    def _store(self):
        with open(CSV_FILE, "a") as csv_file:
            writer = csv.writer(csv_file)
            for item in self._items_list:
                writer.writerow(item)

        print("Item stored correctly")
        self._items_list.clear()

    def export(self, datetime: str, received_from: str, data: dict):
        new_item = SensorReading(
            timestamp=datetime, sensor_number=received_from, sensor_reading=data
        )
        self._items_list.append(new_item)

        if len(self._items_list) >= self._n_items:
            self._store()

Synthesiser Connector

A Synthesiser Connector, or simply Synthesiser, is an application that continuously allows Follower Twins to receive, transform and send the data back into the Space.

In other words, a Synthesiser creates a set of Twins that

  1. continuously follow Publisher Twins' feeds;
  2. "synthesise" or interoperate the data;
  3. publish the new data back as a feed.
572572

Representation of a Synthesiser Connector

How to create a Synthesiser Connector with the IOTICS API

A Synthesiser application creates a set of Follower Twins with one or more Feeds so that they both receive data and share the new data.

The following snippet provides an example of a Synthesiser that:

  1. Creates 10 Synthesiser Twins with a Feed;
  2. Follows the Publisher Connector Twins' feed with temperature readings;
  3. Computes the average temperature for each sensor Twin;
  4. Shares the average data as a feed.
Click to see the code

Instructions:

  1. Run the publisher_connector.py in a terminal;
  2. Run the synthesiser_connector.py code below in another terminal.
1. Get a fresh token to be used in the requests' headers;
2. Create a Twin Model;
  2.1 Build list of Properties;
  2.2 Build list of Feeds;
  2.3 Get a new Twin DID;
  2.4 Use the Upsert Twin API;
3. Create Twins from Model;
  3.1 Use the Describe Local Twin API to get the Twin Model's metadata;
  3.2 Build list of Properties;
  3.3 Use the Describe Feed API to get the  the Twin Model's Feed's metadata;
  3.4 Build list of Feeds;
  For each Twin to follow:
  3.5 Get a new Twin DID;
  3.6 Use the Upsert Twin API;
Every 10 seconds:
4. Get the Twin Publishers' last shared data;
5. Compute the average temperature;
6. Share data;
import base64
import json
from datetime import datetime, timedelta, timezone
from time import sleep
from typing import Callable, List, Tuple

import stomp
from iotic.web.stomp.client import StompWSConnection12
from iotics.lib.identity.api.high_level_api import get_rest_high_level_identity_api
from requests import request

RESOLVER_URL = "resolver_url"
HOST = "host_url"
STOMP_ENDPOINT = "stomp_endpoint"

USER_KEY_NAME = "user_key_name"
AGENT_KEY_NAME = "agent_key_name"
USER_SEED = bytes.fromhex("user_seed")
AGENT_SEED = bytes.fromhex("agent_seed")


class StompClient:
    def __init__(
        self,
        endpoint: str,
        callback: Callable,
        heartbeats: Tuple[int, int] = (10000, 10000),
    ):
        self._endpoint = endpoint
        self._stomp_client = None
        self._heartbeats = heartbeats
        self._callback = callback

    def setup(self, token: str):
        self._stomp_client = StompWSConnection12(
            endpoint=self._endpoint, heartbeats=self._heartbeats
        )
        self._stomp_client.set_ssl(verify=True)
        self._stomp_client.set_listener(
            name="stomp_listener",
            lstnr=StompListener(
                stomp_client=self._stomp_client, callback=self._callback
            ),
        )

        self._stomp_client.connect(wait=True, passcode=token)

    def subscribe(self, destination, subscription_id, headers):
        self._stomp_client.subscribe(
            destination=destination, id=subscription_id, headers=headers
        )

    def disconnect(self):
        self._stomp_client.disconnect()


class StompListener(stomp.ConnectionListener):
    def __init__(self, stomp_client, callback):
        self._stomp_client = stomp_client
        self._callback = callback

    def on_error(self, headers, body):
        print(f"Received an error {body}")

    def on_message(self, headers, body):
        self._callback(headers, body)

    def on_disconnected(self):
        self._stomp_client.disconnect()
        print("Disconnected")


class SynthesiserConnector:
    def __init__(self):
        self._high_level_identity_api = None
        self._user_registered_id = None
        self._agent_registered_id = None
        self._headers = None
        self._twins_info = None
        self._stomp_client = None
        self._host_id = None
        self._twin_follower_did = None

    def setup(self):
        self._high_level_identity_api = get_rest_high_level_identity_api(
            resolver_url=RESOLVER_URL
        )
        (
            self._user_registered_id,
            self._agent_registered_id,
        ) = self._high_level_identity_api.create_user_and_agent_with_auth_delegation(
            user_seed=USER_SEED,
            user_key_name=USER_KEY_NAME,
            agent_seed=AGENT_SEED,
            agent_key_name=AGENT_KEY_NAME,
            delegation_name="#AuthDeleg",
        )

        self._headers = {
            "accept": "application/json",
            "Iotics-ClientAppId": "synthesiser_connector",
            "Content-Type": "application/json",
        }
        token = self._refresh_token()
        self._stomp_client = StompClient(
            endpoint=STOMP_ENDPOINT, callback=self.synthesise_callback
        )
        self._stomp_client.setup(token=token)
        self._twins_info = {}
        self._host_id = self._get_host_id()

    def _refresh_token(self, duration: int = 600):
        print("Refreshing token")
        print("---")
        token = self._high_level_identity_api.create_agent_auth_token(
            agent_registered_identity=self._agent_registered_id,
            user_did=self._user_registered_id.did,
            duration=duration,
        )
        self._headers["Authorization"] = f"Bearer {token}"

        return token

    def _get_host_id(self):
        host_id = self._make_api_call(method="GET", url=f"{HOST}/qapi/host/id")

        return host_id["hostId"]

    def _create_twin_identity(self, twin_key_name: str):
        twin_registered_id = (
            self._high_level_identity_api.create_twin_with_control_delegation(
                twin_seed=AGENT_SEED,
                twin_key_name=twin_key_name,
                agent_registered_identity=self._agent_registered_id,
                delegation_name="#ControlDeleg",
            )
        )

        return twin_registered_id.did

    def _make_api_call(
        self, method: str, url: str, json: dict = None, retry: bool = True
    ):
        try:
            response = request(method=method, url=url, headers=self._headers, json=json)
            response.raise_for_status()
        except Exception as ex:
            if retry:
                self._refresh_token()
                return self._make_api_call(method, url, json, retry=False)

            print("Retried once. Still getting error", ex)

        return response.json()

    def create_new_twin(
        self,
        twin_key_name: str,
        properties: List[str] = None,
        feeds: List[str] = None,
        location: dict = None,
    ):
        twin_did = self._create_twin_identity(twin_key_name=twin_key_name)

        self._upsert_twin(
            twin_did=twin_did,
            host_id=self._host_id,
            properties=properties,
            feeds=feeds,
            location=location,
        )

        self._twin_follower_did = twin_did

    def _upsert_twin(
        self,
        twin_did: str,
        host_id: str,
        feeds: List[dict] = None,
        inputs: List[dict] = None,
        location: dict = None,
        properties: List[dict] = None,
    ):
        payload = {"twinId": {"hostId": host_id, "id": twin_did}}

        if location:
            payload["location"] = location
        if feeds:
            payload["feeds"] = feeds
        if inputs:
            payload["inputs"] = inputs
        if properties:
            payload["properties"] = properties

        self._make_api_call(method="PUT", url=f"{HOST}/qapi/twins", json=payload)

        print(f"Twin {twin_did} created successfully")
        print("---")

    def _share_data(
        self, twin_did: str, host_id: str, feed_id: str, data_to_share: dict
    ):
        encoded_data = base64.b64encode(json.dumps(data_to_share).encode()).decode()
        data_to_share_payload = {
            "sample": {"data": encoded_data, "mime": "application/json"}
        }

        self._make_api_call(
            method="POST",
            url=f"{HOST}/qapi/hosts/{host_id}/twins/{twin_did}/feeds/{feed_id}/shares",
            json=data_to_share_payload,
        )

        print(f"Shared {data_to_share} from Feed {feed_id}")

    def search_twins(
        self,
        text: str = None,
        location: dict = None,
        properties: List[dict] = None,
        scope: str = "LOCAL",
        response_type: str = "FULL",
    ):
        twins_list = []

        search_headers = self._headers.copy()
        # Search headers require a new header "Iotics-RequestTimeout".
        # The latter is used to stop the request once the timeout is reached
        search_headers.update(
            {
                "Iotics-RequestTimeout": (
                    datetime.now(tz=timezone.utc) + timedelta(seconds=5)
                ).isoformat(),
            }
        )

        payload = {"responseType": response_type, "filter": {}}

        if text:
            payload["filter"]["text"] = text
        if properties:
            payload["filter"]["properties"] = properties
        if location:
            payload["filter"]["location"] = location

        with request(
            method="POST",
            url=f"{HOST}/qapi/searches",
            headers=search_headers,
            stream=True,
            verify=True,
            params={"scope": scope},
            json=payload,
        ) as resp:
            resp.raise_for_status()
            # Iterates over the response data, one Host at a time
            for chunk in resp.iter_lines():
                response = json.loads(chunk)
                twins_found = []
                try:
                    twins_found = response["result"]["payload"]["twins"]
                except KeyError:
                    continue
                finally:
                    if twins_found:
                        # Append the twins found to the list of twins
                        twins_list.extend(twins_found)

        print(f"Found {len(twins_list)} twin(s)")

        return twins_list

    def subscribe_to_feed(self, twin_publisher: dict):
        twin_follower_host_id = twin_publisher_host_id = self._host_id

        twin_publisher_did = twin_publisher["twinId"]["id"]
        twin_publisher_host_id = twin_publisher["twinId"]["hostId"]
        twin_publisher_feed_id = twin_publisher["feeds"][0]["feedId"]["id"]

        endpoint = f"/qapi/hosts/{twin_follower_host_id}/twins/{self._twin_follower_did}/interests/hosts/{twin_publisher_host_id}/twins/{twin_publisher_did}/feeds/{twin_publisher_feed_id}"

        self._stomp_client.subscribe(
            destination=endpoint,
            subscription_id=f"{twin_publisher_did}",
            headers=self._headers,
        )

        twin_publisher_label = self._get_twin_label(twin_publisher["properties"])
        sensor_number = twin_publisher_label.split(" ")[-1]
        # Check if the current Twin is the Twin Model.
        # In such case we don't want to follow it (it won't share any data).
        if not sensor_number.isdigit():
            return

        print(f"Subscribing to {twin_publisher_label}")  # to remove

        # Add the Twin's info in the mapping dictionary
        twin_info = {
            twin_publisher_did: {
                "twin_label": twin_publisher_label,
                "feed_id": f"averageTemp{int(sensor_number)}",  # The Feed ID from which to share data
                "previous_mean": 0,
                "sample_count": 1,
            }
        }
        self._twins_info.update(twin_info)

    def _publish_data(self, twin_publisher_did: str, received_data: dict):
        twin_info = self._twins_info.get(twin_publisher_did)
        previous_mean = twin_info["previous_mean"]
        sample_count = twin_info["sample_count"]
        feed_id = twin_info["feed_id"]

        new_sample = received_data["reading"]
        new_mean = ((sample_count - 1) * previous_mean + new_sample) / sample_count

        # Publish the new mean
        self._share_data(
            twin_did=self._twin_follower_did,
            host_id=self._host_id,
            feed_id=feed_id,
            data_to_share={"average": new_mean},
        )

        # Update the average within the dictionary
        self._twins_info[twin_publisher_did]["previous_mean"] = new_mean
        self._twins_info[twin_publisher_did]["sample_count"] += 1

    def synthesise_callback(self, headers, body):
        encoded_data = json.loads(body)
        twin_publisher_did = encoded_data["interest"]["followedFeedId"]["twinId"]

        try:
            data = encoded_data["feedData"]["data"]
        except KeyError:
            print("NO DATA")
        else:
            decoded_feed_data = json.loads(base64.b64decode(data).decode("ascii"))
            self._publish_data(
                twin_publisher_did=twin_publisher_did, received_data=decoded_feed_data
            )

    @staticmethod
    def _get_twin_label(property_list: List[dict]):
        for prop in property_list:
            if prop["key"] == "http://www.w3.org/2000/01/rdf-schema#label":
                return prop["langLiteralValue"]["value"]

        return None


def main():
    synthesiser_connector = SynthesiserConnector()
    synthesiser_connector.setup()
    # Build a list of 10 Feeds,
    # one of which will share the average temperature of its related Temperature Sensor Publisher
    feeds_list = []
    for sensor_number in range(10):
        feeds_list.append(
            {
                "id": f"averageTemp{sensor_number+1}",
                "storeLast": True,  # We want to store the last value sent
                "properties": [
                    # Add a Label for the Feed
                    {
                        "key": "http://www.w3.org/2000/01/rdf-schema#label",
                        "langLiteralValue": {
                            "value": f"Average Temperature Sensor {sensor_number+1}",
                            "lang": "en",
                        },
                    }
                ],
                # Add a single Value called "reading"
                "values": [
                    {
                        "comment": "Temperature in degrees Celsius",
                        "dataType": "decimal",
                        "label": "average",
                        "unit": "http://purl.obolibrary.org/obo/UO_0000027",
                    }
                ],
            },
        )

    synthesiser_connector.create_new_twin(
        twin_key_name="TwinSynthesiser",
        properties=[
            # Label
            {
                "key": "http://www.w3.org/2000/01/rdf-schema#label",
                "langLiteralValue": {
                    "value": "Temperature Sensor Synthesiser",
                    "lang": "en",
                },
            },
            # Comment
            {
                "key": "http://www.w3.org/2000/01/rdf-schema#comment",
                "langLiteralValue": {
                    "value": "A temperature sensor that synthesises temperature data",
                    "lang": "en",
                },
            },
        ],
        feeds=feeds_list,
    )
    twins_list = synthesiser_connector.search_twins(text="shares")
    for twin_publisher in twins_list:
        synthesiser_connector.subscribe_to_feed(twin_publisher=twin_publisher)
        print("---")

    while True:
        sleep(10)


if __name__ == "__main__":
    main()