Archive - Tutorial 2 - Share and Subscribe to a feed

❗️

Opps

This page has been deprecated, if you have found your way here please head back to the Welcome page!

Now that you've created a twin in Tutorial 1, you'll want to be able to share and subscribe to feeds. This will introduce you to IOTICS' concept of symmetry and brokered interactions.

By the end of this tutorial you will be able to create a digital twin that’s able to find another digital twin and exchange data with it.

Make sure you have followed the instructions in the prerequisites page and completed Tutorial 1.

You'll need to have installed the IOTICS STOMP/WebSocket SDK from your local system then execute, in your REPL, the iotics_tutorial.py file from Tutorial 1:

exec(open("iotics_tutorial.py").read())

Step 1: Sharing data from a twin

Define a function that shares some data:

from random import randrange
from datetime import datetime
import json
import base64
import requests

def getHB(min, max):
    return min + randrange(max - min)


def share(host: str, headers: dict, twin_id: str, feed_id: str):
    message = json.dumps({"bpm": getHB(50, 90)})
    message_b64 = base64.b64encode(message.encode('ascii')).decode('ascii')
    payload = {
        "sample": {
            "data": message_b64,
            "mime": "application/json;encoding=base64",
            "occurredAt": datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S%zZ')
        }
    }
    r = requests.post(f'{host}/qapi/twins/{twin_id}/feeds/{feed_id}/shares',
                      headers=headers, json=payload)
    print(f'"shared {message}: result code={r.status_code}')

The functions can then be used as:

share(host=host.address, headers=iotics_headers(agent_identity, user_identity, host.address), twin_id=hb_twin_identity.id, feed_id='hb')
#output: "shared {"bpm": 74}: result code=202

share(host=host.address, headers=iotics_headers(agent_identity, user_identity, host.address), twin_id=hb_twin_identity.id, feed_id='hb')
#output: "shared {"bpm": 70}: result code=202

share(host=host.address, headers=iotics_headers(agent_identity, user_identity, host.address), twin_id=hb_twin_identity.id, feed_id='hb')
#output: "shared {"bpm": 58}: result code=202

Step 2: Create another twin

This new twin, named monitor twin, will find the HB device twin and manifest an interest in its data feed.

Let’s create the agent for this new twin, named agent2. These steps are similar to those explained in the prerequisites page for the first agent. For simplicity we’ll run the new agent on behalf of the same user as the previous agent.

agent2_key=NamedECDSAKey(seed=seed, purpose="agent", name="hrate_mon_0")
agent2_identity=IdentifiableEntity(named_key=agent2_key, host=host)

#output: Identity did:iotics:iotGfE7ftmw8HF5M1ovad8Ttfqer8Akw82pZ not found
#output: registered doc for NamedKey[p=agent, id=did:iotics:iotGfE7ftmw8HF5M1ovad8Ttfqer8Akw82pZ, n=hrate_mon_0]

agent2_proof=agent2_identity.document_manager.new_proof(did=user_key.id)
agent2_issuer=agent2_identity.document_manager.issuer()
user_identity.document_manager.add_auth_delegation(proof=agent2_proof, authorizer_id=agent2_issuer, name="hrate_mon_0_deleg_0")

#output: registered doc for NamedKey[p=user, id=did:iotics:iotNT6FQyjKgveeFu82gTQDMijMcMktSvBMK, n=fred_at_gmail0]
#output: True

At this point the agent identity is created and delegation of authority assigned to it by user.

We can now monitor the twin:

hrate_monitor_twin_key = NamedECDSAKey(seed=seed, purpose="twin", name="hrate_monitor_001")
hrate_monitor_twin_identity=IdentifiableEntity(named_key=hrate_monitor_twin_key, host=host)

#output: Identity did:iotics:iotQCBjqtG5K22iT4XTRkW8pT7JUhDJoU8TZ not found
#output: registered doc for NamedKey[p=twin, id=did:iotics:iotQCBjqtG5K22iT4XTRkW8pT7JUhDJoU8TZ, n=hrate_monitor_001]

payload = { 'twinId': { 'value': hrate_monitor_twin_identity.id } }
r = requests.post(f'{host.address}/qapi/twins', headers=iotics_headers(agent_identity, user_identity, host.address), json=payload)
r.status_code

#output: 201

Now the monitor twin exists.

Step 3: Receiving data via IOTICS async API

Currently IOTICS supports an Async API based on STOMP over WebSocket. Details on the STOMP protocol are here.

IOTICS provides an SDK to facilitate accessing the API. The SDK extends stomp.py to provide a websocket transport.

Connect to the IOTICS host

The first thing to do is to get the stomp endpoint from the host:

import requests

resp = requests.get(f'{host.address}/index.json').json()
stomp_endpoint = resp["stomp"]

Next, we create a new STOMP client:

