Share Data & Follow Feeds

IOTICS is all about sharing data in real-time or as and when data becomes available. Digital Twins publish data through one or more data feeds, and they can at the same time follow one or more data feeds from one or more other Digital Twins to retrieve their data as well.

This page explains how a Digital Twin publishes (shares) data, as well as follows (receives) data from another Digital Twin.


Introduction to Sharing Data

In IOTICS, data sharing only happens through and between Digital Twins. These connections are called "brokered interactions" or "twin-to-twin interactions". A Digital Twin can publish data through one or more data feeds, as well as follow one or more data feeds from one or more other Digital Twins.

In essence, IOTICS data ecosystems and networks are created by Digital Twins exchanging data with one another.

Digital Twin types

Depending on whether a Digital Twin is publishing and/or following data, we distinguish between the following types:

  • Publisher: a Digital Twin that publishes data via one or more data feeds;
  • Follower: a Digital Twin that follows one or more data feeds from one or more Digital Twins;
  • Synthesiser: a Digital Twin that is both a Publisher and a Follower, and concurrently publishes and follows one or more data feeds.

Historical data in IOTICS

It is important to note that IOTICS is all about streaming data. We don't store historical data. Only a Digital Twin's metadata properties are stored in order to enable search.

This implies that, by default, all data shared through a data feed will be lost unless another Digital Twin follows, uses or stores it. Only the latest shared Value of a Digital Twin Publisher's Feed can be retrieved if - and only if - the Digital Twin Publisher's Feed has the storeLast parameter set to True.

How to share data through a Digital Twin's data feed

Consider the following points when creating a Digital Twin Publisher:

  1. A Digital Twin can only publish data through a data feed. Therefore, a Feed and a Value must first be created for that Twin;
  2. The data to be shared needs to be encoded as base64;
  3. The data must always include a dictionary made up of the Value Label(s) as key(s) and the actual data as value(s);
  4. Consider building a code application ("Publisher Connector") to continuously retrieve data from outside IOTICS and share it via the Twin's Feed.
Click to see the prerequisites

  1. Create your user credentials (guide);
  2. Delegate the agent so it can work on behalf of the user (guide);
  3. Create your token to interact with your Host (guide);
  4. Create headers to be added alongside the API request (guide);
  5. Create an Iotics Digital Twin with a Feed and a Value (you can either use the Upsert Twin API only or the Create Twin API + Create Feed API + Update Feed API);
1. Create Twin with a Feed and a Value
2. Encode the data to share into a Base64 format
3. Create the json payload made up of the encoded data, mime type and timestamp
4. Use the Share Data API call
import base64
import json
from requests import request

data_to_share = {"reading": 25.0}
encoded_data = base64.b64encode(json.dumps(data_to_share).encode()).decode()
data_to_share_payload = {
    "sample": {
        "data": encoded_data,
        "mime": "application/json"
    }
}

response = request(
    method="POST",
    url=f"{HOST}/qapi/hosts/{host_id}/twins/{twin_did}/feeds/{feed_id}/shares",
    headers=headers,  # It includes the token
    json=data_to_share_payload,
)

response.raise_for_status()

How to follow a Digital Twin's data feed with the IOTICS API

Consider the following points when creating a Digital Twin Follower:

  1. Check whether you're allowed to access (= follow) data from a specific Digital Twin, as your access may have been restricted. For more details on how to update access permissions go to Selective Sharing for Metadata and Data. As a rule of thumb:
    • Data feeds from your own local Twins (e.g. Twins that are in the same IOTICSpace as the Twin Publisher) can always be followed,
    • Data feeds from remote Twins (e.g. Twins in a different IOTICSpace than the Twin Publisher) can only be followed if the access permissions have been enabled;
  2. Data from a Feed needs to be decoded from base64;
  3. Consider building a code application ("Follower connector") to continuously follow data from a Twin's Feed and executes an action outside IOTICS (e.g. stores the data into a database or activates a physical sensor).
Click to see the prerequisites

  1. Create your user credentials (guide);
  2. Delegate the agent so it can work on behalf of the user (guide);
  3. Create your token to interact with your Host (guide);
  4. Create headers to be added alongside the API request (guide);
  5. Create an Iotics Digital Twin (you can either use the Upsert Twin API or the Create Twin API);
  6. Search and/or describe the Digital Twin Publisher you want to follow in order to get both the Twin DID and Feed ID (guide);
1. Create Twin
2. Get Twin DID and Feed ID of the Twin's Feed to follow
3. Call the Follow Feed API
4. Decode the data received from Base64 format
import base64
import json
from requests import request

response = request(
    method="GET",
    url=f"{HOST}/qapi/hosts/{follower_host_id}/twins/{follower_twin_did}/interests/hosts/{followed_host_id}/twins/{followed_twin_did}/feeds/{feed_id}/samples/last",
    headers=headers,
)

