Share Data & Follow Feeds

IOTICS is all about sharing data in real-time or as and when data becomes available. Digital Twins publish data through one or more data feeds, and they can at the same time follow one or more data feeds from one or more other Digital Twins to retrieve their data as well.

This page explains how a Digital Twin publishes (shares) data, as well as follows (receives) data from another Digital Twin.


Introduction to Sharing Data

In IOTICS, data sharing only happens through and between Digital Twins. These connections are called "brokered interactions" or "twin-to-twin interactions". A Digital Twin can publish data through one or more data feeds, as well as follow one or more data feeds from one or more other Digital Twins.

In essence, IOTICS data ecosystems and networks are created by Digital Twins exchanging data with one another.

Digital Twin types

Depending on whether a Digital Twin is publishing and/or following data, we distinguish between the following types:

  • Publisher: a Digital Twin that publishes data via one or more data feeds;
  • Follower: a Digital Twin that follows one or more data feeds from one or more Digital Twins;
  • Synthesiser: a Digital Twin that is both a Publisher and a Follower, and concurrently publishes and follows one or more data feeds.

Historical data in IOTICS

It is important to note that IOTICS is all about streaming data. We don't store historical data. Only a Digital Twin's metadata properties are stored in order to enable search.

This implies that, by default, all data shared through a data feed will be lost unless another Digital Twin follows, uses or stores it. Only the latest shared Value of a Digital Twin Publisher's Feed can be retrieved if - and only if - the Digital Twin Publisher's Feed has the storeLast parameter set to True.

How to share data through a Digital Twin's data feed

Consider the following points when creating a Digital Twin Publisher:

  1. A Digital Twin can only publish data through a data feed. Therefore, a Feed and a Value must first be created for that Twin;
  2. The data to be shared needs to be encoded as base64;
  3. The data must always include a dictionary made up of the Value Label(s) as key(s) and the actual data as value(s);
  4. Consider building a code application ("Publisher Connector") to continuously retrieve data from outside IOTICS and share it via the Twin's Feed.
Click to see the prerequisites

  1. Create your user credentials (guide);
  2. Delegate the agent so it can work on behalf of the user (guide);
  3. Create your token to interact with your Host (guide);
  4. Create headers to be added alongside the API request (guide);
  5. Create an Iotics Digital Twin with a Feed and a Value (you can either use the Upsert Twin API only or the Create Twin API + Create Feed API + Update Feed API);
1. Create Twin with a Feed and a Value
2. Encode the data to share into a Base64 format
3. Create the json payload made up of the encoded data, mime type and timestamp
4. Use the Share Data API call
import base64
import json
from datetime import datetime, timezone
from requests import request

data_to_share = {"reading": 25.0}
encoded_data = base64.b64encode(json.dumps(data_to_share).encode()).decode()
data_to_share_payload = {
    "sample": {
        "data": encoded_data,
        "mime": "application/json",
        "occurredAt": datetime.now(tz=timezone.utc).isoformat(),
    }
}

response = request(
    method="POST",
    # Replace with:
    # - URL of the Host
    # - the ID of the Twin Publisher
    # - the Feed ID
    url=f"{HOST}/qapi/twins/{twin_id}/feeds/{feed_id}/shares",
    headers=headers,  # It includes the token
    json=data_to_share_payload,
)

response.raise_for_status()

How to follow a Digital Twin's data feed with the IOTICS API

Consider the following points when creating a Digital Twin Follower:

  1. Check whether you're allowed to access (= follow) data from a specific Digital Twin, as your access may have been restricted. For more details on how to update access permissions go to Selective Data Sharing. As a rule of thumb:
    • Data feeds from your own local Twins (e.g. Twins that are in the same IOTICSpace as the Twin Publisher) can always be followed,
    • Data feeds from remote Twins (e.g. Twins in a different IOTICSpace than the Twin Publisher) can only be followed if the access permissions have been enabled;
  2. Data from a Feed needs to be decoded from base64;
  3. Consider building a code application ("Follower connector") to continuously follow data from a Twin's Feed and executes an action outside IOTICS (e.g. stores the data into a database or activates a physical sensor).
Click to see the prerequisites

  1. Create your user credentials (guide);
  2. Delegate the agent so it can work on behalf of the user (guide);
  3. Create your token to interact with your Host (guide);
  4. Create headers to be added alongside the API request (guide);
  5. Create an Iotics Digital Twin (you can either use the Upsert Twin API or the Create Twin API);
  6. Search and/or describe the Digital Twin Publisher you want to follow in order to get both the Twin DID and Feed ID (guide);
1. Create Twin
2. Get Twin DID and Feed ID of the Twin's Feed to follow
3. Call the Follow Feed API
4. Decode the data received from Base64 format
import base64
import json
from requests import request

response = request(
    method="GET",
    # Replace with:
    # - URL of the Host
    # - the ID of the Twin Follower
    # - the ID of the Twin Publisher that you want to follow
    # - the Feed ID to follow
    url=f"{HOST}/qapi/twins/{follower_twin_id}/interests/twins/{followed_twin_id}/feeds/{followed_feed_id}/samples/last",
    headers=headers,  # It includes the token
)

