当前位置:   article > 正文

使用Netty和ZK实现一个简单的RPC框架(服务端实现)_kzjrpc

kzjrpc

1.服务端

1.服务注册

  1. js复制代码@Configuration
  2. public class NettyServerStart {
  3. @Autowired
  4. private ServiceRegistrationScan registrationScan;
  5. @Autowired
  6. private NettyServer nettyServer;
  7. @PostConstruct
  8. public void start() {
  9. //服务注册
  10. registrationScan.scan();
  11. //服务器启动
  12. nettyServer.start();
  13. }
  14. }

@PostConstruct注解的作用是在依赖注入完成后,执行被注解的方法。

深入ServiceRegistrationScan

  1. java复制代码@Component
  2. public class ServiceRegistrationScan implements ApplicationContextAware {
  3. @Autowired
  4. private ServerZKit serverZKit;
  5. @Autowired
  6. private RpcServerConfiguration rpcServerConfiguration;
  7. //把spring容器注入进来因为会用到它去拿我们需要暴露服务的类
  8. private static ApplicationContext context;
  9. public void scan() {
  10. //创建一个根节点
  11. serverZKit.createRootNode();
  12. //拿到所有实现这个接口的类
  13. Map<String, Object> beansWithAnnotation = context.getBeansWithAnnotation(MoXiuRpcServer.class);
  14. if (!beansWithAnnotation.isEmpty()) {
  15. //拿到当前服务器的IP地址
  16. String realIp = IpUtil.getRealIp();
  17. for (Object value : beansWithAnnotation.values()) {
  18. MoXiuRpcServer annotation = value.getClass().getAnnotation(MoXiuRpcServer.class);
  19. //拿到类似于这样的 com.moxiu.xxx
  20. Class aClass = annotation.interfaceName();
  21. String s = aClass.getName();
  22. //以这个接口创建一个结点
  23. serverZKit.createPersistentNode(s);
  24. String childName = s + "/" + realIp +":"+ rpcServerConfiguration.getRpcPort();
  25. //创建本机IP地址加端口
  26. serverZKit.createNode(childName);
  27. }
  28. }
  29. }
  30. @Override
  31. public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  32. context = applicationContext;
  33. }
  34. public static <T> Class<T> getBean(String beanName) {
  35. return (Class<T>) context.getBean(beanName);
  36. }
  37. }

以上代码为注入spring容器拿到所有实现了@MoXiuRpcServer注解的实现类,把该类注册到ZK中。

这个接口是什么呢?

  1. java复制代码@Component
  2. @Documented
  3. @Target(ElementType.TYPE)
  4. @Retention(RetentionPolicy.RUNTIME)
  5. public @interface MoXiuRpcServer {
  6. Class interfaceName() default void.class; //实现类需要暴露的接口名字
  7. }

怎么使用呢

  1. java复制代码@MoXiuRpcServer(interfaceName = UserService.class)
  2. public class UserServiceImpl implements UserService {
  3. @Override
  4. public String getUser() {
  5. return null;
  6. }
  7. }

因为客户端需要根据接口生成代理对象,所以这里需要你指定他到底是那个接口的实现类。

2.编写Netty服务器

Netty服务器采用主从Reactor模式及主Reactor只负责建立连接,从Reactor会负责到客户端数据的读写等业务处理。

  1. java复制代码@Component
  2. public class NettyServer implements ServerStart {
  3. @Autowired
  4. private RpcServerConfiguration rpcServerConfiguration;
  5. @Override
  6. public void start() {
  7. //主Reactor 只负责连接建立所以只要1个线程就行
  8. EventLoopGroup bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("boosGroup"));
  9. //从Reactor 负责连接数据读写 传0默认为当前cpu核数*2个线程
  10. EventLoopGroup workerGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("workerGroup"));
  11. //业务线程池 负责业务处理可以不要
  12. EventExecutorGroup eventExecutors = new UnorderedThreadPoolEventExecutor(8, new DefaultThreadFactory("businessGroup"));
  13. ServerBootstrap bootstrap = new ServerBootstrap();
  14. MxRequestHandle mxRequestHandle = new MxRequestHandle();
  15. bootstrap.group(bossGroup, workerGroup)
  16. .channel(NioServerSocketChannel.class)
  17. .option(ChannelOption.SO_BACKLOG, 1024)
  18. .option(ChannelOption.TCP_NODELAY, true)
  19. .option(ChannelOption.SO_KEEPALIVE, true)
  20. .childHandler(new ChannelInitializer<SocketChannel>() {
  21. @Override
  22. protected void initChannel(SocketChannel nioSocketChannel) throws Exception {
  23. ChannelPipeline pipeline = nioSocketChannel.pipeline();
  24. //编码
  25. pipeline.addLast("OneEncoding", new LengthFieldPrepender(4));
  26. pipeline.addLast("MxResponseEncode", new MxRpcResponseEncode());
  27. //解码
  28. pipeline.addLast("OneDecode", new LengthFieldBasedFrameDecoder(65536, 0, 4, 0, 4));
  29. pipeline.addLast("MxRequestDecode", new MxRpcRequestDecode());
  30. pipeline.addLast(eventExecutors, "MxRpcRequest", mxRequestHandle);
  31. }
  32. });
  33. try {
  34. ChannelFuture sync = bootstrap.bind(rpcServerConfiguration.getRpcPort()).sync();
  35. sync.channel().closeFuture().sync();
  36. } catch (InterruptedException e) {
  37. e.printStackTrace();
  38. } finally {
  39. bossGroup.shutdownGracefully();
  40. workerGroup.shutdownGracefully();
  41. eventExecutors.shutdownGracefully();
  42. }
  43. }
  44. }

