IOTICS API

ExperienceSupported LanguagesIdentity OptionsProtocol
AdvancedPython, Java, C# and any OpenAPI Tools API Clients compatible languageUse our Identity LibraryREST + STOMP

Introduction

Welcome to the IOTICS API Getting Started guide! Our API uses the WebAPI technology, and to make it easier throughout the guides, we'll refer to it as simply IOTICS API.

We'll walk you through the process of Getting Started with IOTICS by using the IOTICS API. The WebAPI technology allows you to build your integration from the ground up and have a more in-depth understanding of what's beneath the hood of IOTICS.

After following this guide you will have learned:

Tutorial scenario

For this tutorial, we will be exploring a manufacturing company that has noticed that occasionally some of their 10 machines start to malfunction when reaching a certain internal temperature. By using IOTICS you will create a system that allows the company to monitor the temperature and receive an alert if any of these machines reaches the specified threshold, allowing for action to be taken.

By the end of the scenario, you will have created a Twin Model, used that model to create 10 Twins of different assets, connected these Twins to a source of data and used this to create alerts based on data interactions.

Once the monitoring system is in place we can start to expand this IOTICS ecosystem, adding different feeds and interactions from both inside and outside of the example company's boundaries. These are topics we will explore in future tutorials.

600

Prerequisites

Before starting you need:

If you don't currently have an IOTICSpace or access to our GitHub repos then you can sign up through our Contact Us page or by emailing mailto:[email protected].

IOTICSpace

An IOTICSpace is an ecosystem of Digital Twins that enable the creation and processing of data interactions, leading into powerful, useable event analytics. With an IOTICSpace data becomes Findable, Accessible, Interoperable, and Reusable.

For the purposes of this guide all you need to know about IOTICSpaces is that they allow you to create Digital Twins quickly and consistently as well as enabling them to connect and communicate with other Digital Twins. If you would like to read more about the benefits of IOTICSpaces then take a look at our FAIR Data page for more information.

Throughout this guide we will be taking a look in your IOTICSpace to see the output of your work, if you're already setup you can log into your IOTICSpace by going to: https://yourspace.iotics.space/.

We will also be referencing a number of endpoints for your space, if you are ever unsure where to go you can find all the relevant endpoints for your IOTICSpace by going to: https://yourspace.iotics.space/index.json.

🚧

Note:

If you don't currently have an IOTICSpace then you can sign up through our Contact Us page or by emailing IOTICS Support.

Preparation

Environment set up

To make this guide as easy to follow as possible we will be setting up a new Python Virtual Environment containing all the dependencies we'll need for the tutorial.

  1. Clone the IOTICS Host Library to your device

  2. In your command line navigate to the directory you cloned the library into(for example: C:\path\iotics-host-lib) and run:

python3.8 -m venv env
source env/bin/activate
pip install -U pip setuptools wheel
pip install iotics-identity shortuuid
pip install -f deps/ iotics.host.lib.sources
py -m venv env
.\env\Scripts\activate
pip install -U pip setuptools wheel
pip install iotics-identity shortuuid
pip install -f deps/ iotics.host.lib.sources

Identity

Now that we've created the environment we'll be using in the tutorial we need to set up your credentials for securely using it.

At IOTICS we take identification and security seriously. All interactions must be identified through the use of Decentralised IDs (DIDs).

We will only be touching lightly on DIDs for this tutorial, but they are an emerging effort for establishing a standard for self-sovereign digital identities from the W3C Credentials Community Group. You can read more about DIDs on our Decentralized Identity Documents Key Concept page if you'd like to know more.

Creating and authenticating tutorial_web_api.py

The first step is to create the program's main() method and pass our authentication credentials we'll use to interact with the IOTICS API.

For this step we'll need a number of different variables, they will come from either the script you just ran or found in the index.json file of your IOTICSpace(for example: https://yourspace.iotics.space/index.json).

  1. Set your imports and authentication variables:
import base64
import json
from datetime import datetime, timedelta, timezone
from time import sleep
from typing import NamedTuple
from uuid import uuid4

import shortuuid
import stomp
from iotic.web.stomp.client import StompWSConnection12
from iotics.lib.identity.api.high_level_api import get_rest_high_level_identity_api
from requests import request

RESOLVER_URL = "<resolver_url>"
HOST = "<host_url>"

USER_KEY_NAME = "<from script output>"
AGENT_KEY_NAME = "<from script output>"
USER_SEED = bytes.fromhex("<from script output>")
AGENT_SEED = bytes.fromhex("<from script output>")
  1. Add a new Tutorial class to tutorial_web_api:
class Tutorial:
    def __init__(self):
        self._api = get_rest_high_level_identity_api(resolver_url=RESOLVER_URL)
        self._client_app_id = f"web_api_{uuid4()}"
        self._client_ref = f"d-poc-{shortuuid.random(8)}"
        self._agent_registered_id = None
        self._user_registered_id = None
        self._headers = None
        self._sensors_map = {}

    def setup(self):
        (
            self._user_registered_id,
            self._agent_registered_id,
        ) = self._api.create_user_and_agent_with_auth_delegation(
            user_seed=USER_SEED,
            user_key_name=USER_KEY_NAME,
            agent_seed=AGENT_SEED,
            agent_key_name=AGENT_KEY_NAME,
            delegation_name="#AuthDeleg",
        )

        self._headers = {
            "accept": "application/json",
            "Iotics-ClientRef": self._client_ref,
            "Iotics-ClientAppId": self._client_app_id,
            "Content-Type": "application/json",
        }
        self._refresh_token()

    def _refresh_token(self):
        token = self._api.create_agent_auth_token(
            agent_registered_identity=self._agent_registered_id,
            user_did=self._user_registered_id.did,
            duration=3600,
        )

        self._headers["Authorization"] = f"Bearer {token}"

        return token
  1. Then finally add the new main() method:
def main():
    tutorial = Tutorial()
    tutorial.setup()

if __name__ == "__main__":
    main()

Setting up the Data

