Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@

from foundry_dev_tools.utils.api_types import (
CodeResourceType,
DatasetRid,
FolderRid,
MarkingId,
NetworkEgressPolicyRid,
Rid,
SourceRid,
TransactionType,
)


Expand Down Expand Up @@ -201,6 +203,101 @@ def api_bulk_get_source_description(self, source_rids: set[SourceRid], **kwargs)
**kwargs,
)

def api_add_extract_for_branch(self, branch_name: str, payload: dict, **kwargs) -> requests.Response:
"""Returns a new source RID."""
return self.api_request(
"POST",
f"build/add-extract?branchName={branch_name}",
json=payload,
**kwargs,
)

def create_new_extract_for_source(
self,
branch_name: str,
dataset_rid: DatasetRid,
source_rid: SourceRid,
extract_name: str,
transaction_type: TransactionType,
source_adapter: dict,
spark_profiles: list[str] | None = None,
) -> str:
r"""Creates a new Foundry extract from a given source.

The `.json()` at the end does return a string value and not a dict-like structure.

Usage:
```python
>>> sync_rid = magritte_client.create_new_extract_for_source(
... branch_name="master",
... dataset_rid="ri.foundry.main.dataset.xxxxxx-xxxx-xxxx-xxxx-xxxxxxx",
... source_rid="ri.magritte..source.xxxxxx-xxxx-xxxx-xxxx-xxxxxxx",
... extract_name="all_flights",
... transaction_type="SNAPSHOT",
... source_adapter= {
... "type": "jdbc-source-adapter", # for a jdbc-type connector
... "jdbcOptions": {
... "preQueries": [],
... "query": {
... "sqlQuery": "SELECT * FROM \"AIRPORT_DB\".\"FLIGHTS\""
... },
... }
... },
... spark_profiles=[] # only work with a non-agent type of exports
... )
```

Args:
branch_name: Name of the branch where the data should land.
dataset_rid: Dataset RID.
source_rid: Source RID from which the extract/sync will be spawned from.
extract_name: Name of the new extract/sync.
transaction_type: One of the options available within `TransactionType`
source_adapter: Dictionary specifying the source adapter configuration. Must include:
- type (str): Adapter type, e.g., "jdbc-source-adapter".
- jdbcOptions (dict, optional): For JDBC adapters, must include:
- preQueries (list[str]): List of SQL queries to run before main query.
- query (dict): Must include:
- sqlQuery (str): The SQL query to execute.
- Example:
{
"type": "jdbc-source-adapter",
"jdbcOptions": {
"preQueries": [],
"query": {
"sqlQuery": "SELECT * FROM \"AIRPORT_DB\".\"FLIGHTS\""
}
}
}
spark_profiles: list[str]. A list of spark profile names to use.
Not available when using agent-based extracts.

Returns:
str: The extract RID of the newly created extract.
"""
payload = {
"datasetId": dataset_rid,
"branchName": branch_name,
"extractConfig": {
"sourceAdapter": source_adapter,
"outputOptions": {"transactionType": transaction_type},
},
"outputOptions": {"transactionType": transaction_type},
"outputConstraints": [{"consistentOutputSchema": {}, "type": "consistentOutputSchema"}],
"schemaOptions": {},
"timestampOptions": {
"useTimestampType": False,
"timestampWithoutTzOptions": {"asString": {}, "type": "asString"},
},
"transactionType": transaction_type,
"sourceAdapter": source_adapter,
"extractName": extract_name,
"sourceId": source_rid,
"sparkProfiles": spark_profiles or [],
}

return self.api_add_extract_for_branch(branch_name=branch_name, payload=payload).json()

def api_get_runtime_platform(self, source_rid: SourceRid, **kwargs) -> requests.Response:
"""Get runtime platform information for a source.

Expand Down