Do You PHP はてブロ

Do You PHPはてなからはてブロに移動しました

PHPからActiveMQに繋いでみた

via. PHP Stompライブラリを呼び出す消費者スクリプトを書く | 秋元@サイボウズラボ・プログラマー・ブログ

ずいぶん前(2007年10月)にサイボウズラボの秋元さんがPHP勉強会で発表したネタですが、stomp拡張モジュールがリリースされたことと、いつか業務でキューを導入するかも知れないということで、ちょっとずつ調べ始めてある程度量が溜まってきたので、ちょっとまとめてみました。

以下、ちょっと長いです。

メッセージ・キューイングとは

BPnetの説明が分かりやすいかな。


メッセージ・キューイングは,キューに入れたメッセージをやり取りすることで,システム間を連携する。メッセージ・キューイングはファイル転送と同じ非同期型のメカニズムであるが,ファイル転送よりも即時性が高い。アプリケーションの内部でメッセージを生成してキューに入れるため,利用者がリターン・キーを押したタイミングでサーバーと連携することも可能だ。ファイル転送では,このような連携は難しい。

 また,メッセージの送り手は,キューにメッセージを入れるとすぐに次の処理に移る。受け手は,キューからメッセージを取り出して処理する。メッセージの送り手と受け手は非同期で処理するため,一方のシステムがダウンしても直接影響を受けない。

Stompとは

Stompとは、Streaming Text Orientated Messaging Protocolの略で、キューへの入出力に特化したプロトコルです。Stompの仕様はhttp://stomp.codehaus.org/ProtocolSTOMP Protocol Specification, Version 1.2にあります。非常に簡潔なプロトコルですが、トランザクションもサポートしています。
送受信されるメッセージは、

  • コマンド行
  • ヘッダ(キーと値)
  • 空行
  • ボディ
  • NULL文字

という構成で、フレームと呼ばれます。最後のNULL文字を除けば、HTTPと似ていますね。

ActiveMQとは

Apache ActiveMQ - Wikipediaにもありますが、ASF(Apache Software Foundation)からリリースされているメッセージ関連のオープンソースミドルウェアで、Stompをサポートしています。Java以外にPerl, Python, Ruby, PHP, .NET, C/C++などのクライアントライブラリが用意されています。

PHP用Stompライブラリ

大きくは以下の2つ。

今回は、これら2つを使ってActiveMQに繋いでみました。なお、PHP版Stompライブラリを使用する場合は、Socket拡張モジュールが必要になります。また、SSLでの接続をおこなう場合は、openssl拡張モジュールも必要になります。

テスト環境

  • CentOS5.2 on VMwarePlayer on CF-R8
  • PHP5.2.10
  • Java SE Runtime Environment (build 1.6.0_16-b01) (JAVA_HOME=/usr/local/jdk)

PHP版Stompライブラリのインストール

FUSEForgeからダウンロード・展開するだけです。

$ wget http://stomp.fusesource.org/release/php/1.0/stomp-php-1.0.0.tar.gz
$ tar zxf stomp-php-1.0.0.tar.gz
$ 

Stomp拡張モジュールのインストールと設定

今回はバージョン0.3.1をインストール。Stomp-0.2.0からSSLでの接続もサポートされています。

$ sudo pecl install stomp-beta
$ php -i -dextension=stomp.so | grep -i stomp
Stomp
Stomp => enabled
stomp.default_broker => tcp://localhost:61613 => tcp://localhost:61613
stomp.default_connection_timeout_sec => 2 => 2
stomp.default_connection_timeout_usec => 0 => 0
stomp.default_read_timeout_sec => 2 => 2
stomp.default_read_timeout_usec => 0 => 0
$ 

ActiveMQのインストール・設定・起動/停止

ActiveMQのダウンロードページからダウンロード・展開するだけです。

$ wget http://ftp.riken.jp/net/apache/activemq/apache-activemq/5.3.0/apache-activemq-5.3.0-bin.tar.gz
$ tar zxf apache-activemq-5.3.0-bin.tar.gz
$ cd apache-activemq-5.3.0/bin/
$ 

設定ファイルは、apache-activemq-5.3.0/confディレクトリに配置します。デフォルトの設定ファイルはactivemq.xmlです。なお、activemq.xmlではStompを受け付ける設定にはなっていません。今回は、次のような簡単な設定ファイルをsimple.xmlとして用意しました。

<?xml version="1.0" encoding="UTF-8"?>
<beans>
  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
  <broker xmlns="http://activemq.apache.org/schema/core" useJmx="false">
    <transportConnectors>
      <transportConnector name="stomp" uri="stomp://localhost:61613"/>
    </transportConnectors>
  </broker>
</beans>

設定ファイルを作成したら起動してみます。環境変数JAVA_HOMEを設定していない場合は設定しておきましょう。起動するとログが出力されますので、stompプロトコルをポート61613でlistenしているかどうかを確認しておきます。

$ export JAVA_HOME=/usr/local/jdk
$ ./activemq xbean:simple.xml
Java Runtime: Sun Microsystems Inc. 1.6.0_16 /usr/local/jdk1.6.0_16/jre
                          :
 INFO | Listening for connections at: stomp://localhost.localdomain:61613
 INFO | Connector stomp Started
 INFO | ActiveMQ JMS Message Broker (localhost, ID:centos-33413-1257844882047-0:0) started

停止は、CTRL+Cで。

簡単なサンプル

PHP版ライブラリを使った例

メッセージを送信する側(simple_publisher.php)は次のようになります。

<?php
require_once("Stomp.php");

$broker = 'tcp://localhost:61613';
$queue  = '/queue/stomp_sample';
$msg    = 'bar' . date('YmdHis');

try {
    $stomp = new Stomp($broker);
    $stomp->connect();
    $stomp->send($queue, $msg);
} catch (StompException $e) {
    echo $e->getMessage();
}

connectメソッドで接続し、sendメソッドに送信するキュー(destination)とメッセージを指定して呼び出すだけです。
次にメッセージを受信する側(simple_consumer.php)。

<?php
require_once("Stomp.php");

$broker = 'tcp://localhost:61613';
$queue  = '/queue/stomp_sample';

try {
    $stomp = new Stomp($broker);
    $stomp->connect();
    $stomp->subscribe($queue);
    if ($stomp->hasFrameToRead()) {
        $frame = $stomp->readFrame();
        echo $frame->body . PHP_EOL;
        $stomp->ack($frame);
    } else {
        echo 'no frame' . PHP_EOL;
    }
} catch(StompException $e) {
    echo $e->getMessage() . PHP_EOL;
}

接続後、キューをsubscribeし、フレームがあるかどうかをhasFrameToReadメソッドで確認します。まだフレームがある場合はフレームをreadFrameメソッドで読み込みます。最後にackメソッドでメッセージを読み込んだことを通知し、キューからそのメッセージを削除します。

実行すると次のようになります。

$ php simple_publisher.php
$ php simple_consumer.php
bar20091110222402
$ 
PECL::stomp使った例

PHP版ライブラリを使った例と基本的には同じです。ただし、Stompクラスをインスタンス化する際に接続(connectメソッド相当)を実行します。

<?php
$broker = 'tcp://localhost:61613';
$queue  = '/queue/stomp_sample';
$msg    = 'bar' . date('YmdHis');

try {
    $stomp = new Stomp($broker);
    $stomp->send($queue, $msg);
} catch (StompException $e) {
    echo $e->getMessage();
}

メッセージを受信する側もほぼ同じです(simple_consumer.php)。hasFrameToReadメソッドの名称がhasFrameメソッドになるだけです。

<?php
$broker = 'tcp://localhost:61613';
$queue  = '/queue/stomp_sample';

try {
    $stomp = new Stomp($broker);
    $stomp->subscribe($queue);
    if ($stomp->hasFrame()) {
        $frame = $stomp->readFrame();
        echo $frame->body . PHP_EOL;
        $stomp->ack($frame);
    } else {
        echo 'no frame' . PHP_EOL;
    }
} catch(StompException $e) {
    echo $e->getMessage() . PHP_EOL;
}

トランザクション

StompのトランザクションSQLトランザクションと似ていて、処理手順は

  1. begin
  2. send、ackなどの処理
  3. commit or abort

のようになります。トランザクションは個別のID(トランザクションID)で区別され、ネストしたトランザクションも可能です。
以下、Stomp拡張モジュールを使う例(publisher_tx.php)ですが、PHP版Stompライブラリでもほぼ同様で、トランザクションIDを配列ではなく文字列としてメソッドに渡します。この辺はStomp.phpソースコードを見てください;-)

<?php
/**
 * トランザクションIDを発行する
 */
function getTxId() {
    return sha1(md5(time() . rand() . uniqid(rand(), true)));
}

$broker = 'tcp://localhost:61613';
$queue  = '/queue/stomp_sample';
$msg    = 'bar' . date('YmdHis');

try {
    $stomp = new Stomp($broker);

    $tx1 = getTxId();
    $stomp->begin($tx1);
    $stomp->send($queue, '外側1', array('transaction' => $tx1));

        /**
         * トランザクション内の別トランザクション(最後にabort)
         * インデントした部分がその範囲
         */
        $tx2 = getTxId();
        $stomp->begin($tx2);
        $stomp->send($queue, '内側', array('transaction' => $tx2));
        $stomp->abort($tx2);

    $stomp->send($queue, '外側2', array('transaction' => $tx1));
    $stomp->commit($tx1);
} catch (StompException $e) {
    echo $e->getMessage();
}

実行すると次のようになります。"内側"というメッセージがabortされていることに注意。

$ php -dextension=stomp.so publisher_tx.php
$ php -dextension=stomp.so simple_consumer.php
外側1
$ php -dextension=stomp.so simple_consumer.php
外側2
$ php -dextension=stomp.so simple_consumer.php
no frame
$