Next we're going to arrange the data we're going to be using throughout the tutorial.

  1. Add the below at the top of tutorial_web_api.py:
class RestApi(NamedTuple):
    method: str
    url: str

SENSOR_DATA = RestApi(
    method="GET",
    url="http://flaskapi.dev.iotics.com/sensor_temp",
)

CREATE_TWIN = RestApi(method="POST", url="{host}/qapi/twins")
UPDATE_TWIN = RestApi(method="PATCH", url="{host}/qapi/twins/{twin_id}")
CREATE_FEED = RestApi(method="POST", url="{host}/qapi/twins/{twin_id}/feeds")
UPDATE_FEED = RestApi(method="PATCH", url="{host}/qapi/twins/{twin_id}/feeds/{feed_id}")
DESCRIBE_FEED = RestApi(method="GET", url="{host}/qapi/twins/{twin_id}/feeds/{feed_id}")
SHARE_DATA_FEED = RestApi(
    method="POST", url="{host}/qapi/twins/{twin_id}/feeds/{feed_id}/shares"
)
  1. Add these new class methods to your Tutorial class:
# Inside Tutorial class
    def get_sensor_data(self):
        sensor_data = self._make_api_call(
            method=SENSOR_DATA.method, url=SENSOR_DATA.url
        )

        return sensor_data

    def _make_api_call(self, method, url, json=None):
        response = request(method=method, url=url, headers=self._headers, json=json)

        response.raise_for_status()

        return response.json()

Setting up Stomp classes

The last step in preparing for the tutorial is to setup our Stomp Classes.

  1. Add the StompClient and StompListener classes to the file:
class StompHandler:
    def __init__(self, endpoint, callback, token):
        self._endpoint = endpoint
        self._token = token
        self._stomp_client = None
        self._callback = callback

    def setup(self):
        self._stomp_client = StompWSConnection12(endpoint=self._endpoint)
        self._stomp_client.set_listener(
            "stomp_listener", StompListener(self._stomp_client, self._callback)
        )

        self._stomp_client.connect(wait=True, passcode=self._token)

    def subscribe(self, destination, subscription_id, headers):
        self._stomp_client.subscribe(
            destination=destination, id=subscription_id, headers=headers
        )

    def disconnect(self):
        self._stomp_client.disconnect()


class StompListener(stomp.ConnectionListener):
    def __init__(self, stomp_client, callback):
        self._stomp_client = stomp_client
        self._callback = callback

    def on_error(self, headers, body):
        print('received an error "%s"' % body)

    def on_message(self, headers, body):
        self._callback(headers, body)

    def on_disconnected(self):
        self._stomp_client.disconnect()
        print("disconnected")

Creating your Twins

Now that we've got our credentials and data source in place we can move on to creating our first Digital Twin, the first step is to create Twin Model.

Creating your first Twin Model

For this section we're going to first create a Twin Model that we can then use to quickly create a number of Twins. This allows our Factory owner to quickly model their machinery for monitoring.

First we're going to create a new method called create_model that uses the agent_auth and api_factory you previously created. To create the Model Twin and its Feed, we will be using the Twin API Service and Feed API Service.

  1. Add the below at the top of tutorial_web_api.py:
TWINS_VISIBILITY = "PRIVATE"

TWIN_TYPE_PREDICATE = "http://www.w3.org/1999/02/22-rdf-syntax-ns#type"
LABEL_PREDICATE = "http://www.w3.org/2000/01/rdf-schema#label"
COMMENT_PREDICATE = "http://www.w3.org/2000/01/rdf-schema#comment"
CREATED_FROM_MODEL_PREDICATE = "https://data.iotics.com/app#model"
MODEL_TYPE_PROPERTY = {
    "key": TWIN_TYPE_PREDICATE,
    "uriValue": {"value": "https://data.iotics.com/app#Model"},
}
ALLOW_ALL_HOSTS_PROPERTY = {
    "key": "http://data.iotics.com/public#hostAllowList",
    "uriValue": {"value": "http://data.iotics.com/public#allHosts"},
}

MODEL_LABEL = "Machine model (tutorial)"
FEED_ID = "currentTemp"
  1. Add this new class method to your Tutorial class:
# Inside Tutorial class
    def _create_twin(self, twin_key_name):
        twin_registered_id = self._api.create_twin_with_control_delegation(
            twin_seed=AGENT_SEED,
            twin_key_name=twin_key_name,
            agent_registered_identity=self._agent_registered_id,
            delegation_name="#ControlDeleg",
        )

        self._make_api_call(
            method=CREATE_TWIN.method,
            url=CREATE_TWIN.url.format(host=HOST),
            json={"twinId": {"value": twin_registered_id.did}},
        )

        return twin_registered_id.did

    def _update_twin_with_metadata(
        self, twin_id, visibility, properties=None, cleared_all=True, location=None
    ):
        payload = {"newVisibility": {"visibility": visibility}}

        if properties:
            payload["properties"] = {"added": properties, "clearedAll": cleared_all}
        if location:
            payload["location"] = location

        self._make_api_call(
            method=UPDATE_TWIN.method,
            url=UPDATE_TWIN.url.format(host=HOST, twin_id=twin_id),
            json=payload,
        )

    def _create_feed(self, twin_id, feed_id):
        self._make_api_call(
            method=CREATE_FEED.method,
            url=CREATE_FEED.url.format(host=HOST, twin_id=twin_id),
            json={"feedId": {"value": feed_id}},
        )

    def _describe_feed(self, twin_id, feed_id):
        feed_description = self._make_api_call(
            method=DESCRIBE_FEED.method,
            url=DESCRIBE_FEED.url.format(host=HOST, twin_id=twin_id, feed_id=feed_id),
        )

        return feed_description
      
    def _update_feed(
        self,
        twin_id,
        feed_id,
        metadata,
        properties=None,
        cleared_all=True,
        store_last=True,
    ):
        payload = {"storeLast": store_last, "values": {"added": metadata}}

        if properties:
            payload["properties"] = {"added": properties, "clearedAll": cleared_all}

        self._make_api_call(
            method=UPDATE_FEED.method,
            url=UPDATE_FEED.url.format(host=HOST, twin_id=twin_id, feed_id=feed_id),
            json=payload,
        )

    def create_model(self):
        # Create Model twin
        model_twin_did = self._create_twin(twin_key_name="#MachineModel")

        # Add Properties
        self._update_twin_with_metadata(
            twin_id=model_twin_did,
            properties=[
                MODEL_TYPE_PROPERTY,
                ALLOW_ALL_HOSTS_PROPERTY,
                {
                    "key": LABEL_PREDICATE,
                    "langLiteralValue": {"value": MODEL_LABEL, "lang": "en"},
                },
            ],
            visibility=TWINS_VISIBILITY,
        )

        # Add Feed
        self._create_feed(twin_id=model_twin_did, feed_id=FEED_ID)

        # Add Value
        self._update_feed(
            twin_id=model_twin_did,
            feed_id=FEED_ID,
            properties=[
                {
                    "key": LABEL_PREDICATE,
                    "langLiteralValue": {"value": FEED_ID, "lang": "en"},
                }
            ],
            metadata=[
                {
                    "comment": "Temperature in degrees Celsius",
                    "dataType": "decimal",
                    "label": VALUE_LABEL,
                    "unit": "http://purl.obolibrary.org/obo/UO_0000027",
                }
            ],
        )

        print("Model twin created")

        return model_twin_did
  1. Add this method to your main() class:
