スポンサーリンク

2016年2月23日

Using OTP with Elixir (Processes)

とある錬金術師の万能薬(Elixir)

Wait a minute

プログラミングErlangのOTP入門をElixirで行っていく内容です。
と言っても、いきなりGenServerやGenEventなどは使えないので、
まずはプロセスを使ってOTP的な設計から学んでいく。
ElixirからOTPってどうやって使うんだ?とか、そもそもOTPって何?って思う人のための記事…(主に私です!!)

Goal

  • プログラミングErlangのOTP入門をElixirで実施し、ブログへアップする(時間制限なし)
  • ElixirからOTPを使うというのは、どのようにすればいいのかサンプルとなるものを作成する
  • とにかく色々作る(だいぶファジー)

Outcome

2016/02/23 18:25 “1st Step”の追加
2016/02/23 20:07 “2nd Step”の追加
2016/02/24 23:54 “3rd Step”の追加
2016/02/25 22:55 “4th Step”の追加
2016/02/26 23:13 “Extra Step”の追加

Analyze

ErlangをElixirに変換する時、Erlangの知識が必要になってくる。
最初にErlang側のコードを調査し、次にElixirを調査すると言った流れになる。
Erlang側の知識がないと、どうしても調査に時間が掛かり数十行のコードを変換するのにもかなり時間がかかる。
おそらく慣れと知識の積み重ねがあれば、あまり意識せず変換していけるようになる。
つまり、Erlang学べ…

Learn

Erlang側の幾つかの文法を習得した。
Elixirに変換するのは意外と苦労することを知った。
Erlangを学ぶことが必須であることを自覚した。

Content

Dev-Environment

  • OS: Windows8.1
  • Erlang: Eshell V7.2.1, OTP-Version 18.1
  • Elixir: v1.2.0

Index

Using OTP with Elixir
|> Prepare
|> 1st Step
|> 2nd Step
|> 3rd Step
|> 4th Step
|> Extra Step

Prepare

プロジェクトの準備を行います。
> cd プロジェクト作成ディレクトリ

> mix new elixir_otp_basic
...

> cd elixir_otp_basic

> mix test
...
本記事内においてプロジェクトと言った場合、”elixir_otp_basic”を指す。
作成するサーバは全部で5つになる…はず
また、進め方としては以下のように進めていきます。
  1. 小さなクライアントサーバプログラムをElixirで書く
  2. 1のプログラムをだんだんと汎用化させながら機能を追加していく
  3. gen_serverやgen_eventなどのOTPを使ったコードに移行する
ホットコードスワッピングできるかな…できるよな…できるはず!
…作っていけば分かるでしょう。

First Step

始めの一歩!
いきなりgen_serverとかやってもよく分からんので、
まず簡単なサンプルをプロセス(spawn)使って作成します。
第1サーバを作成する。

File: lib/server1.ex

defmodule Server1 do

  def start(name, mod) do
    Process.register(spawn(fn -> loop(name, mod, mod.init) end), name)
  end

  def rpc(name, request) do
    send(name, {self(), request})
    receive do
      {_name, response} -> response
    end
  end

  defp loop(name, mod, state) do
    receive do
      {from, request} ->
        {response, state1} = mod.handle(request, state)
        send(from, {name, response})
        loop(name, mod, state1)
    end
  end
end
クライアント側を作成する。

File: lib/name_server.ex

defmodule NameServer do
  import Server1, only: [rpc: 2]

  ## Client routine
  def add(name, place) do
    rpc(:name_server, {:add, name, place})
  end

  def whereis(name) do
    rpc(:name_server, {:whereis, name})
  end

  ## Callback routine
  def init do
    Map.new
  end

  def handle({:add, name, place}, map) do
    {:ok, Map.put(map, name, place)}
  end

  def handle({:whereis, name}, map) do
    {Map.get(map, name), map}
  end
