-
Notifications
You must be signed in to change notification settings - Fork 15
Open
Labels
bugSomething isn't workingSomething isn't workingdocumentationImprovements or additions to documentationImprovements or additions to documentationserver-proto
Description
Sync Subscription Acknowledgement Proposal
A lightweight acknowledgement system to improve reliability without OPC UA's complexity.
Problem Statement
The current /sync endpoint clears the queue immediately upon retrieval:
response = sub.pendingUpdates.copy()
sub.pendingUpdates.clear() # Data at risk if client crashes before processingIf the client crashes after receiving data but before processing it, that data is lost.
Proposed Solution: Cursor-Based Acknowledgement
Add a sequence number to each update and let the client acknowledge by cursor position.
Current: /sync → returns all, clears queue (data at risk)
Proposed: /sync → returns all with sequence numbers
/ack?through=456 → clears updates ≤ 456
Data Model Changes
Modified Subscription Model
# subscriptions.py
class PendingUpdate(BaseModel):
sequence: int
timestamp: str
data: Any
class Subscription(BaseModel):
subscriptionId: int
created: str
maxDepth: int = 1
monitoredItems: List[str] = []
# New fields
pendingUpdates: List[PendingUpdate] = [] # Now includes sequence numbers
nextSequence: int = 1 # Incrementing counter
lastAckedSequence: int = 0 # Client's confirmed position
max_queue_size: int = 1000
max_unacked_age_seconds: int = 300 # Optional: TTL for old unacked data
is_streaming: bool = False
handler: Callable | None = None
event_loop: Any | None = None
streaming_response: StreamingResponse | None = NoneNew/Modified Endpoints
1. Modified /sync - Returns data without clearing
@router.post("/subscriptions/{subscriptionId}/sync")
async def sync_subscription(
subscriptionId: int,
request: Request,
since: int = Query(default=0, description="Return updates after this sequence")
):
sub = get_subscription(subscriptionId)
if not sub:
raise HTTPException(status_code=404, detail="Subscription not found")
# Filter to updates after the requested sequence
updates = [u for u in sub.pendingUpdates if u.sequence > since]
# Return updates with sequence info, but DON'T clear
return {
"updates": [
{
"sequence": u.sequence,
"timestamp": u.timestamp,
"data": u.data
}
for u in updates
],
"lastSequence": sub.pendingUpdates[-1].sequence if sub.pendingUpdates else sub.lastAckedSequence,
"lastAckedSequence": sub.lastAckedSequence
}2. New /ack - Acknowledges through a sequence number
class AckRequest(BaseModel):
throughSequence: int
@router.post("/subscriptions/{subscriptionId}/ack")
async def ack_subscription(subscriptionId: int, body: AckRequest, request: Request):
sub = get_subscription(subscriptionId)
if not sub:
raise HTTPException(status_code=404, detail="Subscription not found")
if body.throughSequence <= sub.lastAckedSequence:
return {"acknowledged": 0, "lastAckedSequence": sub.lastAckedSequence}
# Remove all updates up through the acknowledged sequence
before_count = len(sub.pendingUpdates)
sub.pendingUpdates = [
u for u in sub.pendingUpdates
if u.sequence > body.throughSequence
]
sub.lastAckedSequence = body.throughSequence
return {
"acknowledged": before_count - len(sub.pendingUpdates),
"lastAckedSequence": sub.lastAckedSequence,
"remaining": len(sub.pendingUpdates)
}3. Combined sync + ack (optional convenience endpoint)
class SyncRequest(BaseModel):
ackThrough: int | None = None # Optionally ack previous batch
since: int | None = None # Optionally filter by sequence
@router.post("/subscriptions/{subscriptionId}/sync")
async def sync_subscription(subscriptionId: int, body: SyncRequest = None):
sub = get_subscription(subscriptionId)
if not sub:
raise HTTPException(status_code=404, detail="Subscription not found")
# Ack previous batch if provided
if body and body.ackThrough:
sub.pendingUpdates = [u for u in sub.pendingUpdates if u.sequence > body.ackThrough]
sub.lastAckedSequence = body.ackThrough
# Filter by since if provided
since = (body.since if body else None) or 0
updates = [u for u in sub.pendingUpdates if u.sequence > since]
return {
"updates": updates,
"lastSequence": sub.pendingUpdates[-1].sequence if sub.pendingUpdates else sub.lastAckedSequence,
"lastAckedSequence": sub.lastAckedSequence
}Modified Update Handler
def handle_data_source_update(instance, value, I3X_DATA_SUBSCRIPTIONS, data_source):
for sub in I3X_DATA_SUBSCRIPTIONS:
if element_id in sub.monitoredItems:
updateValue = getSubscriptionValue(instance, value, ...)
if sub.is_streaming and sub.handler:
# Streaming mode - unchanged
sub.handler(updateValue)
else:
# Sync mode - add sequence number
pending = PendingUpdate(
sequence=sub.nextSequence,
timestamp=datetime.utcnow().isoformat(),
data=updateValue
)
sub.nextSequence += 1
# Queue management - drop oldest when full
if len(sub.pendingUpdates) >= sub.max_queue_size:
sub.pendingUpdates.pop(0)
# TODO: Consider logging warning about data loss
sub.pendingUpdates.append(pending)Client Usage Pattern
JavaScript Client Example
class SyncSubscriptionClient {
constructor(baseUrl, subscriptionId) {
this.baseUrl = baseUrl;
this.subscriptionId = subscriptionId;
this.lastProcessedSequence = 0;
}
async poll() {
// 1. Fetch updates from last known position
const response = await fetch(
`${this.baseUrl}/subscriptions/${this.subscriptionId}/sync?since=${this.lastProcessedSequence}`
);
const { updates, lastSequence } = await response.json();
if (updates.length === 0) return;
// 2. Process updates
for (const update of updates) {
await this.processUpdate(update.data);
this.lastProcessedSequence = update.sequence;
}
// 3. Acknowledge AFTER successful processing
await fetch(
`${this.baseUrl}/subscriptions/${this.subscriptionId}/ack`,
{
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ throughSequence: this.lastProcessedSequence })
}
);
}
async processUpdate(data) {
// Application-specific processing
}
}
// Usage
const client = new SyncSubscriptionClient('http://localhost:8000', 1);
setInterval(() => client.poll(), 1000);Python Client Example
import httpx
class SyncSubscriptionClient:
def __init__(self, base_url: str, subscription_id: int):
self.base_url = base_url
self.subscription_id = subscription_id
self.last_processed_sequence = 0
def poll(self):
# 1. Fetch updates
response = httpx.post(
f"{self.base_url}/subscriptions/{self.subscription_id}/sync",
params={"since": self.last_processed_sequence}
)
data = response.json()
updates = data.get("updates", [])
if not updates:
return
# 2. Process updates
for update in updates:
self.process_update(update["data"])
self.last_processed_sequence = update["sequence"]
# 3. Acknowledge
httpx.post(
f"{self.base_url}/subscriptions/{self.subscription_id}/ack",
json={"throughSequence": self.last_processed_sequence}
)
def process_update(self, data):
# Application-specific processing
print(f"Processing: {data}")Optional Enhancements
1. TTL-based cleanup
Prevent unbounded queue growth for dead clients:
from datetime import datetime, timedelta
def cleanup_stale_updates(sub: Subscription):
"""Remove updates older than max_unacked_age_seconds."""
cutoff = datetime.utcnow() - timedelta(seconds=sub.max_unacked_age_seconds)
before_count = len(sub.pendingUpdates)
sub.pendingUpdates = [
u for u in sub.pendingUpdates
if datetime.fromisoformat(u.timestamp) > cutoff
]
removed = before_count - len(sub.pendingUpdates)
if removed > 0:
logging.warning(f"Subscription {sub.subscriptionId}: removed {removed} stale updates")2. Nack support
Allow client to report processing failure and reset position:
class NackRequest(BaseModel):
resetToSequence: int
reason: str | None = None
@router.post("/subscriptions/{subscriptionId}/nack")
async def nack_subscription(subscriptionId: int, body: NackRequest):
sub = get_subscription(subscriptionId)
if not sub:
raise HTTPException(status_code=404, detail="Subscription not found")
if body.reason:
logging.warning(f"Subscription {subscriptionId} nack: {body.reason}")
# Don't actually change lastAckedSequence - just let client re-fetch
# Client can use `since` parameter to get updates from desired position
return {
"reset": True,
"message": f"Client can re-fetch from sequence {body.resetToSequence} using ?since parameter"
}3. Health/status endpoint
@router.get("/subscriptions/{subscriptionId}/status")
async def subscription_status(subscriptionId: int):
sub = get_subscription(subscriptionId)
if not sub:
raise HTTPException(status_code=404, detail="Subscription not found")
return {
"subscriptionId": sub.subscriptionId,
"monitoredItemCount": len(sub.monitoredItems),
"pendingUpdateCount": len(sub.pendingUpdates),
"nextSequence": sub.nextSequence,
"lastAckedSequence": sub.lastAckedSequence,
"unackedCount": len([u for u in sub.pendingUpdates if u.sequence > sub.lastAckedSequence]),
"isStreaming": sub.is_streaming,
"oldestPendingTimestamp": sub.pendingUpdates[0].timestamp if sub.pendingUpdates else None
}Comparison
| Aspect | Current | Proposed | OPC UA |
|---|---|---|---|
| Sequence tracking | None | Single counter per subscription | Per-message with AvailableSequenceNumbers |
| Acknowledgement | Implicit (sync = ack) | Explicit batch (/ack?through=N) |
Explicit per-sequence |
| Retry on failure | No | Yes (data retained until acked) | Yes (server resends) |
| Client crash recovery | Lost data | Resume from lastAckedSequence | Resume from last ack |
| Complexity | Minimal | Low | High |
| Queue overflow | Silent drop | Drop oldest (with warning) | QueueOverflowEvent |
Migration Path
Phase 1: Add sequence numbers (backward compatible)
- Add
sequencefield to updates - Existing clients continue to work (ignore sequence)
- New clients can start tracking sequences
Phase 2: Add /ack endpoint
- Existing clients continue to work (never call
/ack, queue grows) - Add TTL cleanup to prevent unbounded growth
- New clients use
/ackfor reliability
Phase 3: Deprecate implicit ack (optional)
- Add configuration flag to disable implicit ack on
/sync - Gives time for clients to migrate
Summary
This design provides:
- Crash recovery: Client can resume from last acknowledged position
- Explicit confirmation: Data isn't lost until client confirms receipt
- Batch efficiency: One ack covers many messages
- Simplicity: Just an incrementing counter, no complex state machine
- Backward compatible: Old clients still work (just without retry benefits)
The main trade-off vs OPC UA is that the server doesn't actively track what's "in flight" - it's up to the client to ack. But that keeps the server stateless and simple.
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't workingdocumentationImprovements or additions to documentationImprovements or additions to documentationserver-proto