def main():
    tutorial = Tutorial()
    tutorial.setup()

    model_twin_id = tutorial.create_model()
  1. Now just run the file from your CLI:
python tutorial_web_api.py

You should see something similar to the below returned:

694

👍

Congratulations!

You have created your first model!

Creating your first Digital Twins

Now that we've created a Model we can use it to create the Twins needed to monitor our Factory's assets. We'll be using the same APIs as before, so this method will need the same parameters as the method used to make the model.

We'll be touching on searching in this section, but if you want a more in-depth look see our dedicated Searching guide.

  1. Add the below at the top of tutorial_web_api.py:
# Top of file
SEARCH_TWINS = RestApi(method="POST", url="{host}/qapi/searches")
  1. Add this new method to the Tutorial class:
# Inside Tutorial class
    def _search_twins(self, properties=None, text=None, location=None, scope="LOCAL"):
        request_timeout = datetime.now(tz=timezone.utc) + timedelta(seconds=10)
        self._headers.update({"Iotics-RequestTimeout": request_timeout.isoformat()})

        payload = {"filter": {}, "responseType": "FULL"}

        if properties:
            payload["filter"]["properties"] = properties
        if text:
            payload["filter"]["text"] = text
        if location:
            payload["filter"]["location"] = location

        twins_list = []

        with request(
            method=SEARCH_TWINS.method,
            url=SEARCH_TWINS.url.format(host=HOST),
            headers=self._headers,
            json=payload,
            stream=True,
            verify=False,
            params={"scope": scope},
        ) as resp:
            resp.raise_for_status()

            for chunk in resp.iter_lines():
                response = json.loads(chunk)
                try:
                    twins_list = response["result"]["payload"]["twins"]
                except (KeyError, IndexError):
                    continue
                else:
                    break

        self._headers.pop("Iotics-RequestTimeout")
        return twins_list

    def _publish_feed_value(
        self, sensor_data, twin_id, feed_id, print_data=True, twin_label=None
    ):
        data_to_share = {VALUE_LABEL: sensor_data["temp"]}
        encoded_data = base64.b64encode(json.dumps(data_to_share).encode()).decode()
        data_to_share_payload = {
            "sample": {
                "data": encoded_data,
                "mime": "application/json",
                "occurredAt": datetime.now(tz=timezone.utc).isoformat(),
            }
        }

        self._make_api_call(
            method=SHARE_DATA_FEED.method,
            url=SHARE_DATA_FEED.url.format(host=HOST, twin_id=twin_id, feed_id=feed_id),
            json=data_to_share_payload,
        )

        if print_data:
            print(f"Sharing data for {twin_label}: {data_to_share}")
          
    def create_machine_from_model(self):
        # Search for Machine Model
        twins_list = self._search_twins(
            properties=[MODEL_TYPE_PROPERTY], text=MODEL_LABEL
        )
        model_twin = twins_list[0]

        data = self.get_sensor_data()

        # Create new twins based on the model
        for machine_number, sensor_data in enumerate(data):
            machine_name = f"machine_{machine_number}"
            machine_twin_id = self._create_twin(twin_key_name=machine_name)

            # Add Properties
            model_twin_did = model_twin["id"]["value"]
            self._update_twin_with_metadata(
                twin_id=machine_twin_id,
                properties=[
                    ALLOW_ALL_HOSTS_PROPERTY,
                    {
                        "key": TWIN_TYPE_PREDICATE,
                        "uriValue": {
                            "value": "https://data.iotics.com/tutorial#Sensor"
                        },
                    },
                    {
                        "key": CREATED_FROM_MODEL_PREDICATE,
                        "uriValue": {"value": model_twin_did},
                    },
                    {
                        "key": "https://data.iotics.com/tutorial#serialNumber",
                        "stringLiteralValue": {"value": "%06d" % machine_number},
                    },
                    {
                        "key": LABEL_PREDICATE,
                        "langLiteralValue": {
                            "value": f"{machine_name} (tutorial)",
                            "lang": "en",
                        },
                    },
                ],
                location={"location": {"lat": 51.5, "lon": -0.1}},
                visibility=TWINS_VISIBILITY,
            )

            # Add Feeds
            for feed in model_twin["feeds"]:
                feed_id = feed["feed"]["id"]["value"]

                self._create_feed(twin_id=machine_twin_id, feed_id=feed_id)

                # Describe Model's feed
                feed_description = self._describe_feed(
                    twin_id=model_twin_did, feed_id=feed_id
                )

                feed_properties = feed_description["result"]["properties"]
                feed_values = feed_description["result"]["values"]
                store_last = feed_description["result"]["storeLast"]

                # Add Feed's Values
                values_metadata_list = []

                for value in feed_values:
                    value_comment = value["comment"]
                    value_label = value["label"]
                    value_unit = value["unit"]
                    value_datatype = value["dataType"]

                    metadata = {
                        "comment": value_comment,
                        "dataType": value_datatype,
                        "label": value_label,
                        "unit": value_unit,
                    }
                    values_metadata_list.append(metadata)

                self._update_feed(
                    twin_id=machine_twin_id,
                    feed_id=feed_id,
                    properties=feed_properties,
                    metadata=values_metadata_list,
                    store_last=store_last,
                )

                # Share first sample of data
                self._publish_feed_value(
                    sensor_data=sensor_data,
                    twin_id=machine_twin_id,
                    feed_id=feed_id,
                    print_data=False,
                )

            self._sensors_map[machine_name] = machine_twin_id
            print("Machine twin created:", machine_name)
  1. Add the new method to main():
