Skip to content

Conversation

@NGrech
Copy link
Collaborator

@NGrech NGrech commented Aug 29, 2025

  • Implement DataStreamService.getBatchForStudyDeployments for cross- deployment retrieval
  • Introduce CollectedDataPoint and CollectedDataSet containers
  • Support filters: DataType and time range ([from, to])
  • Normalize timestamps via SyncPoint (epoch µs)
  • Update JSON schemas and RPC request handling
  • Add tests (serialization, handler dispatch, in-memory filters/order, schema example validation)
  • Document endpoint in carp-data.md

Enables efficient retrieval across multiple study deployments with optional filtering for analytics and reporting use cases.

@NGrech NGrech added the feature New functionality. label Aug 29, 2025
@NGrech NGrech self-assigned this Aug 29, 2025
@NGrech NGrech added this to the 2.0.0 milestone Aug 29, 2025
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces a new endpoint getBatchForStudyDeployments to the DataStreamService for retrieving data across multiple study deployments with filtering capabilities.

  • Adds getBatchForStudyDeployments method with filters for data types, device roles, and time ranges
  • Introduces CollectedDataPoint and CollectedDataSet data containers for cross-deployment data
  • Updates RPC infrastructure, JSON schemas, and documentation to support the new endpoint

Reviewed Changes

Copilot reviewed 13 out of 13 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
carp.data.core/src/commonMain/kotlin/dk/cachet/carp/data/application/DataStreamService.kt Adds the new getBatchForStudyDeployments method signature to the service interface
carp.data.core/src/commonMain/kotlin/dk/cachet/carp/data/application/CollectedDataPoint.kt Defines the data structure for individual collected data points
carp.data.core/src/commonMain/kotlin/dk/cachet/carp/data/application/CollectedDataSet.kt Defines the collection container with utility methods for filtering
carp.data.core/src/commonMain/kotlin/dk/cachet/carp/data/infrastructure/InMemoryDataStreamService.kt Implements the new endpoint with filtering and validation logic
carp.data.core/src/commonMain/kotlin/dk/cachet/carp/data/infrastructure/DataStreamServiceRequest.kt Adds the RPC request class for the new endpoint
carp.data.core/src/commonMain/kotlin/dk/cachet/carp/data/infrastructure/DataStreamServiceDecorator.kt Updates the decorator to handle the new endpoint
rpc/schemas/data/CollectedDataPoint.json JSON schema definition for CollectedDataPoint
rpc/schemas/data/CollectedDataSet.json JSON schema definition for CollectedDataSet
rpc/schemas/data/DataStreamService/DataStreamServiceRequest.json Updates the RPC schema to include the new request type
rpc/src/main/kotlin/dk/cachet/carp/rpc/GenerateExampleRequests.kt Adds example request for the new endpoint
carp.data.core/src/commonTest/kotlin/dk/cachet/carp/data/infrastructure/InMemoryDataStreamServiceBatchRetrievalTest.kt Comprehensive test suite for the new functionality
carp.data.core/src/commonTest/kotlin/dk/cachet/carp/data/infrastructure/DataStreamServiceInfrastructureTest.kt Adds the new request to infrastructure tests
docs/carp-data.md Documents the new endpoint in the service documentation

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

@Whathecode
Copy link
Member

Whathecode commented Sep 1, 2025

@NGrech Can you clean up the commit history a bit? At least the fixes can be squashed. There is no point in me reviewing a commit which later gets undone. ;) Also, the generated test files should probably be added simultaneously with the commit which would otherwise fail if you don't do so. It should be a goal for each commit to compile.

Possibly, that means all of this should live in a single commit. I'll review it as such if I find some time.

@NGrech NGrech force-pushed the feature/data-batch-endpoint-syncpoint branch from d2b7a73 to eca28e5 Compare September 1, 2025 12:07
@NGrech
Copy link
Collaborator Author

NGrech commented Sep 1, 2025

@Whathecode I squashed the new fixes into the original one.
Note that the reason the generated test files were not in the original commit was that there is no mention of that requirement in the CONTRIBUTING.md.
I think we should add fix this:

You can also run detekt separately through gradle detekt

to gradle detektPasses, since that is the command run in the code analysis check when committing and (at least on windows) gradle detekt will build successfully when there are issues that gradle detektPasses will fail on.

Copy link
Member

@Whathecode Whathecode left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First review round: questioning what a CollectedDataPoint is. More to follow as I continue looking, but that seems pretty fundamental. :)

@NGrech
Copy link
Collaborator Author

NGrech commented Sep 3, 2025

@Whathecode I have looked at the comments you have made and I replied to your feedback, I had some questions and comments so would be interested in seeing your views.

@Whathecode
Copy link
Member

I see you added this to the 2.0.0 milestone instead of 1.3. Any reason you expect this to be a breaking change, i.e., warranting a new major release?

…sts, and docs