response.raise_for_status()

time = response["feedData"]["occurredAt"]
data = response["feedData"]["data"]

feed_data = json.loads(base64.b64decode(data).decode("ascii"))
print(f"New message occurred at {time}: {feed_data}")

Tutorial of the Share Data and Follow Feeds section

Click to see the entire code on how to locally share data from a Twin Publisher and get the data from a Twin Follower

Instructions:

  1. Download this library on your local machine;
  2. Install the above library in your Python venv: pip install iotic.web.stomp-1.0.6.tar.gz;
  3. Run the share_data_tutorial.py;
  4. Run the follow_feed_tutorial.py and in the same IOTICSpace as the share_data_tutorial.py;
  5. Look at the data received on your terminal.
import base64
import json
from typing import List

from iotics.lib.identity.api.high_level_api import get_rest_high_level_identity_api
from requests import request

RESOLVER_URL = "resolver_url"
HOST = "host_url"

USER_KEY_NAME = "user_key_name"
AGENT_KEY_NAME = "agent_key_name"
USER_SEED = bytes.fromhex("user_seed")
AGENT_SEED = bytes.fromhex("agent_seed")


class IoticsRest:
    def __init__(self):
        self._high_level_api = get_rest_high_level_identity_api(
            resolver_url=RESOLVER_URL
        )

        (
            self._user_registered_id,
            self._agent_registered_id,
        ) = self._high_level_api.create_user_and_agent_with_auth_delegation(
            user_seed=USER_SEED,
            user_key_name=USER_KEY_NAME,
            agent_seed=AGENT_SEED,
            agent_key_name=AGENT_KEY_NAME,
            delegation_name="#AuthDeleg",
        )

        token = self._high_level_api.create_agent_auth_token(
            agent_registered_identity=self._agent_registered_id,
            user_did=self._user_registered_id.did,
            duration=10,
        )

        self._headers = {
            "accept": "application/json",
            "Iotics-ClientAppId": "example_code",
            "Authorization": f"Bearer {token}",
            "Content-Type": "application/json",
        }

    def _make_api_call(self, method: str, url: str, json: dict = None):
        response = request(method=method, url=url, headers=self._headers, json=json)
        response.raise_for_status()

        return response.json()

    def create_twin_identity(self, twin_key_name: str):
        twin_registered_id = self._high_level_api.create_twin_with_control_delegation(
            twin_seed=AGENT_SEED,
            twin_key_name=twin_key_name,
            agent_registered_identity=self._agent_registered_id,
            delegation_name="#ControlDeleg",
        )

        return twin_registered_id.did

    def get_host_id(self):
        host_id = self._make_api_call(method="GET", url=f"{HOST}/qapi/host/id")

        return host_id["hostId"]

    def upsert_twin(
        self,
        twin_did: str,
        host_id: str,
        feeds: List[dict] = None,
        inputs: List[dict] = None,
        location: dict = None,
        properties: List[dict] = None,
    ):
        payload = {"twinId": {"hostId": host_id, "id": twin_did}}

        if location:
            payload["location"] = location
        if feeds:
            payload["feeds"] = feeds
        if inputs:
            payload["inputs"] = inputs
        if properties:
            payload["properties"] = properties

        self._make_api_call(method="PUT", url=f"{HOST}/qapi/twins", json=payload)

    def share_data(
        self, twin_did: str, host_id: str, feed_id: str, data_to_share: dict
    ):
        encoded_data = base64.b64encode(json.dumps(data_to_share).encode()).decode()
        data_to_share_payload = {
            "sample": {"data": encoded_data, "mime": "application/json"}
        }

        self._make_api_call(
            method="POST",
            url=f"{HOST}/qapi/hosts/{host_id}/twins/{twin_did}/feeds/{feed_id}/shares",
            json=data_to_share_payload,
        )


