在PHP中实现消息队列有多种方式,以下是一些常见的实现方法:
示例: ```php // 插入消息 $message = json_encode(['task' => 'send_email', 'data' => ['to' => 'user@example.com', 'subject' => 'Hello']]); $db->query("INSERT INTO queue (message) VALUES ('$message')");
// 处理消息 $result = $db->query("SELECT * FROM queue ORDER BY id ASC LIMIT 1"); $message = $result->fetch_assoc(); $task = json_decode($message['message'], true); // 处理任务... $db->query("DELETE FROM queue WHERE id = {$message['id']}"); ```
LPUSH
和BRPOP
命令实现消息队列。示例: ```php $redis = new Redis(); $redis->connect('127.0.0.1', 6379);
// 生产者 $message = json_encode(['task' => 'send_email', 'data' => ['to' => 'user@example.com', 'subject' => 'Hello']]); $redis->lPush('queue', $message);
// 消费者 while (true) { $message = $redis->brPop('queue', 0); $task = json_decode($message[1], true); // 处理任务... } ```
示例: ```php // 使用php-amqplib库 require_once DIR . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel();
// 生产者 $channel->queue_declare('queue', false, false, false, false); $message = new AMQPMessage(json_encode(['task' => 'send_email', 'data' => ['to' => 'user@example.com', 'subject' => 'Hello']])); $channel->basic_publish($message, '', 'queue');
// 消费者 $callback = function ($msg) { $task = json_decode($msg->body, true); // 处理任务... $msg->ack(); }; $channel->basic_consume('queue', '', false, false, false, false, $callback);
while ($channel->is_consuming()) { $channel->wait(); }
$channel->close(); $connection->close(); ```
示例: ```php // 使用Pheanstalk库 require_once DIR . '/vendor/autoload.php'; use Pheanstalk\Pheanstalk;
$pheanstalk = Pheanstalk::create('127.0.0.1');
// 生产者 $message = json_encode(['task' => 'send_email', 'data' => ['to' => 'user@example.com', 'subject' => 'Hello']]); $pheanstalk->useTube('queue')->put($message);
// 消费者 $job = $pheanstalk->watch('queue')->ignore('default')->reserve(); $task = json_decode($job->getData(), true); // 处理任务... $pheanstalk->delete($job); ```
示例: ```php // 使用php-rdkafka库 require_once DIR . '/vendor/autoload.php'; use RdKafka\Conf; use RdKafka\Producer; use RdKafka\KafkaConsumer;
// 生产者 $conf = new Conf(); $conf->set('metadata.broker.list', 'localhost:9092'); $producer = new Producer($conf); $topic = $producer->newTopic("queue"); $message = json_encode(['task' => 'send_email', 'data' => ['to' => 'user@example.com', 'subject' => 'Hello']]); $topic->produce(RD_KAFKA_PARTITION_UA, 0, $message);
// 消费者 $conf = new Conf(); $conf->set('group.id', 'myConsumerGroup'); $conf->set('metadata.broker.list', 'localhost:9092'); $consumer = new KafkaConsumer($conf); $consumer->subscribe(['queue']);
while (true) { $message = $consumer->consume(120*1000); if ($message->err) { continue; } $task = json_decode($message->payload, true); // 处理任务... } ```
示例: ```php // 生产者 $client = new GearmanClient(); $client->addServer(); $client->doBackground('send_email', json_encode(['to' => 'user@example.com', 'subject' => 'Hello']));
// 消费者 $worker = new GearmanWorker(); $worker->addServer(); $worker->addFunction('send_email', function ($job) { $data = json_decode($job->workload(), true); // 处理任务... }); while ($worker->work()); ```
示例: ```php // 使用php-zmq库 $context = new ZMQContext();
// 生产者 $producer = new ZMQSocket($context, ZMQ::SOCKET_PUSH); $producer->connect("tcp://localhost:5555"); $message = json_encode(['task' => 'send_email', 'data' => ['to' => 'user@example.com', 'subject' => 'Hello']]); $producer->send($message);
// 消费者 $consumer = new ZMQSocket($context, ZMQ::SOCKET_PULL); $consumer->bind("tcp://*:5555"); while (true) { $message = $consumer->recv(); $task = json_decode($message, true); // 处理任务... } ```
示例: ```php $server = new Swoole\Server('127.0.0.1', 9501);
$server->on('receive', function ($server, $fd, $reactor_id, $data) { $task = json_decode($data, true); $server->task($task); });
$server->on('task', function ($server, $task_id, $reactor_id, $data) { // 处理任务... $server->finish("Task $task_id finished"); });
$server->start(); ```
示例: ```php // 生产者 dispatch(new SendEmailJob(['to' => 'user@example.com', 'subject' => 'Hello']));
// 消费者 php artisan queue:work ```
示例: ```php // 生产者 $bus->dispatch(new SendEmailMessage(['to' => 'user@example.com', 'subject' => 'Hello']));
// 消费者 php bin/console messenger:consume async ```
根据项目需求和场景选择合适的消息队列实现方式。