from iotic.web.stomp.client import StompWSConnection12

stomp_client = StompWSConnection12(endpoint=stomp_endpoint)
stomp_client.set_ssl(verify=False)

Setting a listener to receive data

We set a listener on the client to receive data over the STOMP connection. For simplicity and illustration purposes, we use a predefined PrintingListener that prints details of comms with the server:

from stomp.listener import PrintingListener

stomp_client.set_listener("log_listener", PrintingListener())

It is possible to set multiple listeners by invoking the set_listener API multiple times and by specifying different names for each listener.

A variety of listeners come out of the box (see here for details). One can also implement their own listener by extending the ConnectionListener class)

Connect to the server

We then need to define an authentication token and pass it to the .connect() API in order to connect to the server.

token = agent2_identity.document_manager.new_token(principal_did=user_key.id, duration=3600, audience=host.address)
stomp_client.connect(wait=True, passcode=token)

#output: on_connecting node01.demo02.space.iotics.com 8021
#output: on_send STOMP {'accept-version': '1.2', 'host': 'node01.demo02.space.iotics.com', 'passcode': 'eyJ0eX...3mym7Uzg'} 
#output: on_connected {'server': 'Iotic Stomp server dev', 'heart-beat': '1000,1000', 'session': '59355716-6dce-444a-b54e-4658afdf4198', 'version': '1.2'}

🚧

Note

The stomp client connection will be automatically disconnected by the server after the token has expired. In this case it is OK to specify a large duration since the token is used and transmitted ONCE to the chances it can be stolen are minimal if compared with what’s happening in the REST API where the token is passed for every call.

It can be observed, from the output produced by the PrintingListener that the client sends a STOMP frame to connect to the server. The on_connected callback prints out the response from the server.

Subscribe to a feed via STOMP

Before we continue we need to define a simple utility function that builds the headers required to be passed to each interaction with the server.

def iotics_stomp_headers(agent_identity: IdentifiableEntity) -> dict:
  return {
     "Iotics-ClientRef": f'd-poc-{shortuuid.random(8)}',
     "Iotics-ClientAppId": agent_identity.document_manager.named_key.id
  }

We now are ready to subscribe to a feed. The structure of the feed URI:

my_twin=hrate_monitor_twin_identity.id
your_twin=hb_twin_identity.id
interested_feed_id = "hb"

feed_path = f"/qapi/twins/{my_twin}/interests/twins/{your_twin}/feeds/{interested_feed_id}"

The structure above show how IOTICS API implements symmetry: in fact, the SUBSCRIBE above can be interpreted as my_twin interacting with your_twin being interested on interested_feed_id.

stomp_client.subscribe(destination=feed_path, id=1, headers=iotics_stomp_headers(agent2_identity))

#output: on_send SUBSCRIBE { 'id': 1, 'ack': 'auto', 'Iotics-ClientRef': 'd-poc-Hi57DrcQ', 'Iotics-ClientAppId': 'did\\ciotics\\ciotGfE7ftmw8HF5M1ovad8Ttfqer8Akw82pZ', 'destination': '/qapi/twins/did\\ciotics\\ciotQCBjqtG5K22iT4XTRkW8pT7JUhDJoU8TZ/interests/twins/did\\ciotics\\ciotACEiy53BLXeRyTmTbu3VL4pNF6VYAfyWf/feeds/hb'}

The client now has sent a subscription request with id=1 the subscription ID can then be used to reconcile the messages in the on_message() listener callback.

Share and receive data

If now we share using the mechanism described above:

share(host=host.address, headers=iotics_headers(agent_identity, user_identity, host.address), twin_id=hb_twin_identity.id, feed_id='hb')

#output: shared {"bpm": 78}: 
#output: result code=202

We’ll observe the output of the PrintingListener as following:

#output: on_message {'content-length': '468', 
            'Iotics-ClientRef': 'd-poc-Hi57DrcQ', 
            'Iotics-ClientAppId': 'did:iotics:iotGfE7ftmw8HF5M1ovad8Ttfqer8Akw82pZ', 
            'Iotics-TransactionRef': 'cOKjX0UnQEiaeO4e0aCzvg,MrHsbcU3bs9WfzA8CQcjPR', 
            'ack': 'auto', 
            'destination': '/qapi/twins/did:iotics:iotQCBjqtG5K22iT4XTRkW8pT7JUhDJoU8TZ/interests/twins/did:iotics:iotACEiy53BLXeRyTmTbu3VL4pNF6VYAfyWf/feeds/hb', 
            'message-id': 'e763fcc4-79ed-4486-ae4a-e660ea821f52', 
            'content-type': 'application/json', 
            'id': '1', 'subscription': '1'} {
  "interest": {
    "followerTwinId": {
      "value": "did:iotics:iotQCBjqtG5K22iT4XTRkW8pT7JUhDJoU8TZ"
    },
    "followedFeed": {
      "feed": {
        "id": {
          "value": "hb"
        },
        "twinId": {
          "value": "did:iotics:iotACEiy53BLXeRyTmTbu3VL4pNF6VYAfyWf"
        }
      }
    }
  },
  "feedData": {
    "occurredAt": "2021-03-10T10:20:34Z",
    "mime": "application/json;encoding\u003dbase64",
    "data": "eyJicG0iOiA3OH0="
  }
}

