-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrpc_handler.py
More file actions
35 lines (26 loc) · 866 Bytes
/
rpc_handler.py
File metadata and controls
35 lines (26 loc) · 866 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import ujson
from jsonrpcserver import Error
from jsonrpcserver import Success
from jsonrpcserver import async_dispatch
from ..shared import Command
from .handler import Handler
class JsonRpcHandler(Handler):
def __init__(self):
self._methods = {
name: jsonrpc_handler(self, name) for name in ["eval", "preview"]
}
async def dispatch(self, req: str) -> str:
return await async_dispatch(
req,
methods=self._methods,
serializer=ujson.dumps,
deserializer=ujson.loads,
)
def jsonrpc_handler(handler: Handler, name: Command):
async def wrapped(req: dict):
try:
result = await handler.handle(name, {"params": req})
return Success(result)
except Exception as e:
return Error(0, str(e), e)
return wrapped