牛逼!自己动手从0实现一个分布式RPC框架,成功拿下阿里offer

交互设计

  写给大家的话

  最近我收到很多读者的来信,对如何学习分布式、如何进行项目实践和提高编程能力,存在很多疑问。

  分布式那么难,怎么学?为什么看了那么多书还是掌握不了?

  开源的框架比如Dubbo代码太多了,完全一头雾水,应该怎么学习?

  在学校没有项目经验,找工作在即,怎么把这块补起来?你还仅限于XXX管理系统么「面试官都疲劳了」?

  我在学校的时候,也有和大家一样的困惑。

  毕业去阿里工作了几年后,通过参与实际的项目开发,关于如何学习新知识、如何快速上手并应用有了一些体会,在这里给大家分享一下:

  一定要动手,动手写代码,实现demo,去debug,在调试的过程中学习。没有必要抱着大块头的书看完了以后再动手写代码,在实践中学习是最快速的方法。在学习框架的过程中,尽量先从框架的初始版本开始看,因为开源框架往往功能复杂,代码庞大,很容易劝退。比如学习linux内核,可以从早期版本开始看。「造轮子」。掌握知识最好的办法是去做项目、实现它。关于项目,很多推荐XXX管理系统的,我认为,此类XXX管理系统些在简历上目前大厂是一点竞争力都没有的,面试官都疲了,培训机构清一色的XXX管理系统,springboot全家桶。必须得差异化竞争。我在这里给大家推荐几个优秀的项目,后面我也会逐个实现给大家:自己实现spring ioc/aop、RPC框架、MQ框架、KV存储、分布式锁。这些项目和互联网大厂技术栈无缝结合,通过自己实现分布式组件「也就是大家平时说的造轮子」,为什么要造轮子?一方面是避免成为调包侠或CRUD工程师,另一方面是提高自己的技术深度,让自己的职业道路更宽。我的宫伀号【编程学习指南】有各种学习资料,即可在菜单栏领取

  这一系列文章我目前已经写了5篇,后面会在本陆续分享给大家,大家可以我的宫伀号【编程学习指南】追更。

  本文非常值得点赞+收藏,因为内容非常多,包含完整的代码实现,真的是手把手教你们怎么实现。要想让自己的简历让面试官眼前一亮,这些项目肯定是加分项。

  Github地址: (欢迎star)

  https://github.com/xiajunhust/tinywheel/tree/main/RPC%20framework

  分布式RPC框架,WHY?

  RPC是指远程过程调用(Remote Procedure Call)。可以使得我们在分布式环境下调用远程服务像调用本机服务一样方便。在分布式应用中使用非常广泛。

  有人会问:“有了开源的RPC框架,为什么要自己去实现?”

  RPC基本原理不难,但是在实际实现的过程中还是会遇到很多坑,涉及很多知识点:线程模型、通信协议设计、负载均衡、动态代理等。

  通过自己动手实现的方式一个简易的RPC框架,包含RPC的核心功能「麻雀虽小五脏俱全」,可以检验自己对知识的掌握情况,学会在实际中灵活运用,加深理解。当然了,生产环境中,建议大家还是用成熟的开源框架。

  RPC框架理论基础

  BRUCE JAY NELSON在其1984年的论文《Implementing Remote Procedure Calls》中描述到,当我们在程序中发起RPC调用时,会涉及5个模块:

  user:发起调用的应用模块,发起rpc调用 会和发起本地调用一样,不感知rpc底层逻辑。user-stub:负责调用请求的打包以及结果的解包。RPCRuntime:RPC运行时,负责处理远程网络交互,如网络包重传、加密等。server-stub:负责请求的解包以及结果的打包。server:真正提供服务处理的应用模块。

  这5部分的关系如下图所示:

我的宫伀号【编程学习指南】有各种学习资料,即可在菜单栏领取

  主流开源RPC框架

我的宫伀号【编程学习指南】有各种学习资料,即可在菜单栏领取

  (1)dubbo:阿里巴巴出品的RPC框架,经历了电商海量场景的考验,github 36.7star。java语言。

  官网:https://dubbo.apache.org/zh/

  github:https://github.com/apache/dubbo

