|
1 | 1 | #!/usr/libexec/platform-python
|
2 | 2 |
|
| 3 | +import json |
3 | 4 | import yaml
|
4 | 5 | import base64
|
5 | 6 | import argparse
|
|
37 | 38 | "ca_password" "hdfs_ssh_pw", "maprfs_ssh_pw", "powervm_mgr_passwd",
|
38 | 39 | "virtual_power_host_pass", "vnc_password", "s3_secret_key",
|
39 | 40 | "ca_private_key_passphrase", "heartbeat_key", "DatabasePassword",
|
40 |
| - "server_certs_key_passphrase", |
| 41 | + "server_certs_key_passphrase", "ssh-privatekey", |
41 | 42 | ]
|
42 | 43 |
|
43 | 44 | CONNECTION_KEYS = ["rabbit", "database_connection",
|
@@ -85,16 +86,42 @@ def mask(self) -> bool:
|
85 | 86 | # s is None or empty dict, return
|
86 | 87 | if not s or len(s) == 0:
|
87 | 88 | return True
|
| 89 | + |
| 90 | + # mask the dict containing k8s secret dump |
| 91 | + self._applyMask(s) |
| 92 | + |
| 93 | + # write the resulting, masked/encoded file |
| 94 | + self._writeYaml(dict(s)) |
| 95 | + return True |
| 96 | + |
| 97 | + def _applyAnnotationsMask(self, annotations: Dict[str, Any]) -> Dict[str, Any]: |
| 98 | + last_config = annotations.get("kubectl.kubernetes.io/last-applied-configuration", None) |
| 99 | + if not last_config: |
| 100 | + return annotations |
| 101 | + try: |
| 102 | + last_applied_config = json.loads(last_config) |
| 103 | + |
| 104 | + # recursively mask secrets within last-applied-configuration |
| 105 | + self._applyMask(last_applied_config) |
| 106 | + annotations["kubectl.kubernetes.io/last-applied-configuration"] = json.dumps(last_applied_config, separators=(',', ':')) |
| 107 | + except (json.JSONDecodeError, KeyError) as e: |
| 108 | + print(f"Error while parsing contents of kubectl.kubernetes.io/last-applied-configuration {e}") |
| 109 | + annotations["kubectl.kubernetes.io/last-applied-configuration"] = MASK_STR |
| 110 | + return annotations |
| 111 | + |
| 112 | + def _applyMask(self, s: Dict) -> None: |
88 | 113 | for k, v in s.items():
|
89 | 114 | # if we have items in the loaded dict,
|
90 | 115 | # we look for the data section, which
|
91 |
| - # is were we want to apply masking |
| 116 | + # is where we want to apply masking |
| 117 | + # now we also look for the metadata |
| 118 | + # section as it also contains secrets |
| 119 | + # within last-applied-configuration |
92 | 120 | if k == "data":
|
93 | 121 | data = self._process_data(v)
|
94 | 122 | s[k] = data
|
95 |
| - # write the resulting, masked/encoded file |
96 |
| - self._writeYaml(dict(s)) |
97 |
| - return True |
| 123 | + elif k == "metadata" and "annotations" in s[k]: |
| 124 | + s[k]["annotations"] = self._applyAnnotationsMask(s[k]["annotations"]) |
98 | 125 |
|
99 | 126 | def _readYaml(self) -> Dict[str, str]:
|
100 | 127 | """
|
|
0 commit comments