Connectors with STOMP
Ingesting or exporting data from an IOTICSpace and its Digital Twins requires the creation of a software application through the IOTICS API, which in IOTICS we call Connector.
This page covers:
Introduction to Connectors
A Connector is a software application that interacts with Digital Twins in an IOTICSpace. It acts as the bridge between an IOTICSpace and the data source.
We distinguish between three different types:
1. Publisher Connector: imports data into an IOTICSpace
2. Follower Connector: exports data from an IOTICSpace
3. Synthesiser Connector: transforms existing data within an IOTICSpace
Publisher Connector
A Publisher Connector is a software application that works as a bridge between a data source and an IOTICSpace. It enables external data to be imported into an IOTICSpace and published by one or more Digital Twins, and therefore shared with and used by other Digital Twins in the ecosystem.
A Publisher Connector:
- Typically handles one single source of data, and is made up of one Twin Model with one or more related Twins from Model. If multiple sources have to be imported and synchronised, several Twin Models may have to be created, each with their specific metadata;
- Periodically imports and publishes new data through the generic Digital Twins. The data can be published either in batches or in single samples, and at fixed or variable intervals. Some Twins may also need to share data with a different frequency than others;
- Runs continuously. Once created, it is good practice to allow all the Twins in the IOTICSpace to maintain their state of consistency - be it actively sharing new data, staying idle or becoming obsolete and being deleted automatically.
How to create a Publisher Connector with the IOTICS API
Implementing a Publisher Connector leads to the creation of a set of Twin Publishers that, simultaneously or in turn, publish data. A prerequisite is the development of one or more Twin Models that will serve as a template for the Twin Publishers.
The following example shows a Publisher Connector that simulates 10 temperature sensors. Each one gets data from the environment and shares sensor readings every 10 seconds. We first create a Twin Model with a Feed and then create the 10 Sensor Twins, which periodically download new data from a Server and share it.
Click to see the example code of a Publisher Connector
1. Get a fresh token to be used in the requests' headers;
2. Create a Twin Model;
2.1 Build list of Properties;
2.2 Build list of Feeds;
2.3 Get a new Twin DID;
2.4 Use the Upsert Twin API;
3. Create Twins from Model;
3.1 Use the Describe Local Twin API to get the Twin Model's metadata;
3.2 Build list of Properties;
3.3 Use the Describe Feed API to get the the Twin Model Feed's metadata;
3.4 Build list of Feeds;
For every Sensor:
3.5 Get a new Twin DID;
3.6 Use the Upsert Twin API;
Every 10 seconds:
4. Download new data from the server;
5. Share data into the Space;
import base64
import json
from datetime import datetime, timezone
from time import sleep
from typing import List
from iotics.lib.identity.api.high_level_api import get_rest_high_level_identity_api
from requests import request
RESOLVER_URL = "resolver_url"
HOST = "host_url"
USER_KEY_NAME = "user_key_name"
AGENT_KEY_NAME = "agent_key_name"
USER_SEED = bytes.fromhex("user_seed")
AGENT_SEED = bytes.fromhex("agent_seed")
class PublisherConnector:
def __init__(self):
self._api = None
self._user_registered_id = None
self._agent_registered_id = None
self._headers = None
self._twins_info = None
def setup(self):
self._api = get_rest_high_level_identity_api(resolver_url=RESOLVER_URL)
(
self._user_registered_id,
self._agent_registered_id,
) = self._api.create_user_and_agent_with_auth_delegation(
user_seed=USER_SEED,
user_key_name=USER_KEY_NAME,
agent_seed=AGENT_SEED,
agent_key_name=AGENT_KEY_NAME,
delegation_name="#AuthDeleg",
)
self._headers = {
"accept": "application/json",
"Iotics-ClientAppId": "publisher_connector",
"Content-Type": "application/json",
}
self._refresh_token()
self._twins_info = {}
def _refresh_token(self, duration: int = 60):
print("Refreshing token")
print("---")
token = self._api.create_agent_auth_token(
agent_registered_identity=self._agent_registered_id,
user_did=self._user_registered_id.did,
duration=duration,
)
self._headers["Authorization"] = f"Bearer {token}"
def _get_new_did(self, twin_key_name: str):
twin_registered_id = self._api.create_twin_with_control_delegation(
twin_seed=AGENT_SEED,
twin_key_name=twin_key_name,
agent_registered_identity=self._agent_registered_id,
delegation_name="#ControlDeleg",
)
return twin_registered_id.did
def _make_api_call(
self, method: str, url: str, json: dict = None, retry: bool = True
):
try:
response = request(method=method, url=url, headers=self._headers, json=json)
response.raise_for_status()
except Exception as ex:
if retry:
self._refresh_token()
return self._make_api_call(method, url, json, retry=False)
else:
print("Retried once. Still getting error", ex)
return response.json()
def _create_new_twin(
self,
twin_key_name: str,
properties: List[str] = None,
feeds: List[str] = None,
visibility: str = "PRIVATE",
location: dict = None,
):
twin_did = self._get_new_did(twin_key_name)
self._upsert_twin(
twin_id=twin_did,
properties=properties,
feeds=feeds,
visibility=visibility,
location=location,
)
return twin_did
def _describe_feed(self, twin_id: str, feed_id: str):
feed_description = self._make_api_call(
method="GET",
url=f"{HOST}/qapi/twins/{twin_id}/feeds/{feed_id}",
)
return feed_description
def _describe_local_twin(self, twin_id: str):
twin_description = self._make_api_call(
method="GET", url=f"{HOST}/qapi/twins/{twin_id}"
)
return twin_description
def _upsert_twin(
self,
twin_id: str,
visibility: str = "PRIVATE",
feeds: List[dict] = None,
location: dict = None,
properties: List[dict] = None,
):
payload = {"twinId": twin_id, "visibility": visibility}
if location:
payload["location"] = location
if feeds:
payload["feeds"] = feeds
if properties:
payload["properties"] = properties
self._make_api_call(method="PUT", url=f"{HOST}/qapi/twins", json=payload)
print(f"Twin {twin_id} created succesfully with Metadata, Feeds and Values")
print("---")
def _share_data(self, twin_label: str, twin_id: str, feed_id: str, value):
data_to_share = {"reading": value}
encoded_data = base64.b64encode(json.dumps(data_to_share).encode()).decode()
data_to_share_payload = {
"sample": {
"data": encoded_data,
"mime": "application/json",
"occurredAt": datetime.now(tz=timezone.utc).isoformat(
timespec="seconds"
),
}
}
self._make_api_call(
method="POST",
url=f"{HOST}/qapi/twins/{twin_id}/feeds/{feed_id}/shares",
json=data_to_share_payload,
)
print(f"Shared {data_to_share} from Twin {twin_label}")
def create_twin_model(self):
print("Creating Twin Model")
# Build a list of Properties for the Twin Model
property_list = [
# Twin Model "special" property
{
"key": "http://www.w3.org/1999/02/22-rdf-syntax-ns#type",
"uriValue": {"value": "https://data.iotics.com/app#Model"},
},
# Add Label
{
"key": "http://www.w3.org/2000/01/rdf-schema#label",
"langLiteralValue": {
"value": "Temperature Sensor Publisher Model",
"lang": "en",
},
},
# Add Comment
{
"key": "http://www.w3.org/2000/01/rdf-schema#comment",
"langLiteralValue": {
"value": "A temperature sensor that shares temperature data",
"lang": "en",
},
},
# Add colour
{
"key": "https://data.iotics.com/app#color",
"stringLiteralValue": {"value": "#9aceff"},
},
# Add Space name
{
"key": "https://data.iotics.com/app#spaceName",
"stringLiteralValue": {"value": "my-space"},
},
# Add CreatedAt property
{
"key": "https://data.iotics.com/app#createdAt",
"literalValue": {
"dataType": "dateTime",
"value": datetime.now(tz=timezone.utc).isoformat(),
},
},
# Add UpdatedAt property
{
"key": "https://data.iotics.com/app#updatedAt",
"literalValue": {
"dataType": "dateTime",
"value": datetime.now(tz=timezone.utc).isoformat(),
},
},
]
# Build a list of Feeds for the Twin Model
feed_list = [
# Add a single Feed called "currentTemp"
{
"id": "currentTemp",
"storeLast": True, # We want to store the last value sent
"properties": [
# Add a Label for the Feed
{
"key": "http://www.w3.org/2000/01/rdf-schema#label",
"langLiteralValue": {
"value": "Temperature",
"lang": "en",
},
}
],
# Add a single Value called "reading"
"values": [
{
"comment": "Temperature in degrees Celsius",
"dataType": "decimal",
"label": "reading",
"unit": "http://purl.obolibrary.org/obo/UO_0000027",
}
],
},
]
model_twin_did = self._create_new_twin(
twin_key_name="TwinPublisherModel",
properties=property_list,
feeds=feed_list,
)
return model_twin_did
def create_twins_from_model(self, twin_model_id: str):
twin_model = self._describe_local_twin(twin_id=twin_model_id)
# Build a list of properties for the generic Twin
property_list = [
# Add the generic twins' "special" properties
{
"key": "https://data.iotics.com/app#model",
"uriValue": {"value": twin_model_id},
},
{
"key": "https://data.iotics.com/app#createdFrom",
"uriValue": {"value": "https://data.iotics.com/app#ByModel"},
},
]
# Add to the list all the Twin Model's properties but consider the exceptions
for prop in twin_model["result"]["properties"]:
# This was the "special" property of the Twin Model, we don't need it here
if prop["key"] == "http://www.w3.org/1999/02/22-rdf-syntax-ns#type":
continue
# Don't add the Label property of the Twin Model.
# It'll be added later based on the number of the sensor
if prop["key"] == "http://www.w3.org/2000/01/rdf-schema#label":
continue
# Change the Created At value
if prop["key"] == "https://data.iotics.com/app#createdAt":
prop["literalValue"]["value"] = datetime.now(
tz=timezone.utc
).isoformat()
# Change the Updated At value
if prop["key"] == "https://data.iotics.com/app#updatedAt":
prop["literalValue"]["value"] = datetime.now(
tz=timezone.utc
).isoformat()
property_list.append(prop)
# Build a list of Feeds for the Twin Model
feed_list = []
# Scan the Twin Model's feeds list to get the Twin Model's Feed IDs.
# Then use the Feed Describe API to get the Feeds' metadata
for feed in twin_model["result"]["feeds"]:
feed_id = feed["feedId"]["value"]
# Use the Feed Describe API
feed_description = self._describe_feed(
twin_id=twin_model_id, feed_id=feed_id
)
feed_properties = feed_description["result"]["properties"]
feed_values = feed_description["result"]["values"]
store_last = feed_description["result"]["storeLast"]
# Append to the list all the Feed's metadata
feed_list.append(
{
"id": feed_id,
"properties": feed_properties,
"values": feed_values,
"storeLast": store_last,
}
)
# Finally create the 10 Publisher Twins
for sensor_number in range(1, 11):
twin_key_name = f"temp_sensor_{sensor_number}"
twin_label = f"Temperature Sensor Publisher {sensor_number}"
# Add the Label property according to the number of the Sensor
property_list_with_label = property_list.copy()
property_list_with_label.append(
{
"key": "http://www.w3.org/2000/01/rdf-schema#label",
"langLiteralValue": {"value": twin_label, "lang": "en"},
}
)
print(f"Creating Twin from Model - {twin_label}")
twin_from_model_did = self._create_new_twin(
twin_key_name=twin_key_name,
properties=property_list_with_label,
feeds=feed_list,
location={"lat": 51.5, "lon": -0.1},
)
# Add the Twin's info in the mapping dictionary
twin_info = {
twin_key_name: {
"twin_did": twin_from_model_did,
"twin_label": twin_label,
}
}
self._twins_info.update(twin_info)
def publish_data(self, data: dict):
for temp_data in data:
temperature = temp_data["temp"]
sensor_id = temp_data["sensor_id"]
self._share_data(
twin_label=self._twins_info[sensor_id]["twin_label"],
twin_id=self._twins_info[sensor_id]["twin_did"],
feed_id="currentTemp",
value=temperature,
)
def _get_twin_label(self, property_list: List[dict]):
for prop in property_list:
if prop["key"] == "http://www.w3.org/2000/01/rdf-schema#label":
return prop["langLiteralValue"]["value"]
return None
def get_sensor_data():
response = request(method="GET", url="http://flaskapi.dev.iotics.com/sensor_temp")
sensor_data = response.json()
return sensor_data
def main():
publisher_connector = PublisherConnector()
publisher_connector.setup()
model_twin_did = publisher_connector.create_twin_model()
publisher_connector.create_twins_from_model(twin_model_id=model_twin_did)
while True:
data = get_sensor_data()
publisher_connector.publish_data(data)
sleep(10)
print("---")
if __name__ == "__main__":
main()
Follower Connector
The mirrored version of the Publisher Connector is the Follower Connector. It is an application that works as a bridge between an IOTICSpace and the data consumer and/or data storage.
A Follower Connector can run in the same or a different IOTICSpace than the Twin Publishers
Use remote search and remote follow if the IOTICSpaces are not the same.
How to create a Follower Connector with the IOTICS API
Similar to the Publisher Connector, a set of Twin Followers are needed in order to ask the Twin Publishers' feed for the last shared value. One or more Twin Models will serve as a template for the Twin Followers so they can be classified according to their specific nature.
The following example shows how to set up a Follower Connector. It automatically creates 10 Follower Twins that follow the Sensor Twins' feed of the Publisher Connector, as demonstrated above, and asks for their last shared value every 10 seconds.
Click to see an example of a Follower Connector
Instructions:
- Run the
publisher_connector.py
in a terminal; - Run the
follower_connector.py
code below in another terminal and in the same IOTICSpace as the Publisher Connector. - Look at the data received on your terminal;
1. Get a fresh token to be used in the requests' headers;
2. Create a Twin Model;
2.1 Build list of Properties;
2.2 Get a new Twin DID;
2.3 Use the Upsert Twin API;
3. Create Twins from Model;
3.1 Use the Describe Local Twin API to get the Twin Model's metadata;
3.2 Build list of Properties;
For each Twin:
3.3 Get a new Twin DID;
3.4 Use the Upsert Twin API;
4. Search for Twin Publishers;
Every 10 seconds:
5. Get the Twin Publishers' last shared data.
import base64
import json
from datetime import datetime, timezone, timedelta
from time import sleep
from typing import List
from iotics.lib.identity.api.high_level_api import get_rest_high_level_identity_api
from requests import request
RESOLVER_URL = "resolver_url"
HOST = "host_url"
USER_KEY_NAME = "user_key_name"
AGENT_KEY_NAME = "agent_key_name"
USER_SEED = bytes.fromhex("user_seed")
AGENT_SEED = bytes.fromhex("agent_seed")
class FollowerConnector:
def __init__(self):
self._api = None
self._user_registered_id = None
self._agent_registered_id = None
self._headers = None
self._twins_info = None
def setup(self):
self._api = get_rest_high_level_identity_api(resolver_url=RESOLVER_URL)
(
self._user_registered_id,
self._agent_registered_id,
) = self._api.create_user_and_agent_with_auth_delegation(
user_seed=USER_SEED,
user_key_name=USER_KEY_NAME,
agent_seed=AGENT_SEED,
agent_key_name=AGENT_KEY_NAME,
delegation_name="#AuthDeleg",
)
self._headers = {
"accept": "application/json",
"Iotics-ClientAppId": "follower_connector",
"Content-Type": "application/json",
}
self._refresh_token()
self._twins_info = {}
def _refresh_token(self, duration: int = 60):
print("Refreshing token")
print("---")
token = self._api.create_agent_auth_token(
agent_registered_identity=self._agent_registered_id,
user_did=self._user_registered_id.did,
duration=duration,
)
self._headers["Authorization"] = f"Bearer {token}"
def _get_new_did(self, twin_key_name: str):
twin_registered_id = self._api.create_twin_with_control_delegation(
twin_seed=AGENT_SEED,
twin_key_name=twin_key_name,
agent_registered_identity=self._agent_registered_id,
delegation_name="#ControlDeleg",
)
return twin_registered_id.did
def _make_api_call(
self, method: str, url: str, json: dict = None, retry: bool = True
):
try:
response = request(method=method, url=url, headers=self._headers, json=json)
response.raise_for_status()
except Exception as ex:
if retry:
self._refresh_token()
return self._make_api_call(method, url, json, retry=False)
else:
print("Retried once. Still getting error", ex)
return response.json()
def _create_new_twin(
self,
twin_key_name: str,
properties: List[str] = None,
feeds: List[str] = None,
visibility: str = "PRIVATE",
location: dict = None,
):
twin_did = self._get_new_did(twin_key_name)
self._upsert_twin(
twin_id=twin_did,
properties=properties,
feeds=feeds,
visibility=visibility,
location=location,
)
return twin_did
def _describe_local_twin(self, twin_id: str):
twin_description = self._make_api_call(
method="GET", url=f"{HOST}/qapi/twins/{twin_id}"
)
return twin_description
def _upsert_twin(
self,
twin_id: str,
visibility: str = "PRIVATE",
feeds: List[dict] = None,
location: dict = None,
properties: List[dict] = None,
):
payload = {"twinId": twin_id, "visibility": visibility}
if location:
payload["location"] = location
if feeds:
payload["feeds"] = feeds
if properties:
payload["properties"] = properties
self._make_api_call(method="PUT", url=f"{HOST}/qapi/twins", json=payload)
print(f"Twin {twin_id} created succesfully with Metadata, Feeds and Values")
print("---")
def create_twin_model(self):
print("Creating Twin Model")
# Build a list of Properties for the Twin Model
property_list = [
# Twin Model "special" property
{
"key": "http://www.w3.org/1999/02/22-rdf-syntax-ns#type",
"uriValue": {"value": "https://data.iotics.com/app#Model"},
},
# Add Label
{
"key": "http://www.w3.org/2000/01/rdf-schema#label",
"langLiteralValue": {
"value": "Temperature Sensor Follower Model",
"lang": "en",
},
},
# Add Comment
{
"key": "http://www.w3.org/2000/01/rdf-schema#comment",
"langLiteralValue": {
"value": "A temperature sensor that follows temperature data",
"lang": "en",
},
},
# Add colour
{
"key": "https://data.iotics.com/app#color",
"stringLiteralValue": {"value": "#9aceff"},
},
# Add Space name
{
"key": "https://data.iotics.com/app#spaceName",
"stringLiteralValue": {"value": "my-space"},
},
# Add CreatedAt property
{
"key": "https://data.iotics.com/app#createdAt",
"literalValue": {
"dataType": "dateTime",
"value": datetime.now(tz=timezone.utc).isoformat(),
},
},
# Add UpdatedAt property
{
"key": "https://data.iotics.com/app#updatedAt",
"literalValue": {
"dataType": "dateTime",
"value": datetime.now(tz=timezone.utc).isoformat(),
},
},
]
model_twin_did = self._create_new_twin(
twin_key_name="TwinFollowerModel", properties=property_list
)
return model_twin_did
def create_twins_from_model(self, twin_model_id: str):
twin_model = self._describe_local_twin(twin_id=twin_model_id)
# Build a list of properties for the generic Twin
property_list = [
# Add the generic twins' "special" properties
{
"key": "https://data.iotics.com/app#model",
"uriValue": {"value": twin_model_id},
},
{
"key": "https://data.iotics.com/app#createdFrom",
"uriValue": {"value": "https://data.iotics.com/app#ByModel"},
},
]
# Add to the list all the Twin Model's properties but consider the exceptions
for prop in twin_model["result"]["properties"]:
# This was the "special" property of the Twin Model, we don't need it here
if prop["key"] == "http://www.w3.org/1999/02/22-rdf-syntax-ns#type":
continue
# Don't add the Label property of the Twin Model.
# It'll be added later based on the number of the sensor
if prop["key"] == "http://www.w3.org/2000/01/rdf-schema#label":
continue
# Change the Created At value
if prop["key"] == "https://data.iotics.com/app#createdAt":
prop["literalValue"]["value"] = datetime.now(
tz=timezone.utc
).isoformat()
# Change the Updated At value
if prop["key"] == "https://data.iotics.com/app#updatedAt":
prop["literalValue"]["value"] = datetime.now(
tz=timezone.utc
).isoformat()
property_list.append(prop)
# Finally create the 10 Sensor Twin Followers
for sensor_number in range(1, 11):
twin_key_name = f"temp_sensor_follower_{sensor_number}"
twin_label = f"Temperature Sensor Follower {sensor_number}"
# Add the Label property according to the number of the Sensor
property_list_with_label = property_list.copy()
property_list_with_label.append(
{
"key": "http://www.w3.org/2000/01/rdf-schema#label",
"langLiteralValue": {"value": twin_label, "lang": "en"},
}
)
print(f"Creating Twin from Model - {twin_label}")
twin_from_model_did = self._create_new_twin(
twin_key_name=twin_key_name,
properties=property_list,
location={"lat": 51.5, "lon": -0.1},
)
# Add the Twin's info in the mapping dictionary
twin_info = {sensor_number: twin_from_model_did}
self._twins_info.update(twin_info)
def search_for_twin_publishers(self):
# Initialise an empty list.
# It will contain the list of Twins retrieved by the search
twins_list = []
# Add a new temporary field in the headers.
# Client request timeout is used to stop the request processing once the timeout is reached
headers = self._headers.copy()
headers.update(
{
"Iotics-RequestTimeout": (
datetime.now(tz=timezone.utc) + timedelta(seconds=10)
).isoformat(),
}
)
with request(
method="POST",
url=f"{HOST}/qapi/searches",
headers=headers,
stream=True,
verify=False,
params={"scope": "LOCAL"},
json={
"responseType": "FULL",
"filter": {
"properties": [
{
"key": "https://data.iotics.com/app#createdFrom",
"uriValue": {
"value": "https://data.iotics.com/app#ByModel"
},
},
],
"text": "shares",
},
},
) as resp:
# Raises HTTPError, if one occurred
resp.raise_for_status()
# Iterates over the response data, one line at a time
for chunk in resp.iter_lines():
response = json.loads(chunk)
twins_found = []
try:
twins_found = response["result"]["payload"]["twins"]
except (KeyError, IndexError):
continue
finally:
if twins_found:
# Append the twins found to the list of twins
twins_list.extend(twins_found)
print(f"Found {len(twins_list)} twin(s)")
return twins_list
def follow_data(self, twins_list: List[dict]):
for twin in twins_list:
# Get the sensor number from the Twin's label last string
twin_publisher_label = self._get_twin_label(twin["properties"])
sensor_number = twin_publisher_label.split(" ")[-1]
# Check if the current Twin is the Twin Model.
# In such case we don't want to follow it.
if not sensor_number.isdigit():
continue
twin_publisher_id = twin["id"]["value"]
twin_follower_id = self._twins_info.get(int(sensor_number))
# We are supposed to know in advance the Feed ID we want our Twin Follower to follow
twin_publisher_feed_id = "currentTemp"
# Get the latest shared value
encoded_data = self._make_api_call(
method="GET",
url=f"{HOST}/qapi/twins/{twin_follower_id}/interests/twins/{twin_publisher_id}/feeds/{twin_publisher_feed_id}/samples/last",
)
time = encoded_data["feedData"]["occurredAt"]
data = encoded_data["feedData"]["data"]
feed_data = json.loads(base64.b64decode(data).decode("ascii"))
print(
f"Last shared data of Twin {twin_publisher_label} occurred at utc time {time}: {feed_data}"
)
def _get_twin_label(self, property_list: List[dict]):
for prop in property_list:
if prop["key"] == "http://www.w3.org/2000/01/rdf-schema#label":
return prop["langLiteralValue"]["value"]
return None
def main():
follower_connector = FollowerConnector()
follower_connector.setup()
model_twin_did = follower_connector.create_twin_model()
follower_connector.create_twins_from_model(twin_model_id=model_twin_did)
twins_list = follower_connector.search_for_twin_publishers()
while True:
follower_connector.follow_data(twins_list)
sleep(10)
print("---")
if __name__ == "__main__":
main()
How to create a Follower Connector with the IOTICS API + STOMP
The following example shows how a Follower Connector can be built with a STOMP Client. The code generates a Twin Model and a generic Twin Follower so that the latter can register its interest in the 10 Publisher Twins' feeds. The STOMP Client allows the Twin Follower to receive the data as soon as the Publisher Twins share the data sample without needing to know the share frequency.
Click to see the code
Instructions:
- Download this library on your local machine;
- Install the above library in your Python venv:
pip install iotic.web.stomp-1.0.6.tar.gz
; - Run the
publisher_connector.py
in a terminal; - Run the
follower_connector_stomp.py
code below in another terminal and in the same IOTICSpace as the Publisher Connector. - Look at the data received on your terminal.
1. Get a fresh token to be used in the requests' headers and in the Stomp Client;
2. Instantiate a Stomp Client;
3. Create a Twin Model;
3.1 Build list of Properties;
3.2 Get a new Twin DID;
3.3 Use the Upsert Twin API;
4. Create a Twin from Model;
4.1 Use the Describe Local Twin API to get the Twin Model's metadata;
4.2 Build list of Properties;
4.3 Get a new Twin DID;
4.4 Use the Upsert Twin API;
5. Search for Twin Publishers;
For each Twin Publisher found:
6. Create a new STOMP subscription;
7. Print the data received;
import base64
import json
from datetime import datetime, timedelta, timezone
from time import sleep
from typing import Callable, Tuple, List
import stomp
from iotic.web.stomp.client import StompWSConnection12
from iotics.lib.identity.api.high_level_api import get_rest_high_level_identity_api
from requests import request
RESOLVER_URL = "resolver_url"
HOST = "host_url"
USER_KEY_NAME = "user_key_name"
AGENT_KEY_NAME = "agent_key_name"
USER_SEED = bytes.fromhex("user_seed")
AGENT_SEED = bytes.fromhex("agent_seed")
class StompClient:
def __init__(
self,
endpoint: str,
callback: Callable,
token: str,
heartbeats: Tuple[int, int] = (10000, 10000),
):
self._endpoint = endpoint
self._token = token
self._stomp_client = None
self._heartbeats = heartbeats
self._callback = callback
self._subscriptions_list = None
def setup(self):
self._stomp_client = StompWSConnection12(
endpoint=self._endpoint, heartbeats=self._heartbeats
)
self._stomp_client.set_ssl(verify=False)
self._stomp_client.set_listener(
name="stomp_listener",
lstnr=StompListener(
stomp_client=self._stomp_client, callback=self._callback
),
)
self._stomp_client.connect(wait=True, passcode=self._token)
self._subscriptions_list = []
def subscribe(self, destination, subscription_id, headers):
self._stomp_client.subscribe(
destination=destination, id=subscription_id, headers=headers
)
self._subscriptions_list.append((destination, subscription_id, headers))
def disconnect(self):
self._stomp_client.disconnect()
class StompListener(stomp.ConnectionListener):
def __init__(self, stomp_client, callback):
self._stomp_client = stomp_client
self._callback = callback
def on_error(self, headers, body):
print('Received an error "%s"' % body)
def on_message(self, headers, body):
self._callback(headers, body)
def on_disconnected(self):
self._stomp_client.disconnect()
print("Disconnected")
class FollowerConnectorStomp:
def __init__(self):
self._api = None
self._user_registered_id = None
self._agent_registered_id = None
self._headers = None
self._twins_publisher_info = None
self._stomp_client = None
def setup(self):
self._api = get_rest_high_level_identity_api(resolver_url=RESOLVER_URL)
(
self._user_registered_id,
self._agent_registered_id,
) = self._api.create_user_and_agent_with_auth_delegation(
user_seed=USER_SEED,
user_key_name=USER_KEY_NAME,
agent_seed=AGENT_SEED,
agent_key_name=AGENT_KEY_NAME,
delegation_name="#AuthDeleg",
)
self._headers = {
"accept": "application/json",
"Iotics-ClientAppId": "follower_connector_stomp",
"Content-Type": "application/json",
}
token = self._refresh_token()
self._twins_publisher_info = {}
stomp_endpoint = request(method="GET", url=f"{HOST}/index.json").json()["stomp"]
self._stomp_client = StompClient(
endpoint=stomp_endpoint,
callback=self._follow_callback,
token=token,
)
self._stomp_client.setup()
def _refresh_token(self, duration: int = 600):
print("Refreshing token")
print("---")
token = self._api.create_agent_auth_token(
agent_registered_identity=self._agent_registered_id,
user_did=self._user_registered_id.did,
duration=duration,
)
self._headers["Authorization"] = f"Bearer {token}"
return token
def _get_new_did(self, twin_key_name: str):
twin_registered_id = self._api.create_twin_with_control_delegation(
twin_seed=AGENT_SEED,
twin_key_name=twin_key_name,
agent_registered_identity=self._agent_registered_id,
delegation_name="#ControlDeleg",
)
return twin_registered_id.did
def _make_api_call(
self, method: str, url: str, json: dict = None, retry: bool = True
):
try:
response = request(method=method, url=url, headers=self._headers, json=json)
response.raise_for_status()
except Exception as ex:
if retry:
self._refresh_token()
return self._make_api_call(method, url, json, retry=False)
print("Retried once. Still getting error", ex)
return response.json()
def _create_new_twin(
self,
twin_key_name: str,
properties: List[str] = None,
feeds: List[str] = None,
visibility: str = "PRIVATE",
location: dict = None,
):
twin_did = self._get_new_did(twin_key_name)
self._upsert_twin(
twin_id=twin_did,
properties=properties,
feeds=feeds,
visibility=visibility,
location=location,
)
return twin_did
def _describe_local_twin(self, twin_id: str):
twin_description = self._make_api_call(
method="GET", url=f"{HOST}/qapi/twins/{twin_id}"
)
return twin_description
def _upsert_twin(
self,
twin_id: str,
visibility: str = "PRIVATE",
feeds: List[dict] = None,
location: dict = None,
properties: List[dict] = None,
):
payload = {"twinId": twin_id, "visibility": visibility}
if location:
payload["location"] = location
if feeds:
payload["feeds"] = feeds
if properties:
payload["properties"] = properties
self._make_api_call(method="PUT", url=f"{HOST}/qapi/twins", json=payload)
print(f"Twin {twin_id} created succesfully with Metadata, Feeds and Values")
print("---")
def create_twin_model(self):
print("Creating Twin Model")
# Build a list of Properties for the Twin Model
property_list = [
# Twin Model "special" property
{
"key": "http://www.w3.org/1999/02/22-rdf-syntax-ns#type",
"uriValue": {"value": "https://data.iotics.com/app#Model"},
},
# Add Label
{
"key": "http://www.w3.org/2000/01/rdf-schema#label",
"langLiteralValue": {
"value": "Temperature Sensor Follower Model",
"lang": "en",
},
},
# Add Comment
{
"key": "http://www.w3.org/2000/01/rdf-schema#comment",
"langLiteralValue": {
"value": "A temperature sensor that follows temperature data",
"lang": "en",
},
},
# Add colour
{
"key": "https://data.iotics.com/app#color",
"stringLiteralValue": {"value": "#9aceff"},
},
# Add Space name
{
"key": "https://data.iotics.com/app#spaceName",
"stringLiteralValue": {"value": "my-space"},
},
# Add CreatedAt property
{
"key": "https://data.iotics.com/app#createdAt",
"literalValue": {
"dataType": "dateTime",
"value": datetime.now(tz=timezone.utc).isoformat(),
},
},
# Add UpdatedAt property
{
"key": "https://data.iotics.com/app#updatedAt",
"literalValue": {
"dataType": "dateTime",
"value": datetime.now(tz=timezone.utc).isoformat(),
},
},
]
model_twin_did = self._create_new_twin(
twin_key_name="TwinFollowerModel", properties=property_list
)
return model_twin_did
def create_twin_from_model(self, twin_model_id: str):
twin_model = self._describe_local_twin(twin_id=twin_model_id)
# Build a list of properties for the generic Twin
property_list = [
# Add the generic twins' "special" properties
{
"key": "https://data.iotics.com/app#model",
"uriValue": {"value": twin_model_id},
},
{
"key": "https://data.iotics.com/app#createdFrom",
"uriValue": {"value": "https://data.iotics.com/app#ByModel"},
},
]
# Add to the list all the Twin Model's properties but consider the exceptions
for prop in twin_model["result"]["properties"]:
# This was the "special" property of the Twin Model, we don't need it here
if prop["key"] == "http://www.w3.org/1999/02/22-rdf-syntax-ns#type":
continue
# Don't add the Label property of the Twin Model.
# It'll be added later based on the number of the sensor
if prop["key"] == "http://www.w3.org/2000/01/rdf-schema#label":
continue
# Change the Created At value
if prop["key"] == "https://data.iotics.com/app#createdAt":
prop["literalValue"]["value"] = datetime.now(
tz=timezone.utc
).isoformat()
# Change the Updated At value
if prop["key"] == "https://data.iotics.com/app#updatedAt":
prop["literalValue"]["value"] = datetime.now(
tz=timezone.utc
).isoformat()
property_list.append(prop)
# Finally create the Sensor Twin Follower
twin_key_name = "temp_sensor_follower"
twin_label = "Temperature Sensor Follower"
# Add the Label property according to the number of the Sensor
property_list_with_label = property_list.copy()
property_list_with_label.append(
{
"key": "http://www.w3.org/2000/01/rdf-schema#label",
"langLiteralValue": {"value": twin_label, "lang": "en"},
}
)
print(f"Creating Twin from Model - {twin_label}")
twin_follower_did = self._create_new_twin(
twin_key_name=twin_key_name,
properties=property_list_with_label,
location={"lat": 51.5, "lon": -0.1},
)
return twin_follower_did
def search_for_twins_publisher(self):
# Initialise an empty list.
# It will contain the list of Twins retrieved by the search
twins_list = []
# Add a new temporary field in the headers.
# Client request timeout is used to stop the request processing once the timeout is reached
headers = self._headers.copy()
headers.update(
{
"Iotics-RequestTimeout": (
datetime.now(tz=timezone.utc) + timedelta(seconds=10)
).isoformat(),
}
)
with request(
method="POST",
url=f"{HOST}/qapi/searches",
headers=headers,
stream=True,
verify=False,
params={"scope": "LOCAL"},
json={
"responseType": "FULL",
"filter": {
"properties": [
{
"key": "https://data.iotics.com/app#createdFrom",
"uriValue": {
"value": "https://data.iotics.com/app#ByModel"
},
},
],
"text": "shares",
},
},
) as resp:
# Raises HTTPError, if one occurred
resp.raise_for_status()
# Iterates over the response data, one line at a time
for chunk in resp.iter_lines():
response = json.loads(chunk)
twins_found = []
try:
twins_found = response["result"]["payload"]["twins"]
except (KeyError, IndexError):
continue
finally:
if twins_found:
# Append the twins found to the list of twins
twins_list.extend(twins_found)
print(f"Found {len(twins_list)} twin(s) publisher")
return twins_list
def _follow_callback(self, headers, body):
payload = json.loads(body)
twin_publisher_did = payload["interest"]["followedFeed"]["feed"]["twinId"][
"value"
]
twin_publisher_label = self._twins_publisher_info.get(twin_publisher_did)
shared_data = payload["feedData"]["data"]
time = payload["feedData"]["occurredAt"]
decoded_data = json.loads(base64.b64decode(shared_data).decode("ascii"))
print(f"Received {decoded_data} from {twin_publisher_label} at {time}")
def _subscribe_to_feed(
self,
follower_twin_id,
followed_twin_id,
followed_feed_name,
):
follow_local_feed_url = f"/qapi/twins/{follower_twin_id}/interests/twins/{followed_twin_id}/feeds/{followed_feed_name}"
self._stomp_client.subscribe(
destination=follow_local_feed_url,
subscription_id=followed_twin_id,
headers=self._headers,
)
return follower_twin_id
def follow_feed(self, twins_found_list: List[dict], twin_follower_did: str):
for twin in twins_found_list:
twin_publisher_did = twin["id"]["value"]
twin_publisher_label = self._get_twin_label(twin["properties"])
self._subscribe_to_feed(
follower_twin_id=twin_follower_did,
followed_twin_id=twin_publisher_did,
followed_feed_name="currentTemp",
)
twin_info = {twin_publisher_did: twin_publisher_label}
self._twins_publisher_info.update(twin_info)
@staticmethod
def _get_twin_label(property_list: List[dict]):
for prop in property_list:
if prop["key"] == "http://www.w3.org/2000/01/rdf-schema#label":
return prop["langLiteralValue"]["value"]
return None
def main():
follower_connector = FollowerConnectorStomp()
follower_connector.setup()
model_twin_did = follower_connector.create_twin_model()
twin_follower_did = follower_connector.create_twin_from_model(
twin_model_id=model_twin_did
)
twins_list = follower_connector.search_for_twins_publisher()
follower_connector.follow_feed(
twins_found_list=twins_list, twin_follower_did=twin_follower_did
)
while True:
sleep(10)
if __name__ == "__main__":
main()
How to export data from an IOTICSpace
Data can be exported outside IOTICS in order to persistently store a Digital Twin's published data.
The code below provides an example of how to allow the follower_connector.py
to store the sensor data received into either a CSV file or an SQLite database.
Click to see the code
Instructions:
- Install a new python library to use the SQLite Engine:
pip install SQLAlchemy
; - Create a new directory called
export_data
. Within the latter create 2 other directories:csv
andsqlite
; - Add the sqlite_engine.py code below to the
sqlite
directory and the csv_engine.py to thecsv
directory as follows:
├── publisher_connector.py
├── follower_connector_export.py
├── export_data
│ ├── csv
│ │ └── csv_engine.py
│ └── sqlite
│ └── sqlite_engine.py
- Run the
publisher_connector.py
in a terminal and thefollower_connector_export.py
code below in another terminal; - Change the
export_data_class
class (line 352) with eitherCSV
orSQLite
; - Look at the data being added to the
temperatures.csv
file ortemperatures.db
according to the choice above.
import base64
import json
from datetime import datetime, timedelta, timezone
from time import sleep
from typing import List
from iotics.lib.identity.api.high_level_api import get_rest_high_level_identity_api
from requests import request
from export_data.csv.csv_engine import CSV
from export_data.sqlite.sqlite_engine import SQLite
RESOLVER_URL = "resolver_url"
HOST = "host_url"
USER_KEY_NAME = "user_key_name"
AGENT_KEY_NAME = "agent_key_name"
USER_SEED = bytes.fromhex("user_seed")
AGENT_SEED = bytes.fromhex("agent_seed")
class FollowerConnectorExport:
def __init__(self, export_data_class):
self._api = None
self._user_registered_id = None
self._agent_registered_id = None
self._headers = None
self._twins_info = None
self._export_data = export_data_class
def setup(self):
self._api = get_rest_high_level_identity_api(resolver_url=RESOLVER_URL)
(
self._user_registered_id,
self._agent_registered_id,
) = self._api.create_user_and_agent_with_auth_delegation(
user_seed=USER_SEED,
user_key_name=USER_KEY_NAME,
agent_seed=AGENT_SEED,
agent_key_name=AGENT_KEY_NAME,
delegation_name="#AuthDeleg",
)
self._headers = {
"accept": "application/json",
"Iotics-ClientAppId": "follower_connector",
"Content-Type": "application/json",
}
self._refresh_token()
self._twins_info = {}
def _refresh_token(self, duration: int = 60):
print("Refreshing token")
print("---")
token = self._api.create_agent_auth_token(
agent_registered_identity=self._agent_registered_id,
user_did=self._user_registered_id.did,
duration=duration,
)
self._headers["Authorization"] = f"Bearer {token}"
def _get_new_did(self, twin_key_name: str):
twin_registered_id = self._api.create_twin_with_control_delegation(
twin_seed=AGENT_SEED,
twin_key_name=twin_key_name,
agent_registered_identity=self._agent_registered_id,
delegation_name="#ControlDeleg",
)
return twin_registered_id.did
def _make_api_call(
self, method: str, url: str, json: dict = None, retry: bool = True
):
try:
response = request(method=method, url=url, headers=self._headers, json=json)
response.raise_for_status()
except Exception as ex:
if retry:
self._refresh_token()
return self._make_api_call(method, url, json, retry=False)
else:
print("Retried once. Still getting error", ex)
return response.json()
def _create_new_twin(
self,
twin_key_name: str,
properties: List[str] = None,
feeds: List[str] = None,
visibility: str = "PRIVATE",
location: dict = None,
):
twin_did = self._get_new_did(twin_key_name)
self._upsert_twin(
twin_id=twin_did,
properties=properties,
feeds=feeds,
visibility=visibility,
location=location,
)
return twin_did
def _describe_local_twin(self, twin_id: str):
twin_description = self._make_api_call(
method="GET", url=f"{HOST}/qapi/twins/{twin_id}"
)
return twin_description
def _upsert_twin(
self,
twin_id: str,
visibility: str = "PRIVATE",
feeds: List[dict] = None,
location: dict = None,
properties: List[dict] = None,
):
payload = {"twinId": twin_id, "visibility": visibility}
if location:
payload["location"] = location
if feeds:
payload["feeds"] = feeds
if properties:
payload["properties"] = properties
self._make_api_call(method="PUT", url=f"{HOST}/qapi/twins", json=payload)
print(f"Twin {twin_id} created succesfully with Metadata, Feeds and Values")
print("---")
def create_twin_model(self):
print("Creating Twin Model")
# Build a list of Properties for the Twin Model
property_list = [
# Twin Model "special" property
{
"key": "http://www.w3.org/1999/02/22-rdf-syntax-ns#type",
"uriValue": {"value": "https://data.iotics.com/app#Model"},
},
# Add Label
{
"key": "http://www.w3.org/2000/01/rdf-schema#label",
"langLiteralValue": {
"value": "Temperature Sensor Follower Model",
"lang": "en",
},
},
# Add Comment
{
"key": "http://www.w3.org/2000/01/rdf-schema#comment",
"langLiteralValue": {
"value": "A temperature sensor that follows temperature data",
"lang": "en",
},
},
# Add colour
{
"key": "https://data.iotics.com/app#color",
"stringLiteralValue": {"value": "#9aceff"},
},
# Add Space name
{
"key": "https://data.iotics.com/app#spaceName",
"stringLiteralValue": {"value": "my-space"},
},
# Add CreatedAt property
{
"key": "https://data.iotics.com/app#createdAt",
"literalValue": {
"dataType": "dateTime",
"value": datetime.now(tz=timezone.utc).isoformat(),
},
},
# Add UpdatedAt property
{
"key": "https://data.iotics.com/app#updatedAt",
"literalValue": {
"dataType": "dateTime",
"value": datetime.now(tz=timezone.utc).isoformat(),
},
},
]
model_twin_did = self._create_new_twin(
twin_key_name="TwinFollowerModel", properties=property_list
)
return model_twin_did
def create_twins_from_model(self, twin_model_id: str):
twin_model = self._describe_local_twin(twin_id=twin_model_id)
# Build a list of properties for the generic Twin
property_list = [
# Add the generic twins' "special" properties
{
"key": "https://data.iotics.com/app#model",
"uriValue": {"value": twin_model_id},
},
{
"key": "https://data.iotics.com/app#createdFrom",
"uriValue": {"value": "https://data.iotics.com/app#ByModel"},
},
]
# Add to the list all the Twin Model's properties but consider the exceptions
for prop in twin_model["result"]["properties"]:
# This was the "special" property of the Twin Model, we don't need it here
if prop["key"] == "http://www.w3.org/1999/02/22-rdf-syntax-ns#type":
continue
# Don't add the Label property of the Twin Model.
# It'll be added later based on the number of the sensor
if prop["key"] == "http://www.w3.org/2000/01/rdf-schema#label":
continue
# Change the Created At value
if prop["key"] == "https://data.iotics.com/app#createdAt":
prop["literalValue"]["value"] = datetime.now(
tz=timezone.utc
).isoformat()
# Change the Updated At value
if prop["key"] == "https://data.iotics.com/app#updatedAt":
prop["literalValue"]["value"] = datetime.now(
tz=timezone.utc
).isoformat()
property_list.append(prop)
# Finally create the 10 Sensor Twin Followers
for sensor_number in range(1, 11):
twin_key_name = f"temp_sensor_follower_{sensor_number}"
twin_label = f"Temperature Sensor Follower {sensor_number}"
# Add the Label property according to the number of the Sensor
property_list_with_label = property_list.copy()
property_list_with_label.append(
{
"key": "http://www.w3.org/2000/01/rdf-schema#label",
"langLiteralValue": {"value": twin_label, "lang": "en"},
}
)
print(f"Creating Twin from Model - {twin_label}")
twin_from_model_did = self._create_new_twin(
twin_key_name=twin_key_name,
properties=property_list,
location={"lat": 51.5, "lon": -0.1},
)
# Add the Twin's info in the mapping dictionary
twin_info = {sensor_number: twin_from_model_did}
self._twins_info.update(twin_info)
def search_for_twin_publishers(self):
# Initialise an empty list.
# It will contain the list of Twins retrieved by the search
twins_list = []
# Add a new temporary field in the headers.
# Client request timeout is used to stop the request processing once the timeout is reached
headers = self._headers.copy()
headers.update(
{
"Iotics-RequestTimeout": (
datetime.now(tz=timezone.utc) + timedelta(seconds=10)
).isoformat(),
}
)
with request(
method="POST",
url=f"{HOST}/qapi/searches",
headers=headers,
stream=True,
verify=False,
params={"scope": "LOCAL"},
json={
"responseType": "FULL",
"filter": {
"properties": [
{
"key": "https://data.iotics.com/app#createdFrom",
"uriValue": {
"value": "https://data.iotics.com/app#ByModel"
},
},
],
"text": "shares",
},
},
) as resp:
# Raises HTTPError, if one occurred
resp.raise_for_status()
# Iterates over the response data, one line at a time
for chunk in resp.iter_lines():
response = json.loads(chunk)
twins_found = []
try:
twins_found = response["result"]["payload"]["twins"]
except (KeyError, IndexError):
continue
finally:
if twins_found:
# Append the twins found to the list of twins
twins_list.extend(twins_found)
print(f"Found {len(twins_list)} twin(s)")
return twins_list
def follow_data(self, twins_list: List[dict]):
for twin in twins_list:
# Get the sensor number from the Twin's label last string
twin_publisher_label = self._get_twin_label(twin["properties"])
sensor_number = twin_publisher_label.split(" ")[-1]
# Check if the current Twin is the Twin Model.
# In such case we don't want to follow it.
if not sensor_number.isdigit():
continue
twin_publisher_id = twin["id"]["value"]
twin_follower_id = self._twins_info.get(int(sensor_number))
# We are supposed to know in advance the Feed ID we want our Twin Follower to follow
twin_publisher_feed_id = "currentTemp"
# Get the latest shared value
encoded_data = self._make_api_call(
method="GET",
url=f"{HOST}/qapi/twins/{twin_follower_id}/interests/twins/{twin_publisher_id}/feeds/{twin_publisher_feed_id}/samples/last",
)
datetime = encoded_data["feedData"]["occurredAt"]
data = encoded_data["feedData"]["data"]
feed_data = json.loads(base64.b64decode(data).decode("ascii"))
print(
f"Last shared data of Twin {twin_publisher_label} occurred at utc time {datetime}: {feed_data}"
)
# Store data externally
self._export_data.export(
datetime=datetime,
received_from=twin_publisher_label,
data=feed_data.get("reading"),
)
def _get_twin_label(self, property_list: List[dict]):
for prop in property_list:
if prop["key"] == "http://www.w3.org/2000/01/rdf-schema#label":
return prop["langLiteralValue"]["value"]
return None
def main():
# Replace the 'export_data_class' class with 'CSV' to store the data into a CSV file
follower_connector = FollowerConnectorExport(export_data_class=SQLite())
follower_connector.setup()
model_twin_did = follower_connector.create_twin_model()
follower_connector.create_twins_from_model(twin_model_id=model_twin_did)
twins_list = follower_connector.search_for_twin_publishers()
while True:
follower_connector.follow_data(twins_list)
sleep(10)
print("---")
if __name__ == "__main__":
main()
from sqlalchemy import Column, Float, Integer, String, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
DB_NAME = "sqlite:///export_data/sqlite/temperatures.db"
Base = declarative_base()
class SensorReading(Base):
__tablename__ = "SensorReading"
id = Column(Integer, primary_key=True)
timestamp = Column(String(100))
sensor_number = Column(Integer)
sensor_reading = Column(Float)
class SQLite:
def __init__(self, echo=False):
engine = create_engine(DB_NAME, echo=echo)
Base.metadata.create_all(bind=engine)
Session = sessionmaker(engine)
self._session = Session()
def _store(self, item: SensorReading):
with self._session as session:
session.add(item)
session.commit()
print("Item stored correctly")
def export(self, datetime: str, received_from: str, data: dict):
sensor_reading_obj = SensorReading(
timestamp=datetime, sensor_number=received_from, sensor_reading=data
)
self._store(sensor_reading_obj)
import csv
from collections import namedtuple
CSV_FILE = "./export_data/csv/temperatures.csv"
CSV_FIELDS = ["timestamp", "sensor_number", "sensor_reading"]
SensorReading = namedtuple("SensorReading", CSV_FIELDS)
class CSV:
def __init__(self, n_items=10):
self._items_list = []
self._n_items = n_items
with open(CSV_FILE, "w") as csv_file:
writer = csv.writer(csv_file)
# Write header
writer.writerow(CSV_FIELDS)
def _store(self):
with open(CSV_FILE, "a") as csv_file:
writer = csv.writer(csv_file)
for item in self._items_list:
writer.writerow(item)
print("Item stored correctly")
self._items_list.clear()
def export(self, datetime: str, received_from: str, data: dict):
new_item = SensorReading(
timestamp=datetime, sensor_number=received_from, sensor_reading=data
)
self._items_list.append(new_item)
if len(self._items_list) >= self._n_items:
self._store()
Synthesiser Connector
A Synthesiser Connector, or simply Synthesiser, is an application that continuously allows Follower Twins to receive, transform and send the data back into the Space.
In other words, a Synthesiser creates a set of Twins that
1 continuously follow Publisher Twins' feeds;
2. "synthesise" or interoperate the data;
3. publish the new data back as a feed.
How to create a Synthesiser Connector with the IOTICS API
A Synthesiser application creates a set of Follower Twins with one or more Feeds so that they both receive data and share the new data.
The following snippet provides an example of a Synthesiser that:
- Creates 10 Synthesiser Twins with a single Feed;
- Follows the Publisher Connector Twins' feed with temperature readings;
- Computes the average temperature for each sensor Twin;
- Shares the average data as a feed.
Click to see the code
Instructions:
- Run the
publisher_connector.py
in a terminal; - Run the
synthesiser_connector.py
code below in another terminal.
1. Get a fresh token to be used in the requests' headers;
2. Create a Twin Model;
2.1 Build list of Properties;
2.2 Build list of Feeds;
2.3 Get a new Twin DID;
2.4 Use the Upsert Twin API;
3. Create Twins from Model;
3.1 Use the Describe Local Twin API to get the Twin Model's metadata;
3.2 Build list of Properties;
3.3 Use the Describe Feed API to get the the Twin Model's Feed's metadata;
3.4 Build list of Feeds;
For each Twin to follow:
3.5 Get a new Twin DID;
3.6 Use the Upsert Twin API;
Every 10 seconds:
4. Get the Twin Publishers' last shared data;
5. Compute the average temperature;
6. Share data;
import base64
import json
from datetime import datetime, timezone, timedelta
from time import sleep
from typing import List
from iotics.lib.identity.api.high_level_api import get_rest_high_level_identity_api
from requests import request
RESOLVER_URL = "resolver_url"
HOST = "host_url"
USER_KEY_NAME = "user_key_name"
AGENT_KEY_NAME = "agent_key_name"
USER_SEED = bytes.fromhex("user_seed")
AGENT_SEED = bytes.fromhex("agent_seed")
class SynthesiserConnector:
def __init__(self):
self._api = None
self._user_registered_id = None
self._agent_registered_id = None
self._headers = None
self._twins_info = None
def setup(self):
self._api = get_rest_high_level_identity_api(resolver_url=RESOLVER_URL)
(
self._user_registered_id,
self._agent_registered_id,
) = self._api.create_user_and_agent_with_auth_delegation(
user_seed=USER_SEED,
user_key_name=USER_KEY_NAME,
agent_seed=AGENT_SEED,
agent_key_name=AGENT_KEY_NAME,
delegation_name="#AuthDeleg",
)
self._headers = {
"accept": "application/json",
"Iotics-ClientAppId": "synthesiser_connector",
"Content-Type": "application/json",
}
self._refresh_token()
self._twins_info = {}
def _refresh_token(self, duration: int = 60):
print("Refreshing token")
print("---")
token = self._api.create_agent_auth_token(
agent_registered_identity=self._agent_registered_id,
user_did=self._user_registered_id.did,
duration=duration,
)
self._headers["Authorization"] = f"Bearer {token}"
def _get_new_did(self, twin_key_name: str):
twin_registered_id = self._api.create_twin_with_control_delegation(
twin_seed=AGENT_SEED,
twin_key_name=twin_key_name,
agent_registered_identity=self._agent_registered_id,
delegation_name="#ControlDeleg",
)
return twin_registered_id.did
def _make_api_call(
self, method: str, url: str, json: dict = None, retry: bool = True
):
try:
response = request(method=method, url=url, headers=self._headers, json=json)
response.raise_for_status()
except Exception as ex:
if retry:
self._refresh_token()
return self._make_api_call(method, url, json, retry=False)
else:
print("Retried once. Still getting error", ex)
return response.json()
def _create_new_twin(
self,
twin_key_name: str,
properties: List[str] = None,
feeds: List[str] = None,
visibility: str = "PRIVATE",
location: dict = None,
):
twin_did = self._get_new_did(twin_key_name)
self._upsert_twin(
twin_id=twin_did,
properties=properties,
feeds=feeds,
visibility=visibility,
location=location,
)
return twin_did
def _describe_feed(self, twin_id: str, feed_id: str):
feed_description = self._make_api_call(
method="GET",
url=f"{HOST}/qapi/twins/{twin_id}/feeds/{feed_id}",
)
return feed_description
def _describe_local_twin(self, twin_id: str):
twin_description = self._make_api_call(
method="GET", url=f"{HOST}/qapi/twins/{twin_id}"
)
return twin_description
def _upsert_twin(
self,
twin_id: str,
visibility: str = "PRIVATE",
feeds: List[dict] = None,
location: dict = None,
properties: List[dict] = None,
):
payload = {"twinId": twin_id, "visibility": visibility}
if location:
payload["location"] = location
if feeds:
payload["feeds"] = feeds
if properties:
payload["properties"] = properties
self._make_api_call(method="PUT", url=f"{HOST}/qapi/twins", json=payload)
print(f"Twin {twin_id} created succesfully with Metadata, Feeds and Values")
print("---")
def _share_data(self, twin_label: str, twin_id: str, feed_id: str, value):
data_to_share = {"average": value}
encoded_data = base64.b64encode(json.dumps(data_to_share).encode()).decode()
data_to_share_payload = {
"sample": {
"data": encoded_data,
"mime": "application/json",
"occurredAt": datetime.now(tz=timezone.utc).isoformat(
timespec="seconds"
),
}
}
self._make_api_call(
method="POST",
url=f"{HOST}/qapi/twins/{twin_id}/feeds/{feed_id}/shares",
json=data_to_share_payload,
)
print(f"Shared {data_to_share} from Twin {twin_label}")
def create_twin_model(self):
print("Creating Twin Model")
# Build a list of Properties for the Twin Model
property_list = [
# Twin Model "special" property
{
"key": "http://www.w3.org/1999/02/22-rdf-syntax-ns#type",
"uriValue": {"value": "https://data.iotics.com/app#Model"},
},
# Add Label
{
"key": "http://www.w3.org/2000/01/rdf-schema#label",
"langLiteralValue": {
"value": "Temperature Sensor Synthesiser Model",
"lang": "en",
},
},
# Add Comment
{
"key": "http://www.w3.org/2000/01/rdf-schema#comment",
"langLiteralValue": {
"value": "A temperature sensor that follows temperature data",
"lang": "en",
},
},
# Add colour
{
"key": "https://data.iotics.com/app#color",
"stringLiteralValue": {"value": "#9aceff"},
},
# Add Space name
{
"key": "https://data.iotics.com/app#spaceName",
"stringLiteralValue": {"value": "my-space"},
},
# Add CreatedAt property
{
"key": "https://data.iotics.com/app#createdAt",
"literalValue": {
"dataType": "dateTime",
"value": datetime.now(tz=timezone.utc).isoformat(),
},
},
# Add UpdatedAt property
{
"key": "https://data.iotics.com/app#updatedAt",
"literalValue": {
"dataType": "dateTime",
"value": datetime.now(tz=timezone.utc).isoformat(),
},
},
]
# Build a list of Feeds for the Twin Model
feed_list = [
# Add a single Feed called "currentTemp"
{
"id": "averageTemp",
"storeLast": True, # We want to store the last value sent
"properties": [
# Add a Label for the Feed
{
"key": "http://www.w3.org/2000/01/rdf-schema#label",
"langLiteralValue": {
"value": "Average Temperature",
"lang": "en",
},
}
],
# Add a single Value called "reading"
"values": [
{
"comment": "Temperature in degrees Celsius",
"dataType": "decimal",
"label": "average",
"unit": "http://purl.obolibrary.org/obo/UO_0000027",
}
],
},
]
model_twin_did = self._create_new_twin(
twin_key_name="TwinSynthesiserModel",
properties=property_list,
feeds=feed_list,
)
return model_twin_did
def create_twins_from_model(self, twin_model_id: str):
twin_model = self._describe_local_twin(twin_id=twin_model_id)
# Build a list of properties for the generic Twin
property_list = [
# Add the generic twins' "special" properties
{
"key": "https://data.iotics.com/app#model",
"uriValue": {"value": twin_model_id},
},
{
"key": "https://data.iotics.com/app#createdFrom",
"uriValue": {"value": "https://data.iotics.com/app#ByModel"},
},
]
# Add to the list all the Twin Model's properties but consider the exceptions
for prop in twin_model["result"]["properties"]:
# This was the "special" property of the Twin Model, we don't need it here
if prop["key"] == "http://www.w3.org/1999/02/22-rdf-syntax-ns#type":
continue
# Don't add the Label property of the Twin Model.
# It'll be added later based on the number of the sensor
if prop["key"] == "http://www.w3.org/2000/01/rdf-schema#label":
continue
# Change the Created At value
if prop["key"] == "https://data.iotics.com/app#createdAt":
prop["literalValue"]["value"] = datetime.now(
tz=timezone.utc
).isoformat()
# Change the Updated At value
if prop["key"] == "https://data.iotics.com/app#updatedAt":
prop["literalValue"]["value"] = datetime.now(
tz=timezone.utc
).isoformat()
property_list.append(prop)
# Build a list of Feeds for the Twin Model
feed_list = []
# Scan the Twin Model's feeds list to get the Twin Model's Feed IDs.
# Then use the Feed Describe API to get the Feeds' metadata
for feed in twin_model["result"]["feeds"]:
feed_id = feed["feedId"]["value"]
# Use the Feed Describe API
feed_description = self._describe_feed(
twin_id=twin_model_id, feed_id=feed_id
)
feed_properties = feed_description["result"]["properties"]
feed_values = feed_description["result"]["values"]
store_last = feed_description["result"]["storeLast"]
# Append to the list all the Feed's metadata
feed_list.append(
{
"id": feed_id,
"properties": feed_properties,
"values": feed_values,
"storeLast": store_last,
}
)
# Finally create the 10 Twin Synthesisers
for sensor_number in range(1, 11):
twin_key_name = f"avg_temp_sensor_{sensor_number}"
twin_label = f"Temperature Sensor Synthesiser {sensor_number}"
# Add the Label property according to the number of the Sensor
property_list_with_label = property_list.copy()
property_list_with_label.append(
{
"key": "http://www.w3.org/2000/01/rdf-schema#label",
"langLiteralValue": {"value": twin_label, "lang": "en"},
}
)
print(f"Creating Twin from Model - {twin_label}")
twin_from_model_did = self._create_new_twin(
twin_key_name=twin_key_name,
properties=property_list,
feeds=feed_list,
location={"lat": 51.5, "lon": -0.1},
)
# Add the Twin's info in the mapping dictionary
twin_info = {
sensor_number: {
"twin_label": twin_label,
"twin_id": twin_from_model_did,
"feed_id": feed_id,
"previous_mean": 0,
"sample_count": 1,
}
}
self._twins_info.update(twin_info)
def search_for_twin_publishers(self):
# Initialise an empty list.
# It will contain the list of Twins retrieved by the search
twins_list = []
# Add a new temporary field in the headers.
# Client request timeout is used to stop the request processing once the timeout is reached
headers = self._headers.copy()
headers.update(
{
"Iotics-RequestTimeout": (
datetime.now(tz=timezone.utc) + timedelta(seconds=10)
).isoformat(),
}
)
with request(
method="POST",
url=f"{HOST}/qapi/searches",
headers=headers,
stream=True,
verify=False,
params={"scope": "LOCAL"},
json={
"responseType": "FULL",
"filter": {
"properties": [
{
"key": "https://data.iotics.com/app#createdFrom",
"uriValue": {
"value": "https://data.iotics.com/app#ByModel"
},
},
],
"text": "shares",
},
},
) as resp:
# Raises HTTPError, if one occurred
resp.raise_for_status()
# Iterates over the response data, one line at a time
for chunk in resp.iter_lines():
response = json.loads(chunk)
twins_found = []
try:
twins_found = response["result"]["payload"]["twins"]
except (KeyError, IndexError):
continue
finally:
if twins_found:
# Append the twins found to the list of twins
twins_list.extend(twins_found)
print(f"Found {len(twins_list)} twin(s)")
return twins_list
def synthesise_data(self, twins_list: List[dict]):
for twin in twins_list:
# Get the sensor number from the Twin's label last string
twin_publisher_label = self._get_twin_label(twin["properties"])
sensor_number = twin_publisher_label.split(" ")[-1]
# Check if the current Twin is the Twin Model.
# In such case we don't want to follow it.
if not sensor_number.isdigit():
continue
twin_publisher_id = twin["id"]["value"]
twin_info = self._twins_info.get(int(sensor_number))
synthesiser_twin_id = twin_info["twin_id"]
synthesiser_twin_previous_mean = twin_info["previous_mean"]
synthesiser_twin_sample_count = twin_info["sample_count"]
# We are supposed to know in advance the Feed ID we want our Twin Synthesiser to follow
twin_publisher_feed_id = "currentTemp"
# Get the latest shared value
encoded_data = self._make_api_call(
method="GET",
url=f"{HOST}/qapi/twins/{synthesiser_twin_id}/interests/twins/{twin_publisher_id}/feeds/{twin_publisher_feed_id}/samples/last",
)
time = encoded_data["feedData"]["occurredAt"]
data = encoded_data["feedData"]["data"]
feed_data = json.loads(base64.b64decode(data).decode("ascii"))
print(
f"Last shared data of Twin {twin_publisher_label} occurred at utc time {time}: {feed_data}"
)
new_sample = feed_data["reading"]
new_mean = (
(synthesiser_twin_sample_count - 1) * synthesiser_twin_previous_mean
+ new_sample
) / synthesiser_twin_sample_count
# Publish the new data
self.publish_data(twin_info=twin_info, data=new_mean)
# Update the average
self._twins_info[int(sensor_number)]["previous_mean"] = new_mean
self._twins_info[int(sensor_number)]["sample_count"] += 1
def publish_data(self, twin_info: dict, data: dict):
synthesiser_twin_id = twin_info["twin_id"]
synthesiser_feed_id = twin_info["feed_id"]
synthesiser_twin_label = twin_info["twin_label"]
self._share_data(
twin_label=synthesiser_twin_label,
twin_id=synthesiser_twin_id,
feed_id=synthesiser_feed_id,
value=data,
)
def _get_twin_label(self, property_list: List[dict]):
for prop in property_list:
if prop["key"] == "http://www.w3.org/2000/01/rdf-schema#label":
return prop["langLiteralValue"]["value"]
return None
def main():
synthesiser_connector = SynthesiserConnector()
synthesiser_connector.setup()
model_twin_did = synthesiser_connector.create_twin_model()
synthesiser_connector.create_twins_from_model(twin_model_id=model_twin_did)
twins_list = synthesiser_connector.search_for_twin_publishers()
while True:
synthesiser_connector.synthesise_data(twins_list)
sleep(10)
print("---")
if __name__ == "__main__":
main()
Updated over 2 years ago