- Implement DataStreamService.getBatchForStudyDeployments for cross-deployment retrieval with filters (deployment IDs, deviceRoleNames, dataTypes, time range)
- Normalize timestamps via SyncPoint
- Add ImmutableDataStreamBatch/Sequence for efficient data access
- Update InMemoryDataStreamService for batch retrieval
- Update JSON schemas, RPC handling, and documentation
- Refactor and clean up related code
- Add comprehensive tests and test resources
@NGrech NGrech force-pushed the feature/data-batch-endpoint-syncpoint branch from eca28e5 to e96bce2 Compare September 17, 2025 11:09
@NGrech
Copy link
Collaborator Author

NGrech commented Sep 17, 2025

@Whathecode I have updated the the code based on the last discussion.

Copy link
Member

@Whathecode Whathecode left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still an incomplete review, but I started looking at why you added ImmutableDataStreamBatch and ...Sequence. The PR description is missing some clarification in regards to why you are adding this. Have a look at some of the questions I asked and see whether you can clarify things.

I also have the impression that adding these changes can easily be done as a separate commit (and even PR). You don't need those for your updates to DataStreamService, and as far as I can tell, the existing data structures would work just fine.

While looking at changes, I noticed some incorrect code style whitespaces. I added a commit which you can squash.

* and cleaner concepts.
*/
@JsExport
class ImmutableDataStreamSequence<TData : Data> private constructor(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really needed? 🤔 You don't use it here. How do you expect to use it?

There already is the read-only DataStreamSequence interface, and as part of serialization DataStreamSequenceSnapshot is used (which is immutable). So, unless you want to safeguard against explicit casts that should get you there already from an encapsulation perspective,.

Comment on lines +147 to +149
* Unlike [MutableDataStreamBatch] which enforces non-overlapping sequences for incremental building,
* this implementation is designed for cross-device, cross-sensor data analysis where temporal
* overlap between different devices/data types is expected and valid.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The contract of DataStreamBatch is:

A collection of non-overlapping, ordered, data stream [sequences].

Thus, at a glance this is a violation of the Liskov Substitution Principle. You break the base contract which is expected to be upheld.

However, DataStreamBatch does allow for "cross-device, cross-sensor data". The documentation on the types should probably be improved, but the non-overlap/order constraint is per data stream. The appendSequence and appendBatch methods clarify this better. You can see that sequences are stored per DataStreamId, which includes deviceRoleName.

Possibly there is something wrong with the implementation which prompted you to create this class?

If you were missing an appendSequences method, you could add that to MutableDataStreamBatch. Furthermore, that class currently is overly strict, in that it doesn't allow adding a preceding sequence even if doing so wouldn't cause any overlap. In case that is what you need, you could support that by updating the internal data structure and re-order elements. I simply didn't need this before.

Comment on lines +151 to +152
* The internal structure is optimized for analytics queries:
* Device Role Name → Data Type → Synchronized Data Sequences
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would device role name -> data type -> sequences be quicker than a composite key lookup of device role name/data type -> sequences?

* @throws IllegalArgumentException when sequences within the same DataStreamId overlap.
*
* note: Sequences across different devices/data types may overlap (this is expected for analytics).
* note: private constructor + factory methods provides validation and optimal internal structure.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Design rationale doesn't belong in API documentation. Furthermore, that's what factory methods are usually for, so no need to comment on this.

val deviceDataMap = sequences
.groupBy { it.dataStream.deviceRoleName }
.mapValues { (_, deviceSequences) ->
deviceSequences.groupBy { it.dataStream.dataType }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These aren't ordered, thus breaking the DataStreamBatch contract. Is that intentional?

Comment on lines 213 to 230
/**
* Get all device role names represented in this batch.
*/
fun getDeviceRoleNames(): Set<String> = deviceDataMap.keys

/**
* Get all data types represented in this batch.
*
* @param deviceRoleName if specified, only return data types for this device role name
*/
fun getDataTypes( deviceRoleName: String? = null ): Set<DataType> =
if (deviceRoleName != null)
{
deviceDataMap[deviceRoleName]?.keys.orEmpty()
} else
{
deviceDataMap.values.flatMap { it.keys }.toSet()
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If these are useful, they are likely useful on the interface as well.

Comment on lines +232 to +237
/**
* Get all sequences for a specific device role name.
*
* @param synchronize if true, convert all sequences to synchronized data points and flatten
*/
fun getForDevice( deviceRoleName: String, synchronize: Boolean = false ): Any =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Synchronization is an operation. And, how to synchronize is open for change. Directly linking that to the data structure is thus undesirable. Instead, have an operation perform synchronization on the data structure.

Why is the return type Any?

{
// Return synchronized data points grouped by data type
deviceDataMap[deviceRoleName]?.mapValues { (_, sequences) ->
sequences.flatMap { it }.map { it.synchronize() }.sortedBy { it.syncPoint.synchronizedOn }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remember that it.synchronize() can cause data points to "jump back in time". So, the previous point may end up laying before the next one in the sequence.

I consider it part of synchronization to make sure that doesn't happen. In essence, it means "synchronization" didn't happen correctly (how should you interpret this data?). A quick pragmatic solution is to drop such data points. Another solution would be some type of compression/interpolation. A perfect solution would look at data outside of the stream to see whether correlations can be found with other data streams in order to update the sync points.

Doing the first pragmatic solution, and documenting this, is probably just fine.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feature New functionality.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants