php-amqplib と RabbitMQ について調べた

この記事は公開されてから1年以上経過しており、情報が古い可能性があります。

見慣れないライブラリを使ってるプロジェクトがあったので調査メモ。

php-amqplibとは?

php-amqplib/php-amqplib: AMQP library for PHP

RabbitMQ とよばれるメッセージングキューのためのクライアントライブラリ for PHP 。厳密には RabiitMQ という話ではなく、そこで利用されるAMQP と呼ばれるプロトコルに準拠したものを取り扱う。メッセージのやり取りについてその専用のプロトコルでやりとりしている。
利用するには mbstring と bcmath のエクステンションが必須になる。

php-amqplib は packagist で管理されているので composer からインストールすることになる。

RabbitMQ とは?

RabbitMQ - Messaging that just works

うさぎがかわいい。

AMQP というメッセージプロトコルを語れる、メッセージングキュー用のミドルウェア…、という分類でいいのかな。

Javascript の Promise みたいなイメージでいると多分認識が合う(合わない)
メッセージを送る人と受け取る人、そして運び人のウサギがいる。ウサギが様々なメッセージのやり取りや順序管理をしてくれるので、送る人は誰に届くかわからないけどこの処理できる人やって、受け取る方は誰から来たかわからないけど処理して結果をしまっておく、なんてことが出来る。
(=マイクロサービスなアーキテクチャ)

ちなみに Erlang で書かれている。
ちなみに今年で 10 年経つんだとかでトップページがお祝いムード。

 

手軽に試すなら公式イメージがあるので docker で。

library/rabbitmq - Docker Hub

 

そうでなければ公式サイトを参考にインストールしていくとよい。

RabbitMQ - Downloading and Installing RabbitMQ

CentOS の項目をみるとこんなことが書かれている。

Overview
rabbitmq-server is included in Fedora. However, the versions included are often quite old. You will probably get better results installing the .rpm from PackageCloud or Bintray. Check the Fedora package details for which version of the server is available for which versions of the distribution.

Fedora には rabbitmq-server な名前で既に登録されているものがあるが、古い可能性ので Package Cloud の RPM を使うのがオススメする、って書いてあった。

rabbitmq/rabbitmq-server - Packages - packagecloud.io | packagecloud

 

ちなみに、ちょっと昔の記事なので、バージョンが古くて今と異なる部分があるかもだが、実際に業務運用に向けていくようなところも込で、がっつりとした情報があったので、こちらも合わせて読むと実践向けっぽい。

はじめての RabbitMQ|サイバーエージェント 公式エンジニアブログ

PHPから使ってみる

例によってお手軽に試してみる程度。 docker で準備を進めていく。

// docker でキューサーバを立ち上げてみる
// 5672 ポートをバインドするのを忘れずに
$ sudo docker run -d -p 5672:5672  --name rabbit-server rabbitmq:3
...

$ sudo docker ps
CONTAINER ID        IMAGE               COMMAND                CREATED             STATUS              PORTS                                                   NAMES
92547dbde579        rabbitmq:3          "docker-entrypoint.s   5 seconds ago       Up 4 seconds        4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, 25672/tcp   rabbit-server


// モジュールが必要なので入れる(mbstringはもともとあった)
$ sudo yum install php-bcmath --enablerepo=remi-php70

// モジュールを移動しないとダメだった(パス設定してないだけだと思う)
$ sudo mv /etc/php.d/20-bcmath.ini /etc/opt/remi/php70/php.d/
$ sudo mv /usr/lib64/php/modules/bcmath.so /opt/remi/php70/root/usr/lib64/php/modules/

$ php -m | grep bcmath
bcmath


// アプリを構築する
$ mkdir php-amqplib-test
$ cd php-amqplib-test

$ composer init
...

$ composer require php-amqplib/php-amqplib:2.7.*
...

ソースを書く。

<?php
// send.php

require('vendor/autoload.php');

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// デフォルトID/PW = guest/guest
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

// 送信するチャンネルを設定
$channel = $connection->channel();

// https://github.com/php-amqplib/php-amqplib/blob/master/PhpAmqpLib/Channel/AMQPChannel.php#L597
// string queue = ''
// bool passive = false
// bool durable = false
// bool exclusive = false
// bool auto_delete = true
// bool nowait = false
$channel->queue_declare('hello', false, false, false, false);


// メッセージ作成
$msg = new AMQPMessage('Hello World!');

