Share Data & Follow Feeds
IOTICS is all about sharing data in real-time or as and when data becomes available. Digital Twins publish data through one or more data feeds, and they can at the same time follow one or more data feeds from one or more other Digital Twins to retrieve their data as well.
This page explains how a Digital Twin publishes (shares) data, as well as follows (receives) data from another Digital Twin.
- Introduction to Sharing Data
- Digital Twin types
- Historical data in IOTICS
- How to share data through a Digital Twin's data feed
- How to follow a Digital Twin's data feed with the IOTICS API
- Tutorial of the Share Data & Follow Feeds section
Introduction to Sharing Data
In IOTICS, data sharing only happens through and between Digital Twins. These connections are called "brokered interactions" or "twin-to-twin interactions". A Digital Twin can publish data through one or more data feeds, as well as follow one or more data feeds from one or more other Digital Twins.
In essence, IOTICS data ecosystems and networks are created by Digital Twins exchanging data with one another.
Digital Twin types
Depending on whether a Digital Twin is publishing and/or following data, we distinguish between the following types:
- Publisher: a Digital Twin that publishes data via one or more data feeds;
- Follower: a Digital Twin that follows one or more data feeds from one or more Digital Twins;
- Synthesiser: a Digital Twin that is both a Publisher and a Follower, and concurrently publishes and follows one or more data feeds.
Historical data in IOTICS
It is important to note that IOTICS is all about streaming data. We don't store historical data. Only a Digital Twin's metadata properties are stored in order to enable search.
This implies that, by default, all data shared through a data feed will be lost unless another Digital Twin follows, uses or stores it. Only the latest shared Value of a Digital Twin Publisher's Feed can be retrieved if - and only if - the Digital Twin Publisher's Feed has the storeLast
parameter set to True
.
How to share data through a Digital Twin's data feed
Consider the following points when creating a Digital Twin Publisher:
- A Digital Twin can only publish data through a data feed. Therefore, a Feed and a Value must first be created for that Twin;
- The data to be shared needs to be encoded as base64;
- The data must always include a dictionary made up of the Value Label(s) as key(s) and the actual data as value(s);
- Consider building a code application ("Publisher Connector") to continuously retrieve data from outside IOTICS and share it via the Twin's Feed.
Click here to see the prerequisites.
1. Create Twin with a Feed and a Value
2. Encode the data to share into a Base64 format
3. Create the json payload made up of the encoded data, mime type and timestamp
4. Use the Share Data API call
import base64
import json
from requests import request
data_to_share = {"reading": 25.0}
encoded_data = base64.b64encode(json.dumps(data_to_share).encode()).decode()
data_to_share_payload = {
"sample": {
"data": encoded_data,
"mime": "application/json"
}
}
response = request(
method="POST",
url=f"{HOST}/qapi/hosts/{host_id}/twins/{twin_did}/feeds/{feed_id}/shares",
headers=headers, # It includes the token
json=data_to_share_payload,
)
response.raise_for_status()
How to follow a Digital Twin's data feed with the IOTICS API
Consider the following points when creating a Digital Twin Follower:
- Check whether you're allowed to access (= follow) data from a specific Digital Twin, as your access may have been restricted. For more details on how to update access permissions go to Selective Sharing for Metadata and Data. As a rule of thumb:
- Data feeds from your own local Twins (e.g. Twins that are in the same IOTICSpace as the Twin Publisher) can always be followed,
- Data feeds from remote Twins (e.g. Twins in a different IOTICSpace than the Twin Publisher) can only be followed if the access permissions have been enabled;
- Data from a Feed needs to be decoded from base64;
- Consider building a code application ("Follower connector") to continuously follow data from a Twin's Feed and executes an action outside IOTICS (e.g. stores the data into a database or activates a physical sensor).
Click here to see the prerequisites.
1. Create Twin
2. Get Twin DID and Feed ID of the Twin's Feed to follow
3. Call the Follow Feed API
4. Decode the data received from Base64 format
import base64
import json
from requests import request
response = request(
method="GET",
url=f"{HOST}/qapi/hosts/{follower_host_id}/twins/{follower_twin_did}/interests/hosts/{followed_host_id}/twins/{followed_twin_did}/feeds/{feed_id}/samples/last",
headers=headers,
)
response.raise_for_status()
time = response["feedData"]["occurredAt"]
data = response["feedData"]["data"]
feed_data = json.loads(base64.b64decode(data).decode("ascii"))
print(f"New message occurred at {time}: {feed_data}")
Tutorial of the Share Data and Follow Feeds section
Click to see the entire code on how to locally share data from a Twin Publisher and get the data from a Twin Follower
Instructions:
- Download this library on your local machine;
- Install the above library in your Python venv:
pip install iotic.web.stomp-1.0.6.tar.gz
; - Run the
share_data_tutorial.py
; - Run the
follow_feed_tutorial.py
and in the same IOTICSpace as theshare_data_tutorial.py
; - Look at the data received on your terminal.
import base64
import json
from typing import List
from iotics.lib.identity.api.high_level_api import get_rest_high_level_identity_api
from requests import request
RESOLVER_URL = "resolver_url"
HOST = "host_url"
USER_KEY_NAME = "user_key_name"
AGENT_KEY_NAME = "agent_key_name"
USER_SEED = bytes.fromhex("user_seed")
AGENT_SEED = bytes.fromhex("agent_seed")
class IoticsRest:
def __init__(self):
self._high_level_api = get_rest_high_level_identity_api(
resolver_url=RESOLVER_URL
)
(
self._user_registered_id,
self._agent_registered_id,
) = self._high_level_api.create_user_and_agent_with_auth_delegation(
user_seed=USER_SEED,
user_key_name=USER_KEY_NAME,
agent_seed=AGENT_SEED,
agent_key_name=AGENT_KEY_NAME,
delegation_name="#AuthDeleg",
)
token = self._high_level_api.create_agent_auth_token(
agent_registered_identity=self._agent_registered_id,
user_did=self._user_registered_id.did,
duration=10,
)
self._headers = {
"accept": "application/json",
"Iotics-ClientAppId": "example_code",
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
def _make_api_call(self, method: str, url: str, json: dict = None):
response = request(method=method, url=url, headers=self._headers, json=json)
response.raise_for_status()
return response.json()
def create_twin_identity(self, twin_key_name: str):
twin_registered_id = self._high_level_api.create_twin_with_control_delegation(
twin_seed=AGENT_SEED,
twin_key_name=twin_key_name,
agent_registered_identity=self._agent_registered_id,
delegation_name="#ControlDeleg",
)
return twin_registered_id.did
def get_host_id(self):
host_id = self._make_api_call(method="GET", url=f"{HOST}/qapi/host/id")
return host_id["hostId"]
def upsert_twin(
self,
twin_did: str,
host_id: str,
feeds: List[dict] = None,
inputs: List[dict] = None,
location: dict = None,
properties: List[dict] = None,
):
payload = {"twinId": {"hostId": host_id, "id": twin_did}}
if location:
payload["location"] = location
if feeds:
payload["feeds"] = feeds
if inputs:
payload["inputs"] = inputs
if properties:
payload["properties"] = properties
self._make_api_call(method="PUT", url=f"{HOST}/qapi/twins", json=payload)
def share_data(
self, twin_did: str, host_id: str, feed_id: str, data_to_share: dict
):
encoded_data = base64.b64encode(json.dumps(data_to_share).encode()).decode()
data_to_share_payload = {
"sample": {"data": encoded_data, "mime": "application/json"}
}
self._make_api_call(
method="POST",
url=f"{HOST}/qapi/hosts/{host_id}/twins/{twin_did}/feeds/{feed_id}/shares",
json=data_to_share_payload,
)
def main():
iotics_rest = IoticsRest()
twin_did = iotics_rest.create_twin_identity(twin_key_name="TwinPublisher")
host_id = iotics_rest.get_host_id()
iotics_rest.upsert_twin(
twin_did=twin_did,
host_id=host_id,
location={"lat": 51.5, "lon": -0.1},
properties=[
# Label
{
"key": "http://www.w3.org/2000/01/rdf-schema#label",
"langLiteralValue": {"value": "Twin Publisher", "lang": "en"},
},
# Comment
{
"key": "http://www.w3.org/2000/01/rdf-schema#comment",
"langLiteralValue": {
"value": "An example of a Twin Publisher",
"lang": "en",
},
},
],
feeds=[
{
"id": "temperature",
"storeLast": True,
"properties": [
# Add a Label
{
"key": "http://www.w3.org/2000/01/rdf-schema#label",
"langLiteralValue": {
"value": "Current Temperature",
"lang": "en",
},
},
# Add a comment
{
"key": "http://www.w3.org/2000/01/rdf-schema#comment",
"langLiteralValue": {
"value": "The current temperature reading",
"lang": "en",
},
},
],
"values": [
{
"comment": "Temperature in degrees Celsius",
"dataType": "decimal",
"label": "reading",
"unit": "http://qudt.org/vocab/unit/DEG_C",
}
],
}
],
)
iotics_rest.share_data(
twin_did=twin_did,
host_id=host_id,
feed_id="temperature",
data_to_share={"reading": 25.5},
)
if __name__ == "__main__":
main()
import base64
import json
from datetime import datetime, timedelta, timezone
from typing import Callable, List, Tuple
import stomp
from iotic.web.stomp.client import StompWSConnection12
from iotics.lib.identity.api.high_level_api import get_rest_high_level_identity_api
from requests import request
RESOLVER_URL = "resolver_url"
HOST = "host_url"
USER_KEY_NAME = "user_key_name"
AGENT_KEY_NAME = "agent_key_name"
USER_SEED = bytes.fromhex("user_seed")
AGENT_SEED = bytes.fromhex("agent_seed")
STOMP_ENDPOINT = "stomp_endpoint"
class StompClient:
def __init__(
self,
endpoint: str,
callback: Callable,
heartbeats: Tuple[int, int] = (10000, 10000),
):
self._endpoint = endpoint
self._stomp_client = None
self._heartbeats = heartbeats
self._callback = callback
def setup(self, token: str):
self._stomp_client = StompWSConnection12(
endpoint=self._endpoint, heartbeats=self._heartbeats
)
self._stomp_client.set_ssl(verify=True)
self._stomp_client.set_listener(
name="stomp_listener",
lstnr=StompListener(
stomp_client=self._stomp_client, callback=self._callback
),
)
self._stomp_client.connect(wait=True, passcode=token)
def subscribe(self, destination, subscription_id, headers):
self._stomp_client.subscribe(
destination=destination, id=subscription_id, headers=headers
)
def disconnect(self):
self._stomp_client.disconnect()
class StompListener(stomp.ConnectionListener):
def __init__(self, stomp_client, callback):
self._stomp_client = stomp_client
self._callback = callback
def on_error(self, headers, body):
print(f"Received an error {body}")
def on_message(self, headers, body):
self._callback(headers, body)
def on_disconnected(self):
self._stomp_client.disconnect()
print("Disconnected")
class IoticsRest:
def __init__(self):
self._high_level_api = get_rest_high_level_identity_api(
resolver_url=RESOLVER_URL
)
(
self._user_registered_id,
self._agent_registered_id,
) = self._high_level_api.create_user_and_agent_with_auth_delegation(
user_seed=USER_SEED,
user_key_name=USER_KEY_NAME,
agent_seed=AGENT_SEED,
agent_key_name=AGENT_KEY_NAME,
delegation_name="#AuthDeleg",
)
token = self._high_level_api.create_agent_auth_token(
agent_registered_identity=self._agent_registered_id,
user_did=self._user_registered_id.did,
duration=600,
)
print(f"Token will expire at {datetime.now() + timedelta(seconds=600)}")
self._headers = {
"accept": "application/json",
"Iotics-ClientAppId": "example_code",
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
self._stomp_client = StompClient(
endpoint=STOMP_ENDPOINT, callback=self.follow_callback
)
self._stomp_client.setup(token=token)
def _make_api_call(self, method: str, url: str, json: dict = None):
response = request(method=method, url=url, headers=self._headers, json=json)
response.raise_for_status()
return response.json()
def create_twin_identity(self, twin_key_name: str):
twin_registered_id = self._high_level_api.create_twin_with_control_delegation(
twin_seed=AGENT_SEED,
twin_key_name=twin_key_name,
agent_registered_identity=self._agent_registered_id,
delegation_name="#ControlDeleg",
)
return twin_registered_id.did
def get_host_id(self):
host_id = self._make_api_call(method="GET", url=f"{HOST}/qapi/host/id")
return host_id["hostId"]
def upsert_twin(
self,
twin_did: str,
host_id: str,
feeds: List[dict] = None,
inputs: List[dict] = None,
location: dict = None,
properties: List[dict] = None,
):
payload = {"twinId": {"hostId": host_id, "id": twin_did}}
if location:
payload["location"] = location
if feeds:
payload["feeds"] = feeds
if inputs:
payload["inputs"] = inputs
if properties:
payload["properties"] = properties
self._make_api_call(method="PUT", url=f"{HOST}/qapi/twins", json=payload)
def search_twins(
self,
text: str = None,
location: dict = None,
properties: List[dict] = None,
scope: str = "LOCAL",
response_type: str = "FULL",
):
twins_list = []
search_headers = self._headers.copy()
# Search headers require a new header "Iotics-RequestTimeout".
# The latter is used to stop the request once the timeout is reached
search_headers.update(
{
"Iotics-RequestTimeout": (
datetime.now(tz=timezone.utc) + timedelta(seconds=5)
).isoformat(),
}
)
payload = {"responseType": response_type, "filter": {}}
if text:
payload["filter"]["text"] = text
if properties:
payload["filter"]["properties"] = properties
if location:
payload["filter"]["location"] = location
with request(
method="POST",
url=f"{HOST}/qapi/searches",
headers=search_headers,
stream=True,
verify=True,
params={"scope": scope},
json=payload,
) as resp:
resp.raise_for_status()
# Iterates over the response data, one Host at a time
for chunk in resp.iter_lines():
response = json.loads(chunk)
twins_found = []
try:
twins_found = response["result"]["payload"]["twins"]
except KeyError:
continue
finally:
if twins_found:
# Append the twins found to the list of twins
twins_list.extend(twins_found)
print(f"Found {len(twins_list)} twin(s)")
return twins_list
def subscribe_to_feed(
self,
twin_follower_did: str,
twin_follower_host_id: str,
twin_publisher_did: str,
twin_publisher_host_id: str,
feed_id: str,
):
endpoint = f"/qapi/hosts/{twin_follower_host_id}/twins/{twin_follower_did}/interests/hosts/{twin_publisher_host_id}/twins/{twin_publisher_did}/feeds/{feed_id}"
self._stomp_client.subscribe(
destination=endpoint,
subscription_id=f"{twin_publisher_did}-{feed_id}",
headers=self._headers,
)
@staticmethod
def follow_callback(headers, body):
encoded_data = json.loads(body)
twin_publisher_did = encoded_data["interest"]["followedFeedId"]["twinId"]
try:
time = encoded_data["feedData"]["occurredAt"]
data = encoded_data["feedData"]["data"]
except KeyError:
print("NO DATA")
else:
decoded_feed_data = json.loads(base64.b64decode(data).decode("ascii"))
print(
f"LAST MESSAGE SHARED FROM TWIN {twin_publisher_did} AT {time}: {decoded_feed_data}"
)
def main():
iotics_rest = IoticsRest()
twin_did = iotics_rest.create_twin_identity(twin_key_name="TwinFollower")
host_id = iotics_rest.get_host_id()
iotics_rest.upsert_twin(
twin_did=twin_did,
host_id=host_id,
properties=[
# Label
{
"key": "http://www.w3.org/2000/01/rdf-schema#label",
"langLiteralValue": {"value": "Twin Follower", "lang": "en"},
},
# Comment
{
"key": "http://www.w3.org/2000/01/rdf-schema#comment",
"langLiteralValue": {
"value": "An example of a Twin Follower",
"lang": "en",
},
},
],
)
twins_list = iotics_rest.search_twins(
properties=[
{
"key": "http://www.w3.org/2000/01/rdf-schema#label",
"langLiteralValue": {"value": "Twin Publisher", "lang": "en"},
}
]
)
# In this example it is assumed the Twin Publisher to use is the first one in the list
twin_publisher = next(iter(twins_list))
twin_publisher_did = twin_publisher["twinId"]["id"]
twin_publisher_host_id = twin_publisher["twinId"]["hostId"]
twin_publisher_feed_id = twin_publisher["feeds"][0]["feedId"]["id"]
iotics_rest.subscribe_to_feed(
twin_follower_did=twin_did,
twin_follower_host_id=host_id,
twin_publisher_did=twin_publisher_did,
twin_publisher_host_id=twin_publisher_host_id,
feed_id=twin_publisher_feed_id,
)
while True:
sleep(10)
if __name__ == "__main__":
main()
Updated 2 months ago