response.raise_for_status()

time = response["feedData"]["occurredAt"]
data = response["feedData"]["data"]

feed_data = json.loads(base64.b64decode(data).decode("ascii"))
print(f"New message occurred at {time}: {feed_data}")
import base64
import json
from requests import request

response = request(
    method="GET",
    # Replace with:
    # - URL of the Host
    # - the ID of the Twin Follower
    # - the ID of the Host where the Twin Publisher lives
    # - the ID of the Twin Publisher that you want to follow
    # - the Feed ID to follow
    url=f"{HOST}/qapi/twins/{follower_twin_id}/interests/hosts/{host_id}/twins/{followed_twin_id}/feeds/{followed_feed_id}/samples/last",
    headers=headers,  # It includes the token
)

response.raise_for_status()

time = response["feedData"]["occurredAt"]
data = response["feedData"]["data"]

feed_data = json.loads(base64.b64decode(data).decode("ascii"))
print(f"New message occurred at {time}: {feed_data}")

Tutorial of the Share Data and Follow Feeds section

Click to see the entire code on how to locally share data from a Twin Publisher and get the data from a Twin Follower

import base64
import json
import random
from datetime import datetime, timezone
from typing import List

from iotics.lib.identity.api.high_level_api import get_rest_high_level_identity_api
from requests import request

RESOLVER_URL = "resolver_url"
HOST = "host_url"

USER_KEY_NAME = "user_key_name"
AGENT_KEY_NAME = "agent_key_name"
USER_SEED = bytes.fromhex("user_seed")
AGENT_SEED = bytes.fromhex("agent_seed")


def make_api_call(method: str, url: str, json: dict):
    response = request(method=method, url=url, headers=headers, json=json)
    response.raise_for_status()

    return response.json()


def create_twin(twin_key_name: str):
    twin_registered_id = api.create_twin_with_control_delegation(
        twin_seed=AGENT_SEED,
        twin_key_name=twin_key_name,
        agent_registered_identity=agent_registered_id,
        delegation_name="#ControlDeleg",
    )

    return twin_registered_id.did


def upsert_twin(
    twin_id: str,
    visibility: str = "PRIVATE",
    feeds: List[dict] = None,
    location: dict = None,
    properties: List[dict] = None,
):
    payload = {"twinId": twin_id, "visibility": visibility}

    if location:
        payload["location"] = location
    if feeds:
        payload["feeds"] = feeds
    if properties:
        payload["properties"] = properties

    make_api_call(method="PUT", url=f"{HOST}/qapi/twins", json=payload)


# Set up the API with authentication delegation so the AGENT can authenticate on behalf of the USER
api = get_rest_high_level_identity_api(resolver_url=RESOLVER_URL)
(
    user_registered_id,
    agent_registered_id,
) = api.create_user_and_agent_with_auth_delegation(
    user_seed=USER_SEED,
    user_key_name=USER_KEY_NAME,
    agent_seed=AGENT_SEED,
    agent_key_name=AGENT_KEY_NAME,
    delegation_name="#AuthDeleg",
)

token = api.create_agent_auth_token(
    agent_registered_identity=agent_registered_id,
    user_did=user_registered_id.did,
    duration=600,
)

headers = {
    "accept": "application/json",
    "Iotics-ClientAppId": "example_code",
    "Authorization": f"Bearer {token}",
    "Content-Type": "application/json",
}

# Create Twin Publisher
twin_publisher_did = create_twin(twin_key_name="TwinPublisher")

print(f"Twin {twin_publisher_did} created succesfully")

feed_id = "currentTemp"

# Add Metadata, Feed and Value
upsert_twin(
    twin_id=twin_publisher_did,
    properties=[
        {
            "key": "http://www.w3.org/2000/01/rdf-schema#label",
            "langLiteralValue": {"value": "Temperature Sensor", "lang": "en"},
        },
        {
            "key": "http://www.w3.org/2000/01/rdf-schema#comment",
            "langLiteralValue": {
                "value": "A temperature sensor that shares temperature data",
                "lang": "en",
            },
        },
    ],
    feeds=[
        {
            "id": feed_id,
            "storeLast": True,
            "properties": [
                {
                    "key": "http://www.w3.org/2000/01/rdf-schema#label",
                    "langLiteralValue": {"value": "Current Temperature", "lang": "en"},
                }
            ],
            "values": [
                {
                    "comment": "Temperature in degrees Celsius",
                    "dataType": "decimal",
                    "label": "reading",
                    "unit": "http://purl.obolibrary.org/obo/UO_0000027",
                }
            ],
        }
    ],
    location={"lat": 51.5, "lon": -0.1},
)

print("Metadata, Feed and Value added to the Twin")

