Skip to content

Commit b202115

Browse files
feat(okta): adds ingest_groups_users config parameter (#12371)
Co-authored-by: Harshal Sheth <[email protected]>
1 parent 08a5183 commit b202115

File tree

3 files changed

+290
-2
lines changed

3 files changed

+290
-2
lines changed

metadata-ingestion/src/datahub/ingestion/source/identity/okta.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from collections import defaultdict
66
from dataclasses import dataclass, field
77
from time import sleep
8-
from typing import Dict, Iterable, List, Optional, Union
8+
from typing import Dict, Iterable, List, Optional, Set, Union
99

1010
import nest_asyncio
1111
from okta.client import Client as OktaClient
@@ -77,6 +77,10 @@ class OktaConfig(StatefulIngestionConfigBase, ConfigModel):
7777
default=True,
7878
description="Whether group membership should be ingested into DataHub. ingest_groups must be True if this is True.",
7979
)
80+
ingest_groups_users: bool = Field(
81+
default=True,
82+
description="Only ingest users belonging to the selected groups. This option is only useful when `ingest_users` is set to False and `ingest_group_membership` to True.",
83+
)
8084

8185
# Optional: Customize the mapping to DataHub Username from an attribute appearing in the Okta User
8286
# profile. Reference: https://developer.okta.com/docs/reference/api/users/
@@ -344,6 +348,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
344348
aspect=StatusClass(removed=False),
345349
).as_workunit()
346350

