Send & Receive Input Messages
Inputs allow a Twin to receive specific messages from any authorised Twin in the Network.
This page covers:
- Introduction to Input Messages
- Feeds vs. Inputs
- Create an Input
- How to create an Input with the IOTICS API
- Send Input Messages
- How to send Input Messages with the IOTICS API
- Tutorial of Twin Input Receiver & Twin Input Sender
Introduction to Input Messages
By advertising an Input, an IOTICS Digital Twin allows authorised Twins to send direct messages to it. This way, Inputs can be used to send commands to change the state of the underlying asset or to send messages that can be interpreted by the Twin to implement a request/response way of interaction.
Feeds vs. Inputs
- Inputs are similar to Feeds, they both possess properties and values to describe how the underlying data, which will be transferred, is expected to look.
- Compared to a Feed, which allows a Twin to share data to any authorised Twin in the Network (one-to-many communications), an Input consents any authorised Twin to share data to a specific Twin (many-to-one communications).
- Feeds are generally used to continuously import data into an IOTICSpace that can be potentially followed by any Twin in the Network (i.e.: temperature data, weather forecast update, etc.). Inputs are useful to send event-based messages to specific Twins so the latter can respond accordingly (i.e.: client/server communication) or modify the behaviour of the physical asset (i.e.: switch a light on/off).
- Selective Data Sharing applies to both Feeds and Inputs by using the same AllowList property.


