Make demand action accept timestamps & write manual timestamps guide#1079
Make demand action accept timestamps & write manual timestamps guide#1079FelonEkonom wants to merge 44 commits intomasterfrom
Conversation
varsill
left a comment
There was a problem hiding this comment.
Just a guide review, I will come back with code review later
| element whose output pad with manual flow control is connected to that | ||
| downstream. The callback receives the pad name, the demanded amount, and the | ||
| demand unit. The element is expected to produce and send that amount of data, | ||
| or less if the stream ends. |
There was a problem hiding this comment.
Add some note that of course the element does not need to fulfill the whole demand all at once, in a single callback's invocation
| flow control — the output pad inherits `:buffers`. | ||
|
|
||
| So the output pad can explicitly control the unit it receives demand in, but | ||
| timestamp units are not available on output pads. |
There was a problem hiding this comment.
I don't understand this "timestamp units are not available on output pads." part 🤔
| the demanded value. | ||
|
|
||
| Timestamp demand units are only applicable to **input pads with manual flow | ||
| control**. Output pads do not support them. If an input pad uses a timestamp |
There was a problem hiding this comment.
Let's rephrase it slightly so that to emphasize that the output pad connected to an input pad with timestamps demand unit receives demand in :bytes or :buffers, as specified by output pad's demand unit (defaulting to :buffers)
There was a problem hiding this comment.
Isn't it said in the line below? TBH I don't know if this commend was written before, or after I added this line, so let me know it the way how it is written right now looks good to you
| end | ||
| ``` | ||
|
|
||
| > #### Do not use redemand in a filter's `handle_demand` {: .warning} |
There was a problem hiding this comment.
Something is wrong with this {: .warning}
There was a problem hiding this comment.
WDYM? It works fine when I run $ mix docs --open
| on `:input`, so each demand naturally covers the next one-second slice of the | ||
| stream. | ||
|
|
||
| ## Filters with manual flow control |
There was a problem hiding this comment.
I think using :redemand in filters in also quite "canonical" and it's less intuitive than using it in sources - how about providing an example for it as well?
| For count/byte metrics, subtracts `consumed_size` from `demand`. | ||
| For timestamp metrics, the demand is a duration that does not change as buffers are consumed — |
There was a problem hiding this comment.
[NIT] I am not sure which should describe how do behaviour implementations work in the behaviour's callback definition
| defp delay_supplying_demand(pad_ref, 0, state) do | ||
| Membrane.Logger.debug_verbose("Ignoring demand of size of 0 on pad #{inspect(pad_ref)}") | ||
| state | ||
| end | ||
|
|
There was a problem hiding this comment.
Because demand 0 is not ignored when unit is timestamp
| def decrease_demand_by_outgoing_buffers(pad_ref, buffers, state) do | ||
| pad_data = PadModel.get_data!(state, pad_ref) | ||
| buffers_size = Buffer.Metric.from_unit(pad_data.demand_unit).buffers_size(buffers) | ||
| {:ok, buffers_size} = Buffer.Metric.from_unit(pad_data.demand_unit).buffers_size(buffers) |
There was a problem hiding this comment.
It will cause a nasty match error when buffers_size returns {:error, ...} - shouldn't we propagate the error further or explicitly raise with a meaningfull error message?
|
|
||
| When the pad's `demand_unit` is `:timestamp`, `{:timestamp, :pts}`, | ||
| `{:timestamp, :dts}`, or `{:timestamp, :dts_or_pts}`, the demand size is a | ||
| `t:Membrane.Time.t/0` duration (in nanoseconds). The queue will deliver |
There was a problem hiding this comment.
| `t:Membrane.Time.t/0` duration (in nanoseconds). The queue will deliver | |
| `t:Membrane.Time.t/0` duration (expressed internally in nanoseconds). The queue will deliver |
There was a problem hiding this comment.
I will remove this nanoseconds mention
There was a problem hiding this comment.
As discussed, please put the metrics into separate modules.
|
|
||
| defp maybe_assert_timestamps_present!(buffers, outbound_metric, pad_ref) do | ||
| Enum.each(buffers, fn buffer -> | ||
| if outbound_metric.nil_timestamp?(buffer) do |
There was a problem hiding this comment.
I think it would be better to make get_timestamp() a public callback and use it here to compare it with nil - this way we could get rid of nil_timestamp?() callback
There was a problem hiding this comment.
As discussed - let's:
- add
is_timestamp_metricguard and use it in InputQueue - unify
generate_metric_specific_warning()andnil_timestamp?assertion into a single callback inTimestampMetricbehaviour
Co-authored-by: Łukasz Kita <lukasz.kita0@gmail.com>
| # Manual demands | ||
|
|
||
| Elements with pads using manual flow control have two responsibilities: |
There was a problem hiding this comment.
I'd start with something like
This guide explains how to use manual demands. Make sure you read the introduction to flow control first.
Manual demands are a mechanism for manually controlling the speed of processing data in Membrane pipelines. This mechanism is powerful, but complex - use it only when
:autoflow control is not sufficient for your use case.
I'd also link to this guide in https://hexdocs.pm/membrane_core/06_flow_control.html and add examples of when to use and when not to use manual demands either there or here.
|
|
||
| 1. If the **output pad** declares `demand_unit: :buffers | :bytes`, that unit | ||
| is used. | ||
| 2. Otherwise, if the linked **input pad** declares `:buffers` or `:bytes` :demand_unit, that |
There was a problem hiding this comment.
| 2. Otherwise, if the linked **input pad** declares `:buffers` or `:bytes` :demand_unit, that | |
| 2. Otherwise, if the linked **input pad** declares `:buffers` or `:bytes` `:demand_unit`, that |
| demand unit. The element is expected to produce and send that amount of data, | ||
| or less if the stream ends. | ||
|
|
||
| The unit in which `demand_size` is expressed is resolved as follows: |
There was a problem hiding this comment.
Let's first explain each demand unit
| 1. If the **output pad** declares `demand_unit: :buffers | :bytes`, that unit | ||
| is used. | ||
| 2. Otherwise, if the linked **input pad** declares `:buffers` or `:bytes` :demand_unit, that | ||
| unit is inherited. | ||
| 3. Otherwise — when the input pad uses a timestamp demand unit or has auto | ||
| flow control — the output pad inherits `:buffers`. |
There was a problem hiding this comment.
mention that the demand unit is automatically converted to what the output expects if it doesn't match what the input uses
| [`handle_demand/5`](`c:Membrane.Element.WithOutputPads.handle_demand/5`) always | ||
| receives the **total current outstanding demand** from downstream, not a delta. | ||
| There is no need to accumulate demand values across multiple callback invocations | ||
| — each call tells you the full amount still expected. |
There was a problem hiding this comment.
Mention that the delta is available in the context
| end | ||
| ``` | ||
|
|
||
| A source generating random bytes: |
There was a problem hiding this comment.
I think the sentence 'a source generating random bytes/buffers' is not precise - each element generates buffers. I'd say 'source handling demand when the unit is :buffers'
| > Returning [`:redemand`](`t:Membrane.Element.Action.redemand/0`) from | ||
| > [`handle_demand/5`](`c:Membrane.Element.WithOutputPads.handle_demand/5`) in a | ||
| > filter is illegal. [`handle_demand/5`](`c:Membrane.Element.WithOutputPads.handle_demand/5`) |
There was a problem hiding this comment.
Is it? I don't see it mentioned anywhere in the docs, nor prohibited in the code. I think it rather doesn't make sense, but it's not illegal
There was a problem hiding this comment.
AFAIK it is illegal, or at least it was made illegal by recent PRs
| > [`handle_buffer/4`](`c:Membrane.Element.WithInputPads.handle_buffer/4`) when | ||
| > the data actually arrives. | ||
|
|
||
| ## Auto flow control |
There was a problem hiding this comment.
I'd move this to the beginning or to https://hexdocs.pm/membrane_core/06_flow_control.html and add an example when auto flow control is not sufficient
There was a problem hiding this comment.
Hm, I added it here deliberately, to as a reference for somebody how will read this guide, to remind that auto flow control exists
|
TODO: deprecate old buffer metric modules and introduce one big private module to rule them all instead |
There was a problem hiding this comment.
As discussed, let's remove the behaviours
| [%Buffer{}] | [], | ||
| non_neg_integer | Membrane.Time.t(), | ||
| first_consumed_buffer :: Buffer.t() | nil, | ||
| last_consumed_buffer :: Buffer.t() | nil | ||
| ) :: {[%Buffer{}] | [], [%Buffer{}] | []} |
There was a problem hiding this comment.
| [%Buffer{}] | [], | |
| non_neg_integer | Membrane.Time.t(), | |
| first_consumed_buffer :: Buffer.t() | nil, | |
| last_consumed_buffer :: Buffer.t() | nil | |
| ) :: {[%Buffer{}] | [], [%Buffer{}] | []} | |
| [%Buffer{}], | |
| non_neg_integer | Membrane.Time.t(), | |
| first_consumed_buffer :: Buffer.t() | nil, | |
| last_consumed_buffer :: Buffer.t() | nil | |
| ) :: {[%Buffer{}], [%Buffer{}]} |
| [first | rest] = buffers | ||
|
|
||
| _last = | ||
| Enum.reduce(rest, first, fn |
There was a problem hiding this comment.
Iterating over all buffers multiple times may introduce some overhead. We do it here and in assert_non_nil_timestamps and split_timestamp_buffers. I'd do that once if possible.
There was a problem hiding this comment.
Is it a copy of Membrane.Core.Element.ManualFlowController.BufferMetric?
Closes #1072
Closes #1081