赞
踩
- js复制代码@Configuration
- public class NettyServerStart {
- @Autowired
- private ServiceRegistrationScan registrationScan;
-
- @Autowired
- private NettyServer nettyServer;
-
- @PostConstruct
- public void start() {
- //服务注册
- registrationScan.scan();
- //服务器启动
- nettyServer.start();
- }
- }

@PostConstruct注解的作用是在依赖注入完成后,执行被注解的方法。
深入ServiceRegistrationScan
- java复制代码@Component
- public class ServiceRegistrationScan implements ApplicationContextAware {
- @Autowired
- private ServerZKit serverZKit;
-
- @Autowired
- private RpcServerConfiguration rpcServerConfiguration;
-
- //把spring容器注入进来因为会用到它去拿我们需要暴露服务的类
- private static ApplicationContext context;
-
- public void scan() {
- //创建一个根节点
- serverZKit.createRootNode();
- //拿到所有实现这个接口的类
- Map<String, Object> beansWithAnnotation = context.getBeansWithAnnotation(MoXiuRpcServer.class);
- if (!beansWithAnnotation.isEmpty()) {
- //拿到当前服务器的IP地址
- String realIp = IpUtil.getRealIp();
- for (Object value : beansWithAnnotation.values()) {
- MoXiuRpcServer annotation = value.getClass().getAnnotation(MoXiuRpcServer.class);
- //拿到类似于这样的 com.moxiu.xxx
- Class aClass = annotation.interfaceName();
- String s = aClass.getName();
- //以这个接口创建一个结点
- serverZKit.createPersistentNode(s);
- String childName = s + "/" + realIp +":"+ rpcServerConfiguration.getRpcPort();
- //创建本机IP地址加端口
- serverZKit.createNode(childName);
- }
- }
- }
-
-
- @Override
- public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
- context = applicationContext;
- }
-
- public static <T> Class<T> getBean(String beanName) {
- return (Class<T>) context.getBean(beanName);
- }
-
- }

以上代码为注入spring容器拿到所有实现了@MoXiuRpcServer注解的实现类,把该类注册到ZK中。
这个接口是什么呢?
- java复制代码@Component
- @Documented
- @Target(ElementType.TYPE)
- @Retention(RetentionPolicy.RUNTIME)
- public @interface MoXiuRpcServer {
- Class interfaceName() default void.class; //实现类需要暴露的接口名字
- }
怎么使用呢
- java复制代码@MoXiuRpcServer(interfaceName = UserService.class)
- public class UserServiceImpl implements UserService {
-
- @Override
- public String getUser() {
- return null;
- }
- }
因为客户端需要根据接口生成代理对象,所以这里需要你指定他到底是那个接口的实现类。
Netty服务器采用主从Reactor模式及主Reactor只负责建立连接,从Reactor会负责到客户端数据的读写等业务处理。
- java复制代码@Component
- public class NettyServer implements ServerStart {
-
- @Autowired
- private RpcServerConfiguration rpcServerConfiguration;
-
- @Override
- public void start() {
- //主Reactor 只负责连接建立所以只要1个线程就行
- EventLoopGroup bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("boosGroup"));
- //从Reactor 负责连接数据读写 传0默认为当前cpu核数*2个线程
- EventLoopGroup workerGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("workerGroup"));
- //业务线程池 负责业务处理可以不要
- EventExecutorGroup eventExecutors = new UnorderedThreadPoolEventExecutor(8, new DefaultThreadFactory("businessGroup"));
- ServerBootstrap bootstrap = new ServerBootstrap();
- MxRequestHandle mxRequestHandle = new MxRequestHandle();
- bootstrap.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
- .option(ChannelOption.SO_BACKLOG, 1024)
- .option(ChannelOption.TCP_NODELAY, true)
- .option(ChannelOption.SO_KEEPALIVE, true)
- .childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel nioSocketChannel) throws Exception {
- ChannelPipeline pipeline = nioSocketChannel.pipeline();
- //编码
- pipeline.addLast("OneEncoding", new LengthFieldPrepender(4));
- pipeline.addLast("MxResponseEncode", new MxRpcResponseEncode());
- //解码
- pipeline.addLast("OneDecode", new LengthFieldBasedFrameDecoder(65536, 0, 4, 0, 4));
- pipeline.addLast("MxRequestDecode", new MxRpcRequestDecode());
- pipeline.addLast(eventExecutors, "MxRpcRequest", mxRequestHandle);
- }
- });
-
- try {
- ChannelFuture sync = bootstrap.bind(rpcServerConfiguration.getRpcPort()).sync();
- sync.channel().closeFuture().sync();
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- bossGroup.shutdownGracefully();
- workerGroup.shutdownGracefully();
- eventExecutors.shutdownGracefully();
- }
- }
- }

这里详解各个编码器和解码器
这里给出一个流程图
数据写出就不画了很懒
MxRquest对象 客户端请求数据 服务端拿到数据后可以根据该信息去查找对应的方法然后调用封装给客户端
- java复制代码@Data
- @Builder
- public class MxRequest {
- //请求id
- private String requestId;
- //请求的类
- private String className;
- //请求的方法名
- private String methodName;
- //参数类型
- private Class<?>[] parameterTypes;
- //参数
- private Object[] parameters;
-
- }
MxReponse对象 响应对象
- java复制代码@Data
- @Builder
- public class MxResponse {
- private String requestId;
- private Object result;
- }
MxRpcResponseEncode 把MxResponse转化为ByteBuf
- java复制代码public class MxRpcResponseEncode extends MessageToByteEncoder<MxResponse> {
-
- @Override
- protected void encode(ChannelHandlerContext channelHandlerContext, MxResponse mxResponse, ByteBuf byteBuf) throws Exception {
- byteBuf.writeBytes(ProtostuffUtils.serialize(mxResponse));
- }
- }
MxRpcRequestDecode 把客户端发过来的ByteBuf转化为MxRequest对象给业务处理器去处理
- java复制代码public class MxRpcRequestDecode extends ByteToMessageDecoder {
-
-
- @Override
- protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) {
- byte[] bytes = new byte[byteBuf.readableBytes()];
- byteBuf.readBytes(bytes);
- MxRequest deserialize = ProtostuffUtils.deserialize(bytes, MxRequest.class);
- list.add(deserialize);
- }
- }
序列化采用的是Protostuff速度快也不用去用Proto生成的对象,不好用我就用封装的了。
业务处理handler
- java复制代码@ChannelHandler.Sharable
- public class MxRequestHandle extends SimpleChannelInboundHandler<MxRequest> {
-
- @Override
- protected void channelRead0(ChannelHandlerContext channelHandlerContext, MxRequest mxRequest) throws Exception {
- String requestId = mxRequest.getRequestId();
- String className = mxRequest.getClassName();
- Object[] parameters = mxRequest.getParameters();
- Class<?>[] parameterTypes = mxRequest.getParameterTypes();
- String methodName = mxRequest.getMethodName();
- //通过类名找到这个类
- Class bean = ServiceRegistrationScan.getBean(className);
- //拿到调用的方法
- Method method = bean.getMethod(methodName, parameterTypes);
- //调用方法
- Object invoke = method.invoke(bean, parameters);
- //封装返回值
- MxResponse build = MxResponse.builder().requestId(requestId).result(invoke).build();
- //写出
- channelHandlerContext.channel().writeAndFlush(build);
- }
- }

可见该类已经注册到ZK中。 后续可带来客户端调试,大家也发现了这和dubbo的很像大概一致,后续带来dubbo源码解析。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。