# Share data
data_to_share = {"reading": round(random.uniform(-10.0, 45.0), 2)}
encoded_data = base64.b64encode(json.dumps(data_to_share).encode()).decode()
data_to_share_payload = {
    "sample": {
        "data": encoded_data,
        "mime": "application/json",
        "occurredAt": datetime.now(tz=timezone.utc).isoformat(),
    }
}
make_api_call(
    method="POST",
    url=f"{HOST}/qapi/twins/{twin_publisher_did}/feeds/{feed_id}/shares",
    json=data_to_share_payload,
)

print(f"Shared {data_to_share}")
import base64
import json
from datetime import datetime, timedelta, timezone
from typing import List

from iotics.lib.identity.api.high_level_api import get_rest_high_level_identity_api
from requests import request

RESOLVER_URL = "resolver_url"
HOST = "host_url"

USER_KEY_NAME = "user_key_name"
AGENT_KEY_NAME = "agent_key_name"
USER_SEED = bytes.fromhex("user_seed")
AGENT_SEED = bytes.fromhex("agent_seed")


def make_api_call(method: str, url: str, json: dict = None):
    response = request(method=method, url=url, headers=headers, json=json)
    response.raise_for_status()

    return response.json()


def create_twin(twin_key_name: str):
    twin_registered_id = api.create_twin_with_control_delegation(
        twin_seed=AGENT_SEED,
        twin_key_name=twin_key_name,
        agent_registered_identity=agent_registered_id,
        delegation_name="#ControlDeleg",
    )

    return twin_registered_id.did


def search_twins(
    properties: List[dict] = None,
    text: str = None,
    location: dict = None,
    scope: str = "GLOBAL",
    response_type: str = "FULL",
):
    # Add 'RequestTimeout' mandatory header parameter
    headers.update(
        {
            "Iotics-RequestTimeout": (
                datetime.now(tz=timezone.utc) + timedelta(seconds=10)
            ).isoformat(),
        }
    )

    payload = {"filter": {}, "responseType": response_type}

    if properties:
        payload["filter"]["properties"] = properties
    if text:
        payload["filter"]["text"] = text
    if location:
        payload["filter"]["location"] = location

    twins_list = []  # Initialise an empty list

    with request(
        method="POST",
        url=f"{HOST}/qapi/searches",
        headers=headers,
        json=payload,
        stream=True,
        verify=False,
        params={"scope": scope},
    ) as resp:
        # Raises HTTPError, if one occurred
        resp.raise_for_status()
        # Iterates over the response data, one line at a time
        for chunk in resp.iter_lines():
            response = json.loads(chunk)
            twins_found = []
            try:
                twins_found = response["result"]["payload"]["twins"]
                host_id = response["result"]["payload"]["remoteHostId"]["value"]
            except KeyError:
                host_id = "local"
            finally:
                if twins_found:
                    # Add the host id value to each twin
                    for twin in twins_found:
                        twin["HostId"] = host_id

                    # Append the twins found to the list of twins
                    twins_list.extend(twins_found)

    headers.pop("Iotics-RequestTimeout")  # This header parameter is no longer needed
    print(f"Found {len(twins_list)} twin(s)")

    return twins_list


# Set up the API with authentication delegation so the AGENT can authenticate on behalf of the USER
api = get_rest_high_level_identity_api(resolver_url=RESOLVER_URL)
(
    user_registered_id,
    agent_registered_id,
) = api.create_user_and_agent_with_auth_delegation(
    user_seed=USER_SEED,
    user_key_name=USER_KEY_NAME,
    agent_seed=AGENT_SEED,
    agent_key_name=AGENT_KEY_NAME,
    delegation_name="#AuthDeleg",
)

token = api.create_agent_auth_token(
    agent_registered_identity=agent_registered_id,
    user_did=user_registered_id.did,
    duration=600,
)

headers = {
    "accept": "application/json",
    "Iotics-ClientAppId": "example_code",
    "Authorization": f"Bearer {token}",
    "Content-Type": "application/json",
}

# Search for the Twin Publisher
twins_list = search_twins(
    properties=[
        {
            "key": "http://www.w3.org/2000/01/rdf-schema#label",
            "langLiteralValue": {"value": "Temperature Sensor", "lang": "en"},
        }
    ],
    scope="LOCAL",
)

twin_publisher = next(iter(twins_list))
twin_publisher_id = twin_publisher["id"]["value"]
twin_publisher_feeds = twin_publisher["feeds"]
# Get the only Feed ID of the demo Twin Publisher
twin_publisher_feed_id = next(iter(twin_publisher_feeds))["feed"]["id"]["value"]

# Create Twin Follower
twin_follower_id = create_twin(twin_key_name="TwinFollower")

print(f"Twin {twin_follower_id} created succesfully")

# Get the latest shared value
encoded_data = make_api_call(
    method="GET",
    url=f"{HOST}/qapi/twins/{twin_follower_id}/interests/twins/{twin_publisher_id}/feeds/{twin_publisher_feed_id}/samples/last",
)

time = encoded_data["feedData"]["occurredAt"]
data = encoded_data["feedData"]["data"]

decoded_feed_data = json.loads(base64.b64decode(data).decode("ascii"))

print(f"New message occurred at {time}: {decoded_feed_data}")

Did this page help you?