PHPからActiveMQに繋いでみた
via. PHP Stompライブラリを呼び出す消費者スクリプトを書く | 秋元@サイボウズラボ・プログラマー・ブログ
ずいぶん前(2007年10月)にサイボウズラボの秋元さんがPHP勉強会で発表したネタですが、stomp拡張モジュールがリリースされたことと、いつか業務でキューを導入するかも知れないということで、ちょっとずつ調べ始めてある程度量が溜まってきたので、ちょっとまとめてみました。
以下、ちょっと長いです。
メッセージ・キューイングとは
BPnetの説明が分かりやすいかな。
メッセージ・キューイングは,キューに入れたメッセージをやり取りすることで,システム間を連携する。メッセージ・キューイングはファイル転送と同じ非同期型のメカニズムであるが,ファイル転送よりも即時性が高い。アプリケーションの内部でメッセージを生成してキューに入れるため,利用者がリターン・キーを押したタイミングでサーバーと連携することも可能だ。ファイル転送では,このような連携は難しい。また,メッセージの送り手は,キューにメッセージを入れるとすぐに次の処理に移る。受け手は,キューからメッセージを取り出して処理する。メッセージの送り手と受け手は非同期で処理するため,一方のシステムがダウンしても直接影響を受けない。
Stompとは
Stompとは、Streaming Text Orientated Messaging Protocolの略で、キューへの入出力に特化したプロトコルです。Stompの仕様はhttp://stomp.codehaus.org/ProtocolSTOMP Protocol Specification, Version 1.2にあります。非常に簡潔なプロトコルですが、トランザクションもサポートしています。
送受信されるメッセージは、
- コマンド行
- ヘッダ(キーと値)
- 空行
- ボディ
- NULL文字
という構成で、フレームと呼ばれます。最後のNULL文字を除けば、HTTPと似ていますね。
ActiveMQとは
Apache ActiveMQ - Wikipediaにもありますが、ASF(Apache Software Foundation)からリリースされているメッセージ関連のオープンソースのミドルウェアで、Stompをサポートしています。Java以外にPerl, Python, Ruby, PHP, .NET, C/C++などのクライアントライブラリが用意されています。
PHP用Stompライブラリ
大きくは以下の2つ。
- 秋元さんのエントリにもあるPHP版Stompライブラリ(現在はCodehausからFUSEForgeに引っ越した模様)
- PECL::stomp
今回は、これら2つを使ってActiveMQに繋いでみました。なお、PHP版Stompライブラリを使用する場合は、Socket拡張モジュールが必要になります。また、SSLでの接続をおこなう場合は、openssl拡張モジュールも必要になります。
テスト環境
PHP版Stompライブラリのインストール
FUSEForgeからダウンロード・展開するだけです。
$ wget http://stomp.fusesource.org/release/php/1.0/stomp-php-1.0.0.tar.gz $ tar zxf stomp-php-1.0.0.tar.gz $
Stomp拡張モジュールのインストールと設定
今回はバージョン0.3.1をインストール。Stomp-0.2.0からSSLでの接続もサポートされています。
$ sudo pecl install stomp-beta $ php -i -dextension=stomp.so | grep -i stomp Stomp Stomp => enabled stomp.default_broker => tcp://localhost:61613 => tcp://localhost:61613 stomp.default_connection_timeout_sec => 2 => 2 stomp.default_connection_timeout_usec => 0 => 0 stomp.default_read_timeout_sec => 2 => 2 stomp.default_read_timeout_usec => 0 => 0 $
ActiveMQのインストール・設定・起動/停止
ActiveMQのダウンロードページからダウンロード・展開するだけです。
$ wget http://ftp.riken.jp/net/apache/activemq/apache-activemq/5.3.0/apache-activemq-5.3.0-bin.tar.gz
$ tar zxf apache-activemq-5.3.0-bin.tar.gz
$ cd apache-activemq-5.3.0/bin/
$
設定ファイルは、apache-activemq-5.3.0/confディレクトリに配置します。デフォルトの設定ファイルはactivemq.xmlです。なお、activemq.xmlではStompを受け付ける設定にはなっていません。今回は、次のような簡単な設定ファイルをsimple.xmlとして用意しました。
<?xml version="1.0" encoding="UTF-8"?> <beans> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/> <broker xmlns="http://activemq.apache.org/schema/core" useJmx="false"> <transportConnectors> <transportConnector name="stomp" uri="stomp://localhost:61613"/> </transportConnectors> </broker> </beans>
設定ファイルを作成したら起動してみます。環境変数JAVA_HOMEを設定していない場合は設定しておきましょう。起動するとログが出力されますので、stompプロトコルをポート61613でlistenしているかどうかを確認しておきます。
$ export JAVA_HOME=/usr/local/jdk $ ./activemq xbean:simple.xml Java Runtime: Sun Microsystems Inc. 1.6.0_16 /usr/local/jdk1.6.0_16/jre : INFO | Listening for connections at: stomp://localhost.localdomain:61613 INFO | Connector stomp Started INFO | ActiveMQ JMS Message Broker (localhost, ID:centos-33413-1257844882047-0:0) started
停止は、CTRL+Cで。
簡単なサンプル
PHP版ライブラリを使った例
メッセージを送信する側(simple_publisher.php)は次のようになります。
<?php require_once("Stomp.php"); $broker = 'tcp://localhost:61613'; $queue = '/queue/stomp_sample'; $msg = 'bar' . date('YmdHis'); try { $stomp = new Stomp($broker); $stomp->connect(); $stomp->send($queue, $msg); } catch (StompException $e) { echo $e->getMessage(); }
connectメソッドで接続し、sendメソッドに送信するキュー(destination)とメッセージを指定して呼び出すだけです。
次にメッセージを受信する側(simple_consumer.php)。
<?php require_once("Stomp.php"); $broker = 'tcp://localhost:61613'; $queue = '/queue/stomp_sample'; try { $stomp = new Stomp($broker); $stomp->connect(); $stomp->subscribe($queue); if ($stomp->hasFrameToRead()) { $frame = $stomp->readFrame(); echo $frame->body . PHP_EOL; $stomp->ack($frame); } else { echo 'no frame' . PHP_EOL; } } catch(StompException $e) { echo $e->getMessage() . PHP_EOL; }
接続後、キューをsubscribeし、フレームがあるかどうかをhasFrameToReadメソッドで確認します。まだフレームがある場合はフレームをreadFrameメソッドで読み込みます。最後にackメソッドでメッセージを読み込んだことを通知し、キューからそのメッセージを削除します。
実行すると次のようになります。
$ php simple_publisher.php $ php simple_consumer.php bar20091110222402 $
PECL::stomp使った例
PHP版ライブラリを使った例と基本的には同じです。ただし、Stompクラスをインスタンス化する際に接続(connectメソッド相当)を実行します。
<?php $broker = 'tcp://localhost:61613'; $queue = '/queue/stomp_sample'; $msg = 'bar' . date('YmdHis'); try { $stomp = new Stomp($broker); $stomp->send($queue, $msg); } catch (StompException $e) { echo $e->getMessage(); }
メッセージを受信する側もほぼ同じです(simple_consumer.php)。hasFrameToReadメソッドの名称がhasFrameメソッドになるだけです。
<?php $broker = 'tcp://localhost:61613'; $queue = '/queue/stomp_sample'; try { $stomp = new Stomp($broker); $stomp->subscribe($queue); if ($stomp->hasFrame()) { $frame = $stomp->readFrame(); echo $frame->body . PHP_EOL; $stomp->ack($frame); } else { echo 'no frame' . PHP_EOL; } } catch(StompException $e) { echo $e->getMessage() . PHP_EOL; }
トランザクション
StompのトランザクションはSQLのトランザクションと似ていて、処理手順は
- begin
- send、ackなどの処理
- commit or abort
のようになります。トランザクションは個別のID(トランザクションID)で区別され、ネストしたトランザクションも可能です。
以下、Stomp拡張モジュールを使う例(publisher_tx.php)ですが、PHP版Stompライブラリでもほぼ同様で、トランザクションIDを配列ではなく文字列としてメソッドに渡します。この辺はStomp.phpのソースコードを見てください;-)
<?php /** * トランザクションIDを発行する */ function getTxId() { return sha1(md5(time() . rand() . uniqid(rand(), true))); } $broker = 'tcp://localhost:61613'; $queue = '/queue/stomp_sample'; $msg = 'bar' . date('YmdHis'); try { $stomp = new Stomp($broker); $tx1 = getTxId(); $stomp->begin($tx1); $stomp->send($queue, '外側1', array('transaction' => $tx1)); /** * トランザクション内の別トランザクション(最後にabort) * インデントした部分がその範囲 */ $tx2 = getTxId(); $stomp->begin($tx2); $stomp->send($queue, '内側', array('transaction' => $tx2)); $stomp->abort($tx2); $stomp->send($queue, '外側2', array('transaction' => $tx1)); $stomp->commit($tx1); } catch (StompException $e) { echo $e->getMessage(); }
実行すると次のようになります。"内側"というメッセージがabortされていることに注意。
$ php -dextension=stomp.so publisher_tx.php $ php -dextension=stomp.so simple_consumer.php 外側1 $ php -dextension=stomp.so simple_consumer.php 外側2 $ php -dextension=stomp.so simple_consumer.php no frame $
SSLでの接続
クライアントとActiveMQ間でSSL通信をおこなう場合、まずはActiveMQ側の設定ファイルを以下のように変更します(ssl.xml)。以下の例では、
- ポート61613:Stompでの通信
- ポート61614:Stomp+SSLでの通信
となります。
<?xml version="1.0" encoding="UTF-8"?> <beans> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/> <broker xmlns="http://activemq.apache.org/schema/core" useJmx="false"> <sslContext> <sslContext keyStore="file:${activemq.base}/conf/broker.ks" keyStorePassword="password" trustStore="file:${activemq.base}/conf/broker.ts" trustStorePassword="password"/> </sslContext> <transportConnectors> <transportConnector name="stomp" uri="stomp://localhost:61613"/> <transportConnector name="stomp+ssl" uri="stomp+ssl://localhost:61614"/> </transportConnectors> </broker> </beans>
ssl.xmlを作成したら、ActiveMQを再起動します。
$ ./activemq xbean:ssl.xml Java Runtime: Sun Microsystems Inc. 1.6.0_16 /usr/local/jdk1.6.0_16/jre : INFO | Listening for connections at: stomp://localhost.localdomain:61613 INFO | Connector stomp Started INFO | Listening for connections at: stomp+ssl://localhost.localdomain:61614 INFO | Connector stomp+ssl Started INFO | ActiveMQ JMS Message Broker (localhost, ID:centos-53915-1257859186651-0:0) started
クライアント側は、tcpスキームの代わりに"ssl"を指定します。
<?php //$broker = 'tcp://localhost:61613'; $broker = 'ssl://localhost:61614'; :
ちなみに
サンプルを実行したときの通信内容をngrepで見ると次のような感じになります。まずはsimple_publisher.php。
$ sudo ngrep -W byline -d lo port 61613 interface: lo (127.0.0.0/255.0.0.0) filter: (ip) and ( port 61613 ) #### T 127.0.0.1:34405 -> 127.0.0.1:61613 [AP] CONNECT login: passcode: ## T 127.0.0.1:34405 -> 127.0.0.1:61613 [AP] . ## T 127.0.0.1:61613 -> 127.0.0.1:34405 [AP] CONNECTED session:ID:centos-53915-1257859186651-2:5 . ## T 127.0.0.1:34405 -> 127.0.0.1:61613 [AP] SEND destination:/queue/stomp_sample bar20091110223756 # T 127.0.0.1:34405 -> 127.0.0.1:61613 [AFP] . DISCONNECT . ###exit 42 received, 0 dropped $
続いてsimple_consumer.php。
$ sudo ngrep -W byline -d lo port 61613 interface: lo (127.0.0.0/255.0.0.0) filter: (ip) and ( port 61613 ) #### T 127.0.0.1:34406 -> 127.0.0.1:61613 [AP] CONNECT login: passcode: ## T 127.0.0.1:34406 -> 127.0.0.1:61613 [AP] . ## T 127.0.0.1:61613 -> 127.0.0.1:34406 [AP] CONNECTED session:ID:centos-53915-1257859186651-2:6 . ## T 127.0.0.1:34406 -> 127.0.0.1:61613 [AP] SUBSCRIBE ack:client destination:/queue/stomp_sample activemq.prefetchSize:1 ## T 127.0.0.1:34406 -> 127.0.0.1:61613 [AP] . ## T 127.0.0.1:61613 -> 127.0.0.1:34406 [AP] MESSAGE message-id:ID:centos-53915-1257859186651-2:2:-1:1:1 destination:/queue/stomp_sample timestamp:1257860176371 expires:0 priority:0 bar20091110223616. # T 127.0.0.1:34406 -> 127.0.0.1:61613 [AP] ACK message-id:ID:centos-53915-1257859186651-2:2:-1:1:1 destination:/queue/stomp_sample timestamp:1257860176371 expires:0 priority:0 # T 127.0.0.1:34406 -> 127.0.0.1:61613 [AFP] . DISCONNECT . ## T 127.0.0.1:61613 -> 127.0.0.1:34406 [AP] MESSAGE message-id:ID:centos-53915-1257859186651-2:3:-1:1:1 destination:/queue/stomp_sample timestamp:1257860203444 expires:0 priority:0 bar20091110223643. #exit 57 received, 0 dropped $
プロトコル仕様と見比べてみると面白いかも。