Send & Receive Input Messages

Inputs allow a Twin to receive specific messages from any authorised Twin in the Network.

This page covers:


Introduction to Input Messages

By advertising an Input, an IOTICS Digital Twin allows authorised Twins to send direct messages to it. This way, Inputs can be used to send commands to change the state of the underlying asset or to send messages that can be interpreted by the Twin to implement a request/response way of interaction.

Feeds vs. Inputs

  • Inputs are similar to Feeds, they both possess properties and values to describe how the underlying data, which will be transferred, is expected to look.
  • Compared to a Feed, which allows a Twin to share data to any authorised Twin in the Network (one-to-many communications), an Input consents any authorised Twin to share data to a specific Twin (many-to-one communications).
  • Feeds are generally used to continuously import data into an IOTICSpace that can be potentially followed by any Twin in the Network (i.e.: temperature data, weather forecast update, etc.). Inputs are useful to send event-based messages to specific Twins so the latter can respond accordingly (i.e.: client/server communication) or modify the behaviour of the physical asset (i.e.: switch a light on/off).
  • Selective Data Sharing applies to both Feeds and Inputs by using the same AllowList property.
381

Feeds vs. Inputs

📘

Need a refresher on Twin Feeds?

Our previous guide Share Data & Follow Feeds can help you!

Create an Input

A Twin can have none, one or many Inputs as well as a combination of Inputs and Feeds.

How to create an Input with the IOTICS API

An Input can be created through the use of the Create Input operation. Its structure is composed of the same components as Feeds:

  • Basic Structure: it includes Input ID only;
  • Metadata: made up of a list of Properties;
  • Values: the actual payload included in the Input message which is divided into Label, Comment, Datatype and Unit.

Be aware that, like the Create Feed operation, Create Input allows to create only the Basic Structure of the Input. In order to add the remaining components (Metadata and Values) the Update Input operation must be used.

Send Input Messages

In order for a Twin A (sender) to send an Input message to another Twin B (receiver), Twin A needs to be authorised by Twin B in terms of both:

  • Visibility: Twin B needs to be found from the IOTICSpace where Twin A lives;
  • Accessibility: Twin A's IOTICSpace needs to be authorised to share data with Twin B.

How to send Input Messages with the IOTICS API

1. Encode the data to be sent using Base64 and return a bytes object
2. Compose the payload dictionary with `data`, `mime` and `occurredAt` values
3. Use the Send Input message operation
import base64
import json
from datetime import datetime, timezone
from requests import request

message = {"light_on": False}

encoded_data = base64.b64encode(json.dumps(message).encode()).decode()
payload = {
    "message": {
        "data": encoded_data,
        "mime": "application/json"
    }
}

# If the Twin Receiver is in a remote Host
send_input_url = f"{HOST}/qapi/hosts/{twin_sender_host_id}/twins/{twin_sender_did}/interests/hosts/{twin_receiver_host_id}/twins/{twin_receiver_did}/inputs/{input_id}/messages"

response = request(
    method="POST",
    url=send_input_url
    headers=headers,
    json=payload
)

response.raise_for_status()

Tutorial of Twin Input Receiver & Twin Input Sender

The following two tutorials show the creation of:

  • a Twin sender sending Input messages (light sensor readings) to a local Twin with an Input;
  • a Twin receiver with an Input that continuously waits for incoming messages via STOMP and prints every message it receives on screen.
Click to see the tutorial with REST and STOMP

Instructions:

  1. Download this library on your local machine;
  2. Install the above library in your Python venv: pip install iotic.web.stomp-1.0.6.tar.gz;
  3. Run the input_receiver.py in a terminal;
  4. Run the input_sender.py code below from another terminal and use the same IOTICSpace as the Twin receiver;
  5. Watch the messages being sent from the Twin sender terminal and received from the Twin receiver terminal.
import base64
import json
from datetime import datetime, timezone
from time import sleep
from typing import Callable, List, Tuple

import stomp
from iotic.web.stomp.client import StompWSConnection12
from iotics.lib.identity.api.high_level_api import get_rest_high_level_identity_api
from requests import request

RESOLVER_URL = "resolver_url"
HOST = "host_url"
STOMP_ENDPOINT = "stomp_endpoint"