351+
okta_users: Set[User] = set()
347352
# Step 2: Populate GroupMembership Aspects for CorpUsers
348353
datahub_corp_user_urn_to_group_membership: Dict[str, GroupMembershipClass] = (
349354
defaultdict(lambda: GroupMembershipClass(groups=[]))
@@ -372,14 +377,20 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
372377
self.report.report_failure("okta_user_mapping", error_str)
373378
continue
374379

380+
if self.config.ingest_groups_users:
381+
okta_users.add(okta_user)
382+
375383
# Update the GroupMembership aspect for this group member.
376384
datahub_corp_user_urn_to_group_membership[
377385
datahub_corp_user_urn
378386
].groups.append(datahub_corp_group_urn)
379387

380388
# Step 3: Produce MetadataWorkUnits for CorpUsers.
381389
if self.config.ingest_users:
382-
okta_users = self._get_okta_users(event_loop)
390+
# we can just throw away collected okta users so far and fetch them all
391+
okta_users = set(self._get_okta_users(event_loop))
392+
393+
if okta_users:
383394
filtered_okta_users = filter(self._filter_okta_user, okta_users)
384395
datahub_corp_user_snapshots = self._map_okta_users(filtered_okta_users)
385396
for user_count, datahub_corp_user_snapshot in enumerate(
Lines changed: 251 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
[
2+
{
3+
"proposedSnapshot": {
4+
"com.linkedin.pegasus2avro.metadata.snapshot.CorpGroupSnapshot": {
5+
"urn": "urn:li:corpGroup:All%20Employees",
6+
"aspects": [
7+
{
8+
"com.linkedin.pegasus2avro.identity.CorpGroupInfo": {
9+
"displayName": "All Employees",
10+
"admins": [],
11+
"members": [],
12+
"groups": [],
13+
"description": "All Employees in the Test Company."
14+
}
15+
}
16+
]
17+
}
18+
},
19+
"systemMetadata": {
20+
"lastObserved": 1586847600000,
21+
"runId": "test-okta-usage",
22+
"lastRunId": "no-run-id-provided"
23+
}
24+
},
25+
{
26+
"entityType": "corpGroup",
27+
"entityUrn": "urn:li:corpGroup:All%20Employees",
28+
"changeType": "UPSERT",
29+
"aspectName": "origin",
30+
"aspect": {
31+
"json": {
32+
"type": "EXTERNAL",
33+
"externalType": "OKTA"
34+
}
35+
},
36+
"systemMetadata": {
37+
"lastObserved": 1586847600000,
38+
"runId": "test-okta-usage",
39+
"lastRunId": "no-run-id-provided"
40+
}
41+
},
42+
{
43+
"entityType": "corpGroup",
44+
"entityUrn": "urn:li:corpGroup:All%20Employees",
45+
"changeType": "UPSERT",
46+
"aspectName": "status",
47+
"aspect": {
48+
"json": {
49+
"removed": false
50+
}
51+
},
52+
"systemMetadata": {
53+
"lastObserved": 1586847600000,
54+
"runId": "test-okta-usage",
55+
"lastRunId": "no-run-id-provided"
56+
}
57+
},
58+
{
59+
"proposedSnapshot": {
60+
"com.linkedin.pegasus2avro.metadata.snapshot.CorpGroupSnapshot": {
61+
"urn": "urn:li:corpGroup:Engineering",
62+
"aspects": [
63+
{
64+
"com.linkedin.pegasus2avro.identity.CorpGroupInfo": {
65+
"displayName": "Engineering",
66+
"admins": [],
67+
"members": [],
68+
"groups": [],
69+
"description": "Engineering team!"
70+
}
71+
}
72+
]
73+
}
74+
},
75+
"systemMetadata": {
76+
"lastObserved": 1586847600000,
77+
"runId": "test-okta-usage",
78+
"lastRunId": "no-run-id-provided"
79+
}
80+
},
81+
{
82+
"entityType": "corpGroup",
83+
"entityUrn": "urn:li:corpGroup:Engineering",
84+
"changeType": "UPSERT",
85+
"aspectName": "origin",
86+
"aspect": {
87+
"json": {
88+
"type": "EXTERNAL",
89+
"externalType": "OKTA"
90+
}
91+
},
92+
"systemMetadata": {
93+
"lastObserved": 1586847600000,
94+
"runId": "test-okta-usage",
95+
"lastRunId": "no-run-id-provided"
96+
}
97+
},
98+
{
99+
"entityType": "corpGroup",
100+
"entityUrn": "urn:li:corpGroup:Engineering",
101+
"changeType": "UPSERT",
102+
"aspectName": "status",
103+
"aspect": {
104+
"json": {
105+
"removed": false
106+
}
107+
},
108+
"systemMetadata": {
109+
"lastObserved": 1586847600000,
110+
"runId": "test-okta-usage",
111+
"lastRunId": "no-run-id-provided"
112+
}
113+
},
114+
{
115+
"proposedSnapshot": {
116+
"com.linkedin.pegasus2avro.metadata.snapshot.CorpUserSnapshot": {
117+
"urn": "urn:li:corpuser:mary.jane",
118+
"aspects": [
119+
{
120+
"com.linkedin.pegasus2avro.identity.CorpUserInfo": {
121+
"customProperties": {},
122+
"active": true,
123+
"displayName": "Mary Jane",
124+
"email": "[email protected]",
125+
"title": "Software Engineer",
126+
"departmentName": "Engineering",
127+
"firstName": "Mary",
128+
"lastName": "Jane",
129+
"fullName": "Mary Jane",
130+
"countryCode": "us"
131+
}
132+
},
133+
{
134+
"com.linkedin.pegasus2avro.identity.GroupMembership": {
135+
"groups": [
136+
"urn:li:corpGroup:All%20Employees",
137+
"urn:li:corpGroup:All%20Employees",
138+
"urn:li:corpGroup:Engineering",
139+
"urn:li:corpGroup:Engineering"
140+
]
141+
}
142+
}
143+
]
144+
}
145+
},
146+
"systemMetadata": {
147+
"lastObserved": 1586847600000,
148+
"runId": "test-okta-usage",
149+
"lastRunId": "no-run-id-provided"
150+
}
151+
},
152+
{
153+
"entityType": "corpuser",
154+
"entityUrn": "urn:li:corpuser:mary.jane",
155+
"changeType": "UPSERT",
156+
"aspectName": "origin",
157+
"aspect": {
158+
"json": {
159+
"type": "EXTERNAL",
160+
"externalType": "OKTA"
161+
}
162+
},
163+
"systemMetadata": {
164+
"lastObserved": 1586847600000,
165+
"runId": "test-okta-usage",
166+
"lastRunId": "no-run-id-provided"
167+
}
168+
},
169+
{
170+
"entityType": "corpuser",
171+
"entityUrn": "urn:li:corpuser:mary.jane",
172+
"changeType": "UPSERT",
173+
"aspectName": "status",
174+
"aspect": {
175+
"json": {
176+
"removed": false
177+
}
178+
},
179+
"systemMetadata": {
180+
"lastObserved": 1586847600000,
181+
"runId": "test-okta-usage",
182+
"lastRunId": "no-run-id-provided"
183+
}
184+
},
185+
{
186+
"proposedSnapshot": {
187+
"com.linkedin.pegasus2avro.metadata.snapshot.CorpUserSnapshot": {
188+
"urn": "urn:li:corpuser:john.doe",
189+
"aspects": [
190+
{
191+
"com.linkedin.pegasus2avro.identity.CorpUserInfo": {
192+
"customProperties": {},
193+
"active": true,
194+
"displayName": "JDoe",
195+
"email": "[email protected]",
196+
"firstName": "John",
197+
"lastName": "Doe",
198+
"fullName": "John Doe"
199+
}
200+
},
201+
{
202+
"com.linkedin.pegasus2avro.identity.GroupMembership": {
203+
"groups": [
204+
"urn:li:corpGroup:All%20Employees",
205+
"urn:li:corpGroup:Engineering"
206+
]
207+
}
208+
}
209+
]
210+
}
211+
},
212+
"systemMetadata": {
213+
"lastObserved": 1586847600000,
214+
"runId": "test-okta-usage",
215+
"lastRunId": "no-run-id-provided"
216+
}
217+
},
218+
{
219+
"entityType": "corpuser",
220+
"entityUrn": "urn:li:corpuser:john.doe",
221+
"changeType": "UPSERT",
222+
"aspectName": "origin",
223+
"aspect": {
224+
"json": {
225+
"type": "EXTERNAL",
226+
"externalType": "OKTA"
227+
}
228+
},
229+
"systemMetadata": {
230+
"lastObserved": 1586847600000,
231+
"runId": "test-okta-usage",
232+
"lastRunId": "no-run-id-provided"
233+
}
234+
},
235+
{
236+
"entityType": "corpuser",
237+
"entityUrn": "urn:li:corpuser:john.doe",
238+
"changeType": "UPSERT",
239+
"aspectName": "status",
240+
"aspect": {
241+
"json": {
242+
"removed": false
243+
}
244+
},
245+
"systemMetadata": {
246+
"lastObserved": 1586847600000,
247+
"runId": "test-okta-usage",
248+
"lastRunId": "no-run-id-provided"
249+
}
250+
}
251+
]

metadata-ingestion/tests/integration/okta/test_okta.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,32 @@ def test_okta_source_default_configs(pytestconfig, mock_datahub_graph, tmp_path)
120120
)
121121

122122

123+
@freeze_time(FROZEN_TIME)
124+
def test_okta_source_ingest_groups_users(pytestconfig, mock_datahub_graph, tmp_path):
125+
test_resources_dir: pathlib.Path = pytestconfig.rootpath / "tests/integration/okta"
126+
127+
output_file_path = f"{tmp_path}/okta_mces_ingest_groups_users.json"
128+
129+
new_recipe = default_recipe(output_file_path)
130+
new_recipe["source"]["config"]["ingest_users"] = False
131+
new_recipe["source"]["config"]["ingest_groups"] = True
132+
new_recipe["source"]["config"]["ingest_groups_users"] = True
133+
134+
run_ingest(
135+
mock_datahub_graph=mock_datahub_graph,
136+
mocked_functions_reference=partial(
137+
_init_mock_okta_client, test_resources_dir=test_resources_dir
138+
),
139+
recipe=new_recipe,
140+
)
141+
142+
mce_helpers.check_golden_file(
143+
pytestconfig,
144+
output_path=output_file_path,
145+
golden_path=f"{test_resources_dir}/okta_mces_golden_ingest_groups_users.json",
146+
)
147+
148+
123149
@freeze_time(FROZEN_TIME)
124150
def test_okta_source_ingestion_disabled(pytestconfig, mock_datahub_graph, tmp_path):
125151
test_resources_dir: pathlib.Path = pytestconfig.rootpath / "tests/integration/okta"

0 commit comments

Comments
 (0)