Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions docs/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,20 @@ <h3 class="endpoint">/stream/:namespace/*key</h3>
<pre class="success">
<a href="https://abacus.jasoncameron.dev/stream/mysite.com/visits" target="_blank">GET /stream/mysite.com/visits</a>
⇒ data: {"value": 36}
</pre>

<h3 class="endpoint">/stream/hit/:namespace/*key</h3>
<p>
Increments and streams a counter using
<a href="https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#Receiving_events_from_the_server" target="_blank">
Server-Sent Events (SSE)
</a>.
<br>
Optionally specify a namespace.</p>

<pre class="success">
<a href="https://abacus.jasoncameron.dev/stream/hit/mysite.com/visits" target="_blank">GET /stream/hit/mysite.com/visits</a>
⇒ data: {"value": 1}
</pre>

<h3 id="create" class="endpoint">/create/:namespace/*key</h3>
Expand Down
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ func CreateRouter() *gin.Engine {
route.GET("/hit/:namespace/:key/shield", HitShieldView)
route.GET("/hit/:namespace/:key", HitView)
route.GET("/stream/:namespace/*key", middleware.SSEMiddleware(), StreamValueView)
route.GET("/stream/hit/:namespace/*key", middleware.SSEMiddleware(), StreamHitView)

route.POST("/create/:namespace/*key", CreateView)
route.GET("/create/:namespace/*key", CreateView)
Expand Down
125 changes: 125 additions & 0 deletions routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,131 @@ func StreamValueView(c *gin.Context) {
})
}

func StreamHitView(c *gin.Context) {
namespace, key := utils.GetNamespaceKey(c)
if namespace == "" || key == "" {
return
}
dbKey := utils.CreateKey(c, namespace, key, false)
if dbKey == "" { // error is handled in CreateKey
return
}
// Get data from Redis
val, err := Client.Incr(context.Background(), dbKey).Result()
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to get data. Try again later."})
return
}
// check if val is is greater than the max value of an int
if val > math.MaxInt {
c.JSON(http.StatusBadRequest, gin.H{"error": "Value is too large. Max value is " + strconv.Itoa(math.
MaxInt), "message": "If you are seeing this error and have a legitimate use case, please contact me @ abacus@jasoncameron.dev"})
return
}

go func() {
utils.SetStream(dbKey, int(val)) // #nosec G115 -- This is safe as we perform a check (
// see above) to ensure val is within the range of an int.
Client.Expire(context.Background(), dbKey, utils.BaseTTLPeriod)
}()

// Set SSE headers
c.Header("Content-Type", "text/event-stream")
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")

// Initialize client channel with a buffer to prevent blocking
clientChan := make(chan int, 10)

// Create a context that's canceled when the client disconnects
ctx := c.Request.Context()

// Add this client to the event server for this specific key
utils.ValueEventServer.NewClients <- utils.KeyClientPair{
Key: dbKey,
Client: clientChan,
}

// Track if cleanup has been done
var cleanupDone bool
var cleanupMutex sync.Mutex

// Ensure client is always removed when handler exits
defer func() {
cleanupMutex.Lock()
if !cleanupDone {
cleanupDone = true
cleanupMutex.Unlock()

// Signal the event server to remove this client
select {
case utils.ValueEventServer.ClosedClients <- utils.KeyClientPair{Key: dbKey, Client: clientChan}:
// Successfully sent cleanup signal
case <-time.After(500 * time.Millisecond):
// Timed out waiting to send cleanup signal
log.Printf("Warning: Timed out sending cleanup signal for %s", dbKey)
}
} else {
cleanupMutex.Unlock()
}
}()

// Monitor for client disconnection in a separate goroutine
go func() {
<-ctx.Done() // Wait for context cancellation (client disconnected)

cleanupMutex.Lock()
if !cleanupDone {
cleanupDone = true
cleanupMutex.Unlock()

log.Printf("Client disconnected for key %s, cleaning up", dbKey)

// Signal the event server to remove this client
select {
case utils.ValueEventServer.ClosedClients <- utils.KeyClientPair{Key: dbKey, Client: clientChan}:
// Successfully sent cleanup signal
case <-time.After(500 * time.Millisecond):
// Timed out waiting to send cleanup signal
log.Printf("Warning: Timed out sending cleanup signal for %s after disconnect", dbKey)
}
} else {
cleanupMutex.Unlock()
}
}()

// Send initial value
if count := val; err == nil {
// Keep your exact format
_, err := c.Writer.WriteString(fmt.Sprintf("data: {\"value\":%d}\n\n", count))
if err != nil {
log.Printf("Error writing to client: %v", err)
return
}
c.Writer.Flush()
}

// Stream updates
c.Stream(func(w io.Writer) bool {
select {
case <-ctx.Done():
return false
case count, ok := <-clientChan:
if !ok {
return false
}
// Keep your exact format
_, err := c.Writer.WriteString(fmt.Sprintf("data: {\"value\":%d}\n\n", count))
if err != nil {
log.Printf("Error writing to client: %v", err)
return false
}
c.Writer.Flush()
return true
}
})
}

func HitView(c *gin.Context) {
namespace, key := utils.GetNamespaceKey(c)
if namespace == "" || key == "" {
Expand Down
Loading
Loading