システムはできて終わりではなく、そこから始まる
前回は、
前回までで、
最終回となる今回は、
負荷分散のための設定
AP4Rにおける負荷分散はいくつかのポイントがあります。
- 非同期処理を複数プロセスへ分散
- ウェブサーバのプロキシ機能を使う
- AP4RのURL変換フィルタを使う
- AP4R プロセス自体の多重化
- メッセージ送信を分散する
- AP4R プロセス間でメッセージを転送する
既に説明した内容もありますが、
ウェブサーバのプロキシ機能にて、非同期処理を複数プロセスへ分散
非同期の処理を複数プロセスで分散させることを考えた場合、
最も簡単な構成は、

では、
modify_rules:
url: proc {|url| url.host = "your.async.host"}

リバースプロキシを同期処理と共用にする場合は、

dispatchersとURL変換フィルタ
より細かく、dispatchers
の初期設定を見るとこのようになっています。
dispatchers:
-
targets: queue.*
threads: 1
これは、targets
で指定されるキュー
dispatchers:
-
targets: queue.very_busy.*
threads: 10
modify_rules:
url: proc {|url| url.host = "busy.async.host"}
-
targets: queue.very_heavy.*
threads: 1
modify_rules:
url: proc {|url| url.host = "heavy.async.host"}
数の多い処理は、busy.
にて処理をさせ、heavy.
に流れるようにしています。また、
メッセージ送信の分散
ここからは、
これまでの例では、

AP4R プロセス間のメッセージ転送
複数のAP4Rプロセスを実行している場合、carriers
という項目がこの機能にあたります。
(注) - この機能は、
設定方法によっては、 AP4Rプロセス間でメッセージのキャッチボールが起きたり、 メッセージDBの負荷が増えたりする難点もあります。必要な設定項目、 アルゴリズムなどを考慮する余地があるため、 現状は 「実験的」 な機能と位置付けています。
設定例は以下のようになります。
carriers:
-
source_uri: druby://another.ap4r.host:6438
threads: 1
どのAP4Rプロセスsource_
)から、threads
)でメッセージを転送するかを設定しています。各スレッドは、dispatchers
が処理できるキューからメッセージを取得し、

(注) - 複数のAP4Rプロセスを利用する場合には、
サーバソケットを開くアドレスとポートの指定で注意する点があります。バージョン1. 1のreliable-msgには、 デフォルトポート (6438) 以外でのLISTENが出来ないバグがあります。またlocalhost以外でのLISTENが出来ません。AP4RのWiki: FAQに修正方法を書いています。
以上のように、
メッセージのリカバリ
メッセージのリカバリには2つの種類があります。冒頭でも触れましたが、
SAF リカバリ
発生状況
SAF機能を利用しているときに発生します。メッセージは、