def main():
    tutorial = Tutorial()
    tutorial.setup()

    model_twin_id = tutorial.create_model()
    tutorial.create_machine_from_model()
  1. Now run the script to make these changes in your space:
python tutorial_web_api.py

You should see something similar to the below returned:

686

👍

Congratulations!

You have created and populated your first twins. If you would like to read about the steps above in more detail see our Digital Twins Advanced Integration guide.

Creating your interactions

Data interactions allow you to combine or transform data in various ways to generate new data that is more specific to your needs. In this example, we are going to do a Transformation Interaction on the machine Twins we created earlier, then create new Twins which alert us if the temperature gets too hot (above 30°C for this example).

We do not need to create these output twins ourselves as there is a process running in your IOTICSpace that automatically does this for you. All you need to do is create an Interaction Twin with the appropriate configuration.

We need to populate this twin so the Interaction Engine can find it. To do this we give it the required RDF type. The Interaction Twins serve as models for the output Twins, so we give it the feed that the output twin will need.

  1. Add the below at the top of tutorial_web_api.py:
# Top of file
VALUE_LABEL = "temperature"
OUTPUT_FEED_NAME = "temperature_status"
OUTPUT_VALUE_LABEL = "status"
  1. Add new method to the Tutorial class:
def create_interaction(self, model_twin_did):
        # Create Interaction twin
        twin_did = self._create_twin(twin_key_name="#SensorInteraction")

        print("Interaction Twin created")

        interaction_config = {
            "enabled": True,
            "rules": [
                {
                    "transformation": {
                        "conditions": [
                            {
                                "fieldsIncludedInOutput": [VALUE_LABEL],
                                "jsonLogic": {">": [{"var": VALUE_LABEL}, 30]},
                            }
                        ],
                        "outputFeedId": OUTPUT_FEED_NAME,
                        "outputFieldId": OUTPUT_VALUE_LABEL,
                        "outputTrueValue": "extreme",
                        "outputFalseValue": "normal",
                        "sourceFeedId": FEED_ID,
                        "sourceId": "1",
                    }
                }
            ],
            "sources": [
                {
                    "cleanupRateS": 900,
                    "feeds": [{"fieldIds": [VALUE_LABEL], "id": FEED_ID}],
                    "filter": {
                        "properties": [
                            {
                                "key": "https://data.iotics.com/app#model",
                                "value": {"uriValue": {"value": model_twin_did}},
                            }
                        ],
                        "text": None,
                    },
                    "id": "1",
                    "modelDid": model_twin_did,
                    "refreshRateS": 300,
                }
            ],
        }

        self._update_twin_with_metadata(
            twin_id=twin_did,
            visibility=TWINS_VISIBILITY,
            properties=[
                MODEL_TYPE_PROPERTY,
                ALLOW_ALL_HOSTS_PROPERTY,
                {
                    "key": TWIN_TYPE_PREDICATE,
                    "uriValue": {"value": "https://data.iotics.com/app#Interaction"},
                },
                {
                    "key": "https://data.iotics.com/app#interactionConfig",
                    "stringLiteralValue": {"value": json.dumps(interaction_config)},
                },
                {
                    "key": LABEL_PREDICATE,
                    "langLiteralValue": {
                        "value": "Sensor Overheating Alert",
                        "lang": "en",
                    },
                },
            ],
        )

        # Add Feed
        self._create_feed(twin_id=twin_did, feed_id=OUTPUT_FEED_NAME)

        # Add Value
        self._update_feed(
            twin_id=twin_did,
            feed_id=OUTPUT_FEED_NAME,
            properties=[
                {
                    "key": LABEL_PREDICATE,
                    "langLiteralValue": {"value": "Temperature status", "lang": "en"},
                }
            ],
            metadata=[
                {
                    "comment": "Temperature status: normal or extreme",
                    "dataType": "string",
                    "label": OUTPUT_VALUE_LABEL,
                }
            ],
        )

        return twin_did
  1. Add to the new method to main():
def main():
    tutorial = Tutorial()
    tutorial.setup()

    model_twin_id = tutorial.create_model()
    tutorial.create_machine_from_model()
    interaction_twin_id = tutorial.create_interaction(model_twin_id)
  1. Run your tutorial file:
python tutorial_web_api.py

You'll see the Interaction Twin created within your space:

687

👍

Congratulations!

You have created your first Interaction Twin!

Sharing data

The last step in our tutorial will be to follow the new output Twins so we can be alerted whenever one of our machines overheats. In practice, this would usually be in a different process from the one reading the data into IOTICSpace, but to minimize how many tabs you have to work with, the tutorial is going to put it together in the same script.

Here we use a new IOTICS API, the Follow API.

We're going to use that follow_api to provide a callback method that gets called whenever data is shared to the specified feed. The callback will need to know which Twin is triggering it, so we save the "subscription ids" we get from following Twins into a dictionary with the Twins' labels, which we make globally available.

For the tutorial, we'll add a simple sample logic for the callback method to print a message to the console whenever a sensor overheats.

