Skip to content

SDK 1DP Integration #40656

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 50 commits into from
Apr 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
b41957a
initial changes
w-javed Apr 22, 2025
2384f79
Merge branch 'main' into sdk_1dp_integration
w-javed Apr 22, 2025
7daa327
fix
w-javed Apr 22, 2025
349156b
fix
w-javed Apr 22, 2025
9c0a670
fix
w-javed Apr 22, 2025
472c5cb
fix
w-javed Apr 23, 2025
4f60902
rearranged
w-javed Apr 23, 2025
92cfbc2
fix
w-javed Apr 23, 2025
49cff18
Fix tests
w-javed Apr 23, 2025
d2a6ed1
Fix tests
w-javed Apr 23, 2025
a7b47b0
Fix tests
w-javed Apr 23, 2025
a5ba7a8
Fix tests
w-javed Apr 23, 2025
44806bc
Copilot fix
w-javed Apr 23, 2025
d2dc79a
Fix tests
w-javed Apr 23, 2025
0638784
Fix tests
w-javed Apr 23, 2025
c5d53cf
Fix tests - assets
w-javed Apr 23, 2025
fb4fa4a
Merge branch 'main' into sdk_1dp_integration
w-javed Apr 23, 2025
6bfa9ce
Fix tests - assets
w-javed Apr 24, 2025
b39796d
Fix tests - assets
w-javed Apr 24, 2025
bbde331
assets
w-javed Apr 25, 2025
5e409c3
assets
w-javed Apr 25, 2025
f765241
fix-assets
w-javed Apr 28, 2025
a85bd75
Merge branch 'main' into sdk_1dp_integration
w-javed Apr 28, 2025
f5e49a4
adding service check
w-javed Apr 28, 2025
ca07f1c
revert one test to avoid large size recording
w-javed Apr 28, 2025
80c4761
unit test fix
w-javed Apr 28, 2025
b243ff5
unit test fix
w-javed Apr 28, 2025
06e99df
last asset
w-javed Apr 28, 2025
f08880b
red-team
w-javed Apr 28, 2025
15bfaf3
merge conflicts
w-javed Apr 28, 2025
69168d9
Merge branch 'main' into sdk_1dp_integration
w-javed Apr 28, 2025
2cfcd09
Enabling Red Teaming for 1dp
w-javed Apr 28, 2025
8bdb30a
assset change
w-javed Apr 28, 2025
b204002
fix for one test
w-javed Apr 29, 2025
3e27dce
typo fix
w-javed Apr 29, 2025
85b4831
fix
w-javed Apr 29, 2025
c842c2e
dep
w-javed Apr 29, 2025
ffb5456
asset
w-javed Apr 29, 2025
fcdbf98
Merge branch 'main' into sdk_1dp_integration
w-javed Apr 29, 2025
d8a62a7
fix
w-javed Apr 29, 2025
b14b14f
fix
w-javed Apr 29, 2025
4b1b283
CI Fix
w-javed Apr 29, 2025
0dac4c7
CI Fix
w-javed Apr 29, 2025
3882d82
CI Fix
w-javed Apr 29, 2025
47c521d
revert change
w-javed Apr 29, 2025
0ad568f
revert again
w-javed Apr 29, 2025
ab7fdd4
revert
w-javed Apr 29, 2025
ba1c008
updated asset for one test
w-javed Apr 29, 2025
97f154c
rollback changes
w-javed Apr 29, 2025
5babcd1
rollback changes
w-javed Apr 29, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdk/evaluation/azure-ai-evaluation/assets.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
"AssetsRepo": "Azure/azure-sdk-assets",
"AssetsRepoPrefixPath": "python",
"TagPrefix": "python/evaluation/azure-ai-evaluation",
"Tag": "python/evaluation/azure-ai-evaluation_497634c2bf"
"Tag": "python/evaluation/azure-ai-evaluation_7d56415461"
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@
from typing import Dict, List, Optional, Union, cast
from urllib.parse import urlparse
from string import Template
from azure.ai.evaluation._common.onedp._client import AIProjectClient
from azure.core.exceptions import HttpResponseError

