上一章我们讲解了最适合入门的RabbitMQ+PHP教程(六)Topic exchange(主题交换机)使用,我们通过实际的执行代码,可以很清晰的了解整体的匹配规则以及操作流程。那么这一章节讲解的是rabbitmq的RPC模式。

前提条件

RabbitMQ 已在标准端口(5672)上的localhost上安装并运行。如果您使用不同的主机,端口或凭据,则需要调整连接设置。如果您再本教程中遇到问题,可以随时通过邮件来联系我们。

RPC 定义

要使用RPC那么我们就要搞清楚这个RPC到底是个什么东西,原理是什么?(RPC) Remote Procedure Call Protocol 远程过程调用协议,一般公稍微大一些的公司都是一个项目有多个系统构成,比如电商中的库存系统,商品系统,订单系统等等,不同的项目开发组维护不同的系统,每个系统有运行在不同的机器上。但是往往机器之间需要互相调用一些数据,由于不在同一台机器可以直接调用,所以需要通过网络来表达调用的语义和传达调用的数据。那么现在RPC的协议很多,Java RMI , WebApi等等。

官方对于PRC的说明

尽管RPC在计算中是一种非常常见的模式,但它经常受到批评。当程序员不知道函数调用是本地的还是慢的RPC时,会出现问题。这样的混淆导致系统不可预测,并增加了调试的不必要的复杂性。错误使用RPC可以导致不可维护的意大利面条代码,而不是简化软件。
考虑到这一点,请考虑以下建议:
1.确保明显哪个函数调用是本地的,哪个是远程的。
2.记录您的系统。使组件之间的依赖关系变得清晰。
3.处理错误案例。当RPC服务器长时间停机时,客户端应该如何反应?
4.如有疑问,请避免使用RPC。如果可以,您应该使用异步管道 - 而不是类似RPC的阻塞,将结果异步推送到下一个计算阶段。

功能说明

本文使用RabbitMQ实现RPC的调用方式,我们需要使用回调队列(Callback queue)。
(1)Callback queue,通过Rabbitmq进行rpc很简单,客户端发送消息,服务端响应消息,但是为了接受响应,我们需要发送带有请求的回调队列地址。

打开网易新闻 查看精彩图片

(2) Message attribute

AMQP 0-9-1协议预定义了一组带有消息的14个属性。大多数属性很少使用,但以下情况除外:
1.delivery_mode:标记消息传递模式,2-消息持久化,其他值-瞬态。
2.content_type:内容类型,用于描述编码的mime-type. 例如经常为该属性设置JSON编码。
3.reply_to:应答,通用的回调队列名称,
4.correlation_id:关联ID,方便RPC相应与请求关联。

(3) RPC处理流程

  1. 1. RPC客户端启动后,创建一个匿名、独占的、回调的队列
  2. 2. RPC客户端设置消息的2个属性:replyTo(回调队列名字)和correlationId(标记请求ID),然后将消息发送到队列rpc_queue

3.请求被发送到rpc_queue队列中

  1. 1. RPC服务监听队列rpc_queue队列中的消息请求。rpc服务器端处理之后将结果封装成消息发送到replyTo指定的回调队列中,并且此消息带上correlationId
  2. 2. RPC客户端在队列replyTo上监听消息,当收到消息后,它会判断收到消息的correlationId。如果值和自己之前发送的一样,则这个值就是RPC的处理结果

(4) Correlation Id ???
若果按照正常的来说我们为每一个RPC创建一个回调队列的话,这个效率是非常低效的。那么我们可以选择为每一个客户端创建一个回调队列。但是如果队列收到一条回复消息,那么却不不清楚响应属于哪个请求来源,这是就需要使用correlationId属性了。我们要为每个请求设置唯一的值。然后,在回调队列中获取消息,查看这个属性,关联response和request就是基于这个属性值的。如果我们看到一个未知的correlationId属性值的消息,可以放心的无视它——它不是我们发送的请求。你可能问道,为什么要忽略回调队列中未知的信息,而不是当作一个失败?这是由于在服务器端竞争条件的导致的。虽然不太可能,但是如果RPC服务器在发送给我们结果后,发送请求反馈前就挂掉了,这有可能会发送未知correlationId属性值的消息。如果发生了这种情况,重启RPC服务器将会重新处理该请求。这就是为什么在客户端必须很好的处理重复响应,RPC应该是幂等的。

RPC client code (客户端代码)

主要业务逻辑如下:

  1. 1. 配置连接工厂
  2. 2. 建立TCP连接
  3. 3. 在TCP连接的基础上创建通道
  4. 4. 定义临时队列replyQueueName,声明唯一标志本次请求corrId,并将replyQueueName和corrId配置要发送的消息队列中
  5. 5. 使用默认的交换机发送消息到队列rpc_queue中
  6. 6. 使用阻塞队列BlockingQueue阻塞当前进程
  7. 7. 收到请求后,将请求放入BlockingQueue中,主线程被唤醒,打印返回内容

打开网易新闻 查看精彩图片

完整代码
RPC客户端代码:https://www.phpassn.com/article/108.html

RPC Service code (服务端代码)

  1. 1. 配置连接工厂
  2. 2. 建立TCP连接
  3. 3. 在TCP连接的基础上创建通道
  4. 4. 声明一个rpc_queue队列
  5. 5. 设置同时最多只能获取一个消息
  6. 6. 在rpc_queue队列在等待消息
  7. 7. 收到消息后,调用回调对象对消息进行处理,向此消息的replyTo队列中发送处理并带上correlationId
  8. 8. 使用wait-notify实现主线程和消息处理回调对象进行同步

打开网易新闻 查看精彩图片

完整代码
RPC服务端代码: https://www.phpassn.com/article/108.html

测试结果:

打开网易新闻 查看精彩图片
打开网易新闻 查看精彩图片
打开网易新闻 查看精彩图片

总结:RPC的远程过程调用到这里已经结束,应该算是比较清晰一点了,说明白了其实就是客户端与服务端之间的异步交互,通过识别码回调给客户端时,创建一个匿名独占队列,通过这个队列把数据传给客户端。下一章节rabbitmq延时队列实现

github链接:https://github.com/orchid-lyy/rabbitmq-study

打开网易新闻 查看精彩图片