Skip to content

Make demand action accept timestamps & write manual timestamps guide#1079

Open
FelonEkonom wants to merge 44 commits intomasterfrom
time-on-demands
Open

Make demand action accept timestamps & write manual timestamps guide#1079
FelonEkonom wants to merge 44 commits intomasterfrom
time-on-demands

Conversation

@FelonEkonom
Copy link
Copy Markdown
Member

@FelonEkonom FelonEkonom commented Feb 25, 2026

Closes #1072
Closes #1081

@FelonEkonom FelonEkonom changed the title Demand in timestamps Make demand action accept timestamps Feb 25, 2026
@FelonEkonom FelonEkonom self-assigned this Mar 9, 2026
@FelonEkonom FelonEkonom changed the title Make demand action accept timestamps Make demand action accept timestamps, write manual timestamps guide Mar 9, 2026
@FelonEkonom FelonEkonom marked this pull request as ready for review March 9, 2026 15:10
@FelonEkonom FelonEkonom requested a review from mat-hek as a code owner March 9, 2026 15:10
@FelonEkonom FelonEkonom marked this pull request as draft March 9, 2026 15:12
@FelonEkonom FelonEkonom marked this pull request as ready for review March 9, 2026 17:16
@FelonEkonom FelonEkonom changed the title Make demand action accept timestamps, write manual timestamps guide Make demand action accept timestamps & write manual timestamps guide Mar 9, 2026
@FelonEkonom FelonEkonom requested a review from varsill March 10, 2026 09:44
Copy link
Copy Markdown
Contributor

@varsill varsill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a guide review, I will come back with code review later

Comment thread guides/useful_concepts/manual_demands.md Outdated
Comment thread guides/useful_concepts/manual_demands.md Outdated
element whose output pad with manual flow control is connected to that
downstream. The callback receives the pad name, the demanded amount, and the
demand unit. The element is expected to produce and send that amount of data,
or less if the stream ends.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some note that of course the element does not need to fulfill the whole demand all at once, in a single callback's invocation

Comment thread guides/useful_concepts/manual_demands.md Outdated
flow control — the output pad inherits `:buffers`.

So the output pad can explicitly control the unit it receives demand in, but
timestamp units are not available on output pads.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this "timestamp units are not available on output pads." part 🤔

the demanded value.

Timestamp demand units are only applicable to **input pads with manual flow
control**. Output pads do not support them. If an input pad uses a timestamp
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's rephrase it slightly so that to emphasize that the output pad connected to an input pad with timestamps demand unit receives demand in :bytes or :buffers, as specified by output pad's demand unit (defaulting to :buffers)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it said in the line below? TBH I don't know if this commend was written before, or after I added this line, so let me know it the way how it is written right now looks good to you