这里详解各个编码器和解码器

  1. OneEncoding 为二次编码器,new LengthFieldPrepender(4)为发送的每一条数据前面添加一个固定4个字节表示为数据长度,解决粘包,黏包问题,
  2. MxResponseEncode为一次编码器,把MxResponse转化为ByteBuf交给一次编码器进行封装。
  3. OneDecode为一次解码器,把封装为自动长度4个字节的数据后面的数据读出来交给MxRequestDecode二次解码器。
  4. MxRequestDecode为二次解码器,把ByteBuf对象转化为MxRequest对象然后交给MxRpcRequest业务处理器处理。

这里给出一个流程图

数据写出就不画了很懒

MxRquest对象 客户端请求数据 服务端拿到数据后可以根据该信息去查找对应的方法然后调用封装给客户端

  1. java复制代码@Data
  2. @Builder
  3. public class MxRequest {
  4. //请求id
  5. private String requestId;
  6. //请求的类
  7. private String className;
  8. //请求的方法名
  9. private String methodName;
  10. //参数类型
  11. private Class<?>[] parameterTypes;
  12. //参数
  13. private Object[] parameters;
  14. }

MxReponse对象 响应对象

  1. java复制代码@Data
  2. @Builder
  3. public class MxResponse {
  4. private String requestId;
  5. private Object result;
  6. }

MxRpcResponseEncode 把MxResponse转化为ByteBuf

  1. java复制代码public class MxRpcResponseEncode extends MessageToByteEncoder<MxResponse> {
  2. @Override
  3. protected void encode(ChannelHandlerContext channelHandlerContext, MxResponse mxResponse, ByteBuf byteBuf) throws Exception {
  4. byteBuf.writeBytes(ProtostuffUtils.serialize(mxResponse));
  5. }
  6. }

MxRpcRequestDecode 把客户端发过来的ByteBuf转化为MxRequest对象给业务处理器去处理

  1. java复制代码public class MxRpcRequestDecode extends ByteToMessageDecoder {
  2. @Override
  3. protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) {
  4. byte[] bytes = new byte[byteBuf.readableBytes()];
  5. byteBuf.readBytes(bytes);
  6. MxRequest deserialize = ProtostuffUtils.deserialize(bytes, MxRequest.class);
  7. list.add(deserialize);
  8. }
  9. }

序列化采用的是Protostuff速度快也不用去用Proto生成的对象,不好用我就用封装的了。

业务处理handler

  1. java复制代码@ChannelHandler.Sharable
  2. public class MxRequestHandle extends SimpleChannelInboundHandler<MxRequest> {
  3. @Override
  4. protected void channelRead0(ChannelHandlerContext channelHandlerContext, MxRequest mxRequest) throws Exception {
  5. String requestId = mxRequest.getRequestId();
  6. String className = mxRequest.getClassName();
  7. Object[] parameters = mxRequest.getParameters();
  8. Class<?>[] parameterTypes = mxRequest.getParameterTypes();
  9. String methodName = mxRequest.getMethodName();
  10. //通过类名找到这个类
  11. Class bean = ServiceRegistrationScan.getBean(className);
  12. //拿到调用的方法
  13. Method method = bean.getMethod(methodName, parameterTypes);
  14. //调用方法
  15. Object invoke = method.invoke(bean, parameters);
  16. //封装返回值
  17. MxResponse build = MxResponse.builder().requestId(requestId).result(invoke).build();
  18. //写出
  19. channelHandlerContext.channel().writeAndFlush(build);
  20. }
  21. }

3.测试服务注册功能

可见该类已经注册到ZK中。 后续可带来客户端调试,大家也发现了这和dubbo的很像大概一致,后续带来dubbo源码解析。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/人工智能uu/article/detail/854830
推荐阅读
相关标签
  

闽ICP备14008679号