Feeds vs. Inputs
Need a refresher on Twin Feeds?
Our previous guide Share Data & Follow Feeds can help you!
Create an Input
A Twin can have none, one or many Inputs as well as a combination of Inputs and Feeds.
How to create an Input with the IOTICS API
Compared to a Feed, which can be created either through the related Create Feed operation or via the Upsert Twin, Inputs can be only added to a Twin through the use of the Upsert Twin operation.
from requests import request
response = request(
method="PUT",
url=f"{HOST}/qapi/twins", # Replace with URL of the Host
headers=headers, # It includes the token
json={
"twinId": twin_id, # Replace with the ID of the Twin to upsert
"inputs": [
{
"values": [
{
"comment": "Temperature value receiver",
"dataType": "integer",
"label": "temperature",
}
],
"id": "input_id",
}
],
},
)
response.raise_for_status()
Send Input Messages
In order for a Twin A (sender) to send an Input message to another Twin B (receiver), Twin A needs to be authorised by Twin B in terms of both:
- Visibility: Twin B needs to be found from the IOTICSpace where Twin A lives;
- Accessibility: Twin A's IOTICSpace needs to be authorised to share data with Twin B.
How to send Input Messages with the IOTICS API
1. Encode the data to be sent using Base64 and return a bytes object
2. Compose the payload dictionary with `data`, `mime` and `occurredAt` values
3. Use the Send Input message operation
import base64
import json
from datetime import datetime, timezone
from requests import request
message = {"temperature": 25}
encoded_data = base64.b64encode(json.dumps(message).encode()).decode()
payload = {
"message": {
"data": encoded_data,
"mime": "application/json",
"occurredAt": datetime.now(tz=timezone.utc).isoformat(timespec="seconds"),
}
}
# If the Twin Receiver is in a remote Host
send_input_url = f"{HOST}/qapi/twins/{sender_twin_id}/interests/hosts/{host_id}/twins/{receiver_twin_id}/inputs/{input_id}/messages"
# If the Twin Receiver is in the local Host
if host_id == "local":
send_input_url = f"{HOST}/qapi/twins/{sender_twin_id}/interests/twins/{receiver_twin_id}/inputs/{input_id}/messages"
response = request(
method="POST",
url=send_input_url
headers=headers, # It includes the token
json=payload
)
response.raise_for_status()
Tutorial of Twin Input Receiver & Twin Input Sender
The following two tutorials show the creation of:
- a Twin sender sending Input messages (light sensor readings) to a local Twin with an Input;
- a Twin receiver with an Input that continuously waits for incoming messages via STOMP and prints every message it receives on screen.
Click to see the tutorial with REST and STOMP
Instructions:
- Download this library on your local machine;
- Install the above library in your Python venv:
pip install iotic.web.stomp-1.0.6.tar.gz
; - Run the
input_receiver.py
in a terminal; - Run the
input_sender.py
code below from another terminal and use the same IOTICSpace as the Twin receiver; - Watch the messages being sent from the Twin sender terminal and received from the Twin receiver terminal.
import base64
import json
from datetime import datetime, timezone
from time import sleep
from typing import Callable, List, Tuple
import stomp
from iotic.web.stomp.client import StompWSConnection12
from iotics.lib.identity.api.high_level_api import get_rest_high_level_identity_api
from requests import request
RESOLVER_URL = "resolver_url"
HOST = "host_url"
USER_KEY_NAME = "user_key_name"
AGENT_KEY_NAME = "agent_key_name"
USER_SEED = bytes.fromhex("user_seed")
AGENT_SEED = bytes.fromhex("agent_seed")
class StompClient:
def __init__(
self,
endpoint: str,
callback: Callable,
token: str,
heartbeats: Tuple[int, int] = (10000, 10000),
):
self._endpoint = endpoint
self._token = token
self._stomp_client = None
self._heartbeats = heartbeats
self._callback = callback
def setup(self):
self._stomp_client = StompWSConnection12(
endpoint=self._endpoint, heartbeats=self._heartbeats
)
self._stomp_client.set_ssl(verify=False)
self._stomp_client.set_listener(
name="stomp_listener",
lstnr=StompListener(
stomp_client=self._stomp_client, callback=self._callback
),
)
self._stomp_client.connect(wait=True, passcode=self._token)
def subscribe(self, destination, subscription_id, headers):
self._stomp_client.subscribe(
destination=destination, id=subscription_id, headers=headers
)
def disconnect(self):
self._stomp_client.disconnect()
class StompListener(stomp.ConnectionListener):
def __init__(self, stomp_client, callback):
self._stomp_client = stomp_client
self._callback = callback
def on_error(self, headers, body):
print(f"Received an error {body}")
def on_message(self, headers, body):
self._callback(headers, body)
def on_disconnected(self):
self._stomp_client.disconnect()
print("Disconnected")
class InputReceiver:
def __init__(self):
self._api = None
self._user_registered_id = None
self._agent_registered_id = None
self._headers = None
self._stomp_client = None
self._light_on = None
def setup(self):
self._api = get_rest_high_level_identity_api(resolver_url=RESOLVER_URL)
(
self._user_registered_id,
self._agent_registered_id,
) = self._api.create_user_and_agent_with_auth_delegation(
user_seed=USER_SEED,
user_key_name=USER_KEY_NAME,
agent_seed=AGENT_SEED,
agent_key_name=AGENT_KEY_NAME,
delegation_name="#AuthDeleg",
)
self._headers = {
"accept": "application/json",
"Iotics-ClientAppId": "input_receiver_stomp",
"Content-Type": "application/json",
}
token = self._refresh_token()
stomp_endpoint = request(method="GET", url=f"{HOST}/index.json").json()["stomp"]
self._stomp_client = StompClient(
endpoint=stomp_endpoint,
callback=self._receive_callback,
token=token,
)
self._stomp_client.setup()
self._light_on = False
def _refresh_token(self, duration: int = 600):
print("Refreshing token")
print("---")
token = self._api.create_agent_auth_token(
agent_registered_identity=self._agent_registered_id,
user_did=self._user_registered_id.did,
duration=duration,
)
self._headers["Authorization"] = f"Bearer {token}"
return token
def _get_new_did(self, twin_key_name: str):
twin_registered_id = self._api.create_twin_with_control_delegation(
twin_seed=AGENT_SEED,
twin_key_name=twin_key_name,
agent_registered_identity=self._agent_registered_id,
delegation_name="#ControlDeleg",
)
return twin_registered_id.did
def _make_api_call(
self, method: str, url: str, json: dict = None, retry: bool = True
):
try:
response = request(method=method, url=url, headers=self._headers, json=json)
response.raise_for_status()
except Exception as ex:
if retry:
self._refresh_token()
return self._make_api_call(method, url, json, retry=False)
print("Retried once. Still getting error", ex)
return response.json()
def _create_new_twin(
self,
twin_key_name: str,
properties: List[str] = None,
feeds: List[str] = None,
inputs: List[str] = None,
visibility: str = "PRIVATE",
location: dict = None,
):
twin_did = self._get_new_did(twin_key_name)
self._upsert_twin(
twin_id=twin_did,
properties=properties,
feeds=feeds,
inputs=inputs,
visibility=visibility,
location=location,
)
return twin_did
def _describe_input(self, twin_id: str, input_id: str):
input_description = self._make_api_call(
method="GET",
url=f"{HOST}/qapi/twins/{twin_id}/inputs/{input_id}",
)
return input_description
def _describe_local_twin(self, twin_id: str):
twin_description = self._make_api_call(
method="GET", url=f"{HOST}/qapi/twins/{twin_id}"
)
return twin_description
def _upsert_twin(
self,
twin_id: str,
visibility: str = "PRIVATE",
feeds: List[dict] = None,
inputs: List[dict] = None,
location: dict = None,
properties: List[dict] = None,
):
payload = {"twinId": twin_id, "visibility": visibility}
if location:
payload["location"] = location
if feeds:
payload["feeds"] = feeds
if inputs:
payload["inputs"] = inputs
if properties:
payload["properties"] = properties
self._make_api_call(method="PUT", url=f"{HOST}/qapi/twins", json=payload)
print(f"Twin {twin_id} created succesfully")
print("---")
def create_twin_model(self):
print("Creating Twin Model")
# Build a list of Properties for the Twin Model
property_list = [
# Twin Model "special" property
{
"key": "http://www.w3.org/1999/02/22-rdf-syntax-ns#type",
"uriValue": {"value": "https://data.iotics.com/app#Model"},
},
# Add Label
{
"key": "http://www.w3.org/2000/01/rdf-schema#label",
"langLiteralValue": {
"value": "Input Receiver Model",
"lang": "en",
},
},
# Add Comment
{
"key": "http://www.w3.org/2000/01/rdf-schema#comment",
"langLiteralValue": {
"value": "A Twin that receives input messages",
"lang": "en",
},
},
# Add colour
{
"key": "https://data.iotics.com/app#color",
"stringLiteralValue": {"value": "#9aceff"},
},
# Add Space name
{
"key": "https://data.iotics.com/app#spaceName",
"stringLiteralValue": {"value": "my-space"},
},
# Add CreatedAt property
{
"key": "https://data.iotics.com/app#createdAt",
"literalValue": {
"dataType": "dateTime",
"value": datetime.now(tz=timezone.utc).isoformat(),
},
},
# Add UpdatedAt property
{
"key": "https://data.iotics.com/app#updatedAt",
"literalValue": {
"dataType": "dateTime",
"value": datetime.now(tz=timezone.utc).isoformat(),
},
},
]
input_id = "light"
input_list = [
{
"id": input_id,
"properties": [
{
"key": "http://www.w3.org/2000/01/rdf-schema#label",
"langLiteralValue": {"value": "Light Input", "lang": "en"},
}
],
"values": [
{
"comment": "Whether to switch ON or OFF the light",
"dataType": "boolean",
"label": "light_on",
}
],
}
]
model_twin_did = self._create_new_twin(
twin_key_name="InputReceiverModel",
properties=property_list,
inputs=input_list,
)
return model_twin_did, input_id
def create_twin_from_model(self, twin_model_id: str):
print("Creating Twin from Model")
twin_model = self._describe_local_twin(twin_id=twin_model_id)
# Build a list of properties for the generic Twin
property_list = [
# Add the generic twins' "special" properties
{
"key": "https://data.iotics.com/app#model",
"uriValue": {"value": twin_model_id},
},
{
"key": "https://data.iotics.com/app#createdFrom",
"uriValue": {"value": "https://data.iotics.com/app#ByModel"},
},
]
# Add to the list all the Twin Model's properties but consider the exceptions
for prop in twin_model["result"]["properties"]:
# This was the "special" property of the Twin Model, we don't need it here
if prop["key"] == "http://www.w3.org/1999/02/22-rdf-syntax-ns#type":
continue
# Replace the Label property of the Twin Model
if prop["key"] == "http://www.w3.org/2000/01/rdf-schema#label":
prop["langLiteralValue"]["value"] = "Light Bulb"
# Change the Created At value
if prop["key"] == "https://data.iotics.com/app#createdAt":
prop["literalValue"]["value"] = datetime.now(
tz=timezone.utc
).isoformat()
# Change the Updated At value
if prop["key"] == "https://data.iotics.com/app#updatedAt":
prop["literalValue"]["value"] = datetime.now(
tz=timezone.utc
).isoformat()
property_list.append(prop)
# Build a list of Inputs
input_list = []
# Scan the Twin Model's input list to get the Twin Model's Input IDs.
# Then use the Input Describe API to get the Inputs' metadata
for input in twin_model["result"]["inputs"]:
input_id = input["inputId"]["value"]
# Use the Feed Describe API
input_description = self._describe_input(
twin_id=twin_model_id, input_id=input_id
)
input_properties = input_description["result"]["properties"]
input_values = input_description["result"]["values"]
# Append to the list all the Feed's metadata
input_list.append(
{
"id": input_id,
"properties": input_properties,
"values": input_values,
}
)
input_twin_id = self._create_new_twin(
twin_key_name="BulbReceiver",
properties=property_list,
inputs=input_list,
location={"lat": 51.5, "lon": -0.1},
)
return input_twin_id
def _receive_callback(self, headers, body):
payload = json.loads(body)
received_data = payload["message"]["data"]
time = payload["message"]["occurredAt"]
input_id = payload["input"]["id"]["value"]
decoded_data = json.loads(base64.b64decode(received_data).decode("ascii"))
print(f"Received message {decoded_data} from input {input_id} at {time}")
if decoded_data["light_on"] and not self._light_on:
print("Switching light ON")
self._light_on = True
elif not (decoded_data["light_on"] and self._light_on):
print("Switching light OFF")
self._light_on = False
def wait_for_input_messages(self, twin_id: str, input_id: str):
input_receiver_url = f"/qapi/twins/{twin_id}/inputs/{input_id}"
self._stomp_client.subscribe(
destination=input_receiver_url,
subscription_id=twin_id,
headers=self._headers,
)
print("Waiting for input messages...")
def main():
input_connector = InputReceiver()
input_connector.setup()
model_twin_did, input_id = input_connector.create_twin_model()
input_twin_id = input_connector.create_twin_from_model(twin_model_id=model_twin_did)
input_connector.wait_for_input_messages(twin_id=input_twin_id, input_id=input_id)
while True:
sleep(10)
if __name__ == "__main__":
main()
import base64
import json
import random
from datetime import datetime, timedelta, timezone
from time import sleep
from typing import List
from iotics.lib.identity.api.high_level_api import get_rest_high_level_identity_api
from requests import request
RESOLVER_URL = "resolver_url"
HOST = "host_url"
USER_KEY_NAME = "user_key_name"
AGENT_KEY_NAME = "agent_key_name"
USER_SEED = bytes.fromhex("user_seed")
AGENT_SEED = bytes.fromhex("agent_seed")
class InputSender:
def __init__(self):
self._api = None
self._user_registered_id = None
self._agent_registered_id = None
self._headers = None
def setup(self):
self._api = get_rest_high_level_identity_api(resolver_url=RESOLVER_URL)
(
self._user_registered_id,
self._agent_registered_id,
) = self._api.create_user_and_agent_with_auth_delegation(
user_seed=USER_SEED,
user_key_name=USER_KEY_NAME,
agent_seed=AGENT_SEED,
agent_key_name=AGENT_KEY_NAME,
delegation_name="#AuthDeleg",
)
self._headers = {
"accept": "application/json",
"Iotics-ClientAppId": "input_receiver_stomp",
"Content-Type": "application/json",
}
self._refresh_token()
def _refresh_token(self, duration: int = 600):
print("Refreshing token")
print("---")
token = self._api.create_agent_auth_token(
agent_registered_identity=self._agent_registered_id,
user_did=self._user_registered_id.did,
duration=duration,
)
self._headers["Authorization"] = f"Bearer {token}"
return token
def _get_new_did(self, twin_key_name: str):
twin_registered_id = self._api.create_twin_with_control_delegation(
twin_seed=AGENT_SEED,
twin_key_name=twin_key_name,
agent_registered_identity=self._agent_registered_id,
delegation_name="#ControlDeleg",
)
return twin_registered_id.did
def _make_api_call(
self, method: str, url: str, json: dict = None, retry: bool = True
):
try:
response = request(method=method, url=url, headers=self._headers, json=json)
response.raise_for_status()
except Exception as ex:
if retry:
self._refresh_token()
return self._make_api_call(method, url, json, retry=False)
print("Retried once. Still getting error", ex)
return response.json()
def _create_new_twin(
self,
twin_key_name: str,
properties: List[str] = None,
feeds: List[str] = None,
inputs: List[str] = None,
visibility: str = "PRIVATE",
location: dict = None,
):
twin_did = self._get_new_did(twin_key_name)
self._upsert_twin(
twin_id=twin_did,
properties=properties,
feeds=feeds,
inputs=inputs,
visibility=visibility,
location=location,
)
return twin_did
def _describe_input(self, twin_id: str, input_id: str):
input_description = self._make_api_call(
method="GET",
url=f"{HOST}/qapi/twins/{twin_id}/inputs/{input_id}",
)
return input_description
def _describe_local_twin(self, twin_id: str):
twin_description = self._make_api_call(
method="GET", url=f"{HOST}/qapi/twins/{twin_id}"
)
return twin_description
def _upsert_twin(
self,
twin_id: str,
visibility: str = "PRIVATE",
feeds: List[dict] = None,
inputs: List[dict] = None,
location: dict = None,
properties: List[dict] = None,
):
payload = {"twinId": twin_id, "visibility": visibility}
if location:
payload["location"] = location
if feeds:
payload["feeds"] = feeds
if inputs:
payload["inputs"] = inputs
if properties:
payload["properties"] = properties
self._make_api_call(method="PUT", url=f"{HOST}/qapi/twins", json=payload)
print(f"Twin {twin_id} created succesfully")
print("---")
def create_twin_model(self):
print("Creating Twin Model")
# Build a list of Properties for the Twin Model
property_list = [
# Twin Model "special" property
{
"key": "http://www.w3.org/1999/02/22-rdf-syntax-ns#type",
"uriValue": {"value": "https://data.iotics.com/app#Model"},
},
# Add Label
{
"key": "http://www.w3.org/2000/01/rdf-schema#label",
"langLiteralValue": {
"value": "Input Sender Model",
"lang": "en",
},
},
# Add Comment
{
"key": "http://www.w3.org/2000/01/rdf-schema#comment",
"langLiteralValue": {
"value": "A Twin that sends input messages",
"lang": "en",
},
},
# Add colour
{
"key": "https://data.iotics.com/app#color",
"stringLiteralValue": {"value": "#9aceff"},
},
# Add Space name
{
"key": "https://data.iotics.com/app#spaceName",
"stringLiteralValue": {"value": "my-space"},
},
# Add CreatedAt property
{
"key": "https://data.iotics.com/app#createdAt",
"literalValue": {
"dataType": "dateTime",
"value": datetime.now(tz=timezone.utc).isoformat(),
},
},
# Add UpdatedAt property
{
"key": "https://data.iotics.com/app#updatedAt",
"literalValue": {
"dataType": "dateTime",
"value": datetime.now(tz=timezone.utc).isoformat(),
},
},
# Add AllowList property
{
"key": "http://data.iotics.com/public#hostAllowList",
"uriValue": {"value": "http://data.iotics.com/public#allHosts"},
},
]
model_twin_did = self._create_new_twin(
twin_key_name="InputSenderModel", properties=property_list
)
return model_twin_did
def create_twin_from_model(self, twin_model_id: str):
print("Creating Twin from Model")
twin_model = self._describe_local_twin(twin_id=twin_model_id)
# Build a list of properties for the generic Twin
property_list = [
# Add the generic twins' "special" properties
{
"key": "https://data.iotics.com/app#model",
"uriValue": {"value": twin_model_id},
},
{
"key": "https://data.iotics.com/app#createdFrom",
"uriValue": {"value": "https://data.iotics.com/app#ByModel"},
},
]
# Add to the list all the Twin Model's properties but consider the exceptions
for prop in twin_model["result"]["properties"]:
# This was the "special" property of the Twin Model, we don't need it here
if prop["key"] == "http://www.w3.org/1999/02/22-rdf-syntax-ns#type":
continue
# Replace the Label property of the Twin Model
if prop["key"] == "http://www.w3.org/2000/01/rdf-schema#label":
prop["langLiteralValue"]["value"] = "Input Sensor Sender"
# Change the Created At value
if prop["key"] == "https://data.iotics.com/app#createdAt":
prop["literalValue"]["value"] = datetime.now(
tz=timezone.utc
).isoformat()
# Change the Updated At value
if prop["key"] == "https://data.iotics.com/app#updatedAt":
prop["literalValue"]["value"] = datetime.now(
tz=timezone.utc
).isoformat()
property_list.append(prop)
input_twin_id = self._create_new_twin(
twin_key_name="InputSender",
properties=property_list,
location={"lat": 51.5, "lon": -0.1},
)
return input_twin_id
def search_for_twins(self, scope: str = "LOCAL"):
print("Searching for Twins")
# Initialise an empty list.
# It will contain the list of Twins retrieved by the search
twins_list = []
# Add a new temporary field in the headers.
# Client request timeout is used to stop the request processing once the timeout is reached
headers = self._headers.copy()
headers.update(
{
"Iotics-RequestTimeout": (
datetime.now(tz=timezone.utc) + timedelta(seconds=10)
).isoformat(),
}
)
with request(
method="POST",
url=f"{HOST}/qapi/searches",
headers=headers,
stream=True,
verify=True,
params={"scope": scope},
json={
"responseType": "FULL",
"filter": {
"properties": [
{
"key": "https://data.iotics.com/app#createdFrom",
"uriValue": {
"value": "https://data.iotics.com/app#ByModel"
},
},
],
"text": "Light",
},
},
) as resp:
# Raises HTTPError, if one occurred
resp.raise_for_status()
# Iterates over the response data, one line at a time
for chunk in resp.iter_lines():
response = json.loads(chunk)
twins_found = []
try:
twins_found = response["result"]["payload"]["twins"]
except (KeyError, IndexError):
continue
finally:
if twins_found:
# Append the twins found to the list of twins
twins_list.extend(twins_found)
print(f"Found {len(twins_list)} twin(s)")
return twins_list
def get_input_detail(self, twins_list):
for twin in twins_list:
twin_id = twin["id"]["value"]
inputs_list = twin["inputs"]
if inputs_list:
input_of_interest = inputs_list[0]["input"]["id"]["value"]
break
print(f"Found Twin {twin_id} with Input {input_of_interest}")
return twin_id, input_of_interest
def send_input_message(
self, sender_twin_id: str, receiver_twin_id: str, input_id: str, message: str
):
encoded_data = base64.b64encode(json.dumps(message).encode()).decode()
payload = {
"message": {
"data": encoded_data,
"mime": "application/json",
"occurredAt": datetime.now(tz=timezone.utc).isoformat(
timespec="seconds"
),
}
}
print(f"Sending message {message}")
self._make_api_call(
method="POST",
url=f"{HOST}/qapi/twins/{sender_twin_id}/interests/twins/{receiver_twin_id}/inputs/{input_id}/messages",
json=payload,
)
def get_sensor_data():
sensor_light_reading = random.randint(1, 10)
message = {"light_on": False}
if sensor_light_reading < 6:
message = {"light_on": True}
return message
def main():
input_connector = InputSender()
input_connector.setup()
model_twin_did = input_connector.create_twin_model()
input_twin_id = input_connector.create_twin_from_model(twin_model_id=model_twin_did)
twins_list = input_connector.search_for_twins()
receiver_twin_id, input_id = input_connector.get_input_detail(twins_list)
while True:
message = get_sensor_data()
input_connector.send_input_message(
sender_twin_id=input_twin_id,
receiver_twin_id=receiver_twin_id,
input_id=input_id,
message=message,
)
sleep(10)
print("---")
if __name__ == "__main__":
main()
Updated 15 days ago