You could do any number of things instead, send an email, make a POST request to an API managing the sensors, and even share data with another twin.

  1. Add the below at the top of tutorial_web_api.py:
# Top of file
SUBSCRIPTIONS_MAP = {}
  1. Add these new methods to the Tutorial class:
# Inside Tutorial class
    @staticmethod
    def _follow_callback(headers, body):
        payload = json.loads(body)
        sub_id = headers["destination"].split("/")[3]
        data = payload["feedData"]["data"]

        try:
            sensor = SUBSCRIPTIONS_MAP[sub_id]
        except KeyError:
            print("No subscription found for twinId", sub_id)
        else:
            interaction_data = json.loads(base64.b64decode(data).decode("ascii"))

            if interaction_data[OUTPUT_VALUE_LABEL] == "extreme":
                print(f"{sensor}: SENSOR IS OVERHEATING! OH THE HUMANITY!!")

         
    def share_data(self, data):
        for machine_number, sensor_data in enumerate(data):
            machine_name = f"machine_{machine_number}"
            machine_twin_id = self._sensors_map.get(machine_name)
            if not machine_twin_id:
                continue

            self._publish_feed_value(
                sensor_data=sensor_data,
                twin_id=machine_twin_id,
                feed_id=FEED_ID,
                twin_label=machine_name,
            )
  1. Add a loop to repeatedly get the sensor data in main():
def main():
    tutorial = Tutorial()
    tutorial.setup()

    model_twin_id = tutorial.create_model()
    tutorial.create_machine_from_model()
    interaction_twin_id = tutorial.create_interaction(model_twin_id)

    while True:
        print("\nGetting latest temperatures...")
        data = tutorial.get_sensor_data()
        tutorial.share_data(data)

        sleep(5)
  1. Now just run the tutorial file from your CLI:
python tutorial_web_api.py

You should see something similar to the below returned:

639

👍

Congratulations!

Your can now start to see the output of your Twins!

Receiving alerts

  1. Add the below at the top of tutorial_web_api.py:
# Top of file
INDEX_PAGE = RestApi(
    method="GET",
    url="{host}/index.json",
)
SUBSCRIBE_TO_FEED = RestApi(
    method="GET",
    url="/qapi/twins/{follower_twin_id}/interests/twins/{followed_twin_id}/feeds/{followed_feed_name}",
)
  1. Add these new methods to the Tutorial class:
# Inside Tutorial class
    def _subscribe_to_feed(
        self, follower_twin_id, followed_twin_id, followed_feed_name, callback
    ):
        response = self._make_api_call(
            method=INDEX_PAGE.method, url=INDEX_PAGE.url.format(host=HOST)
        )

        feed_path = f"/qapi/twins/{follower_twin_id}/interests/twins/{followed_twin_id}/feeds/{followed_feed_name}"

        stomp_handler = StompHandler(
            endpoint=response["stomp"], callback=callback, token=self._refresh_token()
        )
        stomp_handler.setup()
        stomp_handler.subscribe(
            destination=feed_path,
            subscription_id=self._headers["Iotics-ClientRef"],
            headers=self._headers,
        )

    def follow_sensors(self, interaction_twin_id):
        output_twins = []

        print("Searching for output twins", end="", flush=True)

        while len(output_twins) < len(self._sensors_map):
            output_twins = self._search_twins(
                properties=[
                    {
                        "key": CREATED_FROM_MODEL_PREDICATE,
                        "uriValue": {"value": interaction_twin_id},
                    }
                ],
            )
            sleep(10)
            print(".", end="", flush=True)

        print("\nFound %s output twins" % len(output_twins))

        for sensor in output_twins:
            sensor_id = sensor["id"]["value"]
            self._subscribe_to_feed(
                sensor_id, sensor_id, OUTPUT_FEED_NAME, self._follow_callback
            )

            sensor_label = None

            # Search for the label property
            for prop in sensor["properties"]:
                if prop["key"] == LABEL_PREDICATE:
                    sensor_label = prop["langLiteralValue"]["value"]
                    break

            if sensor_label:
                SUBSCRIPTIONS_MAP[sensor_id] = sensor_label
  1. Add the follow_sensors method to main(), before the while loop:
def main():
    # ...
    # interaction_twin_id = tutorial.create_interaction(model_twin_id)
    tutorial.follow_sensors(interaction_twin_id)

    while True:
        print("\nGetting latest temperatures...")
        data = tutorial.get_sensor_data()
        tutorial.share_data(data)
    
        sleep(5)
  1. Run the tutorial_web_api.py file for the final time:
python tutorial_web_api.py

You should see something like:

864

👍

Congratulations!

Your data interaction is now running end-to-end, and the tutorial is complete!

Example file

Click to see example tutorial_web_api.py

RESOLVER_URL = "<resolver_url>"
HOST = "<host_url>"

USER_KEY_NAME = "<from script output>"
AGENT_KEY_NAME = "<from script output>"
USER_SEED = bytes.fromhex("<from script output>")
AGENT_SEED = bytes.fromhex("<from script output>")

TWINS_VISIBILITY = "PRIVATE"

TWIN_TYPE_PREDICATE = "http://www.w3.org/1999/02/22-rdf-syntax-ns#type"
LABEL_PREDICATE = "http://www.w3.org/2000/01/rdf-schema#label"
COMMENT_PREDICATE = "http://www.w3.org/2000/01/rdf-schema#comment"
CREATED_FROM_MODEL_PREDICATE = "https://data.iotics.com/app#model"
MODEL_TYPE_PROPERTY = {
    "key": TWIN_TYPE_PREDICATE,
    "uriValue": {"value": "https://data.iotics.com/app#Model"},
}
ALLOW_ALL_HOSTS_PROPERTY = {
    "key": "http://data.iotics.com/public#hostAllowList",
    "uriValue": {"value": "http://data.iotics.com/public#allHosts"},
}

