Skip to content

Commit 3c24817

Browse files
authored
feat(caddy): persist replay cache across config reloads (#212)
* Create a new config format so we can expand listener configuration for proxy protocol. * Remove unused `fakeAddr`. * Split `startPort` up between TCP and UDP. * Use listeners to configure TCP and/or UDP services as needed. * Remove commented out line. * Use `ElementsMatch` to compare the services irrespective of element ordering. * Do not ignore the `keys` field if `services` is used as well. * Add some more tests for failure scenarios and empty files. * Remove unused `GetPort()`. * Move `ResolveAddr` to config.go. * Remove use of `net.Addr` type. * Pull listener creation into its own function. * Move listener validation/creation to `config.go`. * Use a custom type for listener type. * Fix accept handler. * Add doc comment. * Fix tests still supplying the port. * Move old config parsing to `loadConfig`. * Lowercase `readConfig`. * Use `Config` suffix for config types. * Remove the IP version specifiers from the `newListener` config handling. * refactor: remove use of port in proving metric * Fix tests. * Add a TODO comment to allow short-form direct listener config. * Make legacy key config name consistent with type. * Move config validation out of the `loadConfig` function. * Remove unused port from bad merge. * Add comment describing keys. * Move validation of listeners to config's `Validate()` function. * Introduce a `NetworkAdd` to centralize parsing and creation of listeners. * Use `net.ListenConfig` to listen. * Simplify how we create new listeners. This does not yet deal with reused sockets. * Do not use `io.Closer`. * Use an inline error check. * Use shared listeners and packet connections. This allows us to reload a config while the existing one is still running. They share the same underlying listener, which is actually closed when the last user closes it. * Close existing listeners once the new ones are serving. * Elevate failure to stop listeners to `ERROR` level. * Be more lenient in config validation to allow empty listeners or keys. * Ensure the address is an IP address. * Use `yaml.v3`. * Move file reading back to `main.go`. * Do not embed the `net.Listener` type. * Use a `Service` object to abstract away some of the complex logic of managing listeners. * Fix how we deal with legacy services. * Remove commented out lines. * Use `tcp` and `udp` types for direct listeners. * Use a `ListenerManager` instead of globals to manage listener state. * Add validation check that no two services have the same listener. * Use channels to notify shared listeners they need to stop acceoting. * Pass TCP timeout to service. * Move go routine call up. * Allow inserting single elements directly into the cipher list. * Add the concept of a listener set to track existing listeners and close them all. * Refactor how we create listeners. We introduce shared listeners that allow us to keep an old config running while we set up a new config. This is done by keeping track of the usage of the listeners and only closing them when the last user is done with the shared listener. * Update comments. * `go mod tidy`. * refactor: don't link the TCP handler to a specific listener * Protect new cipher handling methods with mutex. * Move `listeners.go` under `/service`. * Use callback instead of passing in key and manager. * Move config start into a go routine for easier cleanup. * Make a `StreamListener` type. * Rename `closeFunc` to `onCloseFunc`. * Rename `globalListener`. * Don't track usage in the shared listeners. * Add `getAddr()` to avoid some duplicate code. * Move listener set creation out of the inner function. * Remove `PushBack()` from `CipherList`. * Move listener set to `main.go`. * Close the accept channel with an atomic value. * Update comment. * Address review comments. * Close before deleting key. * `server.Stop()` does not return a value * Add a comment for `StreamListener`. * Do not delete the listener from the manager until the last user has closed it. * Consolidate usage counting inside a `listenAddress` type. * Remove `atomic.Value`. * Add some missing comments. * address review comments * Add type guard for `sharedListener`. * Stop the existing config in a goroutine. * Add a TODO to wait for all handlers to be stopped. * Run `stopConfig` in a goroutine in `Stop()` as well. * Create a `TCPListener` that implements a `StreamListener`. * Track close functions instead of the entire listener, which is not needed. * Delegate usage tracking to a reference counter. * Remove the `Get()` method from `refCount`. * Return immediately. * Rename `shared` to `virtual` as they are not actually shared. * Simplify `listenAddr`. * Fix use of the ref count. * Add simple test case for early closing of stream listener. * Add tests for creating stream listeners. * Create handlers on demand. * Refactor create methods. * Address review comments. * Use a mutex to ensure another user doesn't acquire a new closer while we're closing it. * Move mutex up. * Manage the ref counting next to the listener creation. * Do the lazy initialization inside an anonymous function. * Fix concurrent access to `acceptCh` and `closeCh`. * Use `/` in key instead of `-`. * Return error from stopping listeners. * Use channels to ensure `virtualPacketConn`s get closed. * Add more test cases for packet listeners. * Only log errors from stopping old configs. * Remove the `closed` field from the virtual listeners. * Remove the `RefCount`. * Implement channel-based packet read for virtual connections. * Use a done channel. * Set listeners and `onCloseFunc`'s to nil when closing. * Set `onCloseFunc`'s to nil when closing. * Fix race condition. * Add some benchmarks for listener manager. * Add structure logging with `slog`. * Structure forgotten log. * Another forgotten log. * Remove IPInfo logic from TCP and UDP handling into the metrics collector. * Refactor metrics into separate collectors. * Rename some types to remove `Collector` suffix. * Use an LRU cache to manage the ipInfos for Prometheus metrics. * Use `nil` instead of `context.TODO()`. * Use `LogAttrs` for `debug...()` log functions. * Update logging in `metrics.go`. * Fix another race condition. * Revert renaming. * Replace LRU cache with a simpler map that expires unused items. * Move `SetBuildInfo()` call up. * refactor: change `outlineMetrics` to implement the `prometheus.Collector` interface * Address review comments. * Refactor collectors so the connections/associations keep track of the connection metrics. * Address review comments. * Make metrics interfaces for bytes consistently use `int64`. * Add license header. * Support multi-module workspaces so we can develop Caddy and ss-server at the same time. * Rename `Collector` to `Metrics`. * Move service creation into the service package so it can be re-used by Caddy. * Ignore custom Caddy binary. * refactor: create re-usable service that can be re-used by Caddy * Remove need to return errors in opt functions. * Move the service into `shadowsocks.go`. * Add Caddy module with app and handler. * Refactor metrics to not share with Caddy. * Set Prometheus metrics handler. * Catch already registered collectors instead of using `once.Sync`. * refactor: pass in logger to service so caller can control logs * Fix test. * Add `--watch` flag to README. * Remove changes moved to another PR. * Remove arguments from `Logger()`. * Use `slog` instead of `zap`. * Log error in `Provision()` instead of `defineMetrics()`. * Do not panic on bad metrics registrations. * Check if the cast to `OutlineApp` is ok. * Remove `version` from the config. * Use `outline_` prefix for Caddy metrics. * Remove unused `NatTimeoutSec` config option. * Move initialization of handlers to the constructor. * Pass a `list.List` instead of a `CipherList`. * Rename `SSServer` to `OutlineServer`. * refactor: make connection metrics optional * Make setting the logger a setter function. * Revert "Pass a `list.List` instead of a `CipherList`." This reverts commit 1259af8. * Create noop metrics if nil. * Revert some more changes. * Use a noop metrics struct if no metrics provided. * Add noop implementation for `ShadowsocksConnMetrics`. * Move logger arg. * Resolve nil metrics. * Set logger explicitly to `noopLogger` in service creation. * Address review comments. * Set `noopLogger` in `NewShadowsocksStreamAuthenticator()` if nil. * Fix logger reference. * Add TODO comment to persist replay cache. * Remove use of zap. * feat: persist replay cache across config reloads * Update comment. * Fix bad merge and don't use a global. * Address review comments. * Update tests. * Add more context to error message.
1 parent 3dfecd8 commit 3c24817

File tree

4 files changed

+106
-6
lines changed

4 files changed

+106
-6
lines changed

caddy/app.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package caddy
1919

2020
import (
2121
"errors"
22+
"fmt"
2223
"log/slog"
2324

2425
outline_prometheus "github.com/Jigsaw-Code/outline-ss-server/prometheus"
@@ -30,9 +31,14 @@ import (
3031
const outlineModuleName = "outline"
3132

3233
func init() {
34+
replayCache := outline.NewReplayCache(0)
3335
caddy.RegisterModule(ModuleRegistration{
34-
ID: outlineModuleName,
35-
New: func() caddy.Module { return new(OutlineApp) },
36+
ID: outlineModuleName,
37+
New: func() caddy.Module {
38+
app := new(OutlineApp)
39+
app.ReplayCache = replayCache
40+
return app
41+
},
3642
})
3743
}
3844

@@ -65,8 +71,9 @@ func (app *OutlineApp) Provision(ctx caddy.Context) error {
6571
app.logger.Info("provisioning app instance")
6672

6773
if app.ShadowsocksConfig != nil {
68-
// TODO: Persist replay cache across config reloads.
69-
app.ReplayCache = outline.NewReplayCache(app.ShadowsocksConfig.ReplayHistory)
74+
if err := app.ReplayCache.Resize(app.ShadowsocksConfig.ReplayHistory); err != nil {
75+
return fmt.Errorf("failed to configure replay history with capacity %d: %v", app.ShadowsocksConfig.ReplayHistory, err)
76+
}
7077
}
7178

7279
if err := app.defineMetrics(); err != nil {

caddy/shadowsocks_handler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ const ssModuleName = "layer4.handlers.shadowsocks"
3131

3232
func init() {
3333
caddy.RegisterModule(ModuleRegistration{
34-
ID: ssModuleName,
34+
ID: ssModuleName,
3535
New: func() caddy.Module { return new(ShadowsocksHandler) },
3636
})
3737
}

service/replay.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package service
1616

1717
import (
1818
"encoding/binary"
19+
"errors"
1920
"sync"
2021
)
2122

@@ -92,11 +93,25 @@ func (c *ReplayCache) Add(id string, salt []byte) bool {
9293
return false
9394
}
9495
_, inArchive := c.archive[hash]
95-
if len(c.active) == c.capacity {
96+
if len(c.active) >= c.capacity {
9697
// Discard the archive and move active to archive.
9798
c.archive = c.active
9899
c.active = make(map[uint32]empty, c.capacity)
99100
}
100101
c.active[hash] = empty{}
101102
return !inArchive
102103
}
104+
105+
// Resize adjusts the capacity of the ReplayCache.
106+
func (c *ReplayCache) Resize(capacity int) error {
107+
if capacity > MaxCapacity {
108+
return errors.New("ReplayCache capacity would result in too many false positives")
109+
}
110+
c.mutex.Lock()
111+
defer c.mutex.Unlock()
112+
c.capacity = capacity
113+
// NOTE: The active handshakes and archive lists are not explicitly shrunk.
114+
// Their sizes will naturally adjust as new handshakes are added and the cache
115+
// adheres to the updated capacity.
116+
return nil
117+
}

service/replay_test.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ package service
1717
import (
1818
"encoding/binary"
1919
"testing"
20+
21+
"github.com/stretchr/testify/assert"
22+
"github.com/stretchr/testify/require"
2023
)
2124

2225
const keyID = "the key"
@@ -91,6 +94,81 @@ func TestReplayCache_Archive(t *testing.T) {
9194
}
9295
}
9396

97+
func TestReplayCache_Resize(t *testing.T) {
98+
t.Run("Smaller resizes active and archive maps", func(t *testing.T) {
99+
salts := makeSalts(10)
100+
cache := NewReplayCache(5)
101+
for _, s := range salts {
102+
cache.Add(keyID, s)
103+
}
104+
105+
err := cache.Resize(3)
106+
107+
require.NoError(t, err)
108+
assert.Equal(t, cache.capacity, 3, "Expected capacity to be updated")
109+
110+
// Adding a new salt should trigger a shrinking of the active map as it hits the new
111+
// capacity immediately.
112+
cache.Add(keyID, salts[0])
113+
assert.Len(t, cache.active, 1, "Expected active handshakes length to have shrunk")
114+
assert.Len(t, cache.archive, 5, "Expected archive handshakes length to not have shrunk")
115+
116+
// Adding more new salts should eventually trigger a shrinking of the archive map as well,
117+
// when the shrunken active map gets moved to the archive.
118+
for _, s := range salts {
119+
cache.Add(keyID, s)
120+
}
121+
assert.Len(t, cache.archive, 3, "Expected archive handshakes length to have shrunk")
122+
})
123+
124+
t.Run("Larger resizes active and archive maps", func(t *testing.T) {
125+
salts := makeSalts(10)
126+
cache := NewReplayCache(5)
127+
for _, s := range salts {
128+
cache.Add(keyID, s)
129+
}
130+
131+
err := cache.Resize(10)
132+
133+
require.NoError(t, err)
134+
assert.Equal(t, cache.capacity, 10, "Expected capacity to be updated")
135+
assert.Len(t, cache.active, 5, "Expected active handshakes length not to have changed")
136+
assert.Len(t, cache.archive, 5, "Expected archive handshakes length not to have changed")
137+
})
138+
139+
t.Run("Still detect salts", func(t *testing.T) {
140+
salts := makeSalts(10)
141+
cache := NewReplayCache(5)
142+
for _, s := range salts {
143+
cache.Add(keyID, s)
144+
}
145+
146+
cache.Resize(10)
147+
148+
for _, s := range salts {
149+
if cache.Add(keyID, s) {
150+
t.Error("Should still be able to detect the salts after resizing")
151+
}
152+
}
153+
154+
cache.Resize(3)
155+
156+
for _, s := range salts {
157+
if cache.Add(keyID, s) {
158+
t.Error("Should still be able to detect the salts after resizing")
159+
}
160+
}
161+
})
162+
163+
t.Run("Exceeding maximum capacity", func(t *testing.T) {
164+
cache := &ReplayCache{}
165+
166+
err := cache.Resize(MaxCapacity + 1)
167+
168+
require.Error(t, err)
169+
})
170+
}
171+
94172
// Benchmark to determine the memory usage of ReplayCache.
95173
// Note that NewReplayCache only allocates the active set,
96174
// so the eventual memory usage will be roughly double.

0 commit comments

Comments
 (0)