end
私は、並行処理についてさっぱり知らないが、
これで並行処理モデルのクライアントサーバモデルになっている(とのこと)。
またコールバックには、並行処理やプロセス処理、メッセージに関してのコードが何もない。
設計思想(?)的なことがライブラリにてできるようになっているようですね。
作り方がライブラリの方で制限されているとも言えるかもしれない。
でも、フレームワークってそういうものですよね。

Example:

iex> Server1.start(:name_server, NameServer)
true
iex> NameServer.add(:darui, "at home")
:ok
iex> NameServer.whereis(:darui)
"at home"
本には、これが一番基本となるパターンとある。
後は、これの応用・発展ですが、まぁ数こなすしかないでしょう。

Second Step

第2サーバを作成します。(Server1のトランザクション対応バージョン)
次に作るのは、サーバへの問い合わせで例外が起こった場合にクライアントをクラッシュさせるサーバになります。
出ましたねクラッシュ…クラッシュさせて再起動ができるって聞いたことがあります…(知ったかぶり)

lib/server2.ex

defmodule Server2 do
  def start(name, mod) do
    Process.register(spawn(fn -> loop(name, mod, mod.init) end), name)
  end

  def rpc(name, request) do
    send(name, {self(), request})
    receive do
      {_name, _crash} -> exit(:rpc)
      {_name, :ok, response} -> response
    end
  end

  defp loop(name, mod, old_state) do
    receive do
      {from, request} ->
        try do
          {response, new_state} = mod.handle(request, old_state)
          send(from, {name, :ok, response})
          loop(name, mod, new_state)
        catch
          _, why ->
            log_the_error(name, request, why)
            send(from, {name, :crash})
            loop(name, mod, old_state)
        end
    end
  end

  defp log_the_error(name, request, why) do
    IO.inspect("---- Error log ----")
    IO.inspect({name, request, why})
  end
end
log_the_error/3での出力ですが、
Erlangのioformatで出力するのと同等にするのが面倒だったので手を抜きました。
もっと詳しく出力が必要なら適宜に変更してください。
Server2を実行するためにちょっとNameServerを修正します。
importをServer1からServer2に変更してください。

lib/name_server.ex

defmodule NameServer do
  import Server2, only: [rpc: 2]

  ...
end
動作に代わり映えはありません。
Server1と同じ結果が返ってくれば問題なし。

Example:

iex> Server2.start(:name_server, NameServer)
true
iex> NameServer.add(:darui, "at home")
:ok
iex> NameServer.whereis(:darui)
"at home"
さて、せっかく例外に対応しクラッシュさせられるのだから…クラッシュさせてみたいですよね?
ちょっと強引な方法ではあるが、無理やりraiseで例外を発生させる。
defmodule Server2 do
  ...

  defp loop(name, mod, old_state) do
    receive do
      {from, request} ->
        try do
          raise "Oh no!!"
          {response, new_state} = mod.handle(request, old_state)
          send(from, {name, :ok, response})
          loop(name, mod, new_state)
        catch
          ...
        end
    end
  end

  ...
end
実行するとこんな感じになる。

Example:

iex> Server2.start(:name_server, NameServer)
true
iex> NameServer.add(:darui, "at home")
"---- Error log ----"
{:name_server, {:add, :darui, "at home"}, %RuntimeError{message: "Oh no!!"}}
** (exit) :rpc
    (elixir_otp_basic) lib/server2.ex:9: Server2.rpc/2
iex>
ちゃんとクラッシュしてますね!
ちゃんとクラッシュって何かおかしい気がするけど(笑)
ちなみにServer1にraiseで例外を発生させると処理が戻ってこなくなる。

Example:

iex> Server1.start(:name_server, NameServer)
true
iex> NameServer.add(:darui, "at home")

20:01:54.394 [error] Process #PID<0.112.0> raised an exception
** (RuntimeError) Oh no!!
    (elixir_otp_basic) lib/server1.ex:16: Server1.loop/3

## ここで処理が止まる
さて、クライアント側のコードを一切変更せず動作が変わりました。(インポートは変更しましたが…)
これが本質的ではない部分の変更ってことね。
ElixirやErlangではこうやって汎用化していくんですね。
(この感覚を言語化すると…かつて闇の集団(C++)に属していたときにオブジェクト指向の抽象化が少し分かったときと似ている)