USER_KEY_NAME = "user_key_name"
AGENT_KEY_NAME = "agent_key_name"
USER_SEED = bytes.fromhex("user_seed")
AGENT_SEED = bytes.fromhex("agent_seed")


class StompClient:
    def __init__(
        self,
        endpoint: str,
        callback: Callable,
        heartbeats: Tuple[int, int] = (10000, 10000),
    ):
        self._endpoint = endpoint
        self._stomp_client = None
        self._heartbeats = heartbeats
        self._callback = callback

    def setup(self, token: str):
        self._stomp_client = StompWSConnection12(
            endpoint=self._endpoint, heartbeats=self._heartbeats
        )
        self._stomp_client.set_ssl(verify=True)
        self._stomp_client.set_listener(
            name="stomp_listener",
            lstnr=StompListener(
                stomp_client=self._stomp_client, callback=self._callback
            ),
        )

        self._stomp_client.connect(wait=True, passcode=token)

    def subscribe(self, destination, subscription_id, headers):
        self._stomp_client.subscribe(
            destination=destination, id=subscription_id, headers=headers
        )

    def disconnect(self):
        self._stomp_client.disconnect()


class StompListener(stomp.ConnectionListener):
    def __init__(self, stomp_client, callback):
        self._stomp_client = stomp_client
        self._callback = callback

    def on_error(self, headers, body):
        print(f"Received an error {body}")

    def on_message(self, headers, body):
        self._callback(headers, body)

    def on_disconnected(self):
        self._stomp_client.disconnect()
        print("Disconnected")


class IoticsRest:
    def __init__(self):
        self._high_level_api = get_rest_high_level_identity_api(
            resolver_url=RESOLVER_URL
        )

        (
            self._user_registered_id,
            self._agent_registered_id,
        ) = self._high_level_api.create_user_and_agent_with_auth_delegation(
            user_seed=USER_SEED,
            user_key_name=USER_KEY_NAME,
            agent_seed=AGENT_SEED,
            agent_key_name=AGENT_KEY_NAME,
            delegation_name="#AuthDeleg",
        )

        token = self._high_level_api.create_agent_auth_token(
            agent_registered_identity=self._agent_registered_id,
            user_did=self._user_registered_id.did,
            duration=600,
        )

        print(f"Token will expire at {datetime.now() + timedelta(seconds=600)}")

        self._headers = {
            "accept": "application/json",
            "Iotics-ClientAppId": "example_code",
            "Authorization": f"Bearer {token}",
            "Content-Type": "application/json",
        }

        self._stomp_client = StompClient(
            endpoint=STOMP_ENDPOINT, callback=self.receive_callback
        )
        self._stomp_client.setup(token=token)

    def _make_api_call(self, method: str, url: str, json: dict = None):
        response = request(method=method, url=url, headers=self._headers, json=json)
        response.raise_for_status()

        return response.json()

    def create_twin_identity(self, twin_key_name: str):
        twin_registered_id = self._high_level_api.create_twin_with_control_delegation(
            twin_seed=AGENT_SEED,
            twin_key_name=twin_key_name,
            agent_registered_identity=self._agent_registered_id,
            delegation_name="#ControlDeleg",
        )

        return twin_registered_id.did

    def get_host_id(self):
        host_id = self._make_api_call(method="GET", url=f"{HOST}/qapi/host/id")

        return host_id["hostId"]

    def upsert_twin(
        self,
        twin_did: str,
        host_id: str,
        feeds: List[dict] = None,
        inputs: List[dict] = None,
        location: dict = None,
        properties: List[dict] = None,
    ):
        payload = {"twinId": {"hostId": host_id, "id": twin_did}}

        if location:
            payload["location"] = location
        if feeds:
            payload["feeds"] = feeds
        if inputs:
            payload["inputs"] = inputs
        if properties:
            payload["properties"] = properties

        self._make_api_call(method="PUT", url=f"{HOST}/qapi/twins", json=payload)

    def wait_for_input_messages(
        self, twin_receiver_did: str, twin_receiver_host_id: str, input_id: str
    ):
        endpoint = f"/qapi/hosts/{twin_receiver_host_id}/twins/{twin_receiver_did}/inputs/{input_id}"

        self._stomp_client.subscribe(
            destination=endpoint,
            subscription_id=f"{twin_receiver_host_id}-{input_id}",
            headers=self._headers,
        )

    @staticmethod
    def receive_callback(headers, body):
        encoded_data = json.loads(body)

        try:
            time = encoded_data["message"]["occurredAt"]
            data = encoded_data["message"]["data"]
        except KeyError:
            print("NO DATA")
        else:
            decoded_feed_data = json.loads(base64.b64decode(data).decode("ascii"))
            print(f"LAST INPUT MESSAGE RECEIVED AT {time}: {decoded_feed_data}")


