Skip to content

Commit 1bd1814

Browse files
committed
Support streaming tail workers over rpc
As it stands workers tailed by streaming tail workers have to know they are tailing to a streaming tail worker. This is a limitation that should be refactored away in the future but for the moment we can't depend on hasHandler to infer this over rpc.
1 parent 2faf29d commit 1bd1814

File tree

5 files changed

+32
-26
lines changed

5 files changed

+32
-26
lines changed

samples/tail-workers/config.capnp

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ const helloWorld :Workerd.Worker = (
1717
],
1818
compatibilityDate = "2024-10-14",
1919
tails = ["log"],
20+
streamingTails = ["log"],
2021
);
2122

2223
const logWorker :Workerd.Worker = (

src/workerd/api/tail-worker-test-invalid.wd-test

+2-2
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ const unitTests :Workerd.Config = (
1313
],
1414
compatibilityDate = "2023-08-01",
1515
compatibilityFlags = ["nodejs_compat", "service_binding_extra_handlers", "cache_option_disabled"],
16-
tails = ["log"],
16+
streamingTails = ["log"],
1717
),
1818
),
1919
# tail worker with tests
@@ -30,4 +30,4 @@ const unitTests :Workerd.Config = (
3030
autogates = [
3131
"workerd-autogate-streaming-tail-workers",
3232
],
33-
);
33+
);

src/workerd/api/tail-worker-test.wd-test

+7-6
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ const unitTests :Workerd.Config = (
1414
],
1515
compatibilityDate = "2023-08-01",
1616
compatibilityFlags = ["nodejs_compat", "service_binding_extra_handlers", "cache_option_disabled"],
17-
tails = ["log"],
17+
streamingTails = ["log"],
1818
),
1919
),
2020
( name = "queue-test",
@@ -28,7 +28,7 @@ const unitTests :Workerd.Config = (
2828
],
2929
compatibilityDate = "2023-07-24",
3030
compatibilityFlags = ["nodejs_compat", "service_binding_extra_handlers"],
31-
tails = ["log"],
31+
streamingTails = ["log"],
3232
)
3333
),
3434
(name = "alarms", worker = .alarmsWorker),
@@ -66,7 +66,8 @@ const alarmsWorker :Workerd.Worker = (
6666
(name = "ns", durableObjectNamespace = "DurableObjectExample"),
6767
],
6868
# tailed by the main tail worker and the dummy legacy tail worker, to get traces for it too.
69-
tails = ["log", "legacy"],
69+
tails = ["legacy"],
70+
streamingTails = ["log"],
7071
);
7172

7273
const hiberWorker :Workerd.Worker = (
@@ -86,7 +87,7 @@ const hiberWorker :Workerd.Worker = (
8687
bindings = [
8788
(name = "ns", durableObjectNamespace = "DurableObjectExample"),
8889
],
89-
tails = ["log"],
90+
streamingTails = ["log"],
9091
);
9192

9293
const logWorker :Workerd.Worker = (
@@ -111,5 +112,5 @@ const logLegacy :Workerd.Worker = (
111112
(name = "worker", esModule = embed "tail-worker-test-dummy.js")
112113
],
113114
compatibilityDate = "2024-10-14",
114-
tails = ["log"],
115-
);
115+
streamingTails = ["log"],
116+
);

src/workerd/server/server.c++

+18-18
Original file line numberDiff line numberDiff line change
@@ -1631,6 +1631,7 @@ class Server::WorkerService final: public Service,
16311631
kj::Maybe<kj::Own<SqliteDatabase::Vfs>> actorStorage;
16321632
AlarmScheduler& alarmScheduler;
16331633
kj::Array<kj::Own<Service>> tails;
1634+
kj::Array<kj::Own<Service>> streamingTails;
16341635
};
16351636
using LinkCallback = kj::Function<LinkedIoChannels(WorkerService&)>;
16361637
using AbortActorsCallback = kj::Function<void()>;
@@ -1760,25 +1761,20 @@ class Server::WorkerService final: public Service,
17601761

17611762
kj::Array<kj::Own<WorkerInterface>> legacyTailWorkers = nullptr;
17621763
kj::Array<kj::Own<WorkerInterface>> streamingTailWorkers = nullptr;
1763-
// If streaming tail workers is enabled, then we will initialize two lists:
1764-
// one with services that only export the tail or trace handler (legacy tail
1765-
// workers) and one that exports the tailStream handler. We'll check tailStreams
1766-
// first.
1764+
legacyTailWorkers = KJ_MAP(service, channels.tails) -> kj::Own<WorkerInterface> {
1765+
// Caution here... if the tail worker ends up have a circular dependency
1766+
// on the worker we'll end up with an infinite loop trying to initialize.
1767+
// We can test this directly but it's more difficult to test indirect
1768+
// loops (dependency of dependency, etc). Here we're just going to keep
1769+
// it simple and just check the direct dependency.
1770+
// If service refers to an EntrypointService, we need to compare with the underlying
1771+
// WorkerService to match this.
1772+
KJ_ASSERT(service->service() != this, "A worker currently cannot log to itself");
1773+
return service->startRequest({});
1774+
};
1775+
17671776
if (util::Autogate::isEnabled(util::AutogateKey::STREAMING_TAIL_WORKERS)) {
1768-
kj::Vector<kj::Own<WorkerInterface>> legacyList;
1769-
kj::Vector<kj::Own<WorkerInterface>> streamingList;
1770-
for (auto& service: channels.tails) {
1771-
KJ_ASSERT(service->service() != this, "A worker currently cannot log to itself");
1772-
if (service->hasHandler("tailStream"_kj)) {
1773-
streamingList.add(service->startRequest({}));
1774-
} else if (service->hasHandler("tail") || service->hasHandler("trace")) {
1775-
legacyList.add(service->startRequest({}));
1776-
}
1777-
}
1778-
legacyTailWorkers = legacyList.releaseAsArray();
1779-
streamingTailWorkers = streamingList.releaseAsArray();
1780-
} else {
1781-
legacyTailWorkers = KJ_MAP(service, channels.tails) -> kj::Own<WorkerInterface> {
1777+
streamingTailWorkers = KJ_MAP(service, channels.streamingTails) -> kj::Own<WorkerInterface> {
17821778
// Caution here... if the tail worker ends up have a circular dependency
17831779
// on the worker we'll end up with an infinite loop trying to initialize.
17841780
// We can test this directly but it's more difficult to test indirect
@@ -3475,6 +3471,10 @@ kj::Own<Server::Service> Server::makeWorker(kj::StringPtr name,
34753471
return lookupService(tail, kj::str("Worker \"", name, "\"'s tails"));
34763472
};
34773473

3474+
result.streamingTails = KJ_MAP(streamingTails, conf.getStreamingTails()) {
3475+
return lookupService(streamingTails, kj::str("Worker \"", name, "\"'s streaming tails"));
3476+
};
3477+
34783478
return result;
34793479
};
34803480

src/workerd/server/workerd.capnp

+4
Original file line numberDiff line numberDiff line change
@@ -643,6 +643,10 @@ struct Worker {
643643
tails @14 :List(ServiceDesignator);
644644
# List of tail worker services that should receive tail events for this worker.
645645
# See: https://developers.cloudflare.com/workers/observability/logs/tail-workers/
646+
647+
streamingTails @15 :List(ServiceDesignator);
648+
# List of streaming tail worker services that should receive tail events for this worker.
649+
# NOTE: This will be deleted in a future refactor, do not depend on this.
646650
}
647651

648652
struct ExternalServer {

0 commit comments

Comments
 (0)