3rd Step

第3歩目では、iexやプロセスを動かしたままモジュールの付け替えを行えるように、
ホットコードスワッピングに対応をしてみます。
(つまり動的なモジュールのアップグレードを行う)

File: lib/server3.ex

defmodule Server3 do
  def start(name, mod) do
    Process.register(spawn(fn -> loop(name, mod, mod.init) end), name)
  end

  def swap_code(name, mod) do
    rpc(name, {:swap_code, mod})
  end

  def rpc(name, request) do
    send(name, {self(), request})
    receive do
      {_name, response} -> response
    end
  end

  defp loop(name, mod, old_state) do
    receive do
      {from, {:swap_code, new_call_back_mod}} ->
        send(from, {name, :ack})
        loop(name, new_call_back_mod, old_state)
      {from, request} ->
        {response, new_state} = mod.handle(request, old_state)
        send(from, {name, response})
        loop(name, mod, new_state)
    end
  end
end

File: lib/name_server1.ex

defmodule NameServer1 do
  import Server3, only: [rpc: 2]

  ## Client routine
  def add(name, place) do
    rpc(:name_server, {:add, name, place})
  end

  def whereis(name) do
    rpc(:name_server, {:whereis, name})
  end

  ## Callback routine
  def init do
    Map.new
  end

  def handle({:add, name, place}, map) do
    {:ok, Map.put(map, name, place)}
  end

  def handle({:whereis, name}, map) do
    {Map.get(map, name), map}
  end
end
NameServerと内容が同じじゃないかって?
実際にモジュール名が違うだけで内容は同じ。
理由がある。
サーバの名前がモジュールに固定してコンパイルされるため。
(ってことは、プログラムの設計をするときにここら辺も考えないといけないってことだな)

Example:

iex(1)> Server3.start(:name_server, NameServer1)
true
iex(2)> NameServer1.add(:darui, "at home")
:ok
iex(3)> NameServer1.add(:hoge, "at hoge")
:ok

## iexは終了させない
エディタを立ち上げ、新しく関数を追加したコールバックモジュールを作成する。

File: lib/new_name_server.ex

defmodule NewNameServer do
  import Server3, only: [rpc: 2]

  ## Interface
  def all_names do
    rpc(:name_server, :all_names)
  end

  def add(name, place) do
    rpc(:name_server, {:add, name, place})
  end

  def delete(name) do
    rpc(:name_server, {:delete, name})
  end

  def whereis(name) do
    rpc(:name_server, {:whereis, name})
  end

  ## Callback routine
  def init, do: Map.new  def handle({:add, name, place}, map) do
    {:ok, Map.put(map, name, place)}
  end

  def handle(:all_names, map) do
    {Map.keys(map), map}
  end

  def handle({:delete, name}, map) do
    {:ok, Map.delete(map, name)}
  end

  def handle({:whereis, name}, map) do
    {Map.get(map, name), map}
  end
end

Example:

## 先ほど、実行していたiexで続きを行う