def main():
    iotics_rest = IoticsRest()
    twin_did = iotics_rest.create_twin_identity(twin_key_name="TwinReceiver")
    host_id = iotics_rest.get_host_id()
    iotics_rest.upsert_twin(
        twin_did=twin_did,
        host_id=host_id,
        properties=[
            # Label
            {
                "key": "http://www.w3.org/2000/01/rdf-schema#label",
                "langLiteralValue": {"value": "Twin Receiver", "lang": "en"},
            },
            # Comment
            {
                "key": "http://www.w3.org/2000/01/rdf-schema#comment",
                "langLiteralValue": {
                    "value": "An example of a Twin Receiver",
                    "lang": "en",
                },
            },
        ],
        inputs=[
            {
                "id": "on_off_switch",
                "properties": [
                    # Add a Label
                    {
                        "key": "http://www.w3.org/2000/01/rdf-schema#label",
                        "langLiteralValue": {"value": "On/Off Switch", "lang": "en"},
                    },
                    # Add a comment
                    {
                        "key": "http://www.w3.org/2000/01/rdf-schema#comment",
                        "langLiteralValue": {
                            "value": "Allows the light to turn ON/OFF",
                            "lang": "en",
                        },
                    },
                ],
                "values": [
                    {
                        "comment": "Switch of the sensor",
                        "dataType": "boolean",
                        "label": "light_on",
                    }
                ],
            }
        ],
    )

    iotics_rest.wait_for_input_messages(
        twin_receiver_did=twin_did,
        twin_receiver_host_id=host_id,
        input_id="on_off_switch",
    )
    while True:
        sleep(10)


if __name__ == "__main__":
    main()
import base64
import json
from datetime import datetime, timedelta, timezone
from typing import List

from iotics.lib.identity.api.high_level_api import get_rest_high_level_identity_api
from requests import request

RESOLVER_URL = "resolver_url"
HOST = "host_url"

USER_KEY_NAME = "user_key_name"
AGENT_KEY_NAME = "agent_key_name"
USER_SEED = bytes.fromhex("user_seed")
AGENT_SEED = bytes.fromhex("agent_seed")


