Skip to content

Commit 42dc877

Browse files
authored
Merge pull request #11 from invertase/7-pubsub
feat: pubsub module
2 parents a66fbe1 + f9e20a6 commit 42dc877

File tree

3 files changed

+206
-3
lines changed

3 files changed

+206
-3
lines changed

example/functions/main.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
"""
22
Example Firebase Functions written in Python
33
"""
4-
from firebase_functions import db, options, https, params
4+
from firebase_functions import db, options, https, params, pubsub
55
from firebase_admin import initialize_app
66

77
initialize_app()
@@ -52,3 +52,10 @@ def on_call_example(req: https.CallableRequest):
5252
"This is some details of the test",
5353
)
5454
return "Hello from https on call function example"
55+
56+
57+
@pubsub.on_message_published(
58+
topic="hello",)
59+
def on_message_published_example(
60+
event: pubsub.CloudEvent[pubsub.MessagePublishedData]) -> None:
61+
print("Hello from pubsub event:", event)

src/firebase_functions/options.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14-
15-
1614
"""
1715
Module for options that can be used to configure Firebase Cloud Functions
1816
deployments.
@@ -294,6 +292,27 @@ class PubSubOptions(RuntimeOptions):
294292
The Pub/Sub topic to watch for message events.
295293
"""
296294

295+
def _endpoint(
296+
self,
297+
**kwargs,
298+
) -> _manifest.ManifestEndpoint:
299+
event_filters: _typing.Any = {
300+
"topic": self.topic,
301+
}
302+
event_trigger = _manifest.EventTrigger(
303+
eventType="google.cloud.pubsub.topic.v1.messagePublished",
304+
retry=False,
305+
eventFilters=event_filters,
306+
)
307+
308+
kwargs_merged = {
309+
**_dataclasses.asdict(super()._endpoint(**kwargs)),
310+
"eventTrigger":
311+
event_trigger,
312+
}
313+
return _manifest.ManifestEndpoint(
314+
**_typing.cast(_typing.Dict, kwargs_merged))
315+
297316

298317
@_dataclasses.dataclass(frozen=True, kw_only=True)
299318
class DatabaseOptions(RuntimeOptions):

src/firebase_functions/pubsub.py

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
# Copyright 2022 Google Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
"""
15+
Cloud functions to handle events from Google Cloud Pub/Sub.
16+
"""
17+
# pylint: disable=protected-access
18+
import dataclasses as _dataclasses
19+
import datetime as _dt
20+
import functools as _functools
21+
import typing as _typing
22+
import json as _json
23+
import base64 as _base64
24+
import cloudevents.http as _ce
25+
26+
import firebase_functions.options as _options
27+
import firebase_functions.private.util as _util
28+
from firebase_functions.core import CloudEvent, T
29+
30+
31+
@_dataclasses.dataclass(frozen=True)
32+
class Message(_typing.Generic[T]):
33+
"""
34+
Interface representing a Google Cloud Pub/Sub message.
35+
"""
36+
37+
message_id: str
38+
"""
39+
Autogenerated ID that uniquely identifies this message.
40+
"""
41+
42+
publish_time: str
43+
"""
44+
Time the message was published.
45+
"""
46+
47+
attributes: dict[str, str]
48+
"""
49+
User-defined attributes published with the message, if any.
50+
"""
51+
52+
data: str
53+
"""
54+
The data payload of this message object as a base64-encoded string.
55+
"""
56+
57+
ordering_key: str
58+
"""
59+
User-defined key used to ensure ordering amongst messages with the same key.
60+
"""
61+
62+
@property
63+
def json(self) -> _typing.Optional[T]:
64+
try:
65+
if self.data is not None:
66+
return _json.loads(_base64.b64decode(self.data).decode("utf-8"))
67+
else:
68+
return None
69+
except Exception as error:
70+
raise Exception(
71+
f"Unable to parse Pub/Sub message data as JSON: {error}"
72+
) from error
73+
74+
75+
@_dataclasses.dataclass(frozen=True)
76+
class MessagePublishedData(_typing.Generic[T]):
77+
"""
78+
The interface published in a Pub/Sub publish subscription.
79+
80+
'T' Type representing `Message.data`'s JSON format.
81+
"""
82+
message: Message[T]
83+
"""
84+
Google Cloud Pub/Sub message.
85+
"""
86+
87+
subscription: str
88+
"""
89+
A subscription resource.
90+
"""
91+
92+
93+
_E1 = CloudEvent[MessagePublishedData[T]]
94+
_C1 = _typing.Callable[[_E1], None]
95+
96+
97+
def _message_handler(
98+
func: _C1,
99+
raw: _ce.CloudEvent,
100+
) -> None:
101+
event_attributes = raw._get_attributes()
102+
event_data = raw._get_data()
103+
event_dict = {"data": event_data, **event_attributes}
104+
data = event_dict["data"]
105+
message_dict = data["message"]
106+
107+
time = _dt.datetime.strptime(
108+
event_dict["time"],
109+
"%Y-%m-%dT%H:%M:%S.%f%z",
110+
)
111+
112+
publish_time = _dt.datetime.strptime(
113+
message_dict["publish_time"],
114+
"%Y-%m-%dT%H:%M:%S.%f%z",
115+
)
116+
117+
# Convert the UTC string into a datetime object
118+
event_dict["time"] = time
119+
message_dict["publish_time"] = publish_time
120+
121+
# Pop unnecessary keys from the message data
122+
# (we get these keys from the snake case alternatives that are provided)
123+
message_dict.pop("messageId", None)
124+
message_dict.pop("publishTime", None)
125+
126+
# `orderingKey` doesn't come with a snake case alternative,
127+
# there is no `ordering_key` in the raw request.
128+
ordering_key = message_dict.pop("orderingKey", None)
129+
130+
message: MessagePublishedData = MessagePublishedData(
131+
message=Message(
132+
**message_dict,
133+
ordering_key=ordering_key,
134+
),
135+
subscription=data["subscription"],
136+
)
137+
138+
event_dict["data"] = message
139+
140+
event: CloudEvent[MessagePublishedData] = CloudEvent(
141+
data=event_dict["data"],
142+
id=event_dict["id"],
143+
source=event_dict["source"],
144+
specversion=event_dict["specversion"],
145+
subject=event_dict["subject"] if "subject" in event_dict else None,
146+
time=event_dict["time"],
147+
type=event_dict["type"],
148+
)
149+
150+
func(event)
151+
152+
153+
@_util.copy_func_kwargs(_options.PubSubOptions)
154+
def on_message_published(**kwargs) -> _typing.Callable[[_C1], _C1]:
155+
"""
156+
Event handler which triggers on a message being published to a Pub/Sub topic.
157+
Example::
158+
@on_message_published(topic="hello-world")
159+
def example(event: CloudEvent[MessagePublishedData[object]]) -> None:
160+
pass
161+
162+
"""
163+
options = _options.PubSubOptions(**kwargs)
164+
165+
def on_message_published_inner_decorator(func: _C1):
166+
167+
@_functools.wraps(func)
168+
def on_message_published_wrapped(raw: _ce.CloudEvent):
169+
return _message_handler(func, raw)
170+
171+
_util.set_func_endpoint_attr(
172+
on_message_published_wrapped,
173+
options._endpoint(func_name=func.__name__),
174+
)
175+
return on_message_published_wrapped
176+
177+
return on_message_published_inner_decorator

0 commit comments

Comments
 (0)