|
| 1 | +from collections import defaultdict |
| 2 | + |
| 3 | +from cryptography import x509 |
| 4 | +from frozenlist2 import frozenlist |
| 5 | +from pyasn1.codec.der import decoder |
| 6 | + |
1 | 7 | from .base import PushItem
|
| 8 | +from .conv import convert_maybe, sloppylist |
2 | 9 | from .. import compat_attr as attr
|
3 | 10 |
|
4 | 11 |
|
| 12 | +# Red Hat OID namespace is "1.3.6.1.4.1.2312.9", |
| 13 | +# the trailing ".1" designates a Product Certificate. |
| 14 | +OID_NAMESPACE = "1.3.6.1.4.1.2312.9.1." |
| 15 | + |
| 16 | + |
| 17 | +@attr.s() |
| 18 | +class ProductId(object): |
| 19 | + """A ProductID represents a group of metadata pertaining to a single product |
| 20 | + contained in a ProductID certificate.""" |
| 21 | + |
| 22 | + id = attr.ib(type=int) |
| 23 | + """Product Engineering ID (EngID), e.g. 72 |
| 24 | +
|
| 25 | + :type: int |
| 26 | + """ |
| 27 | + |
| 28 | + name = attr.ib(type=str, default=None) |
| 29 | + """Human readable product name, e.g. "Red Hat Enterprise Linux for IBM z Systems" |
| 30 | +
|
| 31 | + :type: str |
| 32 | + """ |
| 33 | + |
| 34 | + version = attr.ib(type=str, default=None) |
| 35 | + """Human readable product version string, e.g. "9.4" |
| 36 | +
|
| 37 | + :type: str |
| 38 | + """ |
| 39 | + |
| 40 | + architecture = attr.ib(type=list, default=None, converter=convert_maybe(sloppylist)) |
| 41 | + """List of architectures supported by the product, e.g. ["s390x"] |
| 42 | +
|
| 43 | + :type: List[str] |
| 44 | + """ |
| 45 | + |
| 46 | + provided_tags = attr.ib(type=list, default=None, converter=convert_maybe(sloppylist)) |
| 47 | + """List of tags describing the provided platforms used for pairing with other products, |
| 48 | + e.g. ["rhel-9", "rhel-9-s390x"] |
| 49 | +
|
| 50 | + :type: List[str] |
| 51 | + """ |
| 52 | + |
| 53 | + |
5 | 54 | @attr.s()
|
6 | 55 | class ProductIdPushItem(PushItem):
|
7 | 56 | """A :class:`~pushsource.PushItem` representing a product ID certificate.
|
8 | 57 |
|
9 | 58 | For push items of this type, the :meth:`~pushsource.PushItem.src` attribute
|
10 | 59 | refers to a file containing a PEM certificate identifying a product.
|
| 60 | + """ |
| 61 | + |
| 62 | + products = attr.ib(type=list, converter=frozenlist) |
| 63 | + """List of products described by the ProductID certificate. |
11 | 64 |
|
12 |
| - This library does not verify that the referenced file is a valid |
13 |
| - certificate. |
| 65 | + :type: List[ProductID] |
14 | 66 | """
|
| 67 | + |
| 68 | + @products.default |
| 69 | + def _default_products(self): |
| 70 | + return frozenlist(self._load_products(self.src) if self.src else []) |
| 71 | + |
| 72 | + def _load_products(self, path): |
| 73 | + """Returns a list of ProductIDs described by the ProductID X.509 certificate file |
| 74 | + in PEM format. Raises ValueError if the file doesn't describe any ProductID.""" |
| 75 | + |
| 76 | + with open(path, "rb") as f: |
| 77 | + x509_certificate = x509.load_pem_x509_certificate(f.read()) |
| 78 | + # Extensions are most commonly ASN.1 (DER) encoded UTF-8 strings. |
| 79 | + # First byte is usually 0x13 = PrintableString, second byte is the length of the string |
| 80 | + # However we can't rely on that and must parse the fields safely using a proper ASN.1 / DER |
| 81 | + # parser. Although cryptography module does its own ASN.1 / DER parsing, it doesn't provide |
| 82 | + # any public API for that yet (see https://github.com/pyca/cryptography/issues/9283), |
| 83 | + # so pyasn1 module has to be used instead. |
| 84 | + products_data = defaultdict(dict) |
| 85 | + for extension in x509_certificate.extensions: |
| 86 | + oid = extension.oid.dotted_string |
| 87 | + if oid.startswith(OID_NAMESPACE): |
| 88 | + # OID component with index 9 is always EngID |
| 89 | + # OID component with index 10 (last) is: |
| 90 | + # 1 = Product Name, e.g. "Red Hat Enterprise Linux for IBM z Systems" |
| 91 | + # 2 = Product Version, e.g. "9.4" |
| 92 | + # 3 = Product Architecture, e.g. "s390x" |
| 93 | + # 4 = Product Tags / Provides, e.g. "rhel-9,rhel-9-s390x" |
| 94 | + eng_id, attribute_id = map(int, oid.split(".")[9:11]) |
| 95 | + products_data[eng_id][attribute_id] = str(decoder.decode(extension.value.value)[0]) |
| 96 | + |
| 97 | + if not products_data: |
| 98 | + raise ValueError("File '%s' is not a ProductID certificate." % path) |
| 99 | + |
| 100 | + result = [] |
| 101 | + for eng_id, product_data in products_data.items(): |
| 102 | + product = ProductId( |
| 103 | + id = eng_id, |
| 104 | + name = product_data.get(1), |
| 105 | + version = product_data.get(2), |
| 106 | + architecture = product_data.get(3), |
| 107 | + provided_tags = product_data.get(4), |
| 108 | + ) |
| 109 | + result.append(product) |
| 110 | + return result |
0 commit comments