class IoticsRest:
    def __init__(self):
        self._high_level_api = get_rest_high_level_identity_api(
            resolver_url=RESOLVER_URL
        )

        (
            self._user_registered_id,
            self._agent_registered_id,
        ) = self._high_level_api.create_user_and_agent_with_auth_delegation(
            user_seed=USER_SEED,
            user_key_name=USER_KEY_NAME,
            agent_seed=AGENT_SEED,
            agent_key_name=AGENT_KEY_NAME,
            delegation_name="#AuthDeleg",
        )

        token = self._high_level_api.create_agent_auth_token(
            agent_registered_identity=self._agent_registered_id,
            user_did=self._user_registered_id.did,
            duration=60,
        )

        self._headers = {
            "accept": "application/json",
            "Iotics-ClientAppId": "example_code",
            "Authorization": f"Bearer {token}",
            "Content-Type": "application/json",
        }

    def _make_api_call(self, method: str, url: str, json: dict = None):
        response = request(method=method, url=url, headers=self._headers, json=json)
        response.raise_for_status()

        return response.json()

    def create_twin_identity(self, twin_key_name: str):
        twin_registered_id = self._high_level_api.create_twin_with_control_delegation(
            twin_seed=AGENT_SEED,
            twin_key_name=twin_key_name,
            agent_registered_identity=self._agent_registered_id,
            delegation_name="#ControlDeleg",
        )

        return twin_registered_id.did

    def get_host_id(self):
        host_id = self._make_api_call(method="GET", url=f"{HOST}/qapi/host/id")

        return host_id["hostId"]

    def upsert_twin(
        self,
        twin_did: str,
        host_id: str,
        feeds: List[dict] = None,
        inputs: List[dict] = None,
        location: dict = None,
        properties: List[dict] = None,
    ):
        payload = {"twinId": {"hostId": host_id, "id": twin_did}}

        if location:
            payload["location"] = location
        if feeds:
            payload["feeds"] = feeds
        if inputs:
            payload["inputs"] = inputs
        if properties:
            payload["properties"] = properties

        self._make_api_call(method="PUT", url=f"{HOST}/qapi/twins", json=payload)

    def search_twins(
        self,
        text: str = None,
        location: dict = None,
        properties: List[dict] = None,
        scope: str = "LOCAL",
        response_type: str = "FULL",
    ):
        twins_list = []

        search_headers = self._headers.copy()
        # Search headers require a new header "Iotics-RequestTimeout".
        # The latter is used to stop the request once the timeout is reached
        search_headers.update(
            {
                "Iotics-RequestTimeout": (
                    datetime.now(tz=timezone.utc) + timedelta(seconds=5)
                ).isoformat(),
            }
        )

        payload = {"responseType": response_type, "filter": {}}

        if text:
            payload["filter"]["text"] = text
        if properties:
            payload["filter"]["properties"] = properties
        if location:
            payload["filter"]["location"] = location

        with request(
            method="POST",
            url=f"{HOST}/qapi/searches",
            headers=search_headers,
            stream=True,
            verify=True,
            params={"scope": scope},
            json=payload,
        ) as resp:
            resp.raise_for_status()
            # Iterates over the response data, one Host at a time
            for chunk in resp.iter_lines():
                response = json.loads(chunk)
                twins_found = []
                try:
                    twins_found = response["result"]["payload"]["twins"]
                except KeyError:
                    continue
                finally:
                    if twins_found:
                        # Append the twins found to the list of twins
                        twins_list.extend(twins_found)

        print(f"Found {len(twins_list)} twin(s)")

        return twins_list

    def send_input_message(
        self,
        twin_sender_did: str,
        twin_sender_host_id: str,
        twin_receiver_did: str,
        twin_receiver_host_id: str,
        input_id: str,
        message: str,
    ):
        encoded_data = base64.b64encode(json.dumps(message).encode()).decode()
        payload = {"message": {"data": encoded_data, "mime": "application/json"}}

        self._make_api_call(
            method="POST",
            url=f"{HOST}/qapi/hosts/{twin_sender_host_id}/twins/{twin_sender_did}/interests/hosts/{twin_receiver_host_id}/twins/{twin_receiver_did}/inputs/{input_id}/messages",
            json=payload,
        )


def main():
    iotics_rest = IoticsRest()
    twin_did = iotics_rest.create_twin_identity(twin_key_name="TwinSender")
    host_id = iotics_rest.get_host_id()
    iotics_rest.upsert_twin(
        twin_did=twin_did,
        host_id=host_id,
        properties=[
            # Label
            {
                "key": "http://www.w3.org/2000/01/rdf-schema#label",
                "langLiteralValue": {"value": "Twin Sender", "lang": "en"},
            },
            # Comment
            {
                "key": "http://www.w3.org/2000/01/rdf-schema#comment",
                "langLiteralValue": {
                    "value": "An example of a Twin Sender",
                    "lang": "en",
                },
            },
        ],
    )
    twins_list = iotics_rest.search_twins(
        properties=[
            {
                "key": "http://www.w3.org/2000/01/rdf-schema#label",
                "langLiteralValue": {"value": "Twin Receiver", "lang": "en"},
            }
        ]
    )

    # In this example it is assumed the Twin Receiver to use is the first one in the list
    twin_receiver = next(iter(twins_list))
    twin_receiver_did = twin_receiver["twinId"]["id"]
    twin_receiver_host_id = twin_receiver["twinId"]["hostId"]
    twin_receiver_input_id = twin_receiver["inputs"][0]["inputId"]["id"]
    iotics_rest.send_input_message(
        twin_sender_did=twin_did,
        twin_sender_host_id=host_id,
        twin_receiver_did=twin_receiver_did,
        twin_receiver_host_id=twin_receiver_host_id,
        input_id=twin_receiver_input_id,
        message={"light_on": False},
    )


if __name__ == "__main__":
    main()