Yufan Song 9fbfa0650e
Add websocket runtime and od-client-runtime (#2603)
* add draft code

* add some sandbox draft code

* Export WebSocketBox and fix add_to_env async

* fix

* test execute

* add runtime draft

* add draft od-runtime-client

* refactor useless code

* format

* resume runtime

* resume runtime

* remove background command

* remove uselss action and init function

* add EventStreamRuntime test

* add echo server test

* temporarily build websocket everytimes

* remove websocket sandbox deprecated

* refactor code

* fix bug, add test

* fix bug

* remove test draft code

* refactor code, remove async

* rename file and directory

* add init plugin and runtime tools function

* add docker luanch

* fix plugin initialization

* remove test scropt

* add mock test code

* apply suggestions

---------

Co-authored-by: Boxuan Li <liboxuan@connect.hku.hk>
2024-07-08 17:44:37 +00:00

32 lines
944 B
Python

# echo_server.py, this file is used in development to test the runtime client. It is not used in production.
import asyncio
import websockets
import pexpect
from websockets.exceptions import ConnectionClosed
import json
def is_valid_json(s):
try:
json.loads(s)
except json.JSONDecodeError:
return False
return True
# Function for testing websocket echo
async def echo(websocket, path):
async for message in websocket:
if is_valid_json(message):
event = json.loads(message)
print("Received:", event)
response = json.dumps(event)
await websocket.send(response)
else:
print("Received:", message)
response = f"Echo: {message}"
await websocket.send(response)
start_server = websockets.serve(echo, "0.0.0.0", 8080)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()