Goal
- GenStageのExamplesを試してみる
Dev-Environment
OS: Windows8.1
Erlang: Eshell V8.0, OTP-Version 18.0
Elixir: v1.3.0
Erlang: Eshell V8.0, OTP-Version 18.0
Elixir: v1.3.0
Context
Caution:
2016/07/15に本記事を作成しました。
このときのGenStageのバージョンはv0.3です。
また、英語力不足や誤解している部分があると思います。
その点、ご了承の上で本記事をお読みください。
What is GenStage?
Quote:
GenStage is a specification for exchanging events between producers and consumers.
GenStageはプロデューサとコンシューマの間でイベントを交換する仕様とのこと…よく分からんorz
現在、提供されている機能について
- Experimental.GenStage
プロデューサとコンシューマのステージを実現するためのビヘイビア。
(普通にGenStageのビヘイビアです)
(普通にGenStageのビヘイビアです)
- Experimental.DynamicSupervisor
このモジュールは、(普通の)スーパバイザのsimple_one_for_one戦略の代替品として使える。
それに加えて、ステージパイプラインでイベント毎に新しいプロセスを起動するステージコンシューマを簡単に作って、使うことができる。
それに加えて、ステージパイプラインでイベント毎に新しいプロセスを起動するステージコンシューマを簡単に作って、使うことができる。
さらっと説明を見るだけでは分かりませんでした。
ちなみに、GenStageは、将来Elixirのリリースに含まれる予定みたいです。
だから、Experimental(実験)が付いているみたいです。
(現状はエイリアスしてね的なことが書いてありました)
ちなみに、GenStageは、将来Elixirのリリースに含まれる予定みたいです。
だから、Experimental(実験)が付いているみたいです。
(現状はエイリアスしてね的なことが書いてありました)
ようは、下記のような記述を追加しておけばいいみたいです。
Example:
alias Experimental.{DynamicSupervisor, GenStage}
何はともあれ、とりあえず使ってみましょう。
(弱小プログラマだから説明だけじゃ分からないだけ・・・)
(弱小プログラマだから説明だけじゃ分からないだけ・・・)
Install
前提として、GenStageを使いたいのであれば、Elixir v1.3以上が必要です。
mix.exsファイルへ下記のように:gen_stage追加してください。
mix.exsファイルへ下記のように:gen_stage追加してください。
File: mix.exs
defmodule GenStageExample.Mixfile do
...
def application do
[applications: [:logger,
:gen_stage]] # Add it
end
defp deps do
[{:gen_stage, "~> 0.1"}] # Add it
end
end
パッケージとして取得します。
Example:
> mix deps.get
Running dependency resolution
Dependency resolution completed
gen_stage: 0.3.0
* Getting gen_stage (Hex package)
Checking package (https://repo.hex.pm/tarballs/gen_stage-0.3.0.tar)
Fetched package
リリースに入ればこの作業はいらなくなるでしょう。
とりあえず、使う準備は完了です。
とりあえず、使う準備は完了です。
Using
Exampleがあるみたいです。elixir-lang/gen_stageに全部で4つのファイルが置いてありますね。
- dynamic_supervisor.exs
- gen_event.exs
- producer_consumer.exs
- stream_stage.exs
これを参考に試してみましょう。
- stream_stage
まずは一番簡単そうな、stream_stage.exsのような内容をiexから実行してみます。
Example:
> iex -S mix
## エイリアスの定義
iex> alias Experimental.GenStage
## 実行例1
iex> {:ok, stage} = File.read!("example.txt") |> String.split("\r\n") |> Stream.map(&{String.length(&1), &1}) |> GenStage.from_enumerable(consumers: :permanent)
{:ok, #PID<0.295.0>}
iex> GenStage.stream([stage]) |> Enum.sort |> Enum.take(2) |> IO.inspect
[{3, "aaa"}, {3, "bbb"}]
## 実行例2
iex> {:ok, stage} = File.read!("example.txt") |> String.split("\r\n") |> Stream.map(&{String.length(&1), &1}) |> GenStage.from_enumerable(consumers: :permanent)
iex> GenStage.stream([stage]) |> Enum.sort(&(&1 > &2)) |> Enum.take(2) |> IO.inspect
[{8, "hugehuge"}, {8, "hogehoge"}]
## 二度目は実行できない
iex> GenStage.stream([stage]) |> Enum.sort(&(&1 > &2)) |> Enum.take(2) |> IO.inspect
** (exit) exited in: Experimental.GenStage.close_stream(%{})
** (EXIT) no process
(gen_stage) lib/gen_stage.ex:1077: Experimental.GenStage.close_stream/1
(elixir) lib/stream.ex:1129: Stream.do_resource/5
(elixir) lib/enum.ex:1635: Enum.reduce/3
(elixir) lib/enum.ex:1968: Enum.sort/2
ファイル読込みで使っているファイルの内容は下記になります。
(特筆すべきことは書いていません)
(特筆すべきことは書いていません)
File: example.txt
aaa
bbb
ccc
ddd
hogehoge
hugehuge
forbar
- producer_consumer
続いて、[A]->[B]->[C]とステージで流れを持つイベントとしてパイプラインに設定してみます。
アプリケーションとしての開始地点。
lib/producer_consumer/app.ex
defmodule ProducerConsumer.App do
def start do
alias Experimental.{GenStage}
alias ProducerConsumer.{StageA, StageB, StageC}
{:ok, a} = GenStage.start_link(StageA, 0)
{:ok, b} = GenStage.start_link(StageB, 2)
{:ok, c} = GenStage.start_link(StageC, :ok)
GenStage.sync_subscribe(b, to: a)
GenStage.sync_subscribe(c, to: b)
Process.sleep(:infinity)
end
end
プロデューサとしてのStageAモジュール。
lib/producer_consumer/stage_a.ex
alias Experimental.{GenStage}
defmodule ProducerConsumer.StageA do
use GenStage
def init(counter) do
{:producer, counter}
end
def handle_demand(demand, counter) when demand > 0 do
IO.inspect {__MODULE__, demand, counter}
events = Enum.to_list(counter..counter+demand-1)
{:noreply, events, counter + demand}
end
end
プロデューサとコンシューマとしてのStageBモジュール。
lib/producer_consumer/stage_b.ex
alias Experimental.{GenStage}
defmodule ProducerConsumer.StageB do
use GenStage
def init(number) do
{:producer_consumer, number}
end
def handle_events(events, _from, number) do
IO.inspect {__MODULE__, events, number}
events = Enum.map(events, fn(event) -> event * number end)
{:noreply, events, number}
end
end
コンシューマとしてのStageCモジュール。
lib/producer_consumer/stage_c.ex
alias Experimental.{GenStage}
defmodule ProducerConsumer.StageC do
use GenStage
def init(:ok) do
{:consumer, :the_state_does_not_matter}
end
def handle_events(events, _from, state) do
:timer.sleep(1000)
IO.inspect {__MODULE__, events, state}
{:noreply, [], state}
end
end
実行してみると出力結果は下記のようになりました。
Example:
iex> ProducerConsumer.App.start
{ProducerConsumer.StageA, 1000, 0}
{ProducerConsumer.StageB,
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21,
22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40,
41, 42, 43, 44, 45, 46, 47, ...], 2}
{ProducerConsumer.StageA, 500, 1000}
{ProducerConsumer.StageB,
[500, 501, 502, 503, 504, 505, 506, 507, 508, 509, 510, 511, 512, 513, 514,
515, 516, 517, 518, 519, 520, 521, 522, 523, 524, 525, 526, 527, 528, 529,
530, 531, 532, 533, 534, 535, 536, 537, 538, 539, 540, 541, 542, 543, 544,
545, 546, 547, ...], 2}
{ProducerConsumer.StageA, 500, 1500}
{ProducerConsumer.StageC,
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40
42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78,
80, 82, 84, 86, 88, 90, 92, 94, ...], :the_state_does_not_matter}
{ProducerConsumer.StageB,
[1000, 1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008, 1009, 1010, 1011, 1012,
1013, 1014, 1015, 1016, 1017, 1018, 1019, 1020, 1021, 1022, 1023, 1024, 1025,
1026, 1027, 1028, 1029, 1030, 1031, 1032, 1033, 1034, 1035, 1036, 1037, 1038,
1039, 1040, 1041, 1042, 1043, 1044, 1045, 1046, 1047, ...], 2}
{ProducerConsumer.StageA, 500, 2000}
{ProducerConsumer.StageC,
[1000, 1002, 1004, 1006, 1008, 1010, 1012, 1014, 1016, 1018, 1020, 1022, 1024,
1026, 1028, 1030, 1032, 1034, 1036, 1038, 1040, 1042, 1044, 1046, 1048, 1050,
1052, 1054, 1056, 1058, 1060, 1062, 1064, 1066, 1068, 1070, 1072, 1074, 1076,
1078, 1080, 1082, 1084, 1086, 1088, 1090, 1092, 1094, ...],
:the_state_does_not_matter}
{ProducerConsumer.StageB,
[1500, 1501, 1502, 1503, 1504, 1505, 1506, 1507, 1508, 1509, 1510, 1511, 1512,
1513, 1514, 1515, 1516, 1517, 1518, 1519, 1520, 1521, 1522, 1523, 1524, 1525,
1526, 1527, 1528, 1529, 1530, 1531, 1532, 1533, 1534, 1535, 1536, 1537, 1538,
1539, 1540, 1541, 1542, 1543, 1544, 1545, 1546, 1547, ...], 2}
{ProducerConsumer.StageA, 500, 2500}
{ProducerConsumer.StageC,
[2000, 2002, 2004, 2006, 2008, 2010, 2012, 2014, 2016, 2018, 2020, 2022, 2024,
2026, 2028, 2030, 2032, 2034, 2036, 2038, 2040, 2042, 2044, 2046, 2048, 2050,
2052, 2054, 2056, 2058, 2060, 2062, 2064, 2066, 2068, 2070, 2072, 2074, 2076,
2078, 2080, 2082, 2084, 2086, 2088, 2090, 2092, 2094, ...],
:the_state_does_not_matter}
{ProducerConsumer.StageB,
[2000, 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012,
2013, 2014, 2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022, 2023, 2024, 2025,
2026, 2027, 2028, 2029, 2030, 2031, 2032, 2033, 2034, 2035, 2036, 2037, 2038,
2039, 2040, 2041, 2042, 2043, 2044, 2045, 2046, 2047, ...], 2}
{ProducerConsumer.StageA, 500, 3000}
...長いので省略
Note:
StageAモジュールに仕込んでいるIO.inspectの値を見てみます。
start_linkで与えているcounterの値は0です。これはそのまま出ています。
最初の出力されているdemandの値は1000です。これどこからきたのでしょう・・・
def handle_demand(demand, counter) when demand > 0 do
IO.inspect {__MODULE__, demand, counter}
events = Enum.to_list(counter..counter+demand-1)
{:noreply, events, counter + demand}
end
Note:
:producer、:consumer、:producer_consumerのどれでもinit/1の実装は必須
:producerタイプを指定した場合、handle_demand/2を実装する必要あり
:consumer、:producer_consumerタイプを指定した場合、handle_events/3を実装する必要あり
流れるように連続でイベントを発生させることができるってことか???
- dynamic_supervisor
DynamicSupervisorを使った例。
(カウンタとして動作するプロデューサにコンシューマとして一つ以上のDynamicSupervisorを使用する例)
(カウンタとして動作するプロデューサにコンシューマとして一つ以上のDynamicSupervisorを使用する例)
lib/my_dynamic_supervisor/app.ex
defmodule MyDynamicSupervisor.App do
def start do
import Supervisor.Spec
alias MyDynamicSupervisor.{Counter, Consumer}
children = [
worker(Counter, [0]),
worker(Consumer, [], id: 1)
]
Supervisor.start_link(children, strategy: :one_for_one)
end
def exec do
start
Process.sleep(:infinity)
end
end
lib/my_dynamic_supervisor/counter.ex
alias Experimental.{GenStage}
defmodule MyDynamicSupervisor.Counter do
use GenStage
def start_link(initial) do
GenStage.start_link(__MODULE__, initial, name: __MODULE__)
end
def init(initial) do
{:producer, initial}
end
def handle_demand(demand, counter) when demand > 0 do
IO.inspect {__MODULE__, demand, counter}
events = Enum.to_list(counter..counter+demand-1)
{:noreply, events, counter + demand}
end
end
lib/my_dynamic_supervisor/consumer.ex
alias Experimental.{GenStage, DynamicSupervisor}
defmodule MyDynamicSupervisor.Consumer do
use DynamicSupervisor
def start_link do
DynamicSupervisor.start_link(__MODULE__, :ok)
end
def init(:ok) do
alias MyDynamicSupervisor.{Printer, Counter}
children = [
worker(Printer, [], restart: :temporary)
]
{:ok, children, strategy: :one_for_one, subscribe_to: [Counter]}
end
end
lib/my_dynamic_supervisor/printer.ex
defmodule MyDynamicSupervisor.Printer do
def start_link(event) do
:timer.sleep(1000)
Task.start_link(fn ->
IO.inspect {self(), event}
end)
end
end
Example:
iex> MyDynamicSupervisor.App.exec
{#PID<0.154.0>, 0}
{#PID<0.155.0>, 1}
{#PID<0.156.0>, 2}
{#PID<0.157.0>, 3}
{#PID<0.158.0>, 4}
{#PID<0.159.0>, 5}
{#PID<0.160.0>, 6}
...
- gen_event
GenEventを置き換えた例。
(GenStageを使用する方法では、並行性を活用し、バッファサイズとバックプレッシャーに対してより多くの柔軟性が提供されている)
(GenStageを使用する方法では、並行性を活用し、バッファサイズとバックプレッシャーに対してより多くの柔軟性が提供されている)
File: lib/replacement_gen_event/app.ex
defmodule ReplacementGenEvent.App do
alias ReplacementGenEvent.{Broadcaster, Consumer}
def start do
import Supervisor.Spec
children = [
worker(Broadcaster, []),
worker(Consumer, [], id: 1),
worker(Consumer, [], id: 2),
worker(Consumer, [], id: 3),
worker(Consumer, [], id: 4),
]
Supervisor.start_link(children, strategy: :one_for_one)
end
def exec do
start
Broadcaster.sync_notify(1)
Broadcaster.sync_notify(2)
Broadcaster.sync_notify(3)
Broadcaster.sync_notify(4)
Broadcaster.sync_notify(5)
Process.sleep(2000)
end
end
File: lib/replacement_gen_event/broadcaster.ex
alias Experimental.{GenStage}
defmodule ReplacementGenEvent.Broadcaster do
use GenStage
def start_link() do
GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
end
def sync_notify(event, timeout \\ 5000) do
GenStage.call(__MODULE__, {:notify, event}, timeout)
end
def init(:ok) do
{:producer, {:queue.new, 0}, dispatcher: GenStage.BroadcastDispatcher}
end
def handle_call({:notify, event}, from, {queue, demand}) do
dispatch_events(:queue.in({from, event}, queue), demand, [])
end
def handle_demand(incoming_demand, {queue, demand}) do
dispatch_events(queue, incoming_demand+demand, [])
end
defp dispatch_events(queue, demand, events) do
with d when d > 0 <- demand,
{{:value, {from, event}}, queue} <- :queue.out(queue) do
GenStage.reply(from, :ok)
dispatch_events(queue, demand-1, [event | events])
else
_ -> {:noreply, Enum.reverse(events), {queue, demand}}
end
end
end
File: lib/replacement_gen_event/consumer.ex
alias Experimental.{GenStage}
defmodule ReplacementGenEvent.Consumer do
use GenStage
def start_link do
GenStage.start_link(__MODULE__, :ok)
end
def init(:ok) do
alias ReplacementGenEvent.Broadcaster
{:consumer, :ok, subscribe_to: [Broadcaster]}
end
def handle_events(events, _from, state) do
for event <- events do
IO.inspect {self(), event}
end
{:noreply, [], state}
end
end
Example:
iex> ReplacementGenEvent.App.exec
{#PID<0.154.0>, 1}
{#PID<0.155.0>, 1}
{#PID<0.153.0>, 1}
{#PID<0.156.0>, 1}
{#PID<0.154.0>, 2}
{#PID<0.155.0>, 2}
{#PID<0.153.0>, 2}
{#PID<0.156.0>, 2}
{#PID<0.154.0>, 3}
{#PID<0.155.0>, 3}
{#PID<0.153.0>, 3}
{#PID<0.156.0>, 3}
{#PID<0.154.0>, 4}
{#PID<0.155.0>, 4}
{#PID<0.153.0>, 4}
{#PID<0.156.0>, 4}
{#PID<0.154.0>, 5}
{#PID<0.155.0>, 5}
{#PID<0.153.0>, 5}
{#PID<0.156.0>, 5}
:ok
とりあえず一旦ここまでです。
一通り試してみて分かった。
全然、理解できてないと言うことが・・・
ドキュメントの方をもう少し詳しく読むことと、ソースコードの方も読むしかないかな。
全然、理解できてないと言うことが・・・
ドキュメントの方をもう少し詳しく読むことと、ソースコードの方も読むしかないかな。