Archive - Tutorial 2 - Share and Subscribe to a feed
Opps
This page has been deprecated, if you have found your way here please head back to the Welcome page!
Now that you've created a twin in Tutorial 1, you'll want to be able to share and subscribe to feeds. This will introduce you to IOTICS' concept of symmetry and brokered interactions.
By the end of this tutorial you will be able to create a digital twin that’s able to find another digital twin and exchange data with it.
Make sure you have followed the instructions in the prerequisites page and completed Tutorial 1.
You'll need to have installed the IOTICS STOMP/WebSocket SDK from your local system then execute, in your REPL, the iotics_tutorial.py file from Tutorial 1:
exec(open("iotics_tutorial.py").read())
Step 1: Sharing data from a twin
Define a function that shares some data:
from random import randrange
from datetime import datetime
import json
import base64
import requests
def getHB(min, max):
return min + randrange(max - min)
def share(host: str, headers: dict, twin_id: str, feed_id: str):
message = json.dumps({"bpm": getHB(50, 90)})
message_b64 = base64.b64encode(message.encode('ascii')).decode('ascii')
payload = {
"sample": {
"data": message_b64,
"mime": "application/json;encoding=base64",
"occurredAt": datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S%zZ')
}
}
r = requests.post(f'{host}/qapi/twins/{twin_id}/feeds/{feed_id}/shares',
headers=headers, json=payload)
print(f'"shared {message}: result code={r.status_code}')
The functions can then be used as:
share(host=host.address, headers=iotics_headers(agent_identity, user_identity, host.address), twin_id=hb_twin_identity.id, feed_id='hb')
#output: "shared {"bpm": 74}: result code=202
share(host=host.address, headers=iotics_headers(agent_identity, user_identity, host.address), twin_id=hb_twin_identity.id, feed_id='hb')
#output: "shared {"bpm": 70}: result code=202
share(host=host.address, headers=iotics_headers(agent_identity, user_identity, host.address), twin_id=hb_twin_identity.id, feed_id='hb')
#output: "shared {"bpm": 58}: result code=202
Step 2: Create another twin
This new twin, named monitor twin, will find the HB device twin and manifest an interest in its data feed.
Let’s create the agent for this new twin, named agent2
. These steps are similar to those explained in the prerequisites page for the first agent. For simplicity we’ll run the new agent on behalf of the same user as the previous agent.
agent2_key=NamedECDSAKey(seed=seed, purpose="agent", name="hrate_mon_0")
agent2_identity=IdentifiableEntity(named_key=agent2_key, host=host)
#output: Identity did:iotics:iotGfE7ftmw8HF5M1ovad8Ttfqer8Akw82pZ not found
#output: registered doc for NamedKey[p=agent, id=did:iotics:iotGfE7ftmw8HF5M1ovad8Ttfqer8Akw82pZ, n=hrate_mon_0]
agent2_proof=agent2_identity.document_manager.new_proof(did=user_key.id)
agent2_issuer=agent2_identity.document_manager.issuer()
user_identity.document_manager.add_auth_delegation(proof=agent2_proof, authorizer_id=agent2_issuer, name="hrate_mon_0_deleg_0")
#output: registered doc for NamedKey[p=user, id=did:iotics:iotNT6FQyjKgveeFu82gTQDMijMcMktSvBMK, n=fred_at_gmail0]
#output: True
At this point the agent identity is created and delegation of authority assigned to it by user.
We can now monitor the twin:
hrate_monitor_twin_key = NamedECDSAKey(seed=seed, purpose="twin", name="hrate_monitor_001")
hrate_monitor_twin_identity=IdentifiableEntity(named_key=hrate_monitor_twin_key, host=host)
#output: Identity did:iotics:iotQCBjqtG5K22iT4XTRkW8pT7JUhDJoU8TZ not found
#output: registered doc for NamedKey[p=twin, id=did:iotics:iotQCBjqtG5K22iT4XTRkW8pT7JUhDJoU8TZ, n=hrate_monitor_001]
payload = { 'twinId': { 'value': hrate_monitor_twin_identity.id } }
r = requests.post(f'{host.address}/qapi/twins', headers=iotics_headers(agent_identity, user_identity, host.address), json=payload)
r.status_code
#output: 201
Now the monitor twin exists.
Step 3: Receiving data via IOTICS async API
Currently IOTICS supports an Async API based on STOMP over WebSocket. Details on the STOMP protocol are here.
IOTICS provides an SDK to facilitate accessing the API. The SDK extends stomp.py to provide a websocket transport.
Connect to the IOTICS host
The first thing to do is to get the stomp endpoint from the host:
import requests
resp = requests.get(f'{host.address}/index.json').json()
stomp_endpoint = resp["stomp"]
Next, we create a new STOMP client:
from iotic.web.stomp.client import StompWSConnection12
stomp_client = StompWSConnection12(endpoint=stomp_endpoint)
stomp_client.set_ssl(verify=False)
Setting a listener to receive data
We set a listener on the client to receive data over the STOMP connection. For simplicity and illustration purposes, we use a predefined PrintingListener
that prints details of comms with the server:
from stomp.listener import PrintingListener
stomp_client.set_listener("log_listener", PrintingListener())
It is possible to set multiple listeners by invoking the set_listener API
multiple times and by specifying different names for each listener.
A variety of listeners come out of the box (see here for details). One can also implement their own listener by extending the ConnectionListener
class)
Connect to the server
We then need to define an authentication token and pass it to the .connect()
API in order to connect to the server.
token = agent2_identity.document_manager.new_token(principal_did=user_key.id, duration=3600, audience=host.address)
stomp_client.connect(wait=True, passcode=token)
#output: on_connecting node01.demo02.space.iotics.com 8021
#output: on_send STOMP {'accept-version': '1.2', 'host': 'node01.demo02.space.iotics.com', 'passcode': 'eyJ0eX...3mym7Uzg'}
#output: on_connected {'server': 'Iotic Stomp server dev', 'heart-beat': '1000,1000', 'session': '59355716-6dce-444a-b54e-4658afdf4198', 'version': '1.2'}
Note
The stomp client connection will be automatically disconnected by the server after the token has expired. In this case it is OK to specify a large duration since the token is used and transmitted ONCE to the chances it can be stolen are minimal if compared with what’s happening in the REST API where the token is passed for every call.
It can be observed, from the output produced by the PrintingListener
that the client sends a STOMP frame to connect to the server. The on_connected
callback prints out the response from the server.
Subscribe to a feed via STOMP
Before we continue we need to define a simple utility function that builds the headers required to be passed to each interaction with the server.
def iotics_stomp_headers(agent_identity: IdentifiableEntity) -> dict:
return {
"Iotics-ClientRef": f'd-poc-{shortuuid.random(8)}',
"Iotics-ClientAppId": agent_identity.document_manager.named_key.id
}
We now are ready to subscribe to a feed. The structure of the feed URI:
my_twin=hrate_monitor_twin_identity.id
your_twin=hb_twin_identity.id
interested_feed_id = "hb"
feed_path = f"/qapi/twins/{my_twin}/interests/twins/{your_twin}/feeds/{interested_feed_id}"
The structure above show how IOTICS API implements symmetry: in fact, the SUBSCRIBE above can be interpreted as my_twin
interacting with your_twin
being interested on interested_feed_id
.
stomp_client.subscribe(destination=feed_path, id=1, headers=iotics_stomp_headers(agent2_identity))
#output: on_send SUBSCRIBE { 'id': 1, 'ack': 'auto', 'Iotics-ClientRef': 'd-poc-Hi57DrcQ', 'Iotics-ClientAppId': 'did\\ciotics\\ciotGfE7ftmw8HF5M1ovad8Ttfqer8Akw82pZ', 'destination': '/qapi/twins/did\\ciotics\\ciotQCBjqtG5K22iT4XTRkW8pT7JUhDJoU8TZ/interests/twins/did\\ciotics\\ciotACEiy53BLXeRyTmTbu3VL4pNF6VYAfyWf/feeds/hb'}
The client now has sent a subscription request with id=1
the subscription ID can then be used to reconcile the messages in the on_message()
listener callback.
Share and receive data
If now we share using the mechanism described above:
share(host=host.address, headers=iotics_headers(agent_identity, user_identity, host.address), twin_id=hb_twin_identity.id, feed_id='hb')
#output: shared {"bpm": 78}:
#output: result code=202
We’ll observe the output of the PrintingListener
as following:
#output: on_message {'content-length': '468',
'Iotics-ClientRef': 'd-poc-Hi57DrcQ',
'Iotics-ClientAppId': 'did:iotics:iotGfE7ftmw8HF5M1ovad8Ttfqer8Akw82pZ',
'Iotics-TransactionRef': 'cOKjX0UnQEiaeO4e0aCzvg,MrHsbcU3bs9WfzA8CQcjPR',
'ack': 'auto',
'destination': '/qapi/twins/did:iotics:iotQCBjqtG5K22iT4XTRkW8pT7JUhDJoU8TZ/interests/twins/did:iotics:iotACEiy53BLXeRyTmTbu3VL4pNF6VYAfyWf/feeds/hb',
'message-id': 'e763fcc4-79ed-4486-ae4a-e660ea821f52',
'content-type': 'application/json',
'id': '1', 'subscription': '1'} {
"interest": {
"followerTwinId": {
"value": "did:iotics:iotQCBjqtG5K22iT4XTRkW8pT7JUhDJoU8TZ"
},
"followedFeed": {
"feed": {
"id": {
"value": "hb"
},
"twinId": {
"value": "did:iotics:iotACEiy53BLXeRyTmTbu3VL4pNF6VYAfyWf"
}
}
}
},
"feedData": {
"occurredAt": "2021-03-10T10:20:34Z",
"mime": "application/json;encoding\u003dbase64",
"data": "eyJicG0iOiA3OH0="
}
}
One may parse and process the data for the subscription id=1
by parsing the received body as a JSON string (as per content-type
header, and subsequently observe the feedData
attribute which matches the data being shared.
base64.b64decode("eyJicG0iOiA3OH0=")
#output: b'{"bpm": 78}'
Disconnect the client
To disconnect the client:
stomp_client.disconnect()
Sharing via STOMP
It is possible to share data via STOMP; this is more effective and performant than sharing via REST as described above.
message = json.dumps({"bpm": getHB(50, 90)})
message_b64 = base64.b64encode(message.encode('ascii')).decode('ascii')
payload = {
"sample": {
"data": message_b64,
"mime": "application/json;encoding=base64",
"occurredAt": datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S%zZ')
}
}
feed_id='hb'
feed_path=f'/qapi/twins/{hb_twin_identity.id}/feeds/{feed_id}'
stomp_client = StompWSConnection12(endpoint=stomp_endpoint)
stomp_client.set_ssl(verify=False)
token = agent_identity.document_manager.new_token(principal_did=user_key.id, duration=3600, audience=HOST)
stomp_client.connect(wait=True, passcode=token)
stomp_client.send(feed_path, headers=iotics_stomp_headers(agent_identity), body=json.dumps(payload))
#output: on_send SEND {'Iotics-ClientRef': 'd-poc-hLdyYNyi', 'Iotics-ClientAppId': 'did\\ciotics\\ciotEn6MyiLSQFw1Ej8CT13L8D1cMyTury39d', 'destination': '/qapi/twins/did\\ciotics\\ciotACEiy53BLXeRyTmTbu3VL4pNF6VYAfyWf/feeds/hb', 'content-length': 122} {"sample": {"data": "eyJicG0iOiA3OX0=", "mime": "application/json;encoding=base64", "occurredAt": "2021-03-10T12:49:10Z"}}
stomp_client.disconnect()
The crucial line of code used to share is:
stomp_client.send(feed_path, headers=iotics_stomp_headers(agent_identity), body=json.dumps(payload))
As STOMP is a text based protocol, the payload is expected to be passed in JSON.
Bring it all together
Copy this content in iotics_tutorial.py
in order to be setup for next tutorial:
from random import randrange
from datetime import datetime
import shortuuid
import json
import base64
import requests
from iotic.lib.identity import Identifier
from iotics_id import Host, NamedECDSAKey, DocumentManager, IdentifiableEntity
from iotic.web.stomp.client import StompWSConnection12
from stomp.listener import PrintingListener
host = Host("https://api01.demo02.space.iotics.com")
seed = None
try:
seed = open('.seed', 'r').read()
except:
# no file
seed = Identifier.new_seed(256)
f = open(".seed", "a")
f.write(seed)
f.close()
def getHB(min, max):
return min + randrange(max - min)
def share(host: str, headers: dict, twin_id: str, feed_id: str):
message = json.dumps({"bpm": getHB(50, 90)})
message_b64 = base64.b64encode(message.encode('ascii')).decode('ascii')
payload = {
"sample": {
"data": message_b64,
"mime": "application/json;encoding=base64",
"occurredAt": datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S%zZ')
}
}
r = requests.post(f'{host}/qapi/twins/{twin_id}/feeds/{feed_id}/shares',
headers=headers, json=payload)
print(f'"shared {message}: result code={r.status_code}')
def iotics_headers(agent_identity: IdentifiableEntity, user_identity: IdentifiableEntity, host: str, duration: int = 120) -> dict:
user_did = user_identity.document_manager.named_key.id
token = agent_identity.document_manager.new_token(
principal_did=user_did, duration=duration, audience=host)
headers = {
"accept": "application/json",
"Iotics-ClientRef": f'd-poc-{shortuuid.random(8)}',
"Iotics-ClientAppId": agent_identity.document_manager.named_key.id,
"Authorization": f'Bearer {token}',
"Content-Type": "application/json"
}
return headers
print("creating/retrieving user and agent identity + delegation")
user_key = NamedECDSAKey(seed=seed, purpose="user", name="fred_at_gmail-0")
user_identity = IdentifiableEntity(named_key=user_key, host=host)
agent_key = NamedECDSAKey(seed=seed, purpose="agent", name="hb-mon-0")
agent_identity = IdentifiableEntity(named_key=agent_key, host=host)
agent_proof = agent_identity.document_manager.new_proof(did=user_key.id)
agent_issuer = agent_identity.document_manager.issuer()
user_identity.document_manager.add_auth_delegation(
proof=agent_proof, authorizer_id=agent_issuer, name="agent_deleg_0")
print("creating twin identity")
hb_twin_key = NamedECDSAKey(seed=seed, purpose="twin", name="hb_device_001")
hb_twin_identity = IdentifiableEntity(named_key=hb_twin_key, host=host)
agent_proof = agent_identity.document_manager.new_proof(
did=hb_twin_identity.id)
agent_issuer = agent_identity.document_manager.issuer()
deleg_name = f'{agent_identity.document_manager.named_key.name}_0'
print("delegating agent to control twin")
hb_twin_identity.document_manager.add_control_delegation(
proof=agent_proof, controller_id=agent_issuer, name=deleg_name)
print("making the twin resource")
payload = {'twinId': {'value': hb_twin_identity.id}}
r = requests.post(f'{host.address}/qapi/twins', headers=iotics_headers(
agent_identity, user_identity, host.address), json=payload)
payload = {
"tags": {
"added": ["cat_health", "monitor"]
},
"labels": {
"added": [
{"lang": "en", "value": "heartbeat monitor"}
]
},
"comments": {
"added": [
{"lang": "en", "value": "an heartbeat monitor"}
]
}
}
r = requests.patch(f'{host.address}/qapi/twins/{hb_twin_identity.id}',
headers=iotics_headers(
agent_identity, user_identity, host.address),
data=json.dumps(payload))
print("making the feed resource")
name = 'hb'
payload = {"feedId": {"value": name}, "storeLast": True}
r = requests.post(f'{host.address}/qapi/twins/{hb_twin_identity.id}/feeds',
headers=iotics_headers(agent_identity, user_identity, host.address), json=payload)
payload = {
"comments": {
"added": [
{
"lang": "en",
"value": "the beat per minutes"
}
]
},
"storeLast": True,
"labels": {
"added": [
{
"lang": "en",
"value": "bpm"
}
]
},
"values": {
"added": [
{
"comment": "bpm",
"dataType": "decimal",
"label": "bpm",
"unit": "http://purl.obolibrary.org/obo/UO_0000148"
}
]
}
}
r = requests.patch(f'{host.address}/qapi/twins/{hb_twin_identity.id}/feeds/{name}',
headers=iotics_headers(agent_identity, user_identity, host.address), json=payload)
print("publishing sample data")
message = json.dumps({"bpm": 70})
message_b64 = base64.b64encode(message.encode('ascii')).decode('ascii')
payload = {
"sample": {
"data": message_b64,
"mime": "application/json;encoding=base64",
"occurredAt": datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S%zZ')
}
}
requests.post(f'{host.address}/qapi/twins/{hb_twin_identity.id}/feeds/{name}/shares',
headers=iotics_headers(agent_identity, user_identity, host.address), json=payload)
print("create another agent")
agent2_key = NamedECDSAKey(seed=seed, purpose="agent", name="hrate_mon_0")
agent2_identity = IdentifiableEntity(named_key=agent2_key, host=host)
agent2_proof = agent2_identity.document_manager.new_proof(did=user_key.id)
agent2_issuer = agent2_identity.document_manager.issuer()
user_identity.document_manager.add_auth_delegation(
proof=agent2_proof, authorizer_id=agent2_issuer, name="hrate_mon_0_deleg_0")
print("create another twin")
hrate_monitor_twin_key = NamedECDSAKey(
seed=seed, purpose="twin", name="hrate_monitor_001")
hrate_monitor_twin_identity = IdentifiableEntity(
named_key=hrate_monitor_twin_key, host=host)
payload = {'twinId': {'value': hrate_monitor_twin_identity.id}}
requests.post(f'{host.address}/qapi/twins', headers=iotics_headers(
agent_identity, user_identity, host.address), json=payload)
print("create a stomp client")
resp = requests.get(f'{host.address}/index.json').json()
stomp_endpoint = resp["stomp"]
stomp_client = StompWSConnection12(endpoint=stomp_endpoint)
stomp_client.set_ssl(verify=False)
def iotics_stomp_headers(agent_identity: IdentifiableEntity) -> dict:
return {
"Iotics-ClientRef": f'd-poc-{shortuuid.random(8)}',
"Iotics-ClientAppId": agent_identity.document_manager.named_key.id
}
Updated over 2 years ago