// メッセージ送信
// https://github.com/php-amqplib/php-amqplib/blob/master/PhpAmqpLib/Channel/AMQPChannel.php#L1086
// string msg
// string exchange = ''
// string routing_key = ''
$channel->basic_publish($msg, '', 'hello');
<?php
// receive.php

require('vendor/autoload.php');

use PhpAmqpLib\Connection\AMQPStreamConnection;

// デフォルトID/PW = guest/guest
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

// チャンネルを設定
$channel = $connection->channel();

// https://github.com/php-amqplib/php-amqplib/blob/master/PhpAmqpLib/Channel/AMQPChannel.php#L597
// string queue = ''
// bool passive = false
// bool durable = false
// bool exclusive = false
// bool auto_delete = true
// bool nowait = false
$channel->queue_declare('hello', false, false, false, false);


// コールバック関数の用意
$callback = function ($msg) {
        echo $msg->body . "\n";
};

// メッセージ受信
// https://github.com/php-amqplib/php-amqplib/blob/master/PhpAmqpLib/Channel/AMQPChannel.php#L901
// string queue = ''
// string consumer_tag = ''
// bool no_local = false
// bool no_ack = false
// bool exclusive = false
// bool nowait = false
// function callback = null
$channel->basic_consume('hello', '', false, true, false, false, $callback);

// メッセージ受信待ちループ
while(count($channel->callbacks)) {
    $channel->wait();
}

なお引数に関するコメントはデフォルトの値。これが何を意味しているかはまたちょっと分からない…。ドキュメントを読むしか…。

ちなみに PHP の API ドキュメントは見当たらない。 Github のソースを読むか、インタフェースはだいたい同じなので JavaDoc のものを読むかするとよい。

 

ターミナルを2つ開いて実行してみる。

// 一方のターミナル
$ php send.php
$ php send.php
$ php send.php
$ php send.php

// もう一方のターミナル
$ php receive.php
Hello World!
Hello World!
Hello World!
Hello World!

ただし、これには問題があって、このように単純に送るだけだとキューが RabbitMQ を再起動したりマシンが落ちたりすると、中身がきれいさっぱり消える。

$ php send.php
$ sudo docker restart rabbit-server
$ php receive.php

// でてこない!

公式ドキュメントだとこのページの Message durability の項目に書いてあるが、送信時に引数を設定しないといけない。

RabbitMQ - RabbitMQ tutorial - Work Queues

<?php
require('vendor/autoload.php');

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// デフォルトID/PW = guest/guest
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

// 送信するチャンネルを設定
$channel = $connection->channel();

// https://github.com/php-amqplib/php-amqplib/blob/master/PhpAmqpLib/Channel/AMQPChannel.php#L597
// string queue = ''
// bool passive = false
// bool durable = false
// bool exclusive = false
// bool auto_delete = true
// bool nowait = false
$channel->queue_declare('hello', false, true, false, false);


// メッセージ作成
// デリバリモードの追加
$msg = new AMQPMessage('Hello World!', ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);

// メッセージ送信
// https://github.com/php-amqplib/php-amqplib/blob/master/PhpAmqpLib/Channel/AMQPChannel.php#L1086
// string msg
// string exchange = ''
// string routing_key = ''
$channel->basic_publish($msg, '', 'hello');
// 一旦再起動しておく
$ sudo docker restart rabbit-server

// 送信, 再起動, 受信
$ php send.php
$ sudo docker restart rabbit-server
$ php receive.php
Hello World!

なるほどね~。

ちなみにの話

ちなみに Laravel のキュードライバとして RabbitMQ を使うライブラリも既にあり、これはもう入れるだけでいい。先人の知恵すごい。

vyuldashev/laravel-queue-rabbitmq: RabbitMQ driver for Laravel Queue

書いてある通りです、以上の説明は出来なくて。要するに QUEUE_DRIVER を変更し、 RabbitMQ 用の設定を幾つか書けば勝手にやってくれるのだそうだ。

ちなみに上記の durable 、メッセージの永続化はデフォルトで有効になっているようだ。

 

ちなみにちなみに、メッセージキューの実装を標準化しよう!という話があるっぽい。知らなかった。動きとしては将来的に PSR になったらいいな、くらいの強制していないけどぼちぼち運動してます!なもの。

queue-interop/queue-interop: Promoting the interoperability of MQs objects. Based on Java JMS

書き方、インタフェースが大きく分かれそうだし、複数対応を謳うようなライブラリもあるし、標準化されるといろいろ嬉しい予感がする。

前後の記事

Next:
Prev: