Skip to content

Make demand action accept timestamps & write manual timestamps guide#1079

Open
FelonEkonom wants to merge 28 commits intomasterfrom
time-on-demands
Open

Make demand action accept timestamps & write manual timestamps guide#1079
FelonEkonom wants to merge 28 commits intomasterfrom
time-on-demands

Conversation

@FelonEkonom
Copy link
Member

@FelonEkonom FelonEkonom commented Feb 25, 2026

Closes #1072
Closes #1081

@FelonEkonom FelonEkonom changed the title Demand in timestamps Make demand action accept timestamps Feb 25, 2026
@FelonEkonom FelonEkonom self-assigned this Mar 9, 2026
@FelonEkonom FelonEkonom changed the title Make demand action accept timestamps Make demand action accept timestamps, write manual timestamps guide Mar 9, 2026
@FelonEkonom FelonEkonom marked this pull request as ready for review March 9, 2026 15:10
@FelonEkonom FelonEkonom requested a review from mat-hek as a code owner March 9, 2026 15:10
@FelonEkonom FelonEkonom marked this pull request as draft March 9, 2026 15:12
@FelonEkonom FelonEkonom marked this pull request as ready for review March 9, 2026 17:16
@FelonEkonom FelonEkonom changed the title Make demand action accept timestamps, write manual timestamps guide Make demand action accept timestamps & write manual timestamps guide Mar 9, 2026
@FelonEkonom FelonEkonom requested a review from varsill March 10, 2026 09:44
Copy link
Contributor

@varsill varsill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a guide review, I will come back with code review later

element whose output pad with manual flow control is connected to that
downstream. The callback receives the pad name, the demanded amount, and the
demand unit. The element is expected to produce and send that amount of data,
or less if the stream ends.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some note that of course the element does not need to fulfill the whole demand all at once, in a single callback's invocation

flow control — the output pad inherits `:buffers`.

So the output pad can explicitly control the unit it receives demand in, but
timestamp units are not available on output pads.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this "timestamp units are not available on output pads." part 🤔

the demanded value.

Timestamp demand units are only applicable to **input pads with manual flow
control**. Output pads do not support them. If an input pad uses a timestamp
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's rephrase it slightly so that to emphasize that the output pad connected to an input pad with timestamps demand unit receives demand in :bytes or :buffers, as specified by output pad's demand unit (defaulting to :buffers)

end
```

> #### Do not use redemand in a filter's `handle_demand` {: .warning}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something is wrong with this {: .warning}

on `:input`, so each demand naturally covers the next one-second slice of the
stream.

## Filters with manual flow control
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using :redemand in filters in also quite "canonical" and it's less intuitive than using it in sources - how about providing an example for it as well?

Comment on lines +32 to +33
For count/byte metrics, subtracts `consumed_size` from `demand`.
For timestamp metrics, the demand is a duration that does not change as buffers are consumed —
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[NIT] I am not sure which should describe how do behaviour implementations work in the behaviour's callback definition

Comment on lines -400 to -404
defp delay_supplying_demand(pad_ref, 0, state) do
Membrane.Logger.debug_verbose("Ignoring demand of size of 0 on pad #{inspect(pad_ref)}")
state
end

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we removing it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because demand 0 is not ignored when unit is timestamp

def decrease_demand_by_outgoing_buffers(pad_ref, buffers, state) do
pad_data = PadModel.get_data!(state, pad_ref)
buffers_size = Buffer.Metric.from_unit(pad_data.demand_unit).buffers_size(buffers)
{:ok, buffers_size} = Buffer.Metric.from_unit(pad_data.demand_unit).buffers_size(buffers)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will cause a nasty match error when buffers_size returns {:error, ...} - shouldn't we propagate the error further or explicitly raise with a meaningfull error message?


When the pad's `demand_unit` is `:timestamp`, `{:timestamp, :pts}`,
`{:timestamp, :dts}`, or `{:timestamp, :dts_or_pts}`, the demand size is a
`t:Membrane.Time.t/0` duration (in nanoseconds). The queue will deliver
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
`t:Membrane.Time.t/0` duration (in nanoseconds). The queue will deliver
`t:Membrane.Time.t/0` duration (expressed internally in nanoseconds). The queue will deliver

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will remove this nanoseconds mention

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, please put the metrics into separate modules.


defp maybe_assert_timestamps_present!(buffers, outbound_metric, pad_ref) do
Enum.each(buffers, fn buffer ->
if outbound_metric.nil_timestamp?(buffer) do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to make get_timestamp() a public callback and use it here to compare it with nil - this way we could get rid of nil_timestamp?() callback

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed - let's:

  • add is_timestamp_metric guard and use it in InputQueue
  • unify generate_metric_specific_warning() and nil_timestamp? assertion into a single callback in TimestampMetric behaviour

Co-authored-by: Łukasz Kita <lukasz.kita0@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Guide about manual demands Time on demands, demands on time 🤔

2 participants