Skip to content

Commit

Permalink
Sixth tutorial in php.
Browse files Browse the repository at this point in the history
  • Loading branch information
majek committed Sep 14, 2011
1 parent 7268ecd commit c8b9e1e
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 2 deletions.
49 changes: 49 additions & 0 deletions php/rpc_client.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<?php

require_once(__DIR__ . '/lib/php-amqplib/amqp.inc');

class FibonacciRpcClient {
private $connection;
private $channel;
private $callback_queue;
private $response;
private $corr_id;

function __construct() {
$this->connection = new AMQPConnection(
'localhost', 5672, 'guest', 'guest');
$this->channel = $this->connection->channel();
list($this->callback_queue, ,) = $this->channel->queue_declare(
"", false, false, true, false);
$this->channel->basic_consume(
$this->callback_queue, '', false, false, false, false,
array($this, 'on_response'));
}
public function on_response($rep) {
if($rep->properties['correlation_id'] == $this->corr_id) {
$this->response = $rep->body;
}
}

public function call($n) {
$this->response = null;
$this->corr_id = uniqid();

$msg = new AMQPMessage(
'' + $n,
array('correlation_id' => $this->corr_id,
'reply_to' => $this->callback_queue)
);
$this->channel->basic_publish($msg, '', 'rpc_queue');
while(!$this->response) {
$this->channel->wait();
}
return intval($this->response);
}
};

$fibonacci_rpc = new FibonacciRpcClient();
$response = $fibonacci_rpc->call(30);
echo " [.] Got ", $response, "\n";

?>
44 changes: 44 additions & 0 deletions php/rpc_server.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
<?php

require_once(__DIR__ . '/lib/php-amqplib/amqp.inc');

$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('rpc_queue', false, false, false, false);

function fib($n) {
if ($n == 0)
return 0;
if ($n == 1)
return 1;
return fib($n-1) + fib($n-2);
}

echo " [x] Awaiting RPC requests\n";
$callback = function($req) {
$n = intval($req->body);
echo " [.] fib(", $n, ")\n";

$msg = new AMQPMessage(
'' + fib($n),
array('correlation_id' => $req->properties['correlation_id'])
);

$req->delivery_info['channel']->basic_publish(
$msg, '', $req->properties['reply_to']);
$req->delivery_info['channel']->basic_ack(
$req->delivery_info['delivery_tag']);
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);

while(count($channel->callbacks)) {
$channel->wait();
}

$channel->close();
$connection->close();

?>
4 changes: 2 additions & 2 deletions test.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ def skip(cwd_cmd, to_skip):
['php']),
'%(arg2)s'),
'tut6': (skip(gen('rpc_client', java='RPCClient', dotnet='RPCClient'),
['erlang', 'php']),
['erlang']),
skip(gen('rpc_server', java='RPCServer', dotnet='RPCServer'),
['erlang', 'php']),
['erlang']),
'fib[(]30[)]'),
}

Expand Down

0 comments on commit c8b9e1e

Please sign in to comment.