Send & Receive Input Messages

Inputs allow a Twin to receive specific messages from any authorised Twin in the Network.

This page covers:


Introduction to Input Messages

By advertising an Input, an IOTICS Digital Twin allows authorised Twins to send direct messages to it. This way, Inputs can be used to send commands to change the state of the underlying asset or to send messages that can be interpreted by the Twin to implement a request/response way of interaction.

Feeds vs. Inputs

  • Inputs are similar to Feeds, they both possess properties and values to describe how the underlying data, which will be transferred, is expected to look.
  • Compared to a Feed, which allows a Twin to share data to any authorised Twin in the Network (one-to-many communications), an Input consents any authorised Twin to share data to a specific Twin (many-to-one communications).
  • Feeds are generally used to continuously import data into an IOTICSpace that can be potentially followed by any Twin in the Network (i.e.: temperature data, weather forecast update, etc.). Inputs are useful to send event-based messages to specific Twins so the latter can respond accordingly (i.e.: client/server communication) or modify the behaviour of the physical asset (i.e.: switch a light on/off).
  • Selective Data Sharing applies to both Feeds and Inputs by using the same AllowList property.
381381

Feeds vs. Inputs

📘

Need a refresher on Twin Feeds?

Our previous guide Share Data & Follow Feeds can help you!

Create an Input

A Twin can have none, one or many Inputs as well as a combination of Inputs and Feeds.

How to create an Input with the IOTICS API

Compared to a Feed, which can be created either through the related Create Feed operation or via the Upsert Twin, Inputs can be only added to a Twin through the use of the Upsert Twin operation.

from requests import request

response = request(
    method="PUT",
    url=f"{HOST}/qapi/twins",  # Replace with URL of the Host
    headers=headers,  # It includes the token
    json={
        "twinId": twin_id,  # Replace with the ID of the Twin to upsert
        "inputs": [
            {
                "values": [
                    {
                        "comment": "Temperature value receiver",
                        "dataType": "integer",
                        "label": "temperature",
                    }
                ],
                "id": "input_id",
            }
        ],
    },
)

response.raise_for_status()

Send Input Messages

In order for a Twin A (sender) to send an Input message to another Twin B (receiver), Twin A needs to be authorised by Twin B in terms of both:

  • Visibility: Twin B needs to be found from the IOTICSpace where Twin A lives;
  • Accessibility: Twin A's IOTICSpace needs to be authorised to share data with Twin B.

How to send Input Messages with the IOTICS API

1. Encode the data to be sent using Base64 and return a bytes object
2. Compose the payload dictionary with `data`, `mime` and `occurredAt` values
3. Use the Send Input message operation
import base64
import json
from datetime import datetime, timezone
from requests import request

message = {"temperature": 25}

encoded_data = base64.b64encode(json.dumps(message).encode()).decode()
payload = {
    "message": {
        "data": encoded_data,
        "mime": "application/json",
        "occurredAt": datetime.now(tz=timezone.utc).isoformat(timespec="seconds"),
    }
}

# If the Twin Receiver is in a remote Host
send_input_url = f"{HOST}/qapi/twins/{sender_twin_id}/interests/hosts/{host_id}/twins/{receiver_twin_id}/inputs/{input_id}/messages"

# If the Twin Receiver is in the local Host
if host_id == "local":
    send_input_url = f"{HOST}/qapi/twins/{sender_twin_id}/interests/twins/{receiver_twin_id}/inputs/{input_id}/messages"

response = request(
    method="POST",
    url=send_input_url
    headers=headers,  # It includes the token
    json=payload
)

response.raise_for_status()

Tutorial of Twin Input Receiver & Twin Input Sender

The following two tutorials show the creation of:

  • a Twin sender sending Input messages (light sensor readings) to a local Twin with an Input;
  • a Twin receiver with an Input that continuously waits for incoming messages via STOMP and prints every message it receives on screen.
Click to see the tutorial with REST and STOMP

Instructions:

  1. Download this library on your local machine;
  2. Install the above library in your Python venv: pip install iotic.web.stomp-1.0.6.tar.gz;
  3. Run the input_receiver.py in a terminal;
  4. Run the input_sender.py code below from another terminal and use the same IOTICSpace as the Twin receiver;
  5. Watch the messages being sent from the Twin sender terminal and received from the Twin receiver terminal.
import base64
import json
from datetime import datetime, timezone
from time import sleep
from typing import Callable, List, Tuple