我的宫伀号【编程学习指南】有各种学习资料,即可在菜单栏领取

  (2)grpc:谷歌开源rpc框架,多种语言。github star 33.2k。

  官网:https://grpc.io/

  github:https://github.com/grpc/grpc

  (3)motan:新浪开源的rpc框架,仅java语言。

  github:https://github.com/weibocom/motan

  (4)spring cloud:Pivotal公司2014年对外开源的RPC框架,仅Java。

我的宫伀号【编程学习指南】有各种学习资料,即可在菜单栏领取

  (5)brpc:百度开源的rpc框架,C++实现。

  github:https://github.com/apache/incubator-brpc

  RPC框架设计

  整体结构如下图,给大家展示了一个「麻雀虽小五脏俱全」的RPC框架,去除了管控平台等辅助功能。通过对核心功能进行设计和实现,理解整个RPC框架的设计原理。

我的宫伀号【编程学习指南】有各种学习资料,即可在菜单栏领取

  涉及核心技术:

  注册中心:服务端将发布的服务注册到注册中心,调用端从注册中心订阅服务,获得服务的地址,才能发起调用。分布式环境不同服务器之间需要通过网络通信(RPC client)。网络通信必然涉及到编解码。避免每次寻址都需要调用注册中心,服务调用端还需要对服务信息进行缓存。动态代理:方便对客户端调用透明化。

  详细设计&技术实现

  01

  技术选型

  spring-boot,依赖管理,强大的配置化能力。可以方便制作RPC框架的starter,集成使用起来非常便捷。nettyzookeeperprotobuf

  02

  RPC调用流程分析

  一次RPC调用整个过程,到底发生了什么事情呢?如下通过序列图的方式展示了详细步骤:

我的宫伀号【编程学习指南】有各种学习资料,即可在菜单栏领取

  03

  工程模块依赖

  代码模块分层如下:

我的宫伀号【编程学习指南】有各种学习资料,即可在菜单栏领取

  util:基础工具类。model:基础领域模型。annotation:注解。提供注解功能,可以非常方便的发布RPC服务和引用RPC服务。registry:注册中心,给出了zk的实现。io:编码和解码实现。provider:服务提供者实现。consumer:服务消费者实现。

  代码包详细情况:

  04

  代码详细介绍

  采用spring-boot框架。将RPC框架实现为一个starter,方便集成使用。

  注解

  为了方便使用此RPC框架,我们通过定义注解,让使用者能直接通过一行注解进行服务的发布和引用。

  /** * RPC provider注解 */@Retention(RetentionPolicy.RUNTIME)//注解打在类上@Target(ElementType.TYPE)@Componentpublic @interface SimpleRpcProvider { Class<?> serviceInterface() default Object.class; String serviceVersion() default "1.0.0";}/** * RPC consumer * * @author summer * @version $Id: SimpleRpcProviderBean.java, v 0.1 2022年01月16日 11:53 AM summer Exp $ */@Retention(RetentionPolicy.RUNTIME)//注解打在属性上@Target(ElementType.FIELD)@Componentpublic @interface SimpleRpcConsumer { /** * 服务版本号 * @return */ String serviceVersion() default "1.0.0"; /** * 注册中心类型-默认zk * @return */ String registerType() default "zookeeper"; /** * 注册中心地址 * @return */ String registerAddress() default "127.0.0.1:2181";}

  注册中心

  常见的注册中心有很多种,比如zookepper、eureka、nacos、consul等。注册中心的原理不是本文的重点,因此不做详细描述。