def main():
    iotics_rest = IoticsRest()
    twin_did = iotics_rest.create_twin_identity(twin_key_name="TwinPublisher")
    host_id = iotics_rest.get_host_id()
    iotics_rest.upsert_twin(
        twin_did=twin_did,
        host_id=host_id,
        location={"lat": 51.5, "lon": -0.1},
        properties=[
            # Label
            {
                "key": "http://www.w3.org/2000/01/rdf-schema#label",
                "langLiteralValue": {"value": "Twin Publisher", "lang": "en"},
            },
            # Comment
            {
                "key": "http://www.w3.org/2000/01/rdf-schema#comment",
                "langLiteralValue": {
                    "value": "An example of a Twin Publisher",
                    "lang": "en",
                },
            },
        ],
        feeds=[
            {
                "id": "temperature",
                "storeLast": True,
                "properties": [
                    # Add a Label
                    {
                        "key": "http://www.w3.org/2000/01/rdf-schema#label",
                        "langLiteralValue": {
                            "value": "Current Temperature",
                            "lang": "en",
                        },
                    },
                    # Add a comment
                    {
                        "key": "http://www.w3.org/2000/01/rdf-schema#comment",
                        "langLiteralValue": {
                            "value": "The current temperature reading",
                            "lang": "en",
                        },
                    },
                ],
                "values": [
                    {
                        "comment": "Temperature in degrees Celsius",
                        "dataType": "decimal",
                        "label": "reading",
                        "unit": "http://qudt.org/vocab/unit/DEG_C",
                    }
                ],
            }
        ],
    )
    iotics_rest.share_data(
        twin_did=twin_did,
        host_id=host_id,
        feed_id="temperature",
        data_to_share={"reading": 25.5},
    )


if __name__ == "__main__":
    main()
import base64
import json
from datetime import datetime, timedelta, timezone
from typing import Callable, List, Tuple

import stomp
from iotic.web.stomp.client import StompWSConnection12
from iotics.lib.identity.api.high_level_api import get_rest_high_level_identity_api
from requests import request

RESOLVER_URL = "resolver_url"
HOST = "host_url"

USER_KEY_NAME = "user_key_name"
AGENT_KEY_NAME = "agent_key_name"
USER_SEED = bytes.fromhex("user_seed")
AGENT_SEED = bytes.fromhex("agent_seed")
STOMP_ENDPOINT = "stomp_endpoint"


class StompClient:
    def __init__(
        self,
        endpoint: str,
        callback: Callable,
        heartbeats: Tuple[int, int] = (10000, 10000),
    ):
        self._endpoint = endpoint
        self._stomp_client = None
        self._heartbeats = heartbeats
        self._callback = callback

    def setup(self, token: str):
        self._stomp_client = StompWSConnection12(
            endpoint=self._endpoint, heartbeats=self._heartbeats
        )
        self._stomp_client.set_ssl(verify=True)
        self._stomp_client.set_listener(
            name="stomp_listener",
            lstnr=StompListener(
                stomp_client=self._stomp_client, callback=self._callback
            ),
        )

        self._stomp_client.connect(wait=True, passcode=token)

    def subscribe(self, destination, subscription_id, headers):
        self._stomp_client.subscribe(
            destination=destination, id=subscription_id, headers=headers
        )

    def disconnect(self):
        self._stomp_client.disconnect()


class StompListener(stomp.ConnectionListener):
    def __init__(self, stomp_client, callback):
        self._stomp_client = stomp_client
        self._callback = callback

    def on_error(self, headers, body):
        print(f"Received an error {body}")

    def on_message(self, headers, body):
        self._callback(headers, body)

    def on_disconnected(self):
        self._stomp_client.disconnect()
        print("Disconnected")