import stomp
from iotic.web.stomp.client import StompWSConnection12
from iotics.lib.identity.api.high_level_api import get_rest_high_level_identity_api
from requests import request

RESOLVER_URL = "resolver_url"
HOST = "host_url"

USER_KEY_NAME = "user_key_name"
AGENT_KEY_NAME = "agent_key_name"
USER_SEED = bytes.fromhex("user_seed")
AGENT_SEED = bytes.fromhex("agent_seed")


class StompClient:
    def __init__(
        self,
        endpoint: str,
        callback: Callable,
        token: str,
        heartbeats: Tuple[int, int] = (10000, 10000),
    ):
        self._endpoint = endpoint
        self._token = token
        self._stomp_client = None
        self._heartbeats = heartbeats
        self._callback = callback

    def setup(self):
        self._stomp_client = StompWSConnection12(
            endpoint=self._endpoint, heartbeats=self._heartbeats
        )
        self._stomp_client.set_ssl(verify=False)
        self._stomp_client.set_listener(
            name="stomp_listener",
            lstnr=StompListener(
                stomp_client=self._stomp_client, callback=self._callback
            ),
        )

        self._stomp_client.connect(wait=True, passcode=self._token)

    def subscribe(self, destination, subscription_id, headers):
        self._stomp_client.subscribe(
            destination=destination, id=subscription_id, headers=headers
        )

    def disconnect(self):
        self._stomp_client.disconnect()


class StompListener(stomp.ConnectionListener):
    def __init__(self, stomp_client, callback):
        self._stomp_client = stomp_client
        self._callback = callback

    def on_error(self, headers, body):
        print(f"Received an error {body}")

    def on_message(self, headers, body):
        self._callback(headers, body)

    def on_disconnected(self):
        self._stomp_client.disconnect()
        print("Disconnected")


