1
+ # Copyright 2022 Google Inc.
2
+ #
3
+ # Licensed under the Apache License, Version 2.0 (the "License");
4
+ # you may not use this file except in compliance with the License.
5
+ # You may obtain a copy of the License at
6
+ #
7
+ # http://www.apache.org/licenses/LICENSE-2.0
8
+ #
9
+ # Unless required by applicable law or agreed to in writing, software
10
+ # distributed under the License is distributed on an "AS IS" BASIS,
11
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
+ # See the License for the specific language governing permissions and
13
+ # limitations under the License.
1
14
"""
2
15
Cloud functions to handle events from Google Cloud Pub/Sub.
3
16
"""
4
-
17
+ # pylint: disable=protected-access
5
18
import dataclasses as _dataclasses
19
+ import datetime as _dt
6
20
import functools as _functools
7
21
import typing as _typing
8
22
import json as _json
9
23
import base64 as _base64
24
+ import cloudevents .http as _ce
10
25
11
26
import firebase_functions .options as _options
12
27
import firebase_functions .private .util as _util
@@ -45,7 +60,7 @@ class Message(_typing.Generic[T]):
45
60
"""
46
61
47
62
@property
48
- def json (self ) -> _typing .Optional [_core . T ]:
63
+ def json (self ) -> _typing .Optional [T ]:
49
64
try :
50
65
if self .data is not None :
51
66
return _json .loads (_base64 .b64decode (self .data ).decode ("utf-8" ))
@@ -58,13 +73,13 @@ def json(self) -> _typing.Optional[_core.T]:
58
73
59
74
60
75
@_dataclasses .dataclass (frozen = True )
61
- class MessagePublishedData (_typing .Generic [_core . T ]):
76
+ class MessagePublishedData (_typing .Generic [T ]):
62
77
"""
63
78
The interface published in a Pub/Sub publish subscription.
64
79
65
80
'T' Type representing `Message.data`'s JSON format.
66
81
"""
67
- message : Message [_core . T ]
82
+ message : Message [T ]
68
83
"""
69
84
Google Cloud Pub/Sub message.
70
85
"""
@@ -75,7 +90,88 @@ class MessagePublishedData(_typing.Generic[_core.T]):
75
90
"""
76
91
77
92
78
- _E1 = DatabaseEvent [Change [_typing .Any | None ]]
79
- _E2 = DatabaseEvent [_typing .Any | None ]
93
+ _E1 = CloudEvent [MessagePublishedData [T ]]
80
94
_C1 = _typing .Callable [[_E1 ], None ]
81
- _C2 = _typing .Callable [[_E2 ], None ]
95
+
96
+
97
+ def _message_handler (
98
+ func : _C1 ,
99
+ raw : _ce .CloudEvent ,
100
+ ) -> None :
101
+ event_attributes = raw ._get_attributes ()
102
+ event_data = raw ._get_data ()
103
+ event_dict = {"data" : event_data , ** event_attributes }
104
+ data = event_dict ["data" ]
105
+ message_dict = data ["message" ]
106
+
107
+ time = _dt .datetime .strptime (
108
+ event_dict ["time" ],
109
+ "%Y-%m-%dT%H:%M:%S.%f%z" ,
110
+ )
111
+
112
+ publish_time = _dt .datetime .strptime (
113
+ message_dict ["publish_time" ],
114
+ "%Y-%m-%dT%H:%M:%S.%f%z" ,
115
+ )
116
+
117
+ # Convert the UTC string into a datetime object
118
+ event_dict ["time" ] = time
119
+ message_dict ["publish_time" ] = publish_time
120
+
121
+ # Pop unnecessary keys from the message data
122
+ # (we get these keys from the snake case alternatives that are provided)
123
+ message_dict .pop ("messageId" , None )
124
+ message_dict .pop ("publishTime" , None )
125
+
126
+ # `orderingKey` doesn't come with a snake case alternative,
127
+ # there is no `ordering_key` in the raw request.
128
+ ordering_key = message_dict .pop ("orderingKey" , None )
129
+
130
+ message : MessagePublishedData = MessagePublishedData (
131
+ message = Message (
132
+ ** message_dict ,
133
+ ordering_key = ordering_key ,
134
+ ),
135
+ subscription = data ["subscription" ],
136
+ )
137
+
138
+ event_dict ["data" ] = message
139
+
140
+ event : CloudEvent [MessagePublishedData ] = CloudEvent (
141
+ data = event_dict ["data" ],
142
+ id = event_dict ["id" ],
143
+ source = event_dict ["source" ],
144
+ specversion = event_dict ["specversion" ],
145
+ subject = event_dict ["subject" ] if "subject" in event_dict else None ,
146
+ time = event_dict ["time" ],
147
+ type = event_dict ["type" ],
148
+ )
149
+
150
+ func (event )
151
+
152
+
153
+ @_util .copy_func_kwargs (_options .PubSubOptions )
154
+ def on_message_published (** kwargs ) -> _typing .Callable [[_C1 ], _C1 ]:
155
+ """
156
+ Event handler which triggers on a message being published to a Pub/Sub topic.
157
+ Example::
158
+ @on_message_published(topic="hello-world")
159
+ def example(event: CloudEvent[MessagePublishedData[object]]) -> None:
160
+ pass
161
+
162
+ """
163
+ options = _options .PubSubOptions (** kwargs )
164
+
165
+ def on_message_published_inner_decorator (func : _C1 ):
166
+
167
+ @_functools .wraps (func )
168
+ def on_message_published_wrapped (raw : _ce .CloudEvent ):
169
+ return _message_handler (func , raw )
170
+
171
+ _util .set_func_endpoint_attr (
172
+ on_message_published_wrapped ,
173
+ options ._endpoint (func_name = func .__name__ ),
174
+ )
175
+ return on_message_published_wrapped
176
+
177
+ return on_message_published_inner_decorator
0 commit comments