class IoticsRest:
    def __init__(self):
        self._high_level_api = get_rest_high_level_identity_api(
            resolver_url=RESOLVER_URL
        )

        (
            self._user_registered_id,
            self._agent_registered_id,
        ) = self._high_level_api.create_user_and_agent_with_auth_delegation(
            user_seed=USER_SEED,
            user_key_name=USER_KEY_NAME,
            agent_seed=AGENT_SEED,
            agent_key_name=AGENT_KEY_NAME,
            delegation_name="#AuthDeleg",
        )

        token = self._high_level_api.create_agent_auth_token(
            agent_registered_identity=self._agent_registered_id,
            user_did=self._user_registered_id.did,
            duration=600,
        )

        print(f"Token will expire at {datetime.now() + timedelta(seconds=600)}")

        self._headers = {
            "accept": "application/json",
            "Iotics-ClientAppId": "example_code",
            "Authorization": f"Bearer {token}",
            "Content-Type": "application/json",
        }

        self._stomp_client = StompClient(
            endpoint=STOMP_ENDPOINT, callback=self.follow_callback
        )
        self._stomp_client.setup(token=token)

    def _make_api_call(self, method: str, url: str, json: dict = None):
        response = request(method=method, url=url, headers=self._headers, json=json)
        response.raise_for_status()

        return response.json()

    def create_twin_identity(self, twin_key_name: str):
        twin_registered_id = self._high_level_api.create_twin_with_control_delegation(
            twin_seed=AGENT_SEED,
            twin_key_name=twin_key_name,
            agent_registered_identity=self._agent_registered_id,
            delegation_name="#ControlDeleg",
        )

        return twin_registered_id.did

    def get_host_id(self):
        host_id = self._make_api_call(method="GET", url=f"{HOST}/qapi/host/id")

        return host_id["hostId"]

    def upsert_twin(
        self,
        twin_did: str,
        host_id: str,
        feeds: List[dict] = None,
        inputs: List[dict] = None,
        location: dict = None,
        properties: List[dict] = None,
    ):
        payload = {"twinId": {"hostId": host_id, "id": twin_did}}

        if location:
            payload["location"] = location
        if feeds:
            payload["feeds"] = feeds
        if inputs:
            payload["inputs"] = inputs
        if properties:
            payload["properties"] = properties

        self._make_api_call(method="PUT", url=f"{HOST}/qapi/twins", json=payload)

    def search_twins(
        self,
        text: str = None,
        location: dict = None,
        properties: List[dict] = None,
        scope: str = "LOCAL",
        response_type: str = "FULL",
    ):
        twins_list = []

        search_headers = self._headers.copy()
        # Search headers require a new header "Iotics-RequestTimeout".
        # The latter is used to stop the request once the timeout is reached
        search_headers.update(
            {
                "Iotics-RequestTimeout": (
                    datetime.now(tz=timezone.utc) + timedelta(seconds=5)
                ).isoformat(),
            }
        )

        payload = {"responseType": response_type, "filter": {}}

        if text:
            payload["filter"]["text"] = text
        if properties:
            payload["filter"]["properties"] = properties
        if location:
            payload["filter"]["location"] = location

        with request(
            method="POST",
            url=f"{HOST}/qapi/searches",
            headers=search_headers,
            stream=True,
            verify=True,
            params={"scope": scope},
            json=payload,
        ) as resp:
            resp.raise_for_status()
            # Iterates over the response data, one Host at a time
            for chunk in resp.iter_lines():
                response = json.loads(chunk)
                twins_found = []
                try:
                    twins_found = response["result"]["payload"]["twins"]
                except KeyError:
                    continue
                finally:
                    if twins_found:
                        # Append the twins found to the list of twins
                        twins_list.extend(twins_found)

        print(f"Found {len(twins_list)} twin(s)")

        return twins_list

    def subscribe_to_feed(
        self,
        twin_follower_did: str,
        twin_follower_host_id: str,
        twin_publisher_did: str,
        twin_publisher_host_id: str,
        feed_id: str,
    ):
        endpoint = f"/qapi/hosts/{twin_follower_host_id}/twins/{twin_follower_did}/interests/hosts/{twin_publisher_host_id}/twins/{twin_publisher_did}/feeds/{feed_id}"

        self._stomp_client.subscribe(
            destination=endpoint,
            subscription_id=f"{twin_publisher_did}-{feed_id}",
            headers=self._headers,
        )

    @staticmethod
    def follow_callback(headers, body):
        encoded_data = json.loads(body)

        twin_publisher_did = encoded_data["interest"]["followedFeedId"]["twinId"]

        try:
            time = encoded_data["feedData"]["occurredAt"]
            data = encoded_data["feedData"]["data"]
        except KeyError:
            print("NO DATA")
        else:
            decoded_feed_data = json.loads(base64.b64decode(data).decode("ascii"))

            print(
                f"LAST MESSAGE SHARED FROM TWIN {twin_publisher_did} AT {time}: {decoded_feed_data}"
            )


def main():
    iotics_rest = IoticsRest()
    twin_did = iotics_rest.create_twin_identity(twin_key_name="TwinFollower")
    host_id = iotics_rest.get_host_id()
    iotics_rest.upsert_twin(
        twin_did=twin_did,
        host_id=host_id,
        properties=[
            # Label
            {
                "key": "http://www.w3.org/2000/01/rdf-schema#label",
                "langLiteralValue": {"value": "Twin Follower", "lang": "en"},
            },
            # Comment
            {
                "key": "http://www.w3.org/2000/01/rdf-schema#comment",
                "langLiteralValue": {
                    "value": "An example of a Twin Follower",
                    "lang": "en",
                },
            },
        ],
    )
    twins_list = iotics_rest.search_twins(
        properties=[
            {
                "key": "http://www.w3.org/2000/01/rdf-schema#label",
                "langLiteralValue": {"value": "Twin Publisher", "lang": "en"},
            }
        ]
    )
    # In this example it is assumed the Twin Publisher to use is the first one in the list
    twin_publisher = next(iter(twins_list))
    twin_publisher_did = twin_publisher["twinId"]["id"]
    twin_publisher_host_id = twin_publisher["twinId"]["hostId"]
    twin_publisher_feed_id = twin_publisher["feeds"][0]["feedId"]["id"]
    iotics_rest.subscribe_to_feed(
        twin_follower_did=twin_did,
        twin_follower_host_id=host_id,
        twin_publisher_did=twin_publisher_did,
        twin_publisher_host_id=twin_publisher_host_id,
        feed_id=twin_publisher_feed_id,
    )
    while True:
        sleep(10)


if __name__ == "__main__":
    main()