Skip to content

Commit 027eb21

Browse files
Merge pull request #23 from anedyaio/development
Bug Fixes and Commands
2 parents 6e9dac2 + 06a04a6 commit 027eb21

8 files changed

Lines changed: 270 additions & 53 deletions

File tree

examples/mqtt.py

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,42 @@
11
import anedya
22
import time
33

4-
config = anedya.default_config()
5-
config.connection_mode = anedya.ConnectionMode.MQTT
6-
config.set_deviceid("<SET-DEVICE-ID>")
7-
config.set_connection_key("<SET-CONNECTION-KEY>")
4+
client = None
5+
tr = None
86

9-
# Create a client
10-
client = anedya.AnedyaClient(config)
117

12-
time.sleep(1)
13-
# Client is created, now connect with the MQTT server
14-
client.connect()
15-
time.sleep(2)
16-
print(client._mqttclient.is_connected())
8+
def main():
9+
config = anedya.default_config()
10+
config.connection_mode = anedya.ConnectionMode.MQTT
11+
config.set_deviceid("67719273-7cfe-4726-a846-72ca86340916")
12+
config.set_connection_key("7346b841bc8cf7fe39555ae19654612b")
13+
config.set_on_command(callback=on_command_callback)
1714

18-
# Publish the datapoint
19-
data = anedya.DataPoints()
15+
# Create a client
16+
global client
17+
client = anedya.AnedyaClient(config)
2018

21-
dp1 = anedya.FloatData(variable='temperature', timestamp_milli=int(time.time_ns() / 1000000), value=10)
19+
time.sleep(1)
20+
# Client is created, now connect with the MQTT server
21+
client.connect()
22+
time.sleep(2)
23+
print(client._mqttclient.is_connected())
2224

23-
data.append(dp1)
25+
# Publish the datapoint
26+
27+
input("Press Enter to continue after sending command...")
28+
tr.wait_to_complete()
29+
print("Disconnecting")
30+
client.disconnect()
2431

25-
client.submit_data(data)
2632

27-
time.sleep(10)
28-
client.disconnect()
33+
def on_command_callback(cmdinput: anedya.CommandDetails):
34+
print(f"Received command from platform: {cmdinput.command}")
35+
# Change the status of the command
36+
global tr
37+
tr = client.update_command_status(command=cmdinput, status=anedya.CommandStatus.RECEIVED, callback_mode=True)
38+
return
39+
40+
41+
if __name__ == '__main__':
42+
main()

setup.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[metadata]
22
name = anedya-dev-sdk
3-
version = 0.1.6
3+
version = 0.1.7
44
url = https://github.com/anedyaio/anedya-dev-sdk-python
55
author = Anedya Systems
66
author_email = support@anedya.io

src/anedya/__init__.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
# src/__init__.py
22

33
from .config import AnedyaConfig, ConnectionMode, default_config
4-
from .models import DataPoints, FloatData
4+
from .models import DataPoints, FloatData, GeoData, CommandDetails
5+
from .client.commandsUpdate import CommandStatus
56
from .anedya import AnedyaClient
67

78
__all__ = ['AnedyaConfig',
89
'ConnectionMode',
910
'default_config',
1011
'FloatData',
1112
'DataPoints',
12-
'AnedyaClient']
13+
'AnedyaClient',
14+
'CommandDetails',
15+
'GeoData',
16+
'CommandStatus']

src/anedya/anedya.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ def disconnect(self):
105105
from .client.submitData import submit_data
106106
from .client.submitLogs import submit_logs
107107
from .client.commandsUpdate import update_command_status
108+
from .client.commandsNext import next_command
108109
from .client.timeSync import get_time
109110
from .client.mqttHandlers import _onconnect_handler, _ondisconnect_handler
110111
from .client.callbacks import _error_callback, _response_callback, _command_callback

