Skip to content

Ft typing #435

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions doc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ attribute to use, e.g. `address.formatted` will access the attribute value
attributes:
mail:
openid: [email]
saml: [mail, emailAdress, email]
saml: [mail, emailAddress, email]
address:
openid: [address.formatted]
saml: [postaladdress]
Expand All @@ -140,7 +140,7 @@ attributes (in the proxy backend) <-> internal <-> returned attributes (from the
* Any plugin using the `openid` profile will use the attribute value from
`email` delivered from the target provider as the value for `mail`.
* Any plugin using the `saml` profile will use the attribute value from `mail`,
`emailAdress` and `email` depending on which attributes are delivered by the
`emailAddress` and `email` depending on which attributes are delivered by the
target provider as the value for `mail`.
* Any plugin using the `openid` profile will use the attribute value under the
key `formatted` in the `address` attribute delivered by the target provider.
Expand Down Expand Up @@ -266,7 +266,7 @@ provider.
2. The **SAMLMirrorFrontend** module mirrors each target provider as a separate entity in the SAML metadata.
In this proxy this is handled with dynamic entity id's, encoding the target provider.
This allows external discovery services to present the mirrored providers transparently, as separate entities
in its UI. The following flow diagram shows the communcation:
in its UI. The following flow diagram shows the communication:

`SP -> optional discovery service -> selected proxy SAML entity -> target IdP`

Expand Down Expand Up @@ -311,7 +311,7 @@ config:

#### Policy

Some settings related to how a SAML response is formed can be overriden on a per-instance or a per-SP
Some settings related to how a SAML response is formed can be overridden on a per-instance or a per-SP
basis. This example summarizes the most common settings (hopefully self-explanatory) with their defaults:

```yaml
Expand All @@ -328,7 +328,7 @@ config:
```

Overrides per SP entityID is possible by using the entityID as a key instead of the "default" key
in the yaml structure. The most specific key takes presedence. If no policy overrides are provided
in the yaml structure. The most specific key takes precedence. If no policy overrides are provided
the defaults above are used.

### SAML2 Backend
Expand Down Expand Up @@ -397,7 +397,7 @@ the user will have to always select a target provider when a discovery service
is configured. If the parameter is set to `True` (and `ForceAuthn` is not set),
the proxy will remember and reuse the selected target provider for the duration
that the state cookie is valid. If `ForceAuthn` is set, then the
`use_memorized_idp_when_force_authn` configuration option can overide
`use_memorized_idp_when_force_authn` configuration option can override
this property and still reuse the selected target provider.

The default behaviour is `False`.
Expand Down Expand Up @@ -803,12 +803,12 @@ Backends and Frontends act like adapters, while micro-services act like plugins
and all of them can be developed by anyone and shared with everyone.

Other people that have been working with the SaToSa proxy, have built
extentions mainly in the form of additional micro-services that are shared to
extensions mainly in the form of additional micro-services that are shared to
be used by anyone.

- SUNET maintains a small collection of extentions that focus around the SWAMID
- SUNET maintains a small collection of extensions that focus around the SWAMID
policies.
The extentions are licensed under the Apache2.0 license.
The extensions are licensed under the Apache2.0 license.
You can find the code using the following URL:

- https://github.com/SUNET/swamid-satosa/
Expand All @@ -828,16 +828,16 @@ be used by anyone.
- https://github.com/italia/Satosa-Saml2Spid

- DAASI International have been a long-time user of this software and have made
their extentions available.
The extentions are licensed under the Apache2.0 license.
their extensions available.
The extensions are licensed under the Apache2.0 license.
You can find the code using the following URL:

- https://gitlab.daasi.de/didmos2/didmos2-auth/-/tree/master/src/didmos_oidc/satosa/micro_services

The extentions include:
The extensions include:

- SCIM attribute store to fetch attributes via SCIM API (instead of LDAP)
- Authoritzation module for blocking services if necessary group memberships or
- Authorization module for blocking services if necessary group memberships or
attributes are missing in the identity (for service providers that do not
evaluate attributes themselves)
- Backend chooser with Django UI for letting the user choose between any
Expand Down
89 changes: 42 additions & 47 deletions src/satosa/attribute_mapping.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
import logging
from collections import defaultdict
from itertools import chain
from typing import Any, Mapping, Optional, Union

from mako.template import Template

logger = logging.getLogger(__name__)


def scope(s):
def scope(s: str) -> str:
"""
Mako filter: used to extract scope from attribute
:param s: string to extract scope from (filtered string in mako template)
:return: the scope
"""
if '@' not in s:
if "@" not in s:
raise ValueError("Unscoped string")
(local_part, _, domain_part) = s.partition('@')
(local_part, _, domain_part) = s.partition("@")
return domain_part


Expand All @@ -24,9 +25,8 @@ class AttributeMapper(object):
Converts between internal and external data format
"""

def __init__(self, internal_attributes):
def __init__(self, internal_attributes: dict[str, dict[str, dict[str, list[str]]]]):
"""
:type internal_attributes: dict[str, dict[str, dict[str, str]]]
:param internal_attributes: A map of how to convert the attributes
(dict[internal_name, dict[attribute_profile, external_name]])
"""
Expand All @@ -35,21 +35,16 @@ def __init__(self, internal_attributes):
self.from_internal_attributes = internal_attributes["attributes"]
self.template_attributes = internal_attributes.get("template_attributes", None)

self.to_internal_attributes = defaultdict(dict)
self.to_internal_attributes: dict[str, Any] = defaultdict(dict)
for internal_attribute_name, mappings in self.from_internal_attributes.items():
for profile, external_attribute_names in mappings.items():
for external_attribute_name in external_attribute_names:
self.to_internal_attributes[profile][external_attribute_name] = internal_attribute_name

def to_internal_filter(self, attribute_profile, external_attribute_names):
def to_internal_filter(self, attribute_profile: str, external_attribute_names: list[str]) -> list[str]:
"""
Converts attribute names from external "type" to internal

:type attribute_profile: str
:type external_attribute_names: list[str]
:type case_insensitive: bool
:rtype: list[str]

:param attribute_profile: From which external type to convert (ex: oidc, saml, ...)
:param external_attribute_names: A list of attribute names
:param case_insensitive: Create a case insensitive filter
Expand All @@ -63,7 +58,7 @@ def to_internal_filter(self, attribute_profile, external_attribute_names):
# no attributes since the given profile is not configured
return []

internal_attribute_names = set() # use set to ensure only unique values
internal_attribute_names: set[str] = set() # use set to ensure only unique values
for external_attribute_name in external_attribute_names:
try:
internal_attribute_name = profile_mapping[external_attribute_name]
Expand All @@ -73,14 +68,10 @@ def to_internal_filter(self, attribute_profile, external_attribute_names):

return list(internal_attribute_names)

def to_internal(self, attribute_profile, external_dict):
def to_internal(self, attribute_profile: str, external_dict: Mapping[str, list[str]]) -> dict[str, list[str]]:
"""
Converts the external data from "type" to internal

:type attribute_profile: str
:type external_dict: dict[str, str]
:rtype: dict[str, str]

:param attribute_profile: From which external type to convert (ex: oidc, saml, ...)
:param external_dict: Attributes in the external format
:return: Attributes in the internal format
Expand All @@ -97,24 +88,23 @@ def to_internal(self, attribute_profile, external_dict):
continue

external_attribute_name = mapping[attribute_profile]
attribute_values = self._collate_attribute_values_by_priority_order(external_attribute_name,
external_dict)
attribute_values = self._collate_attribute_values_by_priority_order(external_attribute_name, external_dict)
if attribute_values: # Only insert key if it has some values
logline = "backend attribute {external} mapped to {internal} ({value})".format(
external=external_attribute_name, internal=internal_attribute_name, value=attribute_values
)
logger.debug(logline)
internal_dict[internal_attribute_name] = attribute_values
else:
logline = "skipped backend attribute {}: no value found".format(
external_attribute_name
)
logline = "skipped backend attribute {}: no value found".format(external_attribute_name)
logger.debug(logline)
internal_dict = self._handle_template_attributes(attribute_profile, internal_dict)
return internal_dict

def _collate_attribute_values_by_priority_order(self, attribute_names, data):
result = []
def _collate_attribute_values_by_priority_order(
self, attribute_names: list[str], data: Mapping[str, list[str]]
) -> list[str]:
result: list[str] = []
for attr_name in attribute_names:
attr_val = self._get_nested_attribute_value(attr_name, data)

Expand All @@ -125,14 +115,19 @@ def _collate_attribute_values_by_priority_order(self, attribute_names, data):

return result

def _render_attribute_template(self, template, data):
def _render_attribute_template(self, template: str, data: Mapping[str, list[str]]) -> list[str]:
t = Template(template, cache_enabled=True, imports=["from satosa.attribute_mapping import scope"])
try:
return t.render(**data).split(self.multivalue_separator)
_rendered = t.render(**data)
if not isinstance(_rendered, str):
raise TypeError("Rendered data is not a string")
return _rendered.split(self.multivalue_separator)
except (NameError, TypeError):
return []

def _handle_template_attributes(self, attribute_profile, internal_dict):
def _handle_template_attributes(
self, attribute_profile: str, internal_dict: dict[str, list[str]]
) -> dict[str, list[str]]:
if not self.template_attributes:
return internal_dict

Expand All @@ -143,26 +138,27 @@ def _handle_template_attributes(self, attribute_profile, internal_dict):

external_attribute_name = mapping[attribute_profile]
templates = [t for t in external_attribute_name if "$" in t] # these looks like templates...
template_attribute_values = [self._render_attribute_template(template, internal_dict) for template in
templates]
flattened_attribute_values = list(chain.from_iterable(template_attribute_values))
attribute_values = flattened_attribute_values or internal_dict.get(internal_attribute_name, None)
template_attribute_values = [
self._render_attribute_template(template, internal_dict) for template in templates
]
flattened_attribute_values: list[str] = list(chain.from_iterable(template_attribute_values))
attribute_values = flattened_attribute_values or internal_dict.get(internal_attribute_name)
if attribute_values: # only insert key if it has some values
internal_dict[internal_attribute_name] = attribute_values

return internal_dict

def _get_nested_attribute_value(self, nested_key, data):
def _get_nested_attribute_value(self, nested_key: str, data: Mapping[str, Any]) -> Optional[Any]:
keys = nested_key.split(self.separator)

d = data
for key in keys:
d = d.get(key)
d = d.get(key) # type: ignore[assignment]
if d is None:
return None
return d

def _create_nested_attribute_value(self, nested_attribute_names, value):
def _create_nested_attribute_value(self, nested_attribute_names: list[str], value: Any) -> dict[str, Any]:
if len(nested_attribute_names) == 1:
# we've reached the inner-most attribute name, set value here
return {nested_attribute_names[0]: value}
Expand All @@ -171,26 +167,22 @@ def _create_nested_attribute_value(self, nested_attribute_names, value):
child_dict = self._create_nested_attribute_value(nested_attribute_names[1:], value)
return {nested_attribute_names[0]: child_dict}

def from_internal(self, attribute_profile, internal_dict):
def from_internal(
self, attribute_profile: str, internal_dict: dict[str, list[str]]
) -> dict[str, Union[list[str], dict[str, list[str]]]]:
"""
Converts the internal data to "type"

:type attribute_profile: str
:type internal_dict: dict[str, str]
:rtype: dict[str, str]

:param attribute_profile: To which external type to convert (ex: oidc, saml, ...)
:param internal_dict: attributes to map
:return: attribute values and names in the specified "profile"
"""
external_dict = {}
external_dict: dict[str, Union[list[str], dict[str, list[str]]]] = {}
for internal_attribute_name in internal_dict:
try:
attribute_mapping = self.from_internal_attributes[internal_attribute_name]
except KeyError:
logline = "no attribute mapping found for the internal attribute {}".format(
internal_attribute_name
)
logline = "no attribute mapping found for the internal attribute {}".format(internal_attribute_name)
logger.debug(logline)
continue

Expand All @@ -206,14 +198,17 @@ def from_internal(self, attribute_profile, internal_dict):
# select the first attribute name
external_attribute_name = external_attribute_names[0]
logline = "frontend attribute {external} mapped from {internal} ({value})".format(
external=external_attribute_name, internal=internal_attribute_name, value=internal_dict[internal_attribute_name]
external=external_attribute_name,
internal=internal_attribute_name,
value=internal_dict[internal_attribute_name],
)
logger.debug(logline)

if self.separator in external_attribute_name:
nested_attribute_names = external_attribute_name.split(self.separator)
nested_dict = self._create_nested_attribute_value(nested_attribute_names[1:],
internal_dict[internal_attribute_name])
nested_dict = self._create_nested_attribute_value(
nested_attribute_names[1:], internal_dict[internal_attribute_name]
)
external_dict[nested_attribute_names[0]] = nested_dict
else:
external_dict[external_attribute_name] = internal_dict[internal_attribute_name]
Expand Down
Loading