One may parse and process the data for the subscription id=1 by parsing the received body as a JSON string (as per content-type header, and subsequently observe the feedData attribute which matches the data being shared.

base64.b64decode("eyJicG0iOiA3OH0=")
#output: b'{"bpm": 78}'

Disconnect the client

To disconnect the client:

stomp_client.disconnect()

Sharing via STOMP

It is possible to share data via STOMP; this is more effective and performant than sharing via REST as described above.

message = json.dumps({"bpm": getHB(50, 90)})
message_b64 = base64.b64encode(message.encode('ascii')).decode('ascii')
payload = {
        "sample": {
            "data": message_b64,
            "mime": "application/json;encoding=base64",
            "occurredAt": datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S%zZ')
        }
    }
    
feed_id='hb'
feed_path=f'/qapi/twins/{hb_twin_identity.id}/feeds/{feed_id}'

stomp_client = StompWSConnection12(endpoint=stomp_endpoint)
stomp_client.set_ssl(verify=False)
token = agent_identity.document_manager.new_token(principal_did=user_key.id, duration=3600, audience=HOST)
stomp_client.connect(wait=True, passcode=token)

stomp_client.send(feed_path, headers=iotics_stomp_headers(agent_identity), body=json.dumps(payload))

#output: on_send SEND {'Iotics-ClientRef': 'd-poc-hLdyYNyi', 'Iotics-ClientAppId': 'did\\ciotics\\ciotEn6MyiLSQFw1Ej8CT13L8D1cMyTury39d', 'destination': '/qapi/twins/did\\ciotics\\ciotACEiy53BLXeRyTmTbu3VL4pNF6VYAfyWf/feeds/hb', 'content-length': 122} {"sample": {"data": "eyJicG0iOiA3OX0=", "mime": "application/json;encoding=base64", "occurredAt": "2021-03-10T12:49:10Z"}}

stomp_client.disconnect()

The crucial line of code used to share is:

stomp_client.send(feed_path, headers=iotics_stomp_headers(agent_identity), body=json.dumps(payload))

As STOMP is a text based protocol, the payload is expected to be passed in JSON.

Bring it all together

Copy this content in iotics_tutorial.py in order to be setup for next tutorial:

from random import randrange
from datetime import datetime
import shortuuid
import json
import base64
import requests
from iotic.lib.identity import Identifier
from iotics_id import Host, NamedECDSAKey, DocumentManager, IdentifiableEntity
from iotic.web.stomp.client import StompWSConnection12

from stomp.listener import PrintingListener

host = Host("https://api01.demo02.space.iotics.com")

seed = None
try:
    seed = open('.seed', 'r').read()
except:
    # no file
    seed = Identifier.new_seed(256)
    f = open(".seed", "a")
    f.write(seed)
    f.close()


def getHB(min, max):
    return min + randrange(max - min)


def share(host: str, headers: dict, twin_id: str, feed_id: str):
    message = json.dumps({"bpm": getHB(50, 90)})
    message_b64 = base64.b64encode(message.encode('ascii')).decode('ascii')
    payload = {
        "sample": {
            "data": message_b64,
            "mime": "application/json;encoding=base64",
            "occurredAt": datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S%zZ')
        }
    }
    r = requests.post(f'{host}/qapi/twins/{twin_id}/feeds/{feed_id}/shares',
                      headers=headers, json=payload)
    print(f'"shared {message}: result code={r.status_code}')


def iotics_headers(agent_identity: IdentifiableEntity, user_identity: IdentifiableEntity, host: str, duration: int = 120) -> dict:
    user_did = user_identity.document_manager.named_key.id
    token = agent_identity.document_manager.new_token(
        principal_did=user_did, duration=duration, audience=host)
    headers = {
        "accept": "application/json",
        "Iotics-ClientRef": f'd-poc-{shortuuid.random(8)}',
        "Iotics-ClientAppId": agent_identity.document_manager.named_key.id,
        "Authorization": f'Bearer {token}',
        "Content-Type": "application/json"
    }
    return headers


print("creating/retrieving user and agent identity + delegation")
user_key = NamedECDSAKey(seed=seed, purpose="user", name="fred_at_gmail-0")
user_identity = IdentifiableEntity(named_key=user_key, host=host)

agent_key = NamedECDSAKey(seed=seed, purpose="agent", name="hb-mon-0")
agent_identity = IdentifiableEntity(named_key=agent_key, host=host)
agent_proof = agent_identity.document_manager.new_proof(did=user_key.id)

agent_issuer = agent_identity.document_manager.issuer()

user_identity.document_manager.add_auth_delegation(
    proof=agent_proof, authorizer_id=agent_issuer, name="agent_deleg_0")

print("creating twin identity")
hb_twin_key = NamedECDSAKey(seed=seed, purpose="twin", name="hb_device_001")
hb_twin_identity = IdentifiableEntity(named_key=hb_twin_key, host=host)
agent_proof = agent_identity.document_manager.new_proof(
    did=hb_twin_identity.id)
agent_issuer = agent_identity.document_manager.issuer()
deleg_name = f'{agent_identity.document_manager.named_key.name}_0'
print("delegating agent to control twin")
hb_twin_identity.document_manager.add_control_delegation(
    proof=agent_proof, controller_id=agent_issuer, name=deleg_name)

print("making the twin resource")
payload = {'twinId': {'value': hb_twin_identity.id}}
r = requests.post(f'{host.address}/qapi/twins', headers=iotics_headers(
    agent_identity, user_identity, host.address), json=payload)

payload = {
    "tags": {
        "added": ["cat_health", "monitor"]
    },
    "labels": {
        "added": [
            {"lang": "en", "value": "heartbeat monitor"}
        ]
    },
    "comments": {
        "added": [
            {"lang": "en", "value": "an heartbeat monitor"}
        ]
    }
}

r = requests.patch(f'{host.address}/qapi/twins/{hb_twin_identity.id}',
                   headers=iotics_headers(
                       agent_identity, user_identity, host.address),
                   data=json.dumps(payload))

print("making the feed resource")
name = 'hb'
payload = {"feedId": {"value": name}, "storeLast": True}
r = requests.post(f'{host.address}/qapi/twins/{hb_twin_identity.id}/feeds',
                  headers=iotics_headers(agent_identity, user_identity, host.address), json=payload)

payload = {
    "comments": {
        "added": [
          {
              "lang": "en",
              "value": "the beat per minutes"
          }
        ]
    },
    "storeLast": True,
    "labels": {
        "added": [
            {
                "lang": "en",
                "value": "bpm"
            }
        ]
    },
    "values": {
        "added": [
            {
                "comment": "bpm",
                "dataType": "decimal",
                "label": "bpm",
                "unit": "http://purl.obolibrary.org/obo/UO_0000148"
            }
        ]
    }
}
r = requests.patch(f'{host.address}/qapi/twins/{hb_twin_identity.id}/feeds/{name}',
                   headers=iotics_headers(agent_identity, user_identity, host.address), json=payload)

print("publishing sample data")
message = json.dumps({"bpm": 70})
message_b64 = base64.b64encode(message.encode('ascii')).decode('ascii')
payload = {
    "sample": {
        "data": message_b64,
        "mime": "application/json;encoding=base64",
        "occurredAt": datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S%zZ')
    }
}

requests.post(f'{host.address}/qapi/twins/{hb_twin_identity.id}/feeds/{name}/shares',
              headers=iotics_headers(agent_identity, user_identity, host.address), json=payload)

print("create another agent")

agent2_key = NamedECDSAKey(seed=seed, purpose="agent", name="hrate_mon_0")
agent2_identity = IdentifiableEntity(named_key=agent2_key, host=host)

agent2_proof = agent2_identity.document_manager.new_proof(did=user_key.id)
agent2_issuer = agent2_identity.document_manager.issuer()
user_identity.document_manager.add_auth_delegation(
    proof=agent2_proof, authorizer_id=agent2_issuer, name="hrate_mon_0_deleg_0")

print("create another twin")
hrate_monitor_twin_key = NamedECDSAKey(
    seed=seed, purpose="twin", name="hrate_monitor_001")
hrate_monitor_twin_identity = IdentifiableEntity(
    named_key=hrate_monitor_twin_key, host=host)

payload = {'twinId': {'value': hrate_monitor_twin_identity.id}}
requests.post(f'{host.address}/qapi/twins', headers=iotics_headers(
    agent_identity, user_identity, host.address), json=payload)

print("create a stomp client")

resp = requests.get(f'{host.address}/index.json').json()
stomp_endpoint = resp["stomp"]
stomp_client = StompWSConnection12(endpoint=stomp_endpoint)
stomp_client.set_ssl(verify=False)


def iotics_stomp_headers(agent_identity: IdentifiableEntity) -> dict:
    return {
        "Iotics-ClientRef": f'd-poc-{shortuuid.random(8)}',
        "Iotics-ClientAppId": agent_identity.document_manager.named_key.id
    }