iex(4)> c "lib/new_name_server.ex"
[NewNameServer]
iex(5)> Server3.swap_code(:name_server, NewNameServer)
:ack
iex(6)> NewNameServer.all_names
[:darui, :hoge]
おぉ!NameServer1.add/2で追加していたデータが、
モジュールを変更しても使えていますね。
トランザクションセマンティクスがこんなに簡単にできるとは…恐ろしい。
プロジェクトの直下にbeamファイルができてしまうが…すごいなこれ!
動いたままでコードが変更できる!!
beamファイルの出力なんかは変えればいいですからね~
Phoenixのcode_reloaderとかここら辺を上手く使ってるんじゃないかなっと思いました。
ソースコード見たことないので分かりませんが(笑)
まったく関係ないが、:ackのackは肯定応答のこと。
通信用語では、通信がエラーなく到達したことを示す信号のこと。
(参考:http://www.wdic.org/w/WDIC/ACK%20(%E4%B8%80%E8%88%AC)))

4th Step

今まで作ってきたサーバの機能を一つにまとめたサーバを作成します。
トランザクションとホットコードスワッピング二つの機能を持ったサーバになります。
特に身構える必要はありません。
今までのソースコードをコピペするだけで終わります。

File: lib/server4.ex

defmodule Server4 do
  def start(name, mod) do
    Process.register(spawn(fn -> loop(name, mod, mod.init) end), name)
  end

  def swap_code(name, mod) do
    rpc(name, {:swap_code, mod})
  end

  def rpc(name, request) do
    send(name, {self(), request})
    receive do
      {_name, _crash} -> exit(:rpc)
      {_name, :ok, response} -> response
    end
  end

  defp loop(name, mod, old_state) do
    receive do
      {from, {:swap_code, new_callback_mod}} ->
        send(from, {name, :ok, :ack})
        loop(name, new_callback_mod, old_state)
      {from, request} ->
        try do
          {response, new_state} = mod.handle(request, old_state)
          send(from, {name, :ok, response})
          loop(name, mod, new_state)
        catch
          _, why ->
            log_the_error(name, request, why)
            send(from, {name, :crash})
            loop(name, mod, old_state)
        end
    end
  end

  defp log_the_error(name, request, why) do
    IO.inspect("---- Error log ----")
    IO.inspect({name, request, why})
  end
end
実行については今までと同じなので、特に実行例は載せません。
さてさて、ようやっとOTPに必要な考え方が少しは身に付いてきた。
ちょっと記事が長くなってきたので、次のGenServerを使う内容から別の記事に移行します。

Extra Step

よっし、プロセス使ってOTPっぽいことやるのは大体終わった。
GenServerへ行く前に、この記事の最後に少し遊んでみます。
サーバに変身するサーバを作る。

File: lib/server5.ex

defmodule Server5 do
  def start do
    spawn(fn -> wait() end)
  end

  def rpc(pid, q) do
    send(pid, {self(), q})
    receive do
      {_pid, reply} -> reply
    end
  end

  defp wait() do
    receive do
      {:become, f} -> f.()
    end
  end
end
このサーバを起動して、
{:become, f}のメッセージを送るとサーバはf.()を評価することによってfサーバへ変身する。
いまいち分かりづらいと思いますが、使えば分かると思います。
理解しようと手を止めず、まずは最後まで実行してみてください。

Example:

iex(1)> pid = Server5.start
#PID<0.100.0>

## iexは終了させない
変身させたいサーバを作成する。

File: lib/my_fac_server.ex

defmodule MyFacServer do
  def loop do
    receive do
      {from, {:fac, n}} ->
        send(from, {self(), fac(n)})
        loop
      {:become, something} ->
        something.()
    end
  end

  defp fac(0) do
    1
  end

  defp fac(n) do
    n * fac(n - 1)
  end
end

Example:

## 先ほど、実行していたiexで続きを行う

iex(2)> c("lib/my_fac_server.ex")
[MyFacServer]
iex(3)> send(pid, {:become, fn -> MyFacServer.loop end})
{:become, #Function<20.54118792/0 in :erl_eval.expr/5>}
iex(4)> Server5.rpc(pid, {:fac, 5})
120
華麗(?)に変身しました。
応用できそうなすごく強力な手法ですが、
逆に言えばしっかりと設計したの上でやらないと大変なことになりそうですね。
面白い手法ですし、どこかで使ってみたいです。

Improve

Erlangを学ぶ。

Prepare

プログラミングErlangを手放さない…(おそらく、ティッピングポイントになる)

Speaking to oneself

そういえば、分散・並行におけるテストって何やればいいんだろうか…
知見がないからさっぱり分からないな。
Stepが全部終わったら振り返りを書く。
すいません、嘘書きました…現状、振返りをするのがきついのでざっくり書いて先に進みます。

Bibliography

人気の投稿