src/anedya/client/commandsNext.py

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
from ..models import CommandDetails, AnedyaEncoder
2+
from ..config import ConnectionMode
3+
from ..errors import AnedyaTxFailure, AnedyaInvalidConfig
4+
import uuid
5+
import json
6+
import base64
7+
import random
8+
import string
9+
import datetime
10+
11+
12+
def next_command(self, timeout: float | None = None) -> tuple[CommandDetails, bool]:
13+
"""
14+
Get the next command from the server.
15+
Returns:
16+
CommandDetails: The next command.
17+
18+
Raises:
19+
AnedyaTxFailure: If the transaction fails.
20+
"""
21+
if self._config is None:
22+
raise AnedyaInvalidConfig('Configuration not provided')
23+
if self._config.connection_mode == ConnectionMode.HTTP:
24+
return _next_command_http(self, timeout=timeout)
25+
elif self._config.connection_mode == ConnectionMode.MQTT:
26+
return _next_command_mqtt(self, timeout=timeout)
27+
else:
28+
raise AnedyaInvalidConfig('Invalid connection mode')
29+
30+
31+
def _next_command_http(self, timeout: float | None = None) -> tuple[CommandDetails, bool]:
32+
if self._config._testmode:
33+
url = "https://device.stageapi.anedya.io/v1/submitData"
34+
else:
35+
url = self._baseurl + "/v1/submitData"
36+
d = _next_command_req("req_" + ''.join(random.choices(string.ascii_letters + string.digits, k=16)))
37+
r = self._httpsession.post(url, data=d.encodeJSON(), timeout=timeout)
38+
# print(r.json())
39+
try:
40+
jsonResponse = r.json()
41+
payload = json.loads(jsonResponse)
42+
if payload['success'] is not True:
43+
raise AnedyaTxFailure(payload['error'], payload['errCode'])
44+
# Payload has success now create a CommandDetails object and return
45+
command = CommandDetails()
46+
command.id = uuid.UUID(payload["commandId"])
47+
command.command = payload["command"]
48+
command.status = payload["status"]
49+
command.type = payload["datatype"]
50+
if self.type == "string":
51+
self.data = payload["data"]
52+
elif self.type == "binary":
53+
base64_bytes = payload["data"].encode("ascii")
54+
data_bytes = base64.b64decode(base64_bytes)
55+
self.data = data_bytes
56+
command.updated = datetime.datetime.fromtimestamp(payload["updated"] / 1000)
57+
command.issued = datetime.datetime.fromtimestamp(payload["issued"] / 1000)
58+
except ValueError:
59+
raise AnedyaTxFailure(message="Invalid JSON response")
60+
return command, payload['nextavailable']
61+
62+
63+
def _next_command_mqtt(self, timeout: float | None = None) -> tuple[CommandDetails, bool]:
64+
# Create and register a transaction
65+
tr = self._transactions.create_transaction()
66+
# Encode the payload
67+
d = _next_command_req(tr.get_id())
68+
payload = d.encodeJSON()
69+
# Publish the message
70+
print(payload)
71+
topic_prefix = "$anedya/device/" + str(self._config._deviceID)
72+
print(topic_prefix + "/commands/updateStatus/json")
73+
msginfo = self._mqttclient.publish(topic=topic_prefix + "/commands/updateStatus/json",
74+
payload=payload, qos=1)
75+
try:
76+
msginfo.wait_for_publish(timeout=timeout)
77+
except ValueError:
78+
raise AnedyaTxFailure(message="Publish queue full")
79+
except RuntimeError as err:
80+
raise AnedyaTxFailure(message=str(err))
81+
except Exception as err:
82+
raise AnedyaTxFailure(message=str(err))
83+
# Wait for transaction to complete
84+
tr.wait_to_complete()
85+
# Transaction completed
86+
# Get the data from the transaction
87+
data = tr.get_data()
88+
# Clear transaction
89+
self._transactions.clear_transaction(tr)
90+
# Check if transaction is successful or not
91+
if data['success'] is not True:
92+
raise AnedyaTxFailure(data['error'], data['errCode'])
93+
try:
94+
if data['available'] is not True:
95+
return None, None
96+
command = CommandDetails()
97+
command.id = uuid.UUID(payload["commandId"])
98+
command.command = payload["command"]
99+
command.status = payload["status"]
100+
command.type = payload["datatype"]
101+
if self.type == "string":
102+
self.data = payload["data"]
103+
elif self.type == "binary":
104+
base64_bytes = payload["data"].encode("ascii")
105+
data_bytes = base64.b64decode(base64_bytes)
106+
self.data = data_bytes
107+
command.updated = datetime.datetime.fromtimestamp(payload["updated"] / 1000)
108+
command.issued = datetime.datetime.fromtimestamp(payload["issued"] / 1000)
109+
except ValueError:
110+
raise AnedyaTxFailure(message="Invalid JSON response")
111+
return command, payload['nextavailable']
112+
113+
114+
class _next_command_req:
115+
def __init__(self, req_id: str):
116+
self.reqID = req_id
117+
return
118+
119+
def toJSON(self):
120+
dict = {
121+
"reqId": self.reqID,
122+
}
123+
return dict
124+
125+
def encodeJSON(self):
126+
data = json.dumps(self, cls=AnedyaEncoder)
127+
return data

src/anedya/client/commandsUpdate.py

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,47 +3,45 @@
33
import random
44
import base64
55
from ..models import CommandDetails, AnedyaEncoder
6-
from enum import Enum
6+
from ..transaction import Transaction
77
from ..errors import AnedyaInvalidConfig, AnedyaInvalidType, AnedyaTxFailure
88
from ..config import ConnectionMode
9+
from ..models import CommandStatus
910

1011