我的宫伀号【编程学习指南】有各种学习资料,即可在菜单栏领取

  此处采用zookeeper的实现,有兴趣的童鞋可以自行进行其他实现,只需要实现一个子类即可。

  /** * 注册中心服务接口定义 */public interface ServiceRegistry { /** * 注册服务 * * @param serviceMetaConfig 服务元数据配置 * @throws Exception */ void register(ServiceMetaConfig serviceMetaConfig) throws Exception; /** * 取消注册服务 * * @param serviceMetaConfig 服务元数据配置 * @throws Exception */ void unRegister(ServiceMetaConfig serviceMetaConfig) throws Exception; /** * 服务发现 * * @param serviceName 服务名 * @return * @throws Exception */ ServiceMetaConfig discovery(String serviceName) throws Exception;}zk实现(采用curator):

  import com.summer.simplerpc.registry.cache.ServiceProviderCache;import com.summer.simplerpc.registry.model.ServiceMetaConfig;import com.summer.simplerpc.registry.ServiceRegistry;import com.summer.simplerpc.util.ServiceUtils;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.retry.ExponentialBackoffRetry;import org.apache.curator.x.discovery.*;import org.apache.curator.x.discovery.details.JsonInstanceSerializer;import org.apache.curator.x.discovery.strategies.RoundRobinStrategy;/** * 服务注册中心-zk实现 */public class ZkServiceRegistry implements ServiceRegistry { /** * zk base path */ private final static String ZK_BASE_PATH = "/simplerpc"; /** * serviceProvider锁 */ private final Object lock = new Object(); /** * zk framework client */ private CuratorFramework client; /** * 服务发现 */ private ServiceDiscovery<ServiceMetaConfig> serviceDiscovery; /** * serviceProvider缓存 */ private ServiceProviderCache serviceProviderCache; /** * 构造函数 * * @param address 地址 */ public ZkServiceRegistry(String address, ServiceProviderCache serviceProviderCache) throws Exception { this.client = CuratorFrameworkFactory.newClient(address, new ExponentialBackoffRetry(1000, 3)); this.client.start(); this.serviceProviderCache = serviceProviderCache; JsonInstanceSerializer<ServiceMetaConfig> serializer = new JsonInstanceSerializer<>(ServiceMetaConfig.class); serviceDiscovery = ServiceDiscoveryBuilder.builder(ServiceMetaConfig.class) .client(client) .serializer(serializer) .basePath(ZK_BASE_PATH) .build(); serviceDiscovery.start(); } @Override public void register(ServiceMetaConfig serviceMetaConfig) throws Exception { ServiceInstanceBuilder<ServiceMetaConfig> serviceInstanceBuilder = ServiceInstance.builder(); ServiceInstance<ServiceMetaConfig> serviceInstance = serviceInstanceBuilder .name(ServiceUtils.buildServiceKey(serviceMetaConfig.getName(), serviceMetaConfig.getVersion())) .address(serviceMetaConfig.getAddress()) .port(serviceMetaConfig.getPort()) .payload(serviceMetaConfig) .uriSpec(new UriSpec("{scheme}://{address}:{port}")) .build(); serviceDiscovery.registerService(serviceInstance); } @Override public void unRegister(ServiceMetaConfig serviceMetaConfig) throws Exception { ServiceInstanceBuilder<ServiceMetaConfig> serviceInstanceBuilder = ServiceInstance.builder(); ServiceInstance<ServiceMetaConfig> serviceInstance = serviceInstanceBuilder .name(ServiceUtils.buildServiceKey(serviceMetaConfig.getName(), serviceMetaConfig.getVersion())) .address(serviceMetaConfig.getAddress()) .port(serviceMetaConfig.getPort()) .payload(serviceMetaConfig) .uriSpec(new UriSpec("{scheme}://{address}:{port}")) .build(); serviceDiscovery.unregisterService(serviceInstance); } @Override public ServiceMetaConfig discovery(String serviceName) throws Exception { //先读缓存 ServiceProvider<ServiceMetaConfig> serviceProvider = serviceProviderCache.queryCache(serviceName); //缓存miss,需要调serviceDiscovery if (serviceProvider == null) { synchronized (lock) { serviceProvider = serviceDiscovery.serviceProviderBuilder() .serviceName(serviceName) .providerStrategy(new RoundRobinStrategy<>()) .build(); serviceProvider.start(); //更新缓存 serviceProviderCache.updateCache(serviceName, serviceProvider); } } ServiceInstance<ServiceMetaConfig> serviceInstance = serviceProvider.getInstance(); return serviceInstance != null ? serviceInstance.getPayload() : null; }}核心领域模型和本地缓存:

  /** * 服务元数据配置领域模型 */@Datapublic class ServiceMetaConfig { /** * 服务名 */ private String name; /** * 服务版本 */ private String version; /** * 服务地址 */ private String address; /** * 服务端口 */ private Integer port;}/** * * @author summer * @version $Id: ServiceProviderCache.java, v 0.1 2022年01月16日 11:41 AM summer Exp $ */public interface ServiceProviderCache { /** * 查询缓存 * @param serviceName * @return */ ServiceProvider<ServiceMetaConfig> queryCache(String serviceName); /** * 更新缓存 * * @param serviceName 服务名 * @param serviceProvider 服务provider * @return */ void updateCache(String serviceName, ServiceProvider<ServiceMetaConfig> serviceProvider);}/** * 本地缓存实现 * * @author summer * @version $Id: ServiceProviderLocalCache.java, v 0.1 2022年01月16日 11:43 AM summer Exp $ */public class ServiceProviderLocalCache implements ServiceProviderCache { /** * 本地缓存map */ private Map<String, ServiceProvider<ServiceMetaConfig>> serviceProviderMap = new ConcurrentHashMap<>(); @Override public ServiceProvider<ServiceMetaConfig> queryCache(String serviceName) { return serviceProviderMap.get(serviceName); } @Override public void updateCache(String serviceName, ServiceProvider<ServiceMetaConfig> serviceProvider) { serviceProviderMap.put(serviceName, serviceProvider); }}

  服务提供方

  我前面提到过,在实际使用的时候会通过注解的方式来发布服务。那么,我们需要在bean初始化后去扫描带SimpleRpcProvider注解的bean,将服务注册到注册中心。另外,我们还需要在初始化后启动netty服务端。因此,我定义服务提供方bean实现SimpleRpcProviderBean,继承InitializingBean、BeanPostProcessor:

  在postProcessAfterInitialization方法中判断bean是否带SimpleRpcProvider注解,如果是则解析服务信息,注册到注册中心。在afterPropertiesSet方法中启动netty服务端。接收服务调用请求,通过动态代理执行实际调用import com.google.common.util.concurrent.ThreadFactoryBuilder;import com.summer.simplerpc.annotation.SimpleRpcProvider;import com.summer.simplerpc.io.RPCDecoder;import com.summer.simplerpc.io.RPCEncoder;import com.summer.simplerpc.registry.ServiceRegistry;import com.summer.simplerpc.registry.model.ServiceMetaConfig;import com.summer.simplerpc.util.ServiceUtils;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.LengthFieldBasedFrameDecoder;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.BeansException;import org.springframework.beans.factory.InitializingBean;import org.springframework.beans.factory.config.BeanPostProcessor;import java.util.Map;import java.util.concurrent.*;/** * rpc provider功能实现。 * * 负责扫描服务provider注解bean,注册服务到注册中心,启动netty监听。 * 提供RPC请求实际处理。 */@Slf4jpublic class SimpleRpcProviderBean implements InitializingBean, BeanPostProcessor { /** * 地址 */ private String address; /** * 服务注册中心 */ private ServiceRegistry serviceRegistry; /** * 服务提供bean的缓存map */ private Map<String, Object> providerBeanMap = new ConcurrentHashMap<>(64); /** * 处理实际rpc请求的线程池 */ private static ThreadPoolExecutor rpcThreadPoolExecutor; private static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("simplerpc-provider-pool-%d").build(); /** * netty相关 */ private EventLoopGroup bossGroup = null; private EventLoopGroup workerGroup = null; /** * 构造函数 * * @param address 地址 * @param serviceRegistry 服务注册中心 */ public SimpleRpcProviderBean(String address, ServiceRegistry serviceRegistry) { this.address = address; this.serviceRegistry = serviceRegistry; } @Override public void afterPropertiesSet() throws Exception { //启动netty服务监听 new Thread(() -> { try { startNettyServer(); } catch (InterruptedException e) { log.error("startNettyServer exception,", e); } }).start(); } /** * 提交rpc处理任务 * * @param task 任务 */ public static void submit(Runnable task) { if (rpcThreadPoolExecutor == null) { synchronized (SimpleRpcProviderBean.class) { if (rpcThreadPoolExecutor == null) { rpcThreadPoolExecutor = new ThreadPoolExecutor(100, 100, 600L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000), threadFactory); } } } rpcThreadPoolExecutor.submit(task); } /** * 启动netty服务监听 * * @throws InterruptedException */ private void startNettyServer() throws InterruptedException { if (workerGroup != null && bossGroup != null) { return; } log.info("startNettyServer begin"); bossGroup = new NioEventLoopGroup(); workerGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) { socketChannel.pipeline() .addLast(new LengthFieldBasedFrameDecoder(65535,0,4,0,0)) .addLast(new RPCDecoder()) .addLast(new RPCEncoder()) .addLast(new SimpleRpcProviderNettyHandler(providerBeanMap)) ; } }) .option(ChannelOption.SO_BACKLOG, 512) .childOption(ChannelOption.SO_KEEPALIVE, true); String[] array = address.split(":"); String host = array[0]; int port = Integer.parseInt(array[1]); //启动服务 ChannelFuture future = serverBootstrap.bind(host, port).sync(); log.info(String.format("startNettyServer,host=%s,port=%s", host, port)); future.channel().closeFuture().sync(); } @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { return bean; } @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { //获取bean上的注解 SimpleRpcProvider simpleRpcProvider = bean.getClass().getAnnotation(SimpleRpcProvider.class); if (simpleRpcProvider == null) { //无注解直接return原始的bean return bean; } //缓存保存 String serviceName = simpleRpcProvider.serviceInterface().getName(); String version = simpleRpcProvider.serviceVersion(); providerBeanMap.put(ServiceUtils.buildServiceKey(serviceName, version), bean); log.info("postProcessAfterInitialization find a simpleRpcProvider[" + serviceName + "," + version + "]"); //将服务注册到注册中心 String[] addressArray = address.split(ServiceUtils.SPLIT_CHAR); String host = addressArray[0]; String port = addressArray[1]; ServiceMetaConfig serviceMetaConfig = new ServiceMetaConfig(); serviceMetaConfig.setAddress(host); serviceMetaConfig.setName(serviceName); serviceMetaConfig.setVersion(version); serviceMetaConfig.setPort(Integer.parseInt(port)); try { serviceRegistry.register(serviceMetaConfig); log.info("register service success,serviceMetaConfig=" + serviceMetaConfig.toString()); } catch (Exception e) { log.error("register service fail,serviceMetaConfig=" + serviceMetaConfig.toString(), e); } return bean; }}netty ChannelPipeline设计:

  LengthFieldBasedFrameDecoder:解码器,解决自定义长度TCP粘包问题RPCDecoder:解码器,解析出RPC请求参数对象SimpleRpcProviderNettyHandler:实际的RPC请求处理逻辑,接收请求参数,返回RPC响应结果RPCEncoder:编码器,将RPC响应结果编码序列化,返回

  RPC核心逻辑处理handler-SimpleRpcProviderNettyHandler

  import com.summer.simplerpc.model.SimpleRpcRequest;import com.summer.simplerpc.model.SimpleRpcResponse;import com.summer.simplerpc.util.ServiceUtils;import io.netty.channel.ChannelFutureListener;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import lombok.extern.slf4j.Slf4j;import org.springframework.cglib.reflect.FastClass;import java.util.Map;@Slf4jpublic class SimpleRpcProviderNettyHandler exts SimpleChannelInboundHandler<SimpleRpcRequest> { /** * 提供rpc服务的实例缓存map */ private Map<String, Object> handlerMap; /** * 构造函数 * * @param handlerMap */ public SimpleRpcProviderNettyHandler(Map<String, Object> handlerMap) { this.handlerMap = handlerMap; } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, SimpleRpcRequest simpleRpcRequest) throws Exception { SimpleRpcProviderBean.submit(() -> { log.debug("Receive rpc request {}", simpleRpcRequest.getBizNO()); SimpleRpcResponse simpleRpcResponse = new SimpleRpcResponse(); simpleRpcResponse.setBizNO(simpleRpcRequest.getBizNO()); try { Object result = doHandle(simpleRpcRequest); simpleRpcResponse.setData(result); } catch (Throwable throwable) { simpleRpcResponse.setMsg(throwable.toString()); log.error("handle rpc request error", throwable); } channelHandlerContext.writeAndFlush(simpleRpcResponse).addListener( (ChannelFutureListener) channelFuture -> log.info("return response for request " + simpleRpcRequest.getBizNO() + ",simpleRpcResponse=" + simpleRpcResponse)); }); } /** * 通过反射,执行实际的rpc请求 * @param simpleRpcRequest * @return */ private Object doHandle(SimpleRpcRequest simpleRpcRequest) throws Exception { String key = ServiceUtils.buildServiceKey(simpleRpcRequest.getClassName(), simpleRpcRequest.getServiceVersion()); if (handlerMap == null

   handlerMap.get(key) == null) { log.error("doHandle,the provider {0} not exist,", simpleRpcRequest.getClassName(), simpleRpcRequest.getServiceVersion()); throw new RuntimeException("the provider not exist"); } log.info("doHandle,simpleRpcRequest=" + simpleRpcRequest.toString()); Object provider = handlerMap.get(key); //通过动态代理执行实际的调用 FastClass fastClass = FastClass.create(provider.getClass()); return fastClass.invoke(fastClass.getIndex(simpleRpcRequest.getMethodName(), simpleRpcRequest.getParamTypes()), provider, simpleRpcRequest.getParamValues()); }}前面我提到过,我实现的是一个框架,需要很方便被集成和使用,因此会实现为一个springboot的starter:

  import com.summer.simplerpc.model.RpcCommonProperty;import com.summer.simplerpc.registry.ServiceRegistry;import com.summer.simplerpc.registry.cache.ServiceProviderCache;import com.summer.simplerpc.registry.cache.ServiceProviderLocalCache;import com.summer.simplerpc.registry.zk.ZkServiceRegistry;import lombok.extern.slf4j.Slf4j;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configuration@Slf4jpublic class SimplerRpcProviderAutoConfiguration { @Bean public SimpleRpcProviderBean initRpcProvider() throws Exception { RpcCommonProperty rpcCommonProperty = new RpcCommonProperty(); rpcCommonProperty.setServiceAddress("127.0.0.1:50001"); rpcCommonProperty.setRegistryAddress("127.0.0.1:2181"); log.info("===================SimplerRpcProviderAutoConfiguration init,rpcCommonProperty=" + rpcCommonProperty.toString()); ServiceProviderCache serviceProviderCache = new ServiceProviderLocalCache(); ServiceRegistry zkServiceRegistry = new ZkServiceRegistry(rpcCommonProperty.getRegistryAddress(), serviceProviderCache); return new SimpleRpcProviderBean(rpcCommonProperty.getServiceAddress(), zkServiceRegistry); }}IO

  IO主要是序列化和反序列化,常见的序列化工具有很多,这里采用Hessian,对于不同序列化工具的详细比对这里不做赘述,后续单独开章节讲述。

  服务端和消费端分别会实现编码器和解码器,加入到netty的ChannelPipeline中,具体见服务端和消费端讲解。

  服务消费方

  使用此框架进行服务消费,同样是通过注解,将注解打在一个bean上,那么则完成了对一个服务的引用。可以像直接使用本地bean一样发起RPC调用。其他操作都由RPC框架来实现:

  扫描所有带SimpleRpcConsumer注解的bean重定义BeanDefinition,使用代理类重新注入spring容器发起RPC服务调用,从本地缓存或注册中心拿到远端服务详情,发起网络调用获取服务返回结果

  SimpleRpcConsumer注解

  import org.springframework.stereotype.Component;import java.lang.annotation.ElementType;import java.lang.annotation.Retention;import java.lang.annotation.RetentionPolicy;import java.lang.annotation.Target;/** * RPC consumer注解 */@Retention(RetentionPolicy.RUNTIME)//注解打在属性上@Target(ElementType.FIELD)@Componentpublic @interface SimpleRpcConsumer { /** * 服务版本号 * @return */ String serviceVersion() default "1.0.0"; /** * 注册中心类型-默认zk * @return */ String registerType() default "zookeeper"; /** * 注册中心地址 * @return */ String registerAddress() default "127.0.0.1:2181";}生成代理类的FactoryBean:

  import com.summer.simplerpc.registry.ServiceRegistry;import com.summer.simplerpc.registry.cache.ServiceProviderCache;import com.summer.simplerpc.registry.cache.ServiceProviderLocalCache;import com.summer.simplerpc.registry.zk.ZkServiceRegistry;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.FactoryBean;import java.lang.reflect.Proxy;/** * 生成rpc consumer代理bean的FactoryBean */@Slf4jpublic class SimpleRpcConsumerFactoryBean implements FactoryBean { /** * 调用的服务接口类 */ private Class<?> interfaceClass; /** * 服务版本号 */ private String serviceVersion; /** * 注册中心类型 */ private String registryType; /** * 注册中心地址 */ private String registryAddress; /** * 实际的bean */ private Object object; /** * init方法,通过动态代理生成bean * * @throws Exception */ public void init() throws Exception { ServiceProviderCache serviceProviderCache = new ServiceProviderLocalCache(); ServiceRegistry zkServiceRegistry = new ZkServiceRegistry(registryAddress, serviceProviderCache); //动态代理 this.object = Proxy.newProxyInstance( interfaceClass.getClassLoader(), new Class<?>[] {interfaceClass}, new SimpleRpcInvokeHandler<>(this.serviceVersion, zkServiceRegistry)); log.info("SimpleRpcConsumerFactoryBean getObject {}", interfaceClass.getName()); } /** * 返回创建的bean实例 * * @return * @throws Exception */ @Override public Object getObject() throws Exception { return this.object; } /** * 创建的bean实例的类型 * * @return */ @Override public Class<?> getObjectType() { return interfaceClass; } /** * 创建的bean实例的作用域 * * @return */ @Override public boolean isSingleton() { return true; } public void setInterfaceClass(Class<?> interfaceClass) { this.interfaceClass = interfaceClass; } public void setServiceVersion(String serviceVersion) { this.serviceVersion = serviceVersion; } public void setRegistryType(String registryType) { this.registryType = registryType; } public void setRegistryAddress(String registryAddress) { this.registryAddress = registryAddress; }}SimpleRpcInvokeHandler-执行实际网络调用的Handler:

  import com.summer.simplerpc.model.SimpleRpcRequest;import com.summer.simplerpc.model.SimpleRpcResponse;import com.summer.simplerpc.registry.ServiceRegistry;import lombok.extern.slf4j.Slf4j;import java.lang.reflect.InvocationHandler;import java.lang.reflect.Method;import java.util.UUID;/** * RPC调用动态代理handler实现 */@Slf4jpublic class SimpleRpcInvokeHandler<T> implements InvocationHandler { /** * 服务版本号 */ private String serviceVersion; /** * 注册中心 */ private ServiceRegistry serviceRegistry; /** * 默认构造函数 */ public SimpleRpcInvokeHandler() { } public SimpleRpcInvokeHandler(String serviceVersion, ServiceRegistry serviceRegistry) { this.serviceVersion = serviceVersion; this.serviceRegistry = serviceRegistry; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { SimpleRpcRequest simpleRpcRequest = new SimpleRpcRequest(); simpleRpcRequest.setBizNO(UUID.randomUUID().toString()); simpleRpcRequest.setClassName(method.getDeclaringClass().getName()); simpleRpcRequest.setServiceVersion(this.serviceVersion); simpleRpcRequest.setMethodName(method.getName()); simpleRpcRequest.setParamTypes(method.getParameterTypes()); simpleRpcRequest.setParamValues(args); log.info("begin simpleRpcRequest=" + simpleRpcRequest.toString()); SimpleRpcConsumerNettyHandler simpleRpcConsumerNettyHandler = new SimpleRpcConsumerNettyHandler(this.serviceRegistry); SimpleRpcResponse simpleRpcResponse = simpleRpcConsumerNettyHandler.sRpcRequest(simpleRpcRequest); log.info("result simpleRpcResponse=" + simpleRpcResponse); return simpleRpcResponse.getData(); }}由SimpleRpcConsumerNettyHandler发起netty网络调。

标签: 交互设计