import jwt

from azure.ai.evaluation._legacy._adapters._errors import MissingRequiredPackage
from azure.ai.evaluation._exceptions import ErrorBlame, ErrorCategory, ErrorTarget, EvaluationException
from azure.ai.evaluation._http_utils import AsyncHttpPipeline, get_async_http_client
from azure.ai.evaluation._model_configurations import AzureAIProject
from azure.ai.evaluation._common.utils import is_onedp_project
from azure.core.credentials import TokenCredential
from azure.core.exceptions import HttpResponseError
from azure.core.pipeline.policies import AsyncRetryPolicy
Expand All @@ -41,6 +44,8 @@
USER_TEXT_TEMPLATE_DICT: Dict[str, Template] = {
"DEFAULT": Template("<Human>{$query}</><System>{$response}</>"),
}
ML_WORKSPACE = "https://management.azure.com/.default"
COG_SRV_WORKSPACE = "https://cognitiveservices.azure.com/.default"

INFERENCE_OF_SENSITIVE_ATTRIBUTES = "inference_sensitive_attributes"

Expand Down Expand Up @@ -99,11 +104,7 @@ def get_common_headers(token: str, evaluator_name: Optional[str] = None) -> Dict
user_agent = f"{USER_AGENT} (type=evaluator; subtype={evaluator_name})" if evaluator_name else USER_AGENT
return {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"User-Agent": user_agent,
# Handle "RuntimeError: Event loop is closed" from httpx AsyncClient
# https://github.com/encode/httpx/discussions/2959
"Connection": "close",
}


Expand All @@ -112,7 +113,31 @@ def get_async_http_client_with_timeout() -> AsyncHttpPipeline:
retry_policy=AsyncRetryPolicy(timeout=CommonConstants.DEFAULT_HTTP_TIMEOUT)
)

async def ensure_service_availability_onedp(client: AIProjectClient, token: str, capability: Optional[str] = None) -> None:
"""Check if the Responsible AI service is available in the region and has the required capability, if relevant.

:param client: The AI project client.
:type client: AIProjectClient
:param token: The Azure authentication token.
:type token: str
:param capability: The capability to check. Default is None.
:type capability: str
:raises Exception: If the service is not available in the region or the capability is not available.
"""
headers = get_common_headers(token)
capabilities = client.evaluations.check_annotation(headers=headers)

if capability and capability not in capabilities:
msg = f"The needed capability '{capability}' is not supported by the RAI service in this region."
raise EvaluationException(
message=msg,
internal_message=msg,
target=ErrorTarget.RAI_CLIENT,
category=ErrorCategory.SERVICE_UNAVAILABLE,
blame=ErrorBlame.USER_ERROR,
tsg_link="https://aka.ms/azsdk/python/evaluation/safetyevaluator/troubleshoot",
)

async def ensure_service_availability(rai_svc_url: str, token: str, capability: Optional[str] = None) -> None:
"""Check if the Responsible AI service is available in the region and has the required capability, if relevant.

Expand Down Expand Up @@ -231,6 +256,40 @@ async def submit_request(
return operation_id


async def submit_request_onedp(
client: AIProjectClient,
data: dict,
metric: str,
token: str,
annotation_task: str,
evaluator_name: str
) -> str:
"""Submit request to Responsible AI service for evaluation and return operation ID

:param client: The AI project client.
:type client: AIProjectClient
:param data: The data to evaluate.
:type data: dict
:param metric: The evaluation metric to use.
:type metric: str
:param token: The Azure authentication token.
:type token: str
:param annotation_task: The annotation task to use.
:type annotation_task: str
:param evaluator_name: The evaluator name.
:type evaluator_name: str
:return: The operation ID.
:rtype: str
"""
normalized_user_text = get_formatted_template(data, annotation_task)
payload = generate_payload(normalized_user_text, metric, annotation_task=annotation_task)
headers = get_common_headers(token, evaluator_name)
response = client.evaluations.submit_annotation(payload, headers=headers)
result = json.loads(response)
operation_id = result["location"].split("/")[-1]
return operation_id


async def fetch_result(operation_id: str, rai_svc_url: str, credential: TokenCredential, token: str) -> Dict:
"""Fetch the annotation result from Responsible AI service