状況の確認
SAF用テーブルに保管されているメッセージを確認します。最初に、
% cd as_rails % ruby script/console Loading development environment. >> Ap4r::StoredMessage.destroy_all
次に、
as_
(省略)
Ap4r::AsyncHelper::base.saf_delete_mode = :logical
RailsプロセスとAP4Rプロセスを起動します。
% cd as_rails % ruby script/server
% cd as_ap4r % ruby script/mongrel_ap4r start -A config/queues_mysql.cfg
これまで作成してきたアプリケーションの画面をひらき、
「Order was successfully created.」
% cd as_rails % ruby script/console Loading development environment. >> y Ap4r::StoredMessage.find(:all) --- - !ruby/object:Ap4r::StoredMessage attributes: status: "1" updated_at: 2007-09-14 10:21:30 duplication_check_id: bf6627e0-448e-012a-cfbe-0016cb9ad524 id: "29" queue: queue.async_shop.payment object: "\x04\b\"\x10order_id=29" created_at: 2007-09-14 10:21:30 headers: "\x04\b{\n\ :\rdelivery:\tonce:\x0Fqueue_name\ "\x1Dqueue.async_shop.payment:\x12dispatch_mode:\tHTTP:\x0Ftarget_url\ "-http://localhost:3000/async_shop/payment:\x12target_method\"\tPOST" => nil
statusは"1"となっています。第3回のSAF機能の説明でも触れましたが、 したがって、 次に、 SAF用テーブルに保管されているメッセージが多い場合には、 メッセージを再度AP4Rに Forward します。AP4R プロセスを起動しておきましょう。 SAF用テーブルに保管されている、 リカバリ対象のメッセージが大量にある場合は、 また、 SAF用テーブルを管理するRailsアプリケーションを作成すれば、 HelloWorldアプリケーションを起動し、 各メッセージの中身も参照できますので、 メッセージの処理中のネットワーク障害や、 AP4Rが起動中であれば、 まず、 として、 では、 次に、 ここではメッセージのヘッダー部分のみが表示されていますが、 ではリカバリを行います。DLQからメッセージを抜いて、 これで元のキューにメッセージが入り、 ここで、 さて、 同じ DLQに入ったメッセージの確認と操作について説明しました。最低限のことは可能ですが、 今後のAP4Rの拡張には、 全4回に渡ってAP4Rの解説をしてきました。AP4Rの概説にはじまり、 AP4Rとは、 AP4RはRubyで書かれています。Rubyのもつ簡潔さ、 一方、 メッセージ送信の AP4RのメッセージングとしてのQoS 非同期処理を含むアプリケーションでも、 AP4Rを利用した典型的なシステムの構成は、 また、 稼動しはじめたシステムは、 AP4Rでは今後の拡張として、 本連載に最後までお付き合いいただき、
status = 0 未処理 status = 1 処理済 Ctrl+C
で終了した状態で注文処理を実行します。ステータスは未処理をあらわす"0"となっているでしょう。>> y Ap4r::StoredMessage.find(:all)
---
- !ruby/object:Ap4r::StoredMessage
attributes:
status: "1"
updated_at: 2007-09-14 10:21:30
(省略)
- !ruby/object:Ap4r::StoredMessage
attributes:
status: "0"
updated_at: 2007-09-14 10:26:58
duplication_check_id: 82912650-448f-012a-cfbe-0016cb9ad524
id: "30"
queue: queue.async_shop.payment
object: "\x04\b\"\x10order_id=30"
created_at: 2007-09-14 10:26:58
headers: "\x04\b{\n\ :\rdelivery:\tonce:\x0Fqueue_name\
"\x1Dqueue.async_shop.payment:\x12dispatch_mode:\tHTTP:\x0Ftarget_url\
"-http://localhost:3000/async_shop/payment:\x12target_method\"\tPOST"
=> nil
>> y Ap4r::StoredMessage.find_status_of(:all).map{|sm| sm.to_summary_string}
---
- 29, queue.async_shop.payment, Fri Sep 14 10:21:30 +0900 2007
- 30, queue.async_shop.payment, Fri Sep 14 10:26:58 +0900 2007
=> nil
>> y Ap4r::StoredMessage.find_status_of(:unforwarded).map{|sm| sm.to_summary_string}
---
- 30, queue.async_shop.payment, Fri Sep 14 10:26:58 +0900 2007
=> nil
リカバリ方法
% cd as_ap4r
% ruby script/mongrel_ap4r start -A config/queues_mysql.cfg
% cd as_rails
% ruby script/console
Loading development environment.
>> Ap4r::StoredMessage.reforward_all
=> [1, 0]
>> Ap4r::StoredMessage.find_status_of(:unforwarded).size
=> 10
>> Ap4r::StoredMessage.reforward_all 3
ActiveRecord::RecordNotFound: ...(省略)
=> [7, 3]
>> y Ap4r::StoredMessage.find_status_of(:unforwarded).map{|sm| sm.to_summary_string}
---
- 35, queue.async_shop.payment, Fri Sep 14 10:38:07 +0900 2007
- 36, queue.async_shop.payment, Fri Sep 14 10:38:19 +0900 2007
- 37, queue.async_shop.payment, Fri Sep 14 10:38:28 +0900 2007
=> nil
>> Ap4r::StoredMessage.reforward 36
=> true
DLQリカバリ
発生状況
状況の確認
AsyncShopController#payment
アクションの頭でraise "dummy exception to test DLQ"
payment
アクションの処理中にダミーの例外が発生したこと、dlq
とキューの管理をしている ReliableMsg::QueueManager
(のインスタンス)qm
を取得します。% irb -rubygems -rap4r
>> dlq = ReliableMsg::Queue.new "$dlq"
=> #<ReliableMsg::Queue:0x2838360 @queue="$dlq">
>> qm = dlq.send :qm
=> #<DRb::DRbObject:0x2832b04 @ref=nil, @uri="druby://localhost:6438">
$dlq
という名前になっています)>> y qm.list(:queue => "$dlq")
--- # 一部整形しています
- :max_deliveries: 5
:priority: 0
:created: 1189737438
:target_method: POST
:redelivery: 1
:queue: queue.async_shop.payment
:expires:
:id: 56205b90-4499-012a-1774-0016cb9ad556
:delivery: :once
:target_url: http://localhost:3000/async_shop/payment
:dispatch_mode: :HTTP
=> nil
:queue
の箇所に元のキューの名前が記録されています。リカバリ方法
payment
アクションに入れたダミーの例外は削除しておきましょう。>> dlq.get{|m| dlq.put(m.object, m.headers) }
=> "5f7df210-4499-012a-1774-0016cb9ad556"
payment
アクションが呼ばれます。少し待ってからブラウザをリロードすると
put
しているように見えますが、m.
の :queue
で指定されたキューに入ります。>> y qm.list(:queue => "$dlq")
--- # 抜粋。一部整形
- :id: e232a830-4499-012a-1774-0016cb9ad556
=> nil
>> dlq.get{|m| raise "dummy recovery error"}
RuntimeError: dummy recovery error # 以下略
>> y qm.list(:queue => "$dlq")
--- # 抜粋。一部整形
- :id: e232a830-4499-012a-1774-0016cb9ad556
=> nil
:id
でメッセージが残っていることが分かります。これは、ReliableMsg::Queue#get
を呼び出すと、
まとめ
AP4Rとは
「軽量」
シンプルなAPI
async_
と、transaction
の2つのAPIのみで非同期処理を組み込むことが可能です。また、堅牢性の要、
非同期でもテストはしっかり
柔軟なシステム構成
いざというときのリカバリ
最後に
AP4R関連リンク