diff --git a/README.md b/README.md index 7836af7..50681c7 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ The package can be installed by adding `membrane_transcoder_plugin` to your list ```elixir def deps do [ - {:membrane_transcoder_plugin, "~> 0.2.0"} + {:membrane_transcoder_plugin, "~> 0.2.1"} ] end ``` diff --git a/lib/transcoder.ex b/lib/transcoder.ex index 8045a7f..3a6886f 100644 --- a/lib/transcoder.ex +++ b/lib/transcoder.ex @@ -22,7 +22,7 @@ defmodule Membrane.Transcoder do require __MODULE__.Video require Membrane.Logger - alias __MODULE__.{Audio, ForwardingFilter, Video} + alias __MODULE__.{Audio, Video} alias Membrane.{AAC, Funnel, H264, H265, Opus, RawAudio, RawVideo, RemoteStream, VP8} @typedoc """ @@ -86,7 +86,7 @@ defmodule Membrane.Transcoder do def handle_init(_ctx, opts) do spec = [ bin_input() - |> child(:forwarding_filter, ForwardingFilter), + |> child(:connector, %Membrane.Connector{notify_on_stream_format?: true}), child(:output_funnel, Funnel) |> bin_output() ] @@ -102,12 +102,8 @@ defmodule Membrane.Transcoder do end @impl true - def handle_child_notification( - {:stream_format, format}, - :forwarding_filter, - _ctx, - %{input_stream_format: nil} = state - ) do + def handle_child_notification({:stream_format, _pad, format}, :connector, _ctx, state) + when state.input_stream_format == nil do state = %{state | input_stream_format: format} |> resolve_output_stream_format() @@ -118,7 +114,7 @@ defmodule Membrane.Transcoder do end spec = - get_child(:forwarding_filter) + get_child(:connector) |> plug_transcoding( format, state.output_stream_format, @@ -130,16 +126,14 @@ defmodule Membrane.Transcoder do end @impl true - def handle_child_notification( - {:stream_format, new_format}, - :forwarding_filter, - _ctx, - %{input_stream_format: non_nil_stream_format} = state - ) do - if new_format != non_nil_stream_format do + def handle_child_notification({:stream_format, _pad, new_format}, :connector, _ctx, state) do + %new_stream_format_module{} = new_format + %old_stream_format_module{} = state.input_stream_format + + if new_stream_format_module != old_stream_format_module do raise """ Received new stream format on transcoder's input: #{inspect(new_format)} - which doesn't match the first received input stream format: #{inspect(non_nil_stream_format)} + which doesn't match the first received input stream format: #{inspect(state.input_stream_format)} Transcoder doesn't support updating the input stream format. """ end diff --git a/lib/transcoder/forwarding_filter.ex b/lib/transcoder/forwarding_filter.ex deleted file mode 100644 index 97c6bca..0000000 --- a/lib/transcoder/forwarding_filter.ex +++ /dev/null @@ -1,117 +0,0 @@ -defmodule Membrane.Transcoder.ForwardingFilter do - @moduledoc false - use Membrane.Filter - - alias Membrane.TimestampQueue - - def_input_pad :input, - accepted_format: _any, - availability: :on_request - - def_output_pad :output, - accepted_format: _any, - availability: :on_request - - defguardp is_input_linked(state) when state.input_pad_ref != nil - defguardp is_output_linked(state) when state.output_pad_ref != nil - - @impl true - def handle_init(_ctx, _opts) do - state = %{queue: TimestampQueue.new(), output_pad_ref: nil, input_pad_ref: nil} - {[], state} - end - - @impl true - def handle_playing(ctx, state), do: maybe_flush_queue(ctx, state) - - @impl true - def handle_pad_added(Pad.ref(direction, _id) = pad_ref, ctx, state) do - same_direction_pads_number = - ctx.pads - |> Enum.count(fn {_pad_ref, pad_data} -> pad_data.direction == direction end) - - if same_direction_pads_number > 1 do - raise """ - #{inspect(__MODULE__)} can have only one #{inspect(direction)} pad, but it has \ - #{same_direction_pads_number} - """ - end - - state = - case direction do - :input -> %{state | input_pad_ref: pad_ref} - :output -> %{state | output_pad_ref: pad_ref} - end - - maybe_flush_queue(ctx, state) - end - - @impl true - def handle_stream_format(_input_pad_ref, stream_format, _ctx, state) - when is_output_linked(state) do - {[ - stream_format: {state.output_pad_ref, stream_format}, - notify_parent: {:stream_format, stream_format} - ], state} - end - - @impl true - def handle_stream_format(input_pad_ref, stream_format, _ctx, state) do - queue = TimestampQueue.push_stream_format(state.queue, input_pad_ref, stream_format) - {[notify_parent: {:stream_format, stream_format}], %{state | queue: queue}} - end - - @impl true - def handle_buffer(_input_pad_ref, buffer, _ctx, state) when is_output_linked(state) do - {[buffer: {state.output_pad_ref, buffer}], state} - end - - @impl true - def handle_buffer(input_pad_ref, buffer, _ctx, state) do - {_suggested_actions, queue} = TimestampQueue.push_buffer(state.queue, input_pad_ref, buffer) - {[], %{state | queue: queue}} - end - - @impl true - def handle_event(Pad.ref(:input, _id), event, _ctx, state) when is_output_linked(state) do - {[forward: event], state} - end - - @impl true - def handle_event(Pad.ref(:output, _id), event, _ctx, state) when is_input_linked(state) do - {[forward: event], state} - end - - @impl true - def handle_event(pad_ref, event, _ctx, state) do - queue = TimestampQueue.push_event(state.queue, pad_ref, event) - {[], %{state | queue: queue}} - end - - @impl true - def handle_end_of_stream(_input_pad_ref, _ctx, state) when is_output_linked(state) do - {[end_of_stream: state.output_pad_ref], state} - end - - @impl true - def handle_end_of_stream(input_pad_ref, _ctx, state) do - queue = TimestampQueue.push_end_of_stream(state.queue, input_pad_ref) - {[], %{state | queue: queue}} - end - - defp maybe_flush_queue(ctx, state) - when ctx.playback == :playing and is_input_linked(state) and is_output_linked(state) do - {_suggested_actions, items, queue} = TimestampQueue.flush_and_close(state.queue) - - actions = - Enum.map(items, fn - {Pad.ref(:input, _id), {item_type, item}} -> {item_type, {state.output_pad_ref, item}} - {Pad.ref(:input, _id), :end_of_stream} -> {:end_of_stream, state.output_pad_ref} - {Pad.ref(:output, _id), {:event, item}} -> {:event, {state.input_pad_ref, item}} - end) - - {actions, %{state | queue: queue}} - end - - defp maybe_flush_queue(_ctx, state), do: {[], state} -end diff --git a/mix.exs b/mix.exs index 1177233..956221f 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule Membrane.Transcoder.Plugin.Mixfile do use Mix.Project - @version "0.2.0" + @version "0.2.1" @github_url "https://github.com/membraneframework/membrane_transcoder_plugin" def project do @@ -37,7 +37,7 @@ defmodule Membrane.Transcoder.Plugin.Mixfile do defp deps do [ - {:membrane_core, "~> 1.1"}, + {:membrane_core, "~> 1.2 and >= 1.2.1"}, {:membrane_opus_plugin, "~> 0.20.3"}, {:membrane_aac_plugin, "~> 0.19.0"}, {:membrane_aac_fdk_plugin, "~> 0.18.0"}, diff --git a/mix.lock b/mix.lock index e894c13..d2424a9 100644 --- a/mix.lock +++ b/mix.lock @@ -24,7 +24,7 @@ "membrane_aac_format": {:hex, :membrane_aac_format, "0.8.0", "515631eabd6e584e0e9af2cea80471fee6246484dbbefc4726c1d93ece8e0838", [:mix], [{:bimap, "~> 1.1", [hex: :bimap, repo: "hexpm", optional: false]}], "hexpm", "a30176a94491033ed32be45e51d509fc70a5ee6e751f12fd6c0d60bd637013f6"}, "membrane_aac_plugin": {:hex, :membrane_aac_plugin, "0.19.0", "58a15efaaa4f2cc91b968464cfd269244e035efdd983aac2e3ddeb176fcf0585", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.8.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "eb7e786e650608ee205f4eebff4c1df3677e545acf09802458f77f64f9942fe9"}, "membrane_common_c": {:hex, :membrane_common_c, "0.16.0", "caf3f29d2f5a1d32d8c2c122866110775866db2726e4272be58e66dfdf4bce40", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:shmex, "~> 0.5.0", [hex: :shmex, repo: "hexpm", optional: false]}, {:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "a3c7e91de1ce1f8b23b9823188a5d13654d317235ea0ca781c05353ed3be9b1c"}, - "membrane_core": {:hex, :membrane_core, "1.2.0", "c74ef043791e11d149a01e344d9973de34d6dd2e3b8a6bdc79ff5b183a74b243", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.3", [hex: :qex, repo: "hexpm", optional: false]}, {:ratio, "~> 3.0 or ~> 4.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "0133e3478af608b1749729d82bc747a0357e207f5819a6bd4d31614ce7d0a463"}, + "membrane_core": {:hex, :membrane_core, "1.2.1", "96cd9c8a255585c0ed2906d093a6c6e5e4198f1b23cfe5e62a613673fcb1da30", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.3", [hex: :qex, repo: "hexpm", optional: false]}, {:ratio, "~> 3.0 or ~> 4.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "becc0e6fabe8732f360c91607b708f196dda1ab5ab1f15fc4a979909cc76d49d"}, "membrane_ffmpeg_swresample_plugin": {:hex, :membrane_ffmpeg_swresample_plugin, "0.20.2", "2e669f0b25418d10b51a73bc52d2e12e4a3a26b416c5c1199d852c3f781a18b3", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.2", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_common_c, "~> 0.16.0", [hex: :membrane_common_c, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:membrane_raw_audio_format, "~> 0.12.0", [hex: :membrane_raw_audio_format, repo: "hexpm", optional: false]}, {:mockery, "~> 2.1", [hex: :mockery, repo: "hexpm", optional: false]}, {:unifex, "~> 1.1", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "6c8d3bcd61d568dd94cabb9b45f29e8926e0076e4432d8f419378e004e02147c"}, "membrane_file_plugin": {:hex, :membrane_file_plugin, "0.17.2", "650e134c2345d946f930082fac8bac9f5aba785a7817d38a9a9da41ffc56fa92", [:mix], [{:logger_backends, "~> 1.0", [hex: :logger_backends, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "df50c6040004cd7b901cf057bd7e99c875bbbd6ae574efc93b2c753c96f43b9d"}, "membrane_funnel_plugin": {:hex, :membrane_funnel_plugin, "0.9.1", "9e108f4ef9d905ebff2da3ba5e58a5b756b58812f4fa68bd576add68fda310a0", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "39fdef1bf29eac949f65a37ea941f997c22ed042c55af044d27a781b63e82f6b"}, diff --git a/test/support/preprocessors.ex b/test/support/preprocessors.ex index dc416ab..42b267d 100644 --- a/test/support/preprocessors.ex +++ b/test/support/preprocessors.ex @@ -20,7 +20,8 @@ defmodule Membrane.Transcoder.Support.Preprocessors do @spec parse_h264(Membrane.ChildrenSpec.builder()) :: Membrane.ChildrenSpec.builder() def parse_h264(link_builder) do - child(link_builder, %Membrane.H264.Parser{ + link_builder + |> child(%Membrane.H264.Parser{ output_alignment: :au, output_stream_structure: :annexb, generate_best_effort_timestamps: %{framerate: {30, 1}}