MODEL_LABEL = "Machine model (tutorial)"
FEED_ID = "currentTemp"
VALUE_LABEL = "temperature"
OUTPUT_FEED_NAME = "temperature_status"
OUTPUT_VALUE_LABEL = "status"

SUBSCRIPTIONS_MAP = {}


class RestApi(NamedTuple):
    method: str
    url: str


SENSOR_DATA = RestApi(
    method="GET",
    url="http://flaskapi.dev.iotics.com/sensor_temp",
)
INDEX_PAGE = RestApi(
    method="GET",
    url="{host}/index.json",
)

CREATE_TWIN = RestApi(method="POST", url="{host}/qapi/twins")
UPDATE_TWIN = RestApi(method="PATCH", url="{host}/qapi/twins/{twin_id}")
CREATE_FEED = RestApi(method="POST", url="{host}/qapi/twins/{twin_id}/feeds")
UPDATE_FEED = RestApi(method="PATCH", url="{host}/qapi/twins/{twin_id}/feeds/{feed_id}")
DESCRIBE_FEED = RestApi(method="GET", url="{host}/qapi/twins/{twin_id}/feeds/{feed_id}")
SHARE_DATA_FEED = RestApi(
    method="POST", url="{host}/qapi/twins/{twin_id}/feeds/{feed_id}/shares"
)
SUBSCRIBE_TO_FEED = RestApi(
    method="GET",
    url="/qapi/twins/{follower_twin_id}/interests/twins/{followed_twin_id}/feeds/{followed_feed_name}",
)
SEARCH_TWINS = RestApi(method="POST", url="{host}/qapi/searches")