11-
class CommandStatus(Enum):
12-
PENDING = "pending"
13-
RECEIVED = "received"
14-
PROCESSING = "processing"
15-
SUCCESS = "success"
16-
FAILURE = "failure"
17-
INVALIDATED = "invalidated"
18-
19-
20-
def update_command_status(self, command: CommandDetails, status: CommandStatus, ackdata: str | bytes | None = None, acktype: str = "string", timeout: float | None = None) -> None:
12+
def update_command_status(self, command: CommandDetails, status: CommandStatus, ackdata: str | bytes | None = None, acktype: str = "string", timeout: float | None = None, callback_mode: bool = False) -> None | Transaction:
2113
"""
22-
Update status of a command
14+
Update status of a command. Not thread safe.
2315
2416
Args:
2517
command (CommandDetails): Command object of which status needs to be updated
2618
status (CommandStatus): New status of the command
2719
ackdata (str | bytes | None, optional): Data to be submitted along with acknowledgement. Maximum 1 kB of data is allowed. Defaults to None.
2820
acktype (str, optional): Specify the type of data submitted. Defaults to "string".
2921
timeout (float | None, optional): Time out in seconds for the request. In production setup it is advisable to use a timeout or else your program can get stuck indefinitely. Defaults to None.
22+
callback_mode (bool, optional): When using MQTT connection, it is not possible to publish the message from inside a callback. In such scenario, you can set callback_mode to True. Instead of publishing request right away, the function
23+
schedules the message to be published after the callback is completed. Also function returns a transaction object. Which can be used to check whether transaction has finished or not and what is the status of transaction.
24+
Use this method only when only single transaction is happening at a time. In the case where multiple transactions happening simultaneously, we suggest avoiding call to this function from within callback.Defaults to False.
3025
3126
Raises:
3227
AnedyaInvalidConfig: Invalid configuration
3328
AnedyaInvalidType: Invalid datatype is specified
3429
AnedyaTxFailure: Transaction failure
30+
31+
Returns:
32+
None | Transaction: Returns transaction object if callback_mode is True. returns None otherwise
3533
"""
3634
if self._config is None:
3735
raise AnedyaInvalidConfig('Configuration not provided')
3836
if self._config.connection_mode == ConnectionMode.HTTP:
3937
return _update_command_status_http(self, command=command, status=status, timeout=timeout)
4038
elif self._config.connection_mode == ConnectionMode.MQTT:
41-
return _update_command_status_mqtt(self, command=command, status=status, timeout=timeout)
39+
return _update_command_status_mqtt(self, command=command, status=status, timeout=timeout, callback_mode=callback_mode)
4240
else:
4341
raise AnedyaInvalidConfig('Invalid connection mode')
4442

4543

46-
def _update_command_status_http(self, command: CommandDetails, status: CommandStatus, ackdata: str | tuple | None = None, acktype: str = "string", timeout: float | None = None) -> None:
44+
def _update_command_status_http(self, command: CommandDetails, status: CommandStatus, ackdata: str | bytes | None = None, acktype: str = "string", timeout: float | None = None) -> None:
4745
if self._config._testmode:
4846
url = "https://device.stageapi.anedya.io/v1/submitData"
4947
else:
@@ -61,24 +59,29 @@ def _update_command_status_http(self, command: CommandDetails, status: CommandSt
6159
return
6260

6361

64-
def _update_command_status_mqtt(self, command: CommandDetails, status: CommandStatus, ackdata: str | tuple | None = None, acktype: str = "string", timeout: float | None = None) -> None:
62+
def _update_command_status_mqtt(self, command: CommandDetails, status: CommandStatus, ackdata: str | bytes | None = None, acktype: str = "string", timeout: float | None = None, callback_mode: bool = False) -> None:
6563
# Create and register a transaction
6664
tr = self._transactions.create_transaction()
6765
# Encode the payload
6866
d = _UpdateCommandStatusReq(tr.get_id(), command=command, status=status, ackdata=ackdata, acktype=acktype)
6967
payload = d.encodeJSON()
7068
# Publish the message
71-
# print(payload)
69+
print(payload)
7270
topic_prefix = "$anedya/device/" + str(self._config._deviceID)
73-
# print(topic_prefix + "/submitData/json")
71+
print(topic_prefix + "/commands/updateStatus/json")
7472
msginfo = self._mqttclient.publish(topic=topic_prefix + "/commands/updateStatus/json",
7573
payload=payload, qos=1)
74+
if callback_mode:
75+
# Can not block in callback mode
76+
return tr
7677
try:
7778
msginfo.wait_for_publish(timeout=timeout)
7879
except ValueError:
7980
raise AnedyaTxFailure(message="Publish queue full")
8081
except RuntimeError as err:
8182
raise AnedyaTxFailure(message=str(err))
83+
except Exception as err:
84+
raise AnedyaTxFailure(message=str(err))
8285
# Wait for transaction to complete
8386
tr.wait_to_complete()
8487
# Transaction completed
@@ -110,6 +113,9 @@ def __init__(self, reqId: str, command: CommandDetails, status: CommandStatus, a
110113
self.acktype = "binary"
111114
else:
112115
raise AnedyaInvalidType('Invalid acktype')
116+
if ackdata is None:
117+
self.ackdata = ""
118+
self.acktype = "string"
113119

114120
def toJSON(self):
115121
dict = {

src/anedya/client/mqttHandlers.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ def _onconnect_handler(self, client, userdata, flags, reason_code, properties):
2121
self._mqttclient.subscribe(
2222
topic=topic_prefix + "/response", qos=0)
2323
self._mqttclient.subscribe(
24-
topic=topic_prefix + "/command", qos=0)
24+
topic=topic_prefix + "/commands", qos=0)
2525
# Define all Callbacks for error and response
2626
# Callback for errors
2727
print("Adding Callbacks")

0 commit comments

Comments
 (0)