Comment thread guides/useful_concepts/manual_demands.md Outdated
Comment thread guides/useful_concepts/manual_demands.md Outdated
end
```

> #### Do not use redemand in a filter's `handle_demand` {: .warning}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something is wrong with this {: .warning}

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYM? It works fine when I run $ mix docs --open

on `:input`, so each demand naturally covers the next one-second slice of the
stream.

## Filters with manual flow control
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using :redemand in filters in also quite "canonical" and it's less intuitive than using it in sources - how about providing an example for it as well?

Comment thread lib/membrane/buffer/metric.ex Outdated
Comment thread lib/membrane/buffer/metric.ex Outdated
Comment on lines +32 to +33
For count/byte metrics, subtracts `consumed_size` from `demand`.
For timestamp metrics, the demand is a duration that does not change as buffers are consumed —
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[NIT] I am not sure which should describe how do behaviour implementations work in the behaviour's callback definition

Comment on lines -400 to -404
defp delay_supplying_demand(pad_ref, 0, state) do
Membrane.Logger.debug_verbose("Ignoring demand of size of 0 on pad #{inspect(pad_ref)}")
state
end

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we removing it?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because demand 0 is not ignored when unit is timestamp

def decrease_demand_by_outgoing_buffers(pad_ref, buffers, state) do
pad_data = PadModel.get_data!(state, pad_ref)
buffers_size = Buffer.Metric.from_unit(pad_data.demand_unit).buffers_size(buffers)
{:ok, buffers_size} = Buffer.Metric.from_unit(pad_data.demand_unit).buffers_size(buffers)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will cause a nasty match error when buffers_size returns {:error, ...} - shouldn't we propagate the error further or explicitly raise with a meaningfull error message?

Comment thread lib/membrane/element/action.ex Outdated

When the pad's `demand_unit` is `:timestamp`, `{:timestamp, :pts}`,
`{:timestamp, :dts}`, or `{:timestamp, :dts_or_pts}`, the demand size is a
`t:Membrane.Time.t/0` duration (in nanoseconds). The queue will deliver
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
`t:Membrane.Time.t/0` duration (in nanoseconds). The queue will deliver
`t:Membrane.Time.t/0` duration (expressed internally in nanoseconds). The queue will deliver

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will remove this nanoseconds mention

Comment thread lib/membrane/buffer/metric/timestamp.ex Outdated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, please put the metrics into separate modules.


defp maybe_assert_timestamps_present!(buffers, outbound_metric, pad_ref) do
Enum.each(buffers, fn buffer ->
if outbound_metric.nil_timestamp?(buffer) do
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to make get_timestamp() a public callback and use it here to compare it with nil - this way we could get rid of nil_timestamp?() callback

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed - let's:

  • add is_timestamp_metric guard and use it in InputQueue
  • unify generate_metric_specific_warning() and nil_timestamp? assertion into a single callback in TimestampMetric behaviour

Co-authored-by: Łukasz Kita <lukasz.kita0@gmail.com>
Comment on lines +1 to +3
# Manual demands

Elements with pads using manual flow control have two responsibilities:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd start with something like

This guide explains how to use manual demands. Make sure you read the introduction to flow control first.

Manual demands are a mechanism for manually controlling the speed of processing data in Membrane pipelines. This mechanism is powerful, but complex - use it only when :auto flow control is not sufficient for your use case.

I'd also link to this guide in https://hexdocs.pm/membrane_core/06_flow_control.html and add examples of when to use and when not to use manual demands either there or here.


1. If the **output pad** declares `demand_unit: :buffers | :bytes`, that unit
is used.
2. Otherwise, if the linked **input pad** declares `:buffers` or `:bytes` :demand_unit, that
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
2. Otherwise, if the linked **input pad** declares `:buffers` or `:bytes` :demand_unit, that
2. Otherwise, if the linked **input pad** declares `:buffers` or `:bytes` `:demand_unit`, that

demand unit. The element is expected to produce and send that amount of data,
or less if the stream ends.

The unit in which `demand_size` is expressed is resolved as follows:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's first explain each demand unit

Comment on lines +24 to +29
1. If the **output pad** declares `demand_unit: :buffers | :bytes`, that unit
is used.
2. Otherwise, if the linked **input pad** declares `:buffers` or `:bytes` :demand_unit, that
unit is inherited.
3. Otherwise — when the input pad uses a timestamp demand unit or has auto
flow control — the output pad inherits `:buffers`.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mention that the demand unit is automatically converted to what the output expects if it doesn't match what the input uses

Comment on lines +36 to +39
[`handle_demand/5`](`c:Membrane.Element.WithOutputPads.handle_demand/5`) always
receives the **total current outstanding demand** from downstream, not a delta.
There is no need to accumulate demand values across multiple callback invocations
— each call tells you the full amount still expected.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mention that the delta is available in the context

end
```

A source generating random bytes:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the sentence 'a source generating random bytes/buffers' is not precise - each element generates buffers. I'd say 'source handling demand when the unit is :buffers'

Comment on lines +380 to +382
> Returning [`:redemand`](`t:Membrane.Element.Action.redemand/0`) from
> [`handle_demand/5`](`c:Membrane.Element.WithOutputPads.handle_demand/5`) in a
> filter is illegal. [`handle_demand/5`](`c:Membrane.Element.WithOutputPads.handle_demand/5`)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it? I don't see it mentioned anywhere in the docs, nor prohibited in the code. I think it rather doesn't make sense, but it's not illegal

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK it is illegal, or at least it was made illegal by recent PRs

> [`handle_buffer/4`](`c:Membrane.Element.WithInputPads.handle_buffer/4`) when
> the data actually arrives.

## Auto flow control
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd move this to the beginning or to https://hexdocs.pm/membrane_core/06_flow_control.html and add an example when auto flow control is not sufficient

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I added it here deliberately, to as a reference for somebody how will read this guide, to remind that auto flow control exists

@FelonEkonom
Copy link
Copy Markdown
Member Author

TODO: deprecate old buffer metric modules and introduce one big private module to rule them all instead

Comment thread lib/membrane/buffer/metric/timestamp.ex Outdated
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, let's remove the behaviours

@FelonEkonom FelonEkonom requested review from mat-hek and varsill April 15, 2026 10:46
Comment on lines +18 to +22
[%Buffer{}] | [],
non_neg_integer | Membrane.Time.t(),
first_consumed_buffer :: Buffer.t() | nil,
last_consumed_buffer :: Buffer.t() | nil
) :: {[%Buffer{}] | [], [%Buffer{}] | []}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
[%Buffer{}] | [],
non_neg_integer | Membrane.Time.t(),
first_consumed_buffer :: Buffer.t() | nil,
last_consumed_buffer :: Buffer.t() | nil
) :: {[%Buffer{}] | [], [%Buffer{}] | []}
[%Buffer{}],
non_neg_integer | Membrane.Time.t(),
first_consumed_buffer :: Buffer.t() | nil,
last_consumed_buffer :: Buffer.t() | nil
) :: {[%Buffer{}], [%Buffer{}]}

[first | rest] = buffers

_last =
Enum.reduce(rest, first, fn
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iterating over all buffers multiple times may introduce some overhead. We do it here and in assert_non_nil_timestamps and split_timestamp_buffers. I'd do that once if possible.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a copy of Membrane.Core.Element.ManualFlowController.BufferMetric?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Guide about manual demands Time on demands, demands on time 🤔

3 participants