Expand Down Expand Up @@ -267,6 +326,34 @@ async def fetch_result(operation_id: str, rai_svc_url: str, credential: TokenCre
sleep_time = RAIService.SLEEP_TIME**request_count
await asyncio.sleep(sleep_time)

async def fetch_result_onedp(client: AIProjectClient, operation_id: str, token: str) -> Dict:
"""Fetch the annotation result from Responsible AI service

:param client: The AI project client.
:type client: AIProjectClient
:param operation_id: The operation ID.
:type operation_id: str
:param token: The Azure authentication token.
:type token: str
:return: The annotation result.
:rtype: Dict
"""
start = time.time()
request_count = 0

while True:
headers = get_common_headers(token)
try:
return client.evaluations.operation_results(operation_id, headers=headers)
except HttpResponseError:
request_count += 1
time_elapsed = time.time() - start
if time_elapsed > RAIService.TIMEOUT:
raise TimeoutError(f"Fetching annotation result {request_count} times out after {time_elapsed:.2f} seconds")

sleep_time = RAIService.SLEEP_TIME**request_count
await asyncio.sleep(sleep_time)

def parse_response( # pylint: disable=too-many-branches,too-many-statements
batch_response: List[Dict], metric_name: str, metric_display_name: Optional[str] = None
) -> Dict[str, Union[str, float]]:
Expand Down Expand Up @@ -500,7 +587,7 @@ async def get_rai_svc_url(project_scope: AzureAIProject, token: str) -> str:
return rai_url


async def fetch_or_reuse_token(credential: TokenCredential, token: Optional[str] = None) -> str:
async def fetch_or_reuse_token(credential: TokenCredential, token: Optional[str] = None, workspace: Optional[str] = ML_WORKSPACE) -> str:
"""Get token. Fetch a new token if the current token is near expiry

:param credential: The Azure authentication credential.
Expand All @@ -524,13 +611,13 @@ async def fetch_or_reuse_token(credential: TokenCredential, token: Optional[str]
if (exp_time - current_time) >= 300:
return token

return credential.get_token("https://management.azure.com/.default").token
return credential.get_token(workspace).token


async def evaluate_with_rai_service(
data: dict,
metric_name: str,
project_scope: AzureAIProject,
project_scope: Union[str, AzureAIProject],
credential: TokenCredential,
annotation_task: str = Tasks.CONTENT_HARM,
metric_display_name=None,
Expand All @@ -556,18 +643,26 @@ async def evaluate_with_rai_service(
:rtype: Dict[str, Union[str, float]]
"""

# Get RAI service URL from discovery service and check service availability
token = await fetch_or_reuse_token(credential)
rai_svc_url = await get_rai_svc_url(project_scope, token)
await ensure_service_availability(rai_svc_url, token, annotation_task)

# Submit annotation request and fetch result
operation_id = await submit_request(data, metric_name, rai_svc_url, token, annotation_task, evaluator_name)
annotation_response = cast(List[Dict], await fetch_result(operation_id, rai_svc_url, credential, token))
result = parse_response(annotation_response, metric_name, metric_display_name)
if is_onedp_project(project_scope):
client = AIProjectClient(endpoint=project_scope, credential=credential)
token = await fetch_or_reuse_token(credential=credential, workspace=COG_SRV_WORKSPACE)
await ensure_service_availability_onedp(client, token, annotation_task)
operation_id = await submit_request_onedp(client, data, metric_name, token, annotation_task, evaluator_name)
annotation_response = cast(List[Dict], await fetch_result_onedp(client, operation_id, token))
result = parse_response(annotation_response, metric_name, metric_display_name)
return result
else:
# Get RAI service URL from discovery service and check service availability
token = await fetch_or_reuse_token(credential)
rai_svc_url = await get_rai_svc_url(project_scope, token)
await ensure_service_availability(rai_svc_url, token, annotation_task)

return result
# Submit annotation request and fetch result
operation_id = await submit_request(data, metric_name, rai_svc_url, token, annotation_task, evaluator_name)
annotation_response = cast(List[Dict], await fetch_result(operation_id, rai_svc_url, credential, token))
result = parse_response(annotation_response, metric_name, metric_display_name)

return result

def generate_payload_multimodal(content_type: str, messages, metric: str) -> Dict:
"""Generate the payload for the annotation request
Expand Down Expand Up @@ -600,7 +695,6 @@ def generate_payload_multimodal(content_type: str, messages, metric: str) -> Dic
"AnnotationTask": task,
}


async def submit_multimodal_request(messages, metric: str, rai_svc_url: str, token: str) -> str:
"""Submit request to Responsible AI service for evaluation and return operation ID
:param messages: The normalized list of messages to be entered as the "Contents" in the payload.
Expand Down Expand Up @@ -646,9 +740,37 @@ async def submit_multimodal_request(messages, metric: str, rai_svc_url: str, tok
operation_id = result["location"].split("/")[-1]
return operation_id

async def submit_multimodal_request_onedp(client: AIProjectClient, messages, metric: str, token: str) -> str:

# handle inference sdk strongly type messages
if len(messages) > 0 and not isinstance(messages[0], dict):
try:
from azure.ai.inference.models import ChatRequestMessage
except ImportError as ex:
error_message = (
"Please install 'azure-ai-inference' package to use SystemMessage, UserMessage, AssistantMessage"
)
raise MissingRequiredPackage(message=error_message) from ex
if len(messages) > 0 and isinstance(messages[0], ChatRequestMessage):
messages = [message.as_dict() for message in messages]

## fetch system and assistant messages from the list of messages
filtered_messages = [message for message in messages if message["role"] != "system"]
assistant_messages = [message for message in messages if message["role"] == "assistant"]

## prepare for request
content_type = retrieve_content_type(assistant_messages, metric)
payload = generate_payload_multimodal(content_type, filtered_messages, metric)
headers = get_common_headers(token)

response = client.evaluations.submit_annotation(payload, headers=headers)

result = json.loads(response)
operation_id = result["location"].split("/")[-1]
return operation_id

async def evaluate_with_rai_service_multimodal(
messages, metric_name: str, project_scope: AzureAIProject, credential: TokenCredential
messages, metric_name: str, project_scope: Union[str, AzureAIProject], credential: TokenCredential
):
""" "Evaluate the content safety of the response using Responsible AI service
:param messages: The normalized list of messages.
Expand All @@ -664,12 +786,20 @@ async def evaluate_with_rai_service_multimodal(
:rtype: List[List[Dict]]
"""

# Get RAI service URL from discovery service and check service availability
token = await fetch_or_reuse_token(credential)
rai_svc_url = await get_rai_svc_url(project_scope, token)
await ensure_service_availability(rai_svc_url, token, Tasks.CONTENT_HARM)
# Submit annotation request and fetch result
operation_id = await submit_multimodal_request(messages, metric_name, rai_svc_url, token)
annotation_response = cast(List[Dict], await fetch_result(operation_id, rai_svc_url, credential, token))
result = parse_response(annotation_response, metric_name)
return result
if is_onedp_project(project_scope):
client = AIProjectClient(endpoint=project_scope, credential=credential)
token = await fetch_or_reuse_token(credential=credential, workspace=COG_SRV_WORKSPACE)
await ensure_service_availability_onedp(client, token, Tasks.CONTENT_HARM)
operation_id = await submit_multimodal_request_onedp(client, messages, metric_name, token)
annotation_response = cast(List[Dict], await fetch_result_onedp(client, operation_id, token))
result = parse_response(annotation_response, metric_name)
return result
else:
token = await fetch_or_reuse_token(credential)
rai_svc_url = await get_rai_svc_url(project_scope, token)
await ensure_service_availability(rai_svc_url, token, Tasks.CONTENT_HARM)
# Submit annotation request and fetch result
operation_id = await submit_multimodal_request(messages, metric_name, rai_svc_url, token)
annotation_response = cast(List[Dict], await fetch_result(operation_id, rai_svc_url, credential, token))
result = parse_response(annotation_response, metric_name)
return result
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,17 @@ def construct_prompty_model_config(

return prompty_model_config

def is_onedp_project(azure_ai_project: AzureAIProject) -> bool:
"""Check if the Azure AI project is an OneDP project.

:param azure_ai_project: The scope of the Azure AI project.
:type azure_ai_project: ~azure.ai.evaluation.AzureAIProject
:return: True if the Azure AI project is an OneDP project, False otherwise.
:rtype: bool
"""
if isinstance(azure_ai_project, str):
return True
return False

def validate_azure_ai_project(o: object) -> AzureAIProject:
fields = {"subscription_id": str, "resource_group_name": str, "project_name": str}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
_InternalAnnotationTasks,
)
from azure.ai.evaluation._common.rai_service import evaluate_with_rai_service, evaluate_with_rai_service_multimodal
from azure.ai.evaluation._common.utils import validate_azure_ai_project
from azure.ai.evaluation._common.utils import validate_azure_ai_project, is_onedp_project
from azure.ai.evaluation._exceptions import EvaluationException
from azure.ai.evaluation._common.utils import validate_conversation
from azure.ai.evaluation._constants import _AggregationType
Expand Down Expand Up @@ -50,7 +50,7 @@ class RaiServiceEvaluatorBase(EvaluatorBase[T]):
def __init__(
self,
eval_metric: Union[EvaluationMetrics, _InternalEvaluationMetrics],
azure_ai_project: dict,
azure_ai_project: Union[dict, str],
credential: TokenCredential,
eval_last_turn: bool = False,
conversation_aggregation_type: _AggregationType = _AggregationType.MEAN,
Expand All @@ -59,7 +59,7 @@ def __init__(
):
super().__init__(eval_last_turn=eval_last_turn, conversation_aggregation_type=conversation_aggregation_type, threshold=threshold, _higher_is_better=_higher_is_better)
self._eval_metric = eval_metric
self._azure_ai_project = validate_azure_ai_project(azure_ai_project)
self._azure_ai_project = azure_ai_project if is_onedp_project(azure_ai_project) else validate_azure_ai_project(azure_ai_project)
self._credential = credential
self._threshold = threshold
self._higher_is_better = _higher_is_better
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,15 @@ class _SafetyEvaluator(Enum):
class _SafetyEvaluation:
def __init__(
self,
azure_ai_project: dict,
azure_ai_project: Union[str, dict],
credential: TokenCredential,
model_config: Optional[Union[AzureOpenAIModelConfiguration, OpenAIModelConfiguration]] = None,
):
"""
Initializes a SafetyEvaluation object.

:param azure_ai_project: A dictionary defining the Azure AI project. Required keys are 'subscription_id', 'resource_group_name', and 'project_name'.
:type azure_ai_project: Dict[str, str]
:param azure_ai_project: A string or dictionary defining the Azure AI project. Required keys are 'subscription_id', 'resource_group_name', and 'project_name'.
:type azure_ai_project: Union[str, Dict[str, str]]
:param credential: The credential for connecting to Azure AI project.
:type credential: ~azure.core.credentials.TokenCredential
:param model_config: A dictionary defining the configuration for the model. Acceptable types are AzureOpenAIModelConfiguration and OpenAIModelConfiguration.
Expand All @@ -111,8 +111,7 @@ def __init__(
self.model_config = model_config
else:
self.model_config = None
validate_azure_ai_project(azure_ai_project)
self.azure_ai_project = AzureAIProject(**azure_ai_project)
self.azure_ai_project = azure_ai_project if isinstance(azure_ai_project, str) else validate_azure_ai_project(azure_ai_project)
self.credential = credential
self.logger = _setup_logger()

Expand Down
Loading