Send & Receive Input Messages
Inputs allow a Twin to receive specific messages from any authorised Twin in the Network.
This page covers:
- Introduction to Input Messages
- Feeds vs. Inputs
- Create an Input
- How to create an Input with the IOTICS API
- Send Input Messages
- How to send Input Messages with the IOTICS API
- Tutorial of Twin Input Receiver & Twin Input Sender
Introduction to Input Messages
By advertising an Input, an IOTICS Digital Twin allows authorised Twins to send direct messages to it. This way, Inputs can be used to send commands to change the state of the underlying asset or to send messages that can be interpreted by the Twin to implement a request/response way of interaction.
Feeds vs. Inputs
- Inputs are similar to Feeds, they both possess properties and values to describe how the underlying data, which will be transferred, is expected to look.
- Compared to a Feed, which allows a Twin to share data to any authorised Twin in the Network (one-to-many communications), an Input consents any authorised Twin to share data to a specific Twin (many-to-one communications).
- Feeds are generally used to continuously import data into an IOTICSpace that can be potentially followed by any Twin in the Network (i.e.: temperature data, weather forecast update, etc.). Inputs are useful to send event-based messages to specific Twins so the latter can respond accordingly (i.e.: client/server communication) or modify the behaviour of the physical asset (i.e.: switch a light on/off).
- Selective Data Sharing applies to both Feeds and Inputs by using the same AllowList property.
Need a refresher on Twin Feeds?
Our previous guide Share Data & Follow Feeds can help you!
Create an Input
A Twin can have none, one or many Inputs as well as a combination of Inputs and Feeds.
How to create an Input with the IOTICS API
An Input can be created through the use of the Create Input operation. Its structure is composed of the same components as Feeds:
- Basic Structure: it includes Input ID only;
- Metadata: made up of a list of Properties;
- Values: the actual payload included in the Input message which is divided into Label, Comment, Datatype and Unit.
Be aware that, like the Create Feed operation, Create Input allows to create only the Basic Structure of the Input. In order to add the remaining components (Metadata and Values) the Update Input operation must be used.
Send Input Messages
In order for a Twin A (sender) to send an Input message to another Twin B (receiver), Twin A needs to be authorised by Twin B in terms of both:
- Visibility: Twin B needs to be found from the IOTICSpace where Twin A lives;
- Accessibility: Twin A's IOTICSpace needs to be authorised to share data with Twin B.
How to send Input Messages with the IOTICS API
1. Encode the data to be sent using Base64 and return a bytes object
2. Compose the payload dictionary with `data`, `mime` and `occurredAt` values
3. Use the Send Input message operation
import base64
import json
from datetime import datetime, timezone
from requests import request
message = {"light_on": False}
encoded_data = base64.b64encode(json.dumps(message).encode()).decode()
payload = {
"message": {
"data": encoded_data,
"mime": "application/json"
}
}
# If the Twin Receiver is in a remote Host
send_input_url = f"{HOST}/qapi/hosts/{twin_sender_host_id}/twins/{twin_sender_did}/interests/hosts/{twin_receiver_host_id}/twins/{twin_receiver_did}/inputs/{input_id}/messages"
response = request(
method="POST",
url=send_input_url
headers=headers,
json=payload
)
response.raise_for_status()
Tutorial of Twin Input Receiver & Twin Input Sender
The following two tutorials show the creation of:
- a Twin sender sending Input messages (light sensor readings) to a local Twin with an Input;
- a Twin receiver with an Input that continuously waits for incoming messages via STOMP and prints every message it receives on screen.
Click to see the tutorial with REST and STOMP
Instructions:
- Download this library on your local machine;
- Install the above library in your Python venv:
pip install iotic.web.stomp-1.0.6.tar.gz
; - Run the
input_receiver.py
in a terminal; - Run the
input_sender.py
code below from another terminal and use the same IOTICSpace as the Twin receiver; - Watch the messages being sent from the Twin sender terminal and received from the Twin receiver terminal.
import base64
import json
from datetime import datetime, timezone
from time import sleep
from typing import Callable, List, Tuple
import stomp
from iotic.web.stomp.client import StompWSConnection12
from iotics.lib.identity.api.high_level_api import get_rest_high_level_identity_api
from requests import request
RESOLVER_URL = "resolver_url"
HOST = "host_url"
STOMP_ENDPOINT = "stomp_endpoint"
USER_KEY_NAME = "user_key_name"
AGENT_KEY_NAME = "agent_key_name"
USER_SEED = bytes.fromhex("user_seed")
AGENT_SEED = bytes.fromhex("agent_seed")
class StompClient:
def __init__(
self,
endpoint: str,
callback: Callable,
heartbeats: Tuple[int, int] = (10000, 10000),
):
self._endpoint = endpoint
self._stomp_client = None
self._heartbeats = heartbeats
self._callback = callback
def setup(self, token: str):
self._stomp_client = StompWSConnection12(
endpoint=self._endpoint, heartbeats=self._heartbeats
)
self._stomp_client.set_ssl(verify=True)
self._stomp_client.set_listener(
name="stomp_listener",
lstnr=StompListener(
stomp_client=self._stomp_client, callback=self._callback
),
)
self._stomp_client.connect(wait=True, passcode=token)
def subscribe(self, destination, subscription_id, headers):
self._stomp_client.subscribe(
destination=destination, id=subscription_id, headers=headers
)
def disconnect(self):
self._stomp_client.disconnect()
class StompListener(stomp.ConnectionListener):
def __init__(self, stomp_client, callback):
self._stomp_client = stomp_client
self._callback = callback
def on_error(self, headers, body):
print(f"Received an error {body}")
def on_message(self, headers, body):
self._callback(headers, body)
def on_disconnected(self):
self._stomp_client.disconnect()
print("Disconnected")
class IoticsRest:
def __init__(self):
self._high_level_api = get_rest_high_level_identity_api(
resolver_url=RESOLVER_URL
)
(
self._user_registered_id,
self._agent_registered_id,
) = self._high_level_api.create_user_and_agent_with_auth_delegation(
user_seed=USER_SEED,
user_key_name=USER_KEY_NAME,
agent_seed=AGENT_SEED,
agent_key_name=AGENT_KEY_NAME,
delegation_name="#AuthDeleg",
)
token = self._high_level_api.create_agent_auth_token(
agent_registered_identity=self._agent_registered_id,
user_did=self._user_registered_id.did,
duration=600,
)
print(f"Token will expire at {datetime.now() + timedelta(seconds=600)}")
self._headers = {
"accept": "application/json",
"Iotics-ClientAppId": "example_code",
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
self._stomp_client = StompClient(
endpoint=STOMP_ENDPOINT, callback=self.receive_callback
)
self._stomp_client.setup(token=token)
def _make_api_call(self, method: str, url: str, json: dict = None):
response = request(method=method, url=url, headers=self._headers, json=json)
response.raise_for_status()
return response.json()
def create_twin_identity(self, twin_key_name: str):
twin_registered_id = self._high_level_api.create_twin_with_control_delegation(
twin_seed=AGENT_SEED,
twin_key_name=twin_key_name,
agent_registered_identity=self._agent_registered_id,
delegation_name="#ControlDeleg",
)
return twin_registered_id.did
def get_host_id(self):
host_id = self._make_api_call(method="GET", url=f"{HOST}/qapi/host/id")
return host_id["hostId"]
def upsert_twin(
self,
twin_did: str,
host_id: str,
feeds: List[dict] = None,
inputs: List[dict] = None,
location: dict = None,
properties: List[dict] = None,
):
payload = {"twinId": {"hostId": host_id, "id": twin_did}}
if location:
payload["location"] = location
if feeds:
payload["feeds"] = feeds
if inputs:
payload["inputs"] = inputs
if properties:
payload["properties"] = properties
self._make_api_call(method="PUT", url=f"{HOST}/qapi/twins", json=payload)
def wait_for_input_messages(
self, twin_receiver_did: str, twin_receiver_host_id: str, input_id: str
):
endpoint = f"/qapi/hosts/{twin_receiver_host_id}/twins/{twin_receiver_did}/inputs/{input_id}"
self._stomp_client.subscribe(
destination=endpoint,
subscription_id=f"{twin_receiver_host_id}-{input_id}",
headers=self._headers,
)
@staticmethod
def receive_callback(headers, body):
encoded_data = json.loads(body)
try:
time = encoded_data["message"]["occurredAt"]
data = encoded_data["message"]["data"]
except KeyError:
print("NO DATA")
else:
decoded_feed_data = json.loads(base64.b64decode(data).decode("ascii"))
print(f"LAST INPUT MESSAGE RECEIVED AT {time}: {decoded_feed_data}")
def main():
iotics_rest = IoticsRest()
twin_did = iotics_rest.create_twin_identity(twin_key_name="TwinReceiver")
host_id = iotics_rest.get_host_id()
iotics_rest.upsert_twin(
twin_did=twin_did,
host_id=host_id,
properties=[
# Label
{
"key": "http://www.w3.org/2000/01/rdf-schema#label",
"langLiteralValue": {"value": "Twin Receiver", "lang": "en"},
},
# Comment
{
"key": "http://www.w3.org/2000/01/rdf-schema#comment",
"langLiteralValue": {
"value": "An example of a Twin Receiver",
"lang": "en",
},
},
],
inputs=[
{
"id": "on_off_switch",
"properties": [
# Add a Label
{
"key": "http://www.w3.org/2000/01/rdf-schema#label",
"langLiteralValue": {"value": "On/Off Switch", "lang": "en"},
},
# Add a comment
{
"key": "http://www.w3.org/2000/01/rdf-schema#comment",
"langLiteralValue": {
"value": "Allows the light to turn ON/OFF",
"lang": "en",
},
},
],
"values": [
{
"comment": "Switch of the sensor",
"dataType": "boolean",
"label": "light_on",
}
],
}
],
)
iotics_rest.wait_for_input_messages(
twin_receiver_did=twin_did,
twin_receiver_host_id=host_id,
input_id="on_off_switch",
)
while True:
sleep(10)
if __name__ == "__main__":
main()
import base64
import json
from datetime import datetime, timedelta, timezone
from typing import List
from iotics.lib.identity.api.high_level_api import get_rest_high_level_identity_api
from requests import request
RESOLVER_URL = "resolver_url"
HOST = "host_url"
USER_KEY_NAME = "user_key_name"
AGENT_KEY_NAME = "agent_key_name"
USER_SEED = bytes.fromhex("user_seed")
AGENT_SEED = bytes.fromhex("agent_seed")
class IoticsRest:
def __init__(self):
self._high_level_api = get_rest_high_level_identity_api(
resolver_url=RESOLVER_URL
)
(
self._user_registered_id,
self._agent_registered_id,
) = self._high_level_api.create_user_and_agent_with_auth_delegation(
user_seed=USER_SEED,
user_key_name=USER_KEY_NAME,
agent_seed=AGENT_SEED,
agent_key_name=AGENT_KEY_NAME,
delegation_name="#AuthDeleg",
)
token = self._high_level_api.create_agent_auth_token(
agent_registered_identity=self._agent_registered_id,
user_did=self._user_registered_id.did,
duration=60,
)
self._headers = {
"accept": "application/json",
"Iotics-ClientAppId": "example_code",
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
def _make_api_call(self, method: str, url: str, json: dict = None):
response = request(method=method, url=url, headers=self._headers, json=json)
response.raise_for_status()
return response.json()
def create_twin_identity(self, twin_key_name: str):
twin_registered_id = self._high_level_api.create_twin_with_control_delegation(
twin_seed=AGENT_SEED,
twin_key_name=twin_key_name,
agent_registered_identity=self._agent_registered_id,
delegation_name="#ControlDeleg",
)
return twin_registered_id.did
def get_host_id(self):
host_id = self._make_api_call(method="GET", url=f"{HOST}/qapi/host/id")
return host_id["hostId"]
def upsert_twin(
self,
twin_did: str,
host_id: str,
feeds: List[dict] = None,
inputs: List[dict] = None,
location: dict = None,
properties: List[dict] = None,
):
payload = {"twinId": {"hostId": host_id, "id": twin_did}}
if location:
payload["location"] = location
if feeds:
payload["feeds"] = feeds
if inputs:
payload["inputs"] = inputs
if properties:
payload["properties"] = properties
self._make_api_call(method="PUT", url=f"{HOST}/qapi/twins", json=payload)
def search_twins(
self,
text: str = None,
location: dict = None,
properties: List[dict] = None,
scope: str = "LOCAL",
response_type: str = "FULL",
):
twins_list = []
search_headers = self._headers.copy()
# Search headers require a new header "Iotics-RequestTimeout".
# The latter is used to stop the request once the timeout is reached
search_headers.update(
{
"Iotics-RequestTimeout": (
datetime.now(tz=timezone.utc) + timedelta(seconds=5)
).isoformat(),
}
)
payload = {"responseType": response_type, "filter": {}}
if text:
payload["filter"]["text"] = text
if properties:
payload["filter"]["properties"] = properties
if location:
payload["filter"]["location"] = location
with request(
method="POST",
url=f"{HOST}/qapi/searches",
headers=search_headers,
stream=True,
verify=True,
params={"scope": scope},
json=payload,
) as resp:
resp.raise_for_status()
# Iterates over the response data, one Host at a time
for chunk in resp.iter_lines():
response = json.loads(chunk)
twins_found = []
try:
twins_found = response["result"]["payload"]["twins"]
except KeyError:
continue
finally:
if twins_found:
# Append the twins found to the list of twins
twins_list.extend(twins_found)
print(f"Found {len(twins_list)} twin(s)")
return twins_list
def send_input_message(
self,
twin_sender_did: str,
twin_sender_host_id: str,
twin_receiver_did: str,
twin_receiver_host_id: str,
input_id: str,
message: str,
):
encoded_data = base64.b64encode(json.dumps(message).encode()).decode()
payload = {"message": {"data": encoded_data, "mime": "application/json"}}
self._make_api_call(
method="POST",
url=f"{HOST}/qapi/hosts/{twin_sender_host_id}/twins/{twin_sender_did}/interests/hosts/{twin_receiver_host_id}/twins/{twin_receiver_did}/inputs/{input_id}/messages",
json=payload,
)
def main():
iotics_rest = IoticsRest()
twin_did = iotics_rest.create_twin_identity(twin_key_name="TwinSender")
host_id = iotics_rest.get_host_id()
iotics_rest.upsert_twin(
twin_did=twin_did,
host_id=host_id,
properties=[
# Label
{
"key": "http://www.w3.org/2000/01/rdf-schema#label",
"langLiteralValue": {"value": "Twin Sender", "lang": "en"},
},
# Comment
{
"key": "http://www.w3.org/2000/01/rdf-schema#comment",
"langLiteralValue": {
"value": "An example of a Twin Sender",
"lang": "en",
},
},
],
)
twins_list = iotics_rest.search_twins(
properties=[
{
"key": "http://www.w3.org/2000/01/rdf-schema#label",
"langLiteralValue": {"value": "Twin Receiver", "lang": "en"},
}
]
)
# In this example it is assumed the Twin Receiver to use is the first one in the list
twin_receiver = next(iter(twins_list))
twin_receiver_did = twin_receiver["twinId"]["id"]
twin_receiver_host_id = twin_receiver["twinId"]["hostId"]
twin_receiver_input_id = twin_receiver["inputs"][0]["inputId"]["id"]
iotics_rest.send_input_message(
twin_sender_did=twin_did,
twin_sender_host_id=host_id,
twin_receiver_did=twin_receiver_did,
twin_receiver_host_id=twin_receiver_host_id,
input_id=twin_receiver_input_id,
message={"light_on": False},
)
if __name__ == "__main__":
main()
Updated 10 months ago