class InputReceiver:
    def __init__(self):
        self._api = None
        self._user_registered_id = None
        self._agent_registered_id = None
        self._headers = None
        self._stomp_client = None
        self._light_on = None

    def setup(self):
        self._api = get_rest_high_level_identity_api(resolver_url=RESOLVER_URL)
        (
            self._user_registered_id,
            self._agent_registered_id,
        ) = self._api.create_user_and_agent_with_auth_delegation(
            user_seed=USER_SEED,
            user_key_name=USER_KEY_NAME,
            agent_seed=AGENT_SEED,
            agent_key_name=AGENT_KEY_NAME,
            delegation_name="#AuthDeleg",
        )

        self._headers = {
            "accept": "application/json",
            "Iotics-ClientAppId": "input_receiver_stomp",
            "Content-Type": "application/json",
        }
        token = self._refresh_token()
        stomp_endpoint = request(method="GET", url=f"{HOST}/index.json").json()["stomp"]
        self._stomp_client = StompClient(
            endpoint=stomp_endpoint,
            callback=self._receive_callback,
            token=token,
        )
        self._stomp_client.setup()
        self._light_on = False

    def _refresh_token(self, duration: int = 600):
        print("Refreshing token")
        print("---")
        token = self._api.create_agent_auth_token(
            agent_registered_identity=self._agent_registered_id,
            user_did=self._user_registered_id.did,
            duration=duration,
        )

        self._headers["Authorization"] = f"Bearer {token}"

        return token

    def _get_new_did(self, twin_key_name: str):
        twin_registered_id = self._api.create_twin_with_control_delegation(
            twin_seed=AGENT_SEED,
            twin_key_name=twin_key_name,
            agent_registered_identity=self._agent_registered_id,
            delegation_name="#ControlDeleg",
        )

        return twin_registered_id.did

    def _make_api_call(
        self, method: str, url: str, json: dict = None, retry: bool = True
    ):
        try:
            response = request(method=method, url=url, headers=self._headers, json=json)
            response.raise_for_status()
        except Exception as ex:
            if retry:
                self._refresh_token()
                return self._make_api_call(method, url, json, retry=False)

            print("Retried once. Still getting error", ex)

        return response.json()

    def _create_new_twin(
        self,
        twin_key_name: str,
        properties: List[str] = None,
        feeds: List[str] = None,
        inputs: List[str] = None,
        visibility: str = "PRIVATE",
        location: dict = None,
    ):
        twin_did = self._get_new_did(twin_key_name)

        self._upsert_twin(
            twin_id=twin_did,
            properties=properties,
            feeds=feeds,
            inputs=inputs,
            visibility=visibility,
            location=location,
        )

        return twin_did

    def _describe_input(self, twin_id: str, input_id: str):
        input_description = self._make_api_call(
            method="GET",
            url=f"{HOST}/qapi/twins/{twin_id}/inputs/{input_id}",
        )

        return input_description

    def _describe_local_twin(self, twin_id: str):
        twin_description = self._make_api_call(
            method="GET", url=f"{HOST}/qapi/twins/{twin_id}"
        )

        return twin_description

    def _upsert_twin(
        self,
        twin_id: str,
        visibility: str = "PRIVATE",
        feeds: List[dict] = None,
        inputs: List[dict] = None,
        location: dict = None,
        properties: List[dict] = None,
    ):
        payload = {"twinId": twin_id, "visibility": visibility}

        if location:
            payload["location"] = location
        if feeds:
            payload["feeds"] = feeds
        if inputs:
            payload["inputs"] = inputs
        if properties:
            payload["properties"] = properties

        self._make_api_call(method="PUT", url=f"{HOST}/qapi/twins", json=payload)

        print(f"Twin {twin_id} created succesfully")
        print("---")

    def create_twin_model(self):
        print("Creating Twin Model")
        # Build a list of Properties for the Twin Model
        property_list = [
            # Twin Model "special" property
            {
                "key": "http://www.w3.org/1999/02/22-rdf-syntax-ns#type",
                "uriValue": {"value": "https://data.iotics.com/app#Model"},
            },
            # Add Label
            {
                "key": "http://www.w3.org/2000/01/rdf-schema#label",
                "langLiteralValue": {
                    "value": "Input Receiver Model",
                    "lang": "en",
                },
            },
            # Add Comment
            {
                "key": "http://www.w3.org/2000/01/rdf-schema#comment",
                "langLiteralValue": {
                    "value": "A Twin that receives input messages",
                    "lang": "en",
                },
            },
            # Add colour
            {
                "key": "https://data.iotics.com/app#color",
                "stringLiteralValue": {"value": "#9aceff"},
            },
            # Add Space name
            {
                "key": "https://data.iotics.com/app#spaceName",
                "stringLiteralValue": {"value": "my-space"},
            },
            # Add CreatedAt property
            {
                "key": "https://data.iotics.com/app#createdAt",
                "literalValue": {
                    "dataType": "dateTime",
                    "value": datetime.now(tz=timezone.utc).isoformat(),
                },
            },
            # Add UpdatedAt property
            {
                "key": "https://data.iotics.com/app#updatedAt",
                "literalValue": {
                    "dataType": "dateTime",
                    "value": datetime.now(tz=timezone.utc).isoformat(),
                },
            },
        ]

        input_id = "light"

        input_list = [
            {
                "id": input_id,
                "properties": [
                    {
                        "key": "http://www.w3.org/2000/01/rdf-schema#label",
                        "langLiteralValue": {"value": "Light Input", "lang": "en"},
                    }
                ],
                "values": [
                    {
                        "comment": "Whether to switch ON or OFF the light",
                        "dataType": "boolean",
                        "label": "light_on",
                    }
                ],
            }
        ]

        model_twin_did = self._create_new_twin(
            twin_key_name="InputReceiverModel",
            properties=property_list,
            inputs=input_list,
        )

        return model_twin_did, input_id

    def create_twin_from_model(self, twin_model_id: str):
        print("Creating Twin from Model")
        twin_model = self._describe_local_twin(twin_id=twin_model_id)

        # Build a list of properties for the generic Twin
        property_list = [
            # Add the generic twins' "special" properties
            {
                "key": "https://data.iotics.com/app#model",
                "uriValue": {"value": twin_model_id},
            },
            {
                "key": "https://data.iotics.com/app#createdFrom",
                "uriValue": {"value": "https://data.iotics.com/app#ByModel"},
            },
        ]

        # Add to the list all the Twin Model's properties but consider the exceptions
        for prop in twin_model["result"]["properties"]:
            # This was the "special" property of the Twin Model, we don't need it here
            if prop["key"] == "http://www.w3.org/1999/02/22-rdf-syntax-ns#type":
                continue
            # Replace the Label property of the Twin Model
            if prop["key"] == "http://www.w3.org/2000/01/rdf-schema#label":
                prop["langLiteralValue"]["value"] = "Light Bulb"
            # Change the Created At value
            if prop["key"] == "https://data.iotics.com/app#createdAt":
                prop["literalValue"]["value"] = datetime.now(
                    tz=timezone.utc
                ).isoformat()
            # Change the Updated At value
            if prop["key"] == "https://data.iotics.com/app#updatedAt":
                prop["literalValue"]["value"] = datetime.now(
                    tz=timezone.utc
                ).isoformat()

            property_list.append(prop)

        # Build a list of Inputs
        input_list = []

        # Scan the Twin Model's input list to get the Twin Model's Input IDs.
        # Then use the Input Describe API to get the Inputs' metadata
        for input in twin_model["result"]["inputs"]:
            input_id = input["inputId"]["value"]

            # Use the Feed Describe API
            input_description = self._describe_input(
                twin_id=twin_model_id, input_id=input_id
            )
            input_properties = input_description["result"]["properties"]
            input_values = input_description["result"]["values"]

            # Append to the list all the Feed's metadata
            input_list.append(
                {
                    "id": input_id,
                    "properties": input_properties,
                    "values": input_values,
                }
            )

        input_twin_id = self._create_new_twin(
            twin_key_name="BulbReceiver",
            properties=property_list,
            inputs=input_list,
            location={"lat": 51.5, "lon": -0.1},
        )

        return input_twin_id

    def _receive_callback(self, headers, body):
        payload = json.loads(body)

        received_data = payload["message"]["data"]
        time = payload["message"]["occurredAt"]
        input_id = payload["input"]["id"]["value"]

        decoded_data = json.loads(base64.b64decode(received_data).decode("ascii"))

        print(f"Received message {decoded_data} from input {input_id} at {time}")

        if decoded_data["light_on"] and not self._light_on:
            print("Switching light ON")
            self._light_on = True
        elif not (decoded_data["light_on"] and self._light_on):
            print("Switching light OFF")
            self._light_on = False

    def wait_for_input_messages(self, twin_id: str, input_id: str):
        input_receiver_url = f"/qapi/twins/{twin_id}/inputs/{input_id}"

        self._stomp_client.subscribe(
            destination=input_receiver_url,
            subscription_id=twin_id,
            headers=self._headers,
        )

        print("Waiting for input messages...")