class Tutorial:
    def __init__(self):
        self._api = get_rest_high_level_identity_api(resolver_url=RESOLVER_URL)
        self._client_app_id = f"web_api_{uuid4()}"
        self._client_ref = f"d-poc-{shortuuid.random(8)}"
        self._agent_registered_id = None
        self._user_registered_id = None
        self._headers = None
        self._sensors_map = {}

    def setup(self):
        (
            self._user_registered_id,
            self._agent_registered_id,
        ) = self._api.create_user_and_agent_with_auth_delegation(
            user_seed=USER_SEED,
            user_key_name=USER_KEY_NAME,
            agent_seed=AGENT_SEED,
            agent_key_name=AGENT_KEY_NAME,
            delegation_name="#AuthDeleg",
        )

        self._headers = {
            "accept": "application/json",
            "Iotics-ClientRef": self._client_ref,
            "Iotics-ClientAppId": self._client_app_id,
            "Content-Type": "application/json",
        }
        self._refresh_token()

    def _refresh_token(self):
        token = self._api.create_agent_auth_token(
            agent_registered_identity=self._agent_registered_id,
            user_did=self._user_registered_id.did,
            duration=3600,
        )

        self._headers["Authorization"] = f"Bearer {token}"

        return token

    def _make_api_call(self, method, url, json=None):
        response = request(method=method, url=url, headers=self._headers, json=json)

        response.raise_for_status()

        return response.json()

    def _create_twin(self, twin_key_name):
        twin_registered_id = self._api.create_twin_with_control_delegation(
            twin_seed=AGENT_SEED,
            twin_key_name=twin_key_name,
            agent_registered_identity=self._agent_registered_id,
            delegation_name="#ControlDeleg",
        )

        self._make_api_call(
            method=CREATE_TWIN.method,
            url=CREATE_TWIN.url.format(host=HOST),
            json={"twinId": {"value": twin_registered_id.did}},
        )

        return twin_registered_id.did

    def _update_twin_with_metadata(
        self, twin_id, visibility, properties=None, cleared_all=True, location=None
    ):
        payload = {"newVisibility": {"visibility": visibility}}

        if properties:
            payload["properties"] = {"added": properties, "clearedAll": cleared_all}
        if location:
            payload["location"] = location

        self._make_api_call(
            method=UPDATE_TWIN.method,
            url=UPDATE_TWIN.url.format(host=HOST, twin_id=twin_id),
            json=payload,
        )

    def _create_feed(self, twin_id, feed_id):
        self._make_api_call(
            method=CREATE_FEED.method,
            url=CREATE_FEED.url.format(host=HOST, twin_id=twin_id),
            json={"feedId": {"value": feed_id}},
        )

    def _describe_feed(self, twin_id, feed_id):
        feed_description = self._make_api_call(
            method=DESCRIBE_FEED.method,
            url=DESCRIBE_FEED.url.format(host=HOST, twin_id=twin_id, feed_id=feed_id),
        )

        return feed_description

    def _update_feed(
        self,
        twin_id,
        feed_id,
        metadata,
        properties=None,
        cleared_all=True,
        store_last=True,
    ):
        payload = {"storeLast": store_last, "values": {"added": metadata}}

        if properties:
            payload["properties"] = {"added": properties, "clearedAll": cleared_all}

        self._make_api_call(
            method=UPDATE_FEED.method,
            url=UPDATE_FEED.url.format(host=HOST, twin_id=twin_id, feed_id=feed_id),
            json=payload,
        )

    def _search_twins(self, properties=None, text=None, location=None, scope="LOCAL"):
        request_timeout = datetime.now(tz=timezone.utc) + timedelta(seconds=10)
        self._headers.update({"Iotics-RequestTimeout": request_timeout.isoformat()})

        payload = {"filter": {}, "responseType": "FULL"}

        if properties:
            payload["filter"]["properties"] = properties
        if text:
            payload["filter"]["text"] = text
        if location:
            payload["filter"]["location"] = location

        twins_list = []

        with request(
            method=SEARCH_TWINS.method,
            url=SEARCH_TWINS.url.format(host=HOST),
            headers=self._headers,
            json=payload,
            stream=True,
            verify=False,
            params={"scope": scope},
        ) as resp:
            resp.raise_for_status()

            for chunk in resp.iter_lines():
                response = json.loads(chunk)
                try:
                    twins_list = response["result"]["payload"]["twins"]
                except (KeyError, IndexError):
                    continue
                else:
                    break

        self._headers.pop("Iotics-RequestTimeout")
        return twins_list

    def _publish_feed_value(
        self, sensor_data, twin_id, feed_id, print_data=True, twin_label=None
    ):
        data_to_share = {VALUE_LABEL: sensor_data["temp"]}
        encoded_data = base64.b64encode(json.dumps(data_to_share).encode()).decode()
        data_to_share_payload = {
            "sample": {
                "data": encoded_data,
                "mime": "application/json",
                "occurredAt": datetime.now(tz=timezone.utc).isoformat(),
            }
        }

        self._make_api_call(
            method=SHARE_DATA_FEED.method,
            url=SHARE_DATA_FEED.url.format(host=HOST, twin_id=twin_id, feed_id=feed_id),
            json=data_to_share_payload,
        )

        if print_data:
            print(f"Sharing data for {twin_label}: {data_to_share}")

    def _subscribe_to_feed(
        self, follower_twin_id, followed_twin_id, followed_feed_name, callback
    ):
        response = self._make_api_call(
            method=INDEX_PAGE.method, url=INDEX_PAGE.url.format(host=HOST)
        )

        feed_path = f"/qapi/twins/{follower_twin_id}/interests/twins/{followed_twin_id}/feeds/{followed_feed_name}"

        stomp_handler = StompHandler(
            endpoint=response["stomp"], callback=callback, token=self._refresh_token()
        )
        stomp_handler.setup()
        stomp_handler.subscribe(
            destination=feed_path,
            subscription_id=self._headers["Iotics-ClientRef"],
            headers=self._headers,
        )

    @staticmethod
    def _follow_callback(headers, body):
        payload = json.loads(body)
        sub_id = headers["destination"].split("/")[3]
        data = payload["feedData"]["data"]

        try:
            sensor = SUBSCRIPTIONS_MAP[sub_id]
        except KeyError:
            print("No subscription found for twinId", sub_id)
        else:
            interaction_data = json.loads(base64.b64decode(data).decode("ascii"))

            if interaction_data[OUTPUT_VALUE_LABEL] == "extreme":
                print(f"{sensor}: SENSOR IS OVERHEATING! OH THE HUMANITY!!")

    def create_model(self):
        # Create Model twin
        model_twin_did = self._create_twin(twin_key_name="#MachineModel")

        # Add Properties
        self._update_twin_with_metadata(
            twin_id=model_twin_did,
            properties=[
                MODEL_TYPE_PROPERTY,
                ALLOW_ALL_HOSTS_PROPERTY,
                {
                    "key": LABEL_PREDICATE,
                    "langLiteralValue": {"value": MODEL_LABEL, "lang": "en"},
                },
            ],
            visibility=TWINS_VISIBILITY,
        )

        # Add Feed
        self._create_feed(twin_id=model_twin_did, feed_id=FEED_ID)

        # Add Value
        self._update_feed(
            twin_id=model_twin_did,
            feed_id=FEED_ID,
            properties=[
                {
                    "key": LABEL_PREDICATE,
                    "langLiteralValue": {"value": FEED_ID, "lang": "en"},
                }
            ],
            metadata=[
                {
                    "comment": "Temperature in degrees Celsius",
                    "dataType": "decimal",
                    "label": VALUE_LABEL,
                    "unit": "http://purl.obolibrary.org/obo/UO_0000027",
                }
            ],
        )

        print("Model twin created")

        return model_twin_did

    def create_machine_from_model(self):
        # Search for Machine Model
        twins_list = self._search_twins(
            properties=[MODEL_TYPE_PROPERTY], text=MODEL_LABEL
        )
        model_twin = twins_list[0]

        data = self.get_sensor_data()

        # Create new twins based on the model
        for machine_number, sensor_data in enumerate(data):
            machine_name = f"machine_{machine_number}"
            machine_twin_id = self._create_twin(twin_key_name=machine_name)

            # Add Properties
            model_twin_did = model_twin["id"]["value"]
            self._update_twin_with_metadata(
                twin_id=machine_twin_id,
                properties=[
                    ALLOW_ALL_HOSTS_PROPERTY,
                    {
                        "key": TWIN_TYPE_PREDICATE,
                        "uriValue": {
                            "value": "https://data.iotics.com/tutorial#Sensor"
                        },
                    },
                    {
                        "key": CREATED_FROM_MODEL_PREDICATE,
                        "uriValue": {"value": model_twin_did},
                    },
                    {
                        "key": "https://data.iotics.com/tutorial#serialNumber",
                        "stringLiteralValue": {"value": "%06d" % machine_number},
                    },
                    {
                        "key": LABEL_PREDICATE,
                        "langLiteralValue": {
                            "value": f"{machine_name} (tutorial)",
                            "lang": "en",
                        },
                    },
                ],
                location={"location": {"lat": 51.5, "lon": -0.1}},
                visibility=TWINS_VISIBILITY,
            )

            # Add Feeds
            for feed in model_twin["feeds"]:
                feed_id = feed["feed"]["id"]["value"]

                self._create_feed(twin_id=machine_twin_id, feed_id=feed_id)

                # Describe Model's feed
                feed_description = self._describe_feed(
                    twin_id=model_twin_did, feed_id=feed_id
                )

                feed_properties = feed_description["result"]["properties"]
                feed_values = feed_description["result"]["values"]
                store_last = feed_description["result"]["storeLast"]

                # Add Feed's Values
                values_metadata_list = []

                for value in feed_values:
                    value_comment = value["comment"]
                    value_label = value["label"]
                    value_unit = value["unit"]
                    value_datatype = value["dataType"]

                    metadata = {
                        "comment": value_comment,
                        "dataType": value_datatype,
                        "label": value_label,
                        "unit": value_unit,
                    }
                    values_metadata_list.append(metadata)

                self._update_feed(
                    twin_id=machine_twin_id,
                    feed_id=feed_id,
                    properties=feed_properties,
                    metadata=values_metadata_list,
                    store_last=store_last,
                )

                # Share first sample of data
                self._publish_feed_value(
                    sensor_data=sensor_data,
                    twin_id=machine_twin_id,
                    feed_id=feed_id,
                    print_data=False,
                )

            self._sensors_map[machine_name] = machine_twin_id
            print("Machine twin created:", machine_name)

    def create_interaction(self, model_twin_did):
        # Create Interaction twin
        twin_did = self._create_twin(twin_key_name="#SensorInteraction")

        print("Interaction Twin created")

        interaction_config = {
            "enabled": True,
            "rules": [
                {
                    "transformation": {
                        "conditions": [
                            {
                                "fieldsIncludedInOutput": [VALUE_LABEL],
                                "jsonLogic": {">": [{"var": VALUE_LABEL}, 30]},
                            }
                        ],
                        "outputFeedId": OUTPUT_FEED_NAME,
                        "outputFieldId": OUTPUT_VALUE_LABEL,
                        "outputTrueValue": "extreme",
                        "outputFalseValue": "normal",
                        "sourceFeedId": FEED_ID,
                        "sourceId": "1",
                    }
                }
            ],
            "sources": [
                {
                    "cleanupRateS": 900,
                    "feeds": [{"fieldIds": [VALUE_LABEL], "id": FEED_ID}],
                    "filter": {
                        "properties": [
                            {
                                "key": "https://data.iotics.com/app#model",
                                "value": {"uriValue": {"value": model_twin_did}},
                            }
                        ],
                        "text": None,
                    },
                    "id": "1",
                    "modelDid": model_twin_did,
                    "refreshRateS": 300,
                }
            ],
        }

        # Add Properties
        self._update_twin_with_metadata(
            twin_id=twin_did,
            visibility=TWINS_VISIBILITY,
            properties=[
                MODEL_TYPE_PROPERTY,
                ALLOW_ALL_HOSTS_PROPERTY,
                {
                    "key": TWIN_TYPE_PREDICATE,
                    "uriValue": {"value": "https://data.iotics.com/app#Interaction"},
                },
                {
                    "key": "https://data.iotics.com/app#interactionConfig",
                    "stringLiteralValue": {"value": json.dumps(interaction_config)},
                },
                {
                    "key": LABEL_PREDICATE,
                    "langLiteralValue": {
                        "value": "Sensor Overheating Alert",
                        "lang": "en",
                    },
                },
            ],
        )

        # Add Feed
        self._create_feed(twin_id=twin_did, feed_id=OUTPUT_FEED_NAME)

        # Add Feed's Value
        self._update_feed(
            twin_id=twin_did,
            feed_id=OUTPUT_FEED_NAME,
            properties=[
                {
                    "key": LABEL_PREDICATE,
                    "langLiteralValue": {"value": "Temperature status", "lang": "en"},
                }
            ],
            metadata=[
                {
                    "comment": "Temperature status: normal or extreme",
                    "dataType": "string",
                    "label": OUTPUT_VALUE_LABEL,
                }
            ],
        )

        return twin_did

    def share_data(self, data):
        for machine_number, sensor_data in enumerate(data):
            machine_name = f"machine_{machine_number}"
            machine_twin_id = self._sensors_map.get(machine_name)
            if not machine_twin_id:
                continue

            self._publish_feed_value(
                sensor_data=sensor_data,
                twin_id=machine_twin_id,
                feed_id=FEED_ID,
                twin_label=machine_name,
            )

    def follow_sensors(self, interaction_twin_id):
        output_twins = []

        print("Searching for output twins", end="", flush=True)

        while len(output_twins) < len(self._sensors_map):
            output_twins = self._search_twins(
                properties=[
                    {
                        "key": CREATED_FROM_MODEL_PREDICATE,
                        "uriValue": {"value": interaction_twin_id},
                    }
                ],
            )
            sleep(10)
            print(".", end="", flush=True)

        print("\nFound %s output twins" % len(output_twins))

        for sensor in output_twins:
            sensor_id = sensor["id"]["value"]
            self._subscribe_to_feed(
                sensor_id, sensor_id, OUTPUT_FEED_NAME, self._follow_callback
            )

            sensor_label = None

            # Search for the label property
            for prop in sensor["properties"]:
                if prop["key"] == LABEL_PREDICATE:
                    sensor_label = prop["langLiteralValue"]["value"]
                    break

            if sensor_label:
                SUBSCRIPTIONS_MAP[sensor_id] = sensor_label

    def get_sensor_data(self):
        sensor_data = self._make_api_call(
            method=SENSOR_DATA.method, url=SENSOR_DATA.url
        )

        return sensor_data


