feat add remote hand tracking backend

This commit is contained in:
Tom Boullay
2026-04-27 15:49:02 +02:00
parent b523e98bea
commit fa8bc229c3
10 changed files with 595 additions and 3 deletions
+122
View File
@@ -0,0 +1,122 @@
from __future__ import annotations
import asyncio
from contextlib import asynccontextmanager
from typing import Any
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import JSONResponse
from backend.connection_manager import ClientConnection, ConnectionManager
from backend.hand_tracker import HandTracker, now_ms
MAX_FRAME_BYTES = 220_000
MIN_FRAME_INTERVAL_SECONDS = 0.08
manager = ConnectionManager()
tracker: HandTracker | None = None
detection_lock = asyncio.Lock()
@asynccontextmanager
async def lifespan(app: FastAPI):
global tracker
tracker = HandTracker(max_hands=2)
yield
if tracker:
tracker.close()
app = FastAPI(title="La-Fabrik Hand Tracking", lifespan=lifespan)
@app.get("/health")
async def health() -> JSONResponse:
return JSONResponse(
{
"status": "ok",
"connections": manager.count,
},
)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket) -> None:
connection = await manager.connect(websocket)
await manager.send(connection, status_payload("connected"))
try:
while True:
message = await websocket.receive_json()
response = await handle_message(connection, message)
await manager.send(connection, response)
except WebSocketDisconnect:
manager.disconnect(connection)
except Exception as error:
await manager.send(connection, error_payload(str(error)))
manager.disconnect(connection)
async def handle_message(
connection: ClientConnection,
message: dict[str, Any],
) -> dict[str, Any]:
if message.get("type") != "frame":
return error_payload("Unsupported message type")
current_time = asyncio.get_running_loop().time()
if current_time - connection.last_frame_at < MIN_FRAME_INTERVAL_SECONDS:
return status_payload("rate_limited")
if connection.is_processing:
return status_payload("busy")
image = message.get("image")
if not isinstance(image, str):
return error_payload("Missing image payload")
if len(image) > MAX_FRAME_BYTES:
return error_payload("Frame payload too large")
if tracker is None:
return error_payload("Hand tracker is not ready")
if detection_lock.locked():
return status_payload("busy")
connection.last_frame_at = current_time
connection.is_processing = True
try:
async with detection_lock:
hands = await asyncio.to_thread(tracker.detect_from_base64_jpeg, image)
return {
"type": "hands",
"timestamp": now_ms(),
"hands": [hand.to_payload() for hand in hands],
}
finally:
connection.is_processing = False
def status_payload(status: str) -> dict[str, str | int]:
return {
"type": "status",
"timestamp": now_ms(),
"status": status,
}
def error_payload(message: str) -> dict[str, str | int | list[Any]]:
return {
"type": "error",
"timestamp": now_ms(),
"hands": [],
"message": message,
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)