def main():
    input_connector = InputReceiver()
    input_connector.setup()
    model_twin_did, input_id = input_connector.create_twin_model()
    input_twin_id = input_connector.create_twin_from_model(twin_model_id=model_twin_did)
    input_connector.wait_for_input_messages(twin_id=input_twin_id, input_id=input_id)

    while True:
        sleep(10)


if __name__ == "__main__":
    main()
import base64
import json
import random
from datetime import datetime, timedelta, timezone
from time import sleep
from typing import List

from iotics.lib.identity.api.high_level_api import get_rest_high_level_identity_api
from requests import request

RESOLVER_URL = "resolver_url"
HOST = "host_url"

USER_KEY_NAME = "user_key_name"
AGENT_KEY_NAME = "agent_key_name"
USER_SEED = bytes.fromhex("user_seed")
AGENT_SEED = bytes.fromhex("agent_seed")


class InputSender:
    def __init__(self):
        self._api = None
        self._user_registered_id = None
        self._agent_registered_id = None
        self._headers = None

    def setup(self):
        self._api = get_rest_high_level_identity_api(resolver_url=RESOLVER_URL)
        (
            self._user_registered_id,
            self._agent_registered_id,
        ) = self._api.create_user_and_agent_with_auth_delegation(
            user_seed=USER_SEED,
            user_key_name=USER_KEY_NAME,
            agent_seed=AGENT_SEED,
            agent_key_name=AGENT_KEY_NAME,
            delegation_name="#AuthDeleg",
        )

        self._headers = {
            "accept": "application/json",
            "Iotics-ClientAppId": "input_receiver_stomp",
            "Content-Type": "application/json",
        }
        self._refresh_token()

    def _refresh_token(self, duration: int = 600):
        print("Refreshing token")
        print("---")
        token = self._api.create_agent_auth_token(
            agent_registered_identity=self._agent_registered_id,
            user_did=self._user_registered_id.did,
            duration=duration,
        )

        self._headers["Authorization"] = f"Bearer {token}"

        return token

    def _get_new_did(self, twin_key_name: str):
        twin_registered_id = self._api.create_twin_with_control_delegation(
            twin_seed=AGENT_SEED,
            twin_key_name=twin_key_name,
            agent_registered_identity=self._agent_registered_id,
            delegation_name="#ControlDeleg",
        )

        return twin_registered_id.did

    def _make_api_call(
        self, method: str, url: str, json: dict = None, retry: bool = True
    ):
        try:
            response = request(method=method, url=url, headers=self._headers, json=json)
            response.raise_for_status()
        except Exception as ex:
            if retry:
                self._refresh_token()
                return self._make_api_call(method, url, json, retry=False)

            print("Retried once. Still getting error", ex)

        return response.json()

    def _create_new_twin(
        self,
        twin_key_name: str,
        properties: List[str] = None,
        feeds: List[str] = None,
        inputs: List[str] = None,
        visibility: str = "PRIVATE",
        location: dict = None,
    ):
        twin_did = self._get_new_did(twin_key_name)

        self._upsert_twin(
            twin_id=twin_did,
            properties=properties,
            feeds=feeds,
            inputs=inputs,
            visibility=visibility,
            location=location,
        )

        return twin_did

    def _describe_input(self, twin_id: str, input_id: str):
        input_description = self._make_api_call(
            method="GET",
            url=f"{HOST}/qapi/twins/{twin_id}/inputs/{input_id}",
        )

        return input_description

    def _describe_local_twin(self, twin_id: str):
        twin_description = self._make_api_call(
            method="GET", url=f"{HOST}/qapi/twins/{twin_id}"
        )

        return twin_description

    def _upsert_twin(
        self,
        twin_id: str,
        visibility: str = "PRIVATE",
        feeds: List[dict] = None,
        inputs: List[dict] = None,
        location: dict = None,
        properties: List[dict] = None,
    ):
        payload = {"twinId": twin_id, "visibility": visibility}

        if location:
            payload["location"] = location
        if feeds:
            payload["feeds"] = feeds
        if inputs:
            payload["inputs"] = inputs
        if properties:
            payload["properties"] = properties

        self._make_api_call(method="PUT", url=f"{HOST}/qapi/twins", json=payload)

        print(f"Twin {twin_id} created succesfully")
        print("---")

    def create_twin_model(self):
        print("Creating Twin Model")
        # Build a list of Properties for the Twin Model
        property_list = [
            # Twin Model "special" property
            {
                "key": "http://www.w3.org/1999/02/22-rdf-syntax-ns#type",
                "uriValue": {"value": "https://data.iotics.com/app#Model"},
            },
            # Add Label
            {
                "key": "http://www.w3.org/2000/01/rdf-schema#label",
                "langLiteralValue": {
                    "value": "Input Sender Model",
                    "lang": "en",
                },
            },
            # Add Comment
            {
                "key": "http://www.w3.org/2000/01/rdf-schema#comment",
                "langLiteralValue": {
                    "value": "A Twin that sends input messages",
                    "lang": "en",
                },
            },
            # Add colour
            {
                "key": "https://data.iotics.com/app#color",
                "stringLiteralValue": {"value": "#9aceff"},
            },
            # Add Space name
            {
                "key": "https://data.iotics.com/app#spaceName",
                "stringLiteralValue": {"value": "my-space"},
            },
            # Add CreatedAt property
            {
                "key": "https://data.iotics.com/app#createdAt",
                "literalValue": {
                    "dataType": "dateTime",
                    "value": datetime.now(tz=timezone.utc).isoformat(),
                },
            },
            # Add UpdatedAt property
            {
                "key": "https://data.iotics.com/app#updatedAt",
                "literalValue": {
                    "dataType": "dateTime",
                    "value": datetime.now(tz=timezone.utc).isoformat(),
                },
            },
            # Add AllowList property
            {
                "key": "http://data.iotics.com/public#hostAllowList",
                "uriValue": {"value": "http://data.iotics.com/public#allHosts"},
            },
        ]

        model_twin_did = self._create_new_twin(
            twin_key_name="InputSenderModel", properties=property_list
        )

        return model_twin_did

    def create_twin_from_model(self, twin_model_id: str):
        print("Creating Twin from Model")
        twin_model = self._describe_local_twin(twin_id=twin_model_id)

        # Build a list of properties for the generic Twin
        property_list = [
            # Add the generic twins' "special" properties
            {
                "key": "https://data.iotics.com/app#model",
                "uriValue": {"value": twin_model_id},
            },
            {
                "key": "https://data.iotics.com/app#createdFrom",
                "uriValue": {"value": "https://data.iotics.com/app#ByModel"},
            },
        ]

        # Add to the list all the Twin Model's properties but consider the exceptions
        for prop in twin_model["result"]["properties"]:
            # This was the "special" property of the Twin Model, we don't need it here
            if prop["key"] == "http://www.w3.org/1999/02/22-rdf-syntax-ns#type":
                continue
            # Replace the Label property of the Twin Model
            if prop["key"] == "http://www.w3.org/2000/01/rdf-schema#label":
                prop["langLiteralValue"]["value"] = "Input Sensor Sender"
            # Change the Created At value
            if prop["key"] == "https://data.iotics.com/app#createdAt":
                prop["literalValue"]["value"] = datetime.now(
                    tz=timezone.utc
                ).isoformat()
            # Change the Updated At value
            if prop["key"] == "https://data.iotics.com/app#updatedAt":
                prop["literalValue"]["value"] = datetime.now(
                    tz=timezone.utc
                ).isoformat()

            property_list.append(prop)

        input_twin_id = self._create_new_twin(
            twin_key_name="InputSender",
            properties=property_list,
            location={"lat": 51.5, "lon": -0.1},
        )

        return input_twin_id

    def search_for_twins(self, scope: str = "LOCAL"):
        print("Searching for Twins")
        # Initialise an empty list.
        # It will contain the list of Twins retrieved by the search
        twins_list = []

        # Add a new temporary field in the headers.
        # Client request timeout is used to stop the request processing once the timeout is reached
        headers = self._headers.copy()
        headers.update(
            {
                "Iotics-RequestTimeout": (
                    datetime.now(tz=timezone.utc) + timedelta(seconds=10)
                ).isoformat(),
            }
        )

        with request(
            method="POST",
            url=f"{HOST}/qapi/searches",
            headers=headers,
            stream=True,
            verify=True,
            params={"scope": scope},
            json={
                "responseType": "FULL",
                "filter": {
                    "properties": [
                        {
                            "key": "https://data.iotics.com/app#createdFrom",
                            "uriValue": {
                                "value": "https://data.iotics.com/app#ByModel"
                            },
                        },
                    ],
                    "text": "Light",
                },
            },
        ) as resp:
            # Raises HTTPError, if one occurred
            resp.raise_for_status()
            # Iterates over the response data, one line at a time
            for chunk in resp.iter_lines():
                response = json.loads(chunk)
                twins_found = []
                try:
                    twins_found = response["result"]["payload"]["twins"]
                except (KeyError, IndexError):
                    continue
                finally:
                    if twins_found:
                        # Append the twins found to the list of twins
                        twins_list.extend(twins_found)

        print(f"Found {len(twins_list)} twin(s)")

        return twins_list

    def get_input_detail(self, twins_list):
        for twin in twins_list:
            twin_id = twin["id"]["value"]
            inputs_list = twin["inputs"]

            if inputs_list:
                input_of_interest = inputs_list[0]["input"]["id"]["value"]
                break

        print(f"Found Twin {twin_id} with Input {input_of_interest}")

        return twin_id, input_of_interest

    def send_input_message(
        self, sender_twin_id: str, receiver_twin_id: str, input_id: str, message: str
    ):
        encoded_data = base64.b64encode(json.dumps(message).encode()).decode()
        payload = {
            "message": {
                "data": encoded_data,
                "mime": "application/json",
                "occurredAt": datetime.now(tz=timezone.utc).isoformat(
                    timespec="seconds"
                ),
            }
        }

        print(f"Sending message {message}")

        self._make_api_call(
            method="POST",
            url=f"{HOST}/qapi/twins/{sender_twin_id}/interests/twins/{receiver_twin_id}/inputs/{input_id}/messages",
            json=payload,
        )


def get_sensor_data():
    sensor_light_reading = random.randint(1, 10)

    message = {"light_on": False}

    if sensor_light_reading < 6:
        message = {"light_on": True}

    return message


def main():
    input_connector = InputSender()
    input_connector.setup()
    model_twin_did = input_connector.create_twin_model()
    input_twin_id = input_connector.create_twin_from_model(twin_model_id=model_twin_did)
    twins_list = input_connector.search_for_twins()
    receiver_twin_id, input_id = input_connector.get_input_detail(twins_list)

    while True:
        message = get_sensor_data()
        input_connector.send_input_message(
            sender_twin_id=input_twin_id,
            receiver_twin_id=receiver_twin_id,
            input_id=input_id,
            message=message,
        )
        sleep(10)
        print("---")


if __name__ == "__main__":
    main()

Did this page help you?