class StompHandler:
    def __init__(self, endpoint, callback, token):
        self._endpoint = endpoint
        self._token = token
        self._stomp_client = None
        self._callback = callback

    def setup(self):
        self._stomp_client = StompWSConnection12(endpoint=self._endpoint)
        self._stomp_client.set_listener(
            "stomp_listener", StompListener(self._stomp_client, self._callback)
        )

        self._stomp_client.connect(wait=True, passcode=self._token)

    def subscribe(self, destination, subscription_id, headers):
        self._stomp_client.subscribe(
            destination=destination, id=subscription_id, headers=headers
        )

    def disconnect(self):
        self._stomp_client.disconnect()


class StompListener(stomp.ConnectionListener):
    def __init__(self, stomp_client, callback):
        self._stomp_client = stomp_client
        self._callback = callback

    def on_error(self, headers, body):
        print('received an error "%s"' % body)

    def on_message(self, headers, body):
        self._callback(headers, body)

    def on_disconnected(self):
        self._stomp_client.disconnect()
        print("disconnected")


def main():
    tutorial = Tutorial()
    tutorial.setup()

    model_twin_id = tutorial.create_model()
    tutorial.create_machine_from_model()
    interaction_twin_id = tutorial.create_interaction(model_twin_id)
    tutorial.follow_sensors(interaction_twin_id)

    while True:
        print("\nGetting latest temperatures...")
        data = tutorial.get_sensor_data()
        tutorial.share_data(data)

        sleep(5)


if __name__ == "__main__":
    main()