SSLでの接続

クライアントとActiveMQ間でSSL通信をおこなう場合、まずはActiveMQ側の設定ファイルを以下のように変更します(ssl.xml)。以下の例では、

  • ポート61613:Stompでの通信
  • ポート61614:Stomp+SSLでの通信

となります。

<?xml version="1.0" encoding="UTF-8"?>
<beans>
  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
  <broker xmlns="http://activemq.apache.org/schema/core" useJmx="false">
    <sslContext>
      <sslContext keyStore="file:${activemq.base}/conf/broker.ks"
        keyStorePassword="password" trustStore="file:${activemq.base}/conf/broker.ts"
        trustStorePassword="password"/>
    </sslContext>
    <transportConnectors>
      <transportConnector name="stomp" uri="stomp://localhost:61613"/>
      <transportConnector name="stomp+ssl" uri="stomp+ssl://localhost:61614"/>
    </transportConnectors>
  </broker>
</beans>

ssl.xmlを作成したら、ActiveMQを再起動します。

$ ./activemq xbean:ssl.xml
Java Runtime: Sun Microsystems Inc. 1.6.0_16 /usr/local/jdk1.6.0_16/jre
                         :
 INFO | Listening for connections at: stomp://localhost.localdomain:61613
 INFO | Connector stomp Started
 INFO | Listening for connections at: stomp+ssl://localhost.localdomain:61614
 INFO | Connector stomp+ssl Started
 INFO | ActiveMQ JMS Message Broker (localhost, ID:centos-53915-1257859186651-0:0) started

クライアント側は、tcpスキームの代わりに"ssl"を指定します。

<?php
//$broker = 'tcp://localhost:61613';
$broker = 'ssl://localhost:61614';
               :

ちなみに

サンプルを実行したときの通信内容をngrepで見ると次のような感じになります。まずはsimple_publisher.php

$ sudo ngrep -W byline -d lo port 61613
interface: lo (127.0.0.0/255.0.0.0)
filter: (ip) and ( port 61613 )
####
T 127.0.0.1:34405 -> 127.0.0.1:61613 [AP]
CONNECT
login:
passcode:


##
T 127.0.0.1:34405 -> 127.0.0.1:61613 [AP]
.

##
T 127.0.0.1:61613 -> 127.0.0.1:34405 [AP]
CONNECTED
session:ID:centos-53915-1257859186651-2:5

.

##
T 127.0.0.1:34405 -> 127.0.0.1:61613 [AP]
SEND
destination:/queue/stomp_sample

bar20091110223756
#
T 127.0.0.1:34405 -> 127.0.0.1:61613 [AFP]
.
DISCONNECT

.

###exit
42 received, 0 dropped
$ 

続いてsimple_consumer.php

$ sudo ngrep -W byline -d lo port 61613
interface: lo (127.0.0.0/255.0.0.0)
filter: (ip) and ( port 61613 )
####
T 127.0.0.1:34406 -> 127.0.0.1:61613 [AP]
CONNECT
login:
passcode:


##
T 127.0.0.1:34406 -> 127.0.0.1:61613 [AP]
.

##
T 127.0.0.1:61613 -> 127.0.0.1:34406 [AP]
CONNECTED
session:ID:centos-53915-1257859186651-2:6

.

##
T 127.0.0.1:34406 -> 127.0.0.1:61613 [AP]
SUBSCRIBE
ack:client
destination:/queue/stomp_sample
activemq.prefetchSize:1


##
T 127.0.0.1:34406 -> 127.0.0.1:61613 [AP]
.

##
T 127.0.0.1:61613 -> 127.0.0.1:34406 [AP]
MESSAGE
message-id:ID:centos-53915-1257859186651-2:2:-1:1:1
destination:/queue/stomp_sample
timestamp:1257860176371
expires:0
priority:0

bar20091110223616.

#
T 127.0.0.1:34406 -> 127.0.0.1:61613 [AP]
ACK
message-id:ID:centos-53915-1257859186651-2:2:-1:1:1
destination:/queue/stomp_sample
timestamp:1257860176371
expires:0
priority:0


#
T 127.0.0.1:34406 -> 127.0.0.1:61613 [AFP]
.
DISCONNECT

.

##
T 127.0.0.1:61613 -> 127.0.0.1:34406 [AP]
MESSAGE
message-id:ID:centos-53915-1257859186651-2:3:-1:1:1
destination:/queue/stomp_sample
timestamp:1257860203444
expires:0
priority:0

bar20091110223643.

#exit
57 received, 0 dropped
$

プロトコル仕様と見比べてみると面白いかも。

まとめ

とりあえず、インストールからSSLでの通信までやってみましたが、PHP側はさほど難しくありませんでした。あまり深堀してませんが、パラメータを使って優先度(priority)付きメッセージなども送れそうです。