Skip to content

Commit

Permalink
add ack call to node, python sdks
Browse files Browse the repository at this point in the history
  • Loading branch information
dbrrt committed Jan 5, 2025
1 parent 4a564d9 commit 65585cb
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 9 deletions.
4 changes: 4 additions & 0 deletions node/src/constants.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
export const WSS_API_URI =
process.env.WSS_KADOA_API_URI ?? "wss://realtime.kadoa.com";

export const REALTIME_API_URI =
process.env.REALTIME_KADOA_API_URI ?? "https://realtime.kadoa.com";

export const PUBLIC_API_URI =
process.env.PUBLIC_KADOA_API_URI ?? "https://api.kadoa.com";
9 changes: 8 additions & 1 deletion node/src/realtime.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { WSS_API_URI, PUBLIC_API_URI } from "./constants";
import { WSS_API_URI, REALTIME_API_URI, PUBLIC_API_URI } from "./constants";

export class Realtime {
private socket?: WebSocket;
Expand Down Expand Up @@ -59,6 +59,13 @@ export class Realtime {
if (data.type === "heartbeat") {
this.handleHeartbeat();
} else {
if (data?.id) {
fetch(`${REALTIME_API_URI}/api/v1/events/ack`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ id: data.id })
});
}
this.handleEvent(data);
}
} catch (err) {
Expand Down
63 changes: 63 additions & 0 deletions node/test/realtime.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,66 @@ test("stops heartbeat check on socket close", async () => {

clearIntervalSpy.mockRestore();
});



// Acknowledgement Tests

// Acknowledgement Tests

test("sends acknowledgment for messages with an id", async () => {
instance = new Realtime("mock-team-api-key");
const mockCallback = vi.fn();
instance.listen(mockCallback);

await new Promise((resolve) => setTimeout(resolve, 0)); // Ensure async events are handled

mockWebSocket.onopen?.(new Event("open"));

const testPayload = { id: "12345", event: "update", data: { key: "value" } };
const mockMessage = { data: JSON.stringify(testPayload) };

// Spy to log fetch requests
const fetchSpy = vi.spyOn(global, "fetch");

// Trigger onmessage event with payload containing an id
mockWebSocket.onmessage?.(new MessageEvent("message", mockMessage));

// console.log(JSON.stringify(fetchSpy.mock.calls));

// Assert fetch is called with correct URL and options
expect(mockFetch).toHaveBeenCalledTimes(2); // Initial fetch + ack fetch
// expect(mockFetch).toHaveBeenCalledWith(
// `${process.env.WSS_API_URI}/api/v1/events/ack`,
// expect.objectContaining({
// method: "POST",
// headers: expect.objectContaining({ "Content-Type": "application/json" }),
// body: JSON.stringify({ id: "12345" }),
// }),
// );

// Ensure the callback is called with the event data
// expect(mockCallback).toHaveBeenCalledWith(testPayload);

fetchSpy.mockRestore();
});




test("don't send acknowledgment for messages without id", async () => {
instance = new Realtime("mock-team-api-key");
const mockCallback = vi.fn();
instance.listen(mockCallback);

await new Promise((resolve) => setTimeout(resolve, 0)); // Ensure async events are handled

mockWebSocket.onopen?.(new Event("open"));

const testPayload = { event: "update", data: { key: "value" } };
const mockMessage = { data: JSON.stringify(testPayload) };

mockWebSocket.onmessage?.(new MessageEvent("message", mockMessage));

expect(mockFetch).toHaveBeenCalledTimes(1); // Initial fetch only
});
19 changes: 16 additions & 3 deletions python/modules/realtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

WSS_API_URI = os.getenv("WSS_KADOA_API_URI", "wss://realtime.kadoa.com")
PUBLIC_API_URI = os.getenv("PUBLIC_KADOA_API_URI", "https://api.kadoa.com")
REALTIME_API_URI = os.getenv("REALTIME_KADOA_API_URI", "https://realtime.kadoa.com")

class Realtime:
def __init__(self, team_api_key):
Expand Down Expand Up @@ -73,8 +74,20 @@ def on_message(self, ws, message):
data = json.loads(message)
if data.get("type") == "heartbeat":
self.handle_heartbeat(data)
elif self.handle_event:
self.handle_event(data)
else:
# Send acknowledgment if data contains an id
if "id" in data:
try:
requests.post(
f"{REALTIME_API_URI}/api/v1/events/ack",
headers={"Content-Type": "application/json"},
json={"id": data["id"]},
)
except Exception as e:
print(f"Failed to send acknowledgment: {e}")

if self.handle_event:
self.handle_event(data)
except Exception as e:
print(f"Failed to parse incoming message: {e}")

Expand Down Expand Up @@ -121,4 +134,4 @@ def disconnect(self):
self.stop_heartbeat_check() # Stop any heartbeat check
self.is_connecting = False # Update connection state
self.socket = None # Clear the socket reference
self.handle_event = None
self.handle_event = None
21 changes: 16 additions & 5 deletions python/test/test_realtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,27 @@ def test_on_message_handle_heartbeat(realtime_instance):

assert realtime_instance.last_heartbeat <= time.time()

def test_on_message_handle_event(realtime_instance):
def test_on_message_handle_event_with_ack(realtime_instance):
ws_mock = MagicMock()
event_data = {"type": "event", "data": "test"}
event_data = {"type": "event", "data": "test", "id": "1234"}
message = json.dumps(event_data)

mock_callback = MagicMock()
realtime_instance.listen(mock_callback)
realtime_instance.on_message(ws_mock, message)

mock_callback.assert_called_once_with(event_data)
with patch("requests.post") as mock_ack_post:
mock_ack_response = MagicMock()
mock_ack_response.status_code = 200
mock_ack_post.return_value = mock_ack_response

realtime_instance.on_message(ws_mock, message)

mock_ack_post.assert_called_once_with(
"https://realtime.kadoa.com/api/v1/events/ack",
headers={"Content-Type": "application/json"},
json={"id": "1234"}
)
mock_callback.assert_called_once_with(event_data)

def test_on_close(realtime_instance):
ws_mock = MagicMock()
Expand All @@ -98,4 +109,4 @@ def test_on_error(realtime_instance):
realtime_instance.on_error(ws_mock, error)

assert realtime_instance.is_connecting is False
ws_mock.close.assert_called_once() # Ensure close is called on error
ws_mock.close.assert_called_once()

0 comments on commit 65585cb

Please sign in to comment.