Skip to content

Commit 48d3249

Browse files
Add data collection limit (#5137)
Co-authored-by: Joao Vilaca <[email protected]>
1 parent b6d4525 commit 48d3249

File tree

24 files changed

+304
-150
lines changed

24 files changed

+304
-150
lines changed
Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,2 @@
11
# Copyright (C) 2025 Intel Corporation
22
# SPDX-License-Identifier: Apache-2.0
3-

application/backend/app/alembic/versions/2786b50eb5a4_schema.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ def upgrade() -> None:
148148
sa.Column("sink_id", sa.Text(), nullable=True),
149149
sa.Column("model_revision_id", sa.Text(), nullable=True),
150150
sa.Column("is_running", sa.Boolean(), nullable=False),
151-
sa.Column("data_collection_policies", sa.JSON(), nullable=False),
151+
sa.Column("data_collection", sa.JSON(), nullable=False),
152152
sa.Column("device", sa.String(length=50), nullable=False),
153153
sa.Column("created_at", sa.DateTime(), server_default=sa.text("(CURRENT_TIMESTAMP)"), nullable=False),
154154
sa.Column("updated_at", sa.DateTime(), server_default=sa.text("(CURRENT_TIMESTAMP)"), nullable=False),
Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,2 @@
11
# Copyright (C) 2025 Intel Corporation
22
# SPDX-License-Identifier: Apache-2.0
3-
Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,2 @@
11
# Copyright (C) 2025 Intel Corporation
22
# SPDX-License-Identifier: Apache-2.0
3-

application/backend/app/api/routers/pipelines.py

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,13 @@
1313
from app.api.dependencies import get_pipeline_metrics_service, get_pipeline_service, get_system_service
1414
from app.api.schemas import PipelineMetricsView, PipelineView
1515
from app.api.validators import ProjectID
16-
from app.models import DataCollectionPolicyAdapter, PipelineStatus
16+
from app.models import DataCollectionConfig, DataCollectionPolicyAdapter, PipelineStatus
1717
from app.services import PipelineMetricsService, PipelineService, ResourceNotFoundError, SystemService
1818

1919
router = APIRouter(prefix="/api/projects/{project_id}/pipeline", tags=["Pipelines"])
2020

2121
UPDATE_PIPELINE_BODY_DESCRIPTION = """
22-
Partial pipeline configuration update. May contain any subset of fields including 'device', 'data_collection_policies',
22+
Partial pipeline configuration update. May contain any subset of fields including 'device', 'data_collection',
2323
'source_id', 'sink_id', or 'model_id'. Fields not included in the request will remain unchanged.
2424
"""
2525
UPDATE_PIPELINE_BODY_EXAMPLES = {
@@ -38,23 +38,26 @@
3838
"sink_id": "c6787c06-964b-4097-8eca-238b8cf79fc9",
3939
},
4040
),
41-
"enable_data_collection_policies": Example(
42-
summary="Enable data collection policies",
43-
description="Change data collection policies of the pipeline to fixed rate",
41+
"enable_data_collection": Example(
42+
summary="Enable data collection with max size",
43+
description="Configure data collection with max dataset size and policies",
4444
value={
45-
"data_collection_policies": [
46-
{
47-
"type": "fixed_rate",
48-
"enabled": "true",
49-
"rate": 0.1,
50-
}
51-
]
45+
"data_collection": {
46+
"max_dataset_size": 500,
47+
"policies": [
48+
{
49+
"type": "fixed_rate",
50+
"enabled": True,
51+
"rate": 0.1,
52+
}
53+
],
54+
}
5255
},
5356
),
54-
"clean_data_collection_policies": Example(
55-
summary="Clean data collection policies",
57+
"disable_data_collection": Example(
58+
summary="Disable data collection",
5659
description="Remove all data collection policies of the pipeline",
57-
value={"data_collection_policies": []},
60+
value={"data_collection": {"max_dataset_size": None, "policies": []}},
5861
),
5962
"change_device": Example(
6063
summary="Change inference device",
@@ -119,11 +122,13 @@ def update_pipeline(
119122
)
120123

121124
try:
122-
if "data_collection_policies" in pipeline_config:
123-
pipeline_config["data_collection_policies"] = [
124-
DataCollectionPolicyAdapter.validate_python(policy)
125-
for policy in pipeline_config["data_collection_policies"]
126-
]
125+
if "data_collection" in pipeline_config:
126+
data_collection = pipeline_config["data_collection"]
127+
if "policies" in data_collection:
128+
data_collection["policies"] = [
129+
DataCollectionPolicyAdapter.validate_python(policy) for policy in data_collection["policies"]
130+
]
131+
pipeline_config["data_collection"] = DataCollectionConfig.model_validate(data_collection)
127132
updated = pipeline_service.update_pipeline(project_id, pipeline_config)
128133
return PipelineView.model_validate(updated, from_attributes=True)
129134
except ResourceNotFoundError as e:

application/backend/app/api/schemas/pipeline.py

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
from pydantic import BaseModel, Field
77

8-
from app.models import DataCollectionPolicy, ModelRevision, PipelineStatus
8+
from app.models import DataCollectionConfig, ModelRevision, PipelineStatus
99

1010
from .sink import SinkView
1111
from .source import SourceView
@@ -17,7 +17,7 @@ class PipelineView(BaseModel):
1717
sink: SinkView | None = None # None if disconnected
1818
model_revision: ModelRevision | None = Field(default=None, serialization_alias="model")
1919
status: PipelineStatus = PipelineStatus.IDLE
20-
data_collection_policies: list[DataCollectionPolicy] = Field(default_factory=list)
20+
data_collection: DataCollectionConfig = Field(default_factory=DataCollectionConfig)
2121
device: str = Field(default="cpu", description="Inference device (e.g., 'cpu', 'xpu', 'cuda', 'xpu-2', 'cuda-1')")
2222

2323
model_config = {
@@ -54,19 +54,22 @@ class PipelineView(BaseModel):
5454
},
5555
"status": "running",
5656
"device": "cpu",
57-
"data_collection_policies": [
58-
{
59-
"type": "fixed_rate",
60-
"enabled": "true",
61-
"rate": 0.02,
62-
},
63-
{
64-
"type": "confidence_threshold",
65-
"enabled": "true",
66-
"confidence_threshold": 0.2,
67-
"min_sampling_interval": 2.5,
68-
},
69-
],
57+
"data_collection": {
58+
"max_dataset_size": 500,
59+
"policies": [
60+
{
61+
"type": "fixed_rate",
62+
"enabled": True,
63+
"rate": 0.02,
64+
},
65+
{
66+
"type": "confidence_threshold",
67+
"enabled": True,
68+
"confidence_threshold": 0.2,
69+
"min_sampling_interval": 2.5,
70+
},
71+
],
72+
},
7073
}
7174
}
7275
}

application/backend/app/db/schema.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class PipelineDB(Base):
4747
sink_id: Mapped[str | None] = mapped_column(Text, ForeignKey("sinks.id", ondelete="RESTRICT"))
4848
model_revision_id: Mapped[str | None] = mapped_column(Text, ForeignKey("model_revisions.id", ondelete="RESTRICT"))
4949
is_running: Mapped[bool] = mapped_column(Boolean, default=False)
50-
data_collection_policies: Mapped[list] = mapped_column(JSON, nullable=False, default=list)
50+
data_collection: Mapped[dict] = mapped_column(JSON, nullable=False, default=dict)
5151
device: Mapped[str] = mapped_column(String(50), nullable=False, default="cpu")
5252

5353
sink = relationship("SinkDB", uselist=False, lazy="joined")

application/backend/app/db_seeder.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
from app.db.schema import LabelDB, ModelRevisionDB, PipelineDB, ProjectDB, SinkDB, SourceDB
99
from app.models import (
10+
DataCollectionConfig,
1011
DisconnectedSinkConfig,
1112
DisconnectedSourceConfig,
1213
FixedRateDataCollectionPolicy,
@@ -139,7 +140,10 @@ def _create_pipeline_with_video_source( # noqa: PLR0913
139140
pipeline = PipelineDB(
140141
project_id=project_id,
141142
sink_id=sink_id,
142-
data_collection_policies=[FixedRateDataCollectionPolicy(rate=0.1).model_dump(mode="json")],
143+
data_collection=DataCollectionConfig(
144+
max_dataset_size=100,
145+
policies=[FixedRateDataCollectionPolicy(rate=0.1)],
146+
).model_dump(mode="json"),
143147
is_running=project_id == "9d6af8e8-6017-4ebe-9126-33aae739c5fa", # Running only for detection project
144148
)
145149

application/backend/app/models/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
from .data_collection_policy import (
55
ConfidenceThresholdDataCollectionPolicy,
6+
DataCollectionConfig,
67
DataCollectionPolicy,
78
DataCollectionPolicyAdapter,
89
FixedRateDataCollectionPolicy,
@@ -44,6 +45,7 @@
4445

4546
__all__ = [
4647
"ConfidenceThresholdDataCollectionPolicy",
48+
"DataCollectionConfig",
4749
"DataCollectionPolicy",
4850
"DataCollectionPolicyAdapter",
4951
"DatasetItem",

application/backend/app/models/data_collection_policy.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,22 @@ class ConfidenceThresholdDataCollectionPolicy(DataCollectionPolicyBase):
2727
]
2828

2929
DataCollectionPolicyAdapter: TypeAdapter[DataCollectionPolicy] = TypeAdapter(DataCollectionPolicy)
30+
31+
32+
class DataCollectionConfig(BaseModel):
33+
"""
34+
Configuration for data collection during pipeline execution.
35+
36+
Attributes:
37+
max_dataset_size: Maximum number of items allowed in the dataset. When reached,
38+
data collection will be disabled to prevent uncontrolled dataset growth.
39+
Set to None for unlimited collection (default).
40+
policies: List of policies governing data collection behavior.
41+
"""
42+
43+
max_dataset_size: int | None = Field(
44+
default=None,
45+
ge=1,
46+
description="Maximum number of items allowed in the dataset. None for unlimited.",
47+
)
48+
policies: list[DataCollectionPolicy] = Field(default_factory=list)

0 commit comments

Comments
 (0)