赞
踩
本篇一句话总结:Java实现分布式通信,可以基于Java API、开源框架和远程通信技术三种方式实现。
正文开始:
通过上一篇文章《分布式专题(1)- 计算机网络》我们知道了计算机之间之所以能够进行通信的原理。如果对计算机网络还不是很清楚的同学可以点过去看一下。不过小兵写上一篇时写得比较细,得要花上一点时间才能看完,如果已经知道大概内容了的小伙伴,小兵这里推荐另一篇博客用于快速复习。传送门:《计算机之间是如何进行通信的?》
咱们中国武学讲究内功心法和招式变术。招式变术是千变万化的,而内功心法则稳定而绵延不绝。内功心法的深度决定了可以学习的招式变术的上限高度。学习编程亦如此道:具体的技术是招式变术,而计算机原理和操作系统就是内功心法。习得内功心法,才能更好地掌握各种高阶招式。看我们的虚竹同学,一开始时是什么招术都不会的,就因为有无崖子七十年的内力,所以在天山童姥稍微指点下,就能把追杀他的人全部反杀,一跃成为江湖顶尖高手之一,可见内功的重要性。我们做开发的知道,技术更新的频率是如此之快,上半年用的东西可能下半年就有新的技术取代了,我们永远不可能一直走在技术的最前沿。但如果我们的内功足够深厚,只要稍加观摩,那基本上也能把它的原理猜得七七八八,掌握它自然就水到渠成了。
好了,题外话不多说,正文开始。我们知道,所谓分布式,无非就是“将一个系统拆分成多个子系统并散布到不同设备”的过程而已。在微服务的大潮之中, 我们把系统拆分成了多个服务,根据需要部署在多个机器上,这些服务非常灵活,可以随着访问量弹性扩展。本质上而言,实现一个分布式系统,最核心的部分无非有两点:
分布式系统并非灵丹妙药,解决问题的关键还是看你对问题本身的了解。通常我们需要使用分布式的常见理由是:
本篇要讲的是分布式应用中解决“如何连接”的问题,即Java是如何实现系统间的通信的。先上一张总图:
上图中,我们看到图片左边的【网络通信】,是由协议和网络IO组成。协议如TCP/IP等在上一篇文章中已经介绍过,多出的Multicast(组播)此处也不再延伸介绍,有需要的同学另外自行了解即可。上一篇文章在介绍传输层的TCP协议时,已经提到了“TCP提供全双工通信,会话双方都可以同时接收和发送数据。都设有接收缓存和发送缓存,用来临时存放双向通信的数据”。发送缓存也就是写缓存,接收缓存也就是读缓存。在客户端与服务器经过三次握手建立连接后,在二者之间就相当于打开了一条可以互相传送数据的道路,道路的两端就是各自的读写缓存和我们所说的套接字Socket,每一个socket都有一个输出流和一个输入流。这种跨越网络的数据IO流,就是我们说的网络IO。然后可以看到网络IO还分为了BIO、NIO和AIO,这个我们可以先不管,后面我会再细说。所以TCP连接差不多就是下图这个样子。
在了解了Socket和网络IO的含义之后,我们看回第一张图的右边,可以看到Java实现系统间的通信方式有基于Java API、基于开源框架、基于远程通信技术等。下面,我们用Java代码来一起实现一下这几种方式。
Socket:socket本身并不是协议,它是应用层与TCP/IP协议族通信的中间软件抽象层,是一组调用接口(TCP/IP网络的API函数)。可以看做是对TCP/IP协议的封装,它把复杂的TCP/IP协议族隐藏在Socket接口后面,它的出现只是使得程序员更方便地使用TCP/IP协议栈而已。
java.net 包中的 API 包含有网络编程相关的类和接口。java.net 包中能够找到对TCP协议、UDP协议、Multicast协议的支持。我们仍以基于TCP协议的网络编程为例。
在编程开始前,我们再次简单回顾一下计算机网络中的传输层和TCP协议。
(在看API的具体实现之前,思考一个有意思的问题:如果是交给你去实现客户端与服务器的通信,你会设计多少个对象?如何设计它们的关系?如何做到面向对象设计?多看,多想,多换位思考,如果是你的话,你怎么处理,这是对提高自己水平很有裨益的事,无论是做人还是做事。)
官方文档提到:以下步骤在两台计算机之间使用套接字建立TCP连接时会出现:
- 服务器实例化一个 ServerSocket 对象,表示通过服务器上的端口通信。
- 服务器调用 ServerSocket 类的 accept() 方法,该方法将一直等待,直到客户端连接到服务器上给定的端口。
- 服务器正在等待时,一个客户端实例化一个 Socket 对象,指定服务器名称和端口号来请求连接。
- Socket 类的构造函数试图将客户端连接到指定的服务器和端口号。如果通信被建立,则在客户端创建一个 Socket 对象能够与服务器进行通信。
- 在服务器端,accept() 方法返回服务器上一个新的 socket 引用,该 socket 连接到客户端的 socket。
- 连接建立后,通过使用 I/O 流在进行通信,每一个socket都有一个输出流和一个输入流,客户端的输出流连接到服务器端的输入流,而客户端的输入流连接到服务器端的输出流。
上述流程有空就多看几遍,我们后面讲的所有通信都是基于上述流程。各种技术和框架不过是对这些流程不断封装、抽象、扩展而已,但是主流程仍是不变的。
现在,打开我们的IDEA或者Eclipse,按照API中的实现步骤,一起来实现下面的小目标。
(终于要回到我们熟悉的代码部分了,Code Time Begin !)
小目标:对基于Java API的网络编程有初步的了解。具体需求如下:
1)从客户端把“Hello, I am xxx. Here is Client.”这条消息传送给服务端;
2)从服务端读取该消息,并给客户端返回响应消息:“Hello, xxx, nice to meet you! Here is Server.”
我们可以按照以下步骤实现上述需求:
第一步:建项目
我们先新建一个项目distributed,再建一个名为mysocket的包。为了以后方便添加Jar包,我们建的是maven项目。
第二步:建服务端类
然后建一个服务端类:HelloServer,代码如下:
- package socket;
-
- import java.io.*;
- import java.net.*;
-
- public class HelloServer {
- // 选择一个端口作为服务端端口
- private static int port = 8888;
-
- public static void main(String[] args) throws Exception {
- // 创建ServerSocket对象,相当于在服务端(本机)打开一个监听
- ServerSocket serverSocket = new ServerSocket(port);
- System.out.println("开始监听端口:" + port + "...");
- // accept方法阻塞等待客户端连接,连接成功后得到socket对象
- Socket socket = serverSocket.accept();
- // 获取服务端socket的输入流,客户端通过这个输入流给服务端传递消息
- DataInputStream in = new DataInputStream(socket.getInputStream());
- // 通过服务端socket的输入流,输出客户端发送过来的消息
- System.out.println("客户端消息:"+in.readUTF());
- // 获取服务端socket的输出流,服务端端通过这个输出流给客户端传递消息
- DataOutputStream out = new DataOutputStream(socket.getOutputStream());
- // 通过服务端socket的输出流,给客户端发送消息
- out.writeUTF("Hello,jvxb, nice to meet you!Here is Server。");
- // 关闭服务端socket。
- socket.close();
- // 关闭监听
- serverSocket.close();
- }
-
- }

第三步:建客户端类
然后建一个客户端类:HelloClient,代码如下:
- package socket;
-
- import java.io.*;
- import java.net.*;
-
- public class HelloClient {
-
- // 需连接的服务端IP或域名,此例中本机即为服务端。一般都是通过配置文件来设置。
- private static String serverName = "127.0.0.1";
- // 需连接的服务端端口
- private static int port = 8888;
-
- public static void main(String[] args) throws Exception {
- // 通过指定服务端IP、服务端端口,连接到服务端,连接成功后获得客户端socket.
- Socket clientSocket = new Socket(serverName, port);
- // 通过客户端socket,获得客户端输出流。
- DataOutputStream out = new DataOutputStream(clientSocket.getOutputStream());
- // 通过客户端输出流,向服务端发送消息。
- out.writeUTF("Hello,I am jvxb! Here is Client.");
- // 通过客户端输出流,读取服务端发送过来的消息。
- DataInputStream in = new DataInputStream(clientSocket.getInputStream());
- // 输出服务端发送过来的消息
- System.out.println("服务器响应: " + in.readUTF());
- // 关闭客户端socket
- clientSocket.close();
- }
- }

第四步:测试
1)运行服务端类
2)运行客户端类
3)查看输出结果
可以看到结果如下:

通过以上的例子我们可以看到,只需要简单的几行Java代码,通过Java API我们就能够实现基于TCP协议的客户端/服务端通信。同理,通过DatagramSocket对象也能很快速地实现基于UDP协议的客户端/服务端通信,此处不再展开。当然,我们上面举的例子只是最基础的。一般来说服务端不会只与一个客户端连接,服务端需要监听多个客户端连接的话,就得让accept()方法在while中持续循环,所以服务端的代码一般都是配合多线程来使用,传统做法是一个客户端连接过来就开一个线程去单独处理,这种处理是比较简单容易实现,但很明显客户端连接一多,性能方面就跟不上了,因为光是线程的切换开销就挺大的,更不用说每个线程都会占用挺大的资源。那要怎么解决性能的问题呢?功力深厚者,可以自己去设计出自己的一套东西去解决,像小兵我这种水平未到家的,我觉得用人家东西也挺不错的。。比如我们可以直接使用Netty框架。
思考:如何利用socket传输对象?
可以看到有两个比较著名的网络通讯框架Mina和Netty。这两个开源框架都能提供高性能IO,两个框架也都是韩国人Trustin Lee的大作,Netty比Mina新,且性能更好,Mina据说已经停止维护了,所以咱们还是重点关注Netty。Netty是目前最流行的Java开源NIO框架,连咱们淘宝的Dubbo框架都用到了Netty,能抗得住亿万级流量的冲击,证明这个框架底色还是很足的,所以我们还是有必要学习一下Netty。看看同样是基于TCP协议、网络IO,相比核心 Java API它怎么就有着更好的吞吐量、较低的延时,资源消耗居然还更少,为什么Netty同学就能做到如此优秀?
在学习Netty之前,我们首先要了解几个概念,即BIO、NIO、AIO,及序列化。
同步:如果有多个任务或者事件要发生,这些任务或者事件必须逐个地进行,一个事件或者任务的执行会导致整个流程的暂时等待,这些事件没有办法并发地执行;
异步:如果有多个任务或者事件发生,这些事件可以并发地执行,一个事件或者任务的执行不会导致整个流程的暂时等待。
举个简单的例子,假如有一个任务包括两个子任务A和B,对于同步来说,当A在执行的过程中,B只有等待,直至A执行完毕,B才能执行;而对于异步就是A和B可以并发地执行,B不必等待A执行完毕之后再执行,这样就不会由于A的执行导致整个任务的暂时等待。
阻塞:当某个事件或者任务在执行过程中,它发出一个请求操作,但是由于该请求操作需要的条件不满足,那么就会一直在那等待,直至条件满足;
非阻塞:当某个事件或者任务在执行过程中,它发出一个请求操作,如果该请求操作需要的条件不满足,会立即返回一个标志信息告知条件不满足,不会一直在那等待。
这就是阻塞和非阻塞的区别。也就是说阻塞和非阻塞的区别关键在于当发出请求一个操作时,如果条件不满足,是会一直等待还是返回一个标志信息。举个简单的例子:假如我要读取一个文件中的内容,如果此时文件中没有内容可读,对于阻塞来说就是会一直在那等待,直至文件中有内容可读;而对于非阻塞来说,就会直接返回一个标志信息告知文件中暂时无内容可读。
同步和异步着重点在于多个任务的执行过程中,一个任务的执行是否会导致整个流程的暂时等待;
而阻塞和非阻塞着重点在于发出一个请求操作时,如果进行操作的条件不满足是否会返会一个标志信息告知条件不满足。
服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,处理完成后返回应答给客户端,也就是经典的请求-应答通信模型。但是随着客户端并发量上升,服务端的线程数膨胀,系统性能急剧下降,最终会导致系统不可用。这种模型无法满足高并发,高性能的场景。

既然BIO无法适应高并发情景,那么我们自然要想出一种方法来解决这种问题,那么NIO就出现了。
NIO中使用多路复用器Selector负责管理与客户端建立的多个连接,并监听注册到上面的一些事件,如有新连接接入、当前连接上有可读消息或可写消息。一旦事件被其监听到,就会调用对应的事件处理器来完成对事件的响应。

可以看出:NIO = Selector + Buffer + Channel
NIO的细节就不多讲了,这里只介绍下三件套:
总结起来就是:Selector线程就类似一个管理者(Master),管理了成千上万个管道,然后轮询哪个管道的数据已经准备好,通知CPU执行IO的读取或写入操作。
终于把NIO的相关概念讲完了!感觉再讲下去小兵都要变成老兵了 - -、
根据NIO的相关概念,可得编码实现NIO通信时将有如下步骤:
- 打开ServerSocketChannel,监听客户端连接
- 绑定监听端口,设置连接为非阻塞模式
- 创建Reactor线程,创建多路复用器并启动线程
- 将ServerSocketChannel注册到Reactor线程中的Selector上,监听ACCEPT事件
- Selector轮询准备就绪的key
- Selector监听到新的客户端接入,处理新的接入请求,完成TCP三次握手,建立物理链路
- 设置客户端链路为非阻塞模式
- 将新接入的客户端连接注册到Reactor线程的Selector上,监听读操作,读取客户端发送的网络消息
- 读取客户端消息到缓冲区,获取消息
- 将应答消息编码为Buffer,调用SocketChannel的write将消息异步发送给客户端
- 读取服务端消息到缓冲区,获取消息
现在,打开我们的IDEA或者Eclipse,一起来实现下面的小目标。
(终于要回到我们熟悉的代码部分了,Code Time Begin !)
小目标:对基于NIO的网络编程有初步的了解。具体需求如下:
1)从客户端把“Hello, I am xxx. Here is Nio Client.”这条消息传送给服务端;
2)从服务端读取该消息,并给客户端返回响应消息:“Hello, xxx, nice to meet you! Here is Nio Server.”
我们可以按照以下步骤实现上述需求:
第一步:建项目
我们仍旧使用之前建的项目distributed,新建一个名为mynio的包。为了以后方便添加Jar包,我们的项目是maven项目。
第二步:建服务端类
然后建一个服务端类:HelloNioServer,代码如下:
- package mynio;
-
- import java.net.InetSocketAddress;
- import java.nio.ByteBuffer;
- import java.nio.channels.SelectionKey;
- import java.nio.channels.Selector;
- import java.nio.channels.ServerSocketChannel;
- import java.nio.channels.SocketChannel;
- import java.util.Iterator;
-
- public class HelloNioServer {
- // 选择一个端口作为服务端端口
- private static int port = 8888;
-
- public static void main(String[] args) throws Exception {
-
- //1.打开ServerSocketChannel,用于监听客户端连接通道,
- ServerSocketChannel ssChannel = ServerSocketChannel.open();
- //2.连接通道为非阻塞模式
- ssChannel.configureBlocking(false);
- //3.绑定监听端口
- ssChannel.socket().bind(new InetSocketAddress(port));
- //4.获取选择器,即创建多路复用器
- Selector selector = Selector.open();
- //5.将通道注册到选择器上,并指定监听事件。监控是接收状态,可以监听多个状态
- ssChannel.register(selector, SelectionKey.OP_ACCEPT);
- //6.轮询获取选择器上已经准备就绪的事件,
- while (true) {
- //这是一个阻塞方法,基于内核实现,会一直等待直到有数据可读,返回值是key的数量(可以有多个)
- System.out.println("服务端Selector等待注册事件发生...");
- selector.select();
-
- //获取当前选择器中所有注册的选择键
- Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
- while (iterator.hasNext()) {
- //获取准备就绪的事件
- SelectionKey key = iterator.next();
- // 删除已选的key,以防重复处理
- iterator.remove();
-
- //判断具体是什么事件,来进行相应处理
- if (key.isAcceptable()) {
- System.out.println("【服务端Acceptable事件发生:与客户端完成连接。】");
- //若接收就绪,则获取客户端连接
- SocketChannel channel = ssChannel.accept();
- //切换非阻塞模式
- channel.configureBlocking(false);
- //将该通道的读事件注册到选择器上
- channel.register(selector, SelectionKey.OP_READ);
- }
- if (key.isReadable()) {
- System.out.println("【服务端Readable事件发生:接收到客户端数据。】");
- //获取当前选择器上读就绪状态的通道
- SocketChannel channel = (SocketChannel) key.channel();
- //通过通道和缓冲区,获取客户端发送过来的数据
- ByteBuffer buffer = ByteBuffer.allocate(1024);
- channel.read(buffer);
- String msg = new String(buffer.array()).trim();
- System.out.println("服务器收到客户端消息:" + msg);
- //通过通道和缓冲区,向客户端返回数据
- String sendMsg = "Hello, jvxb, nice to meet you! Here is Nio Server.";
- channel.write(ByteBuffer.wrap(sendMsg.getBytes()));
- }
-
- }
-
- }
- }
- }

第三步:建客户端类
然后建一个客户端类:HelloNioClient,代码如下:
- package mynio;
-
- import java.net.InetSocketAddress;
- import java.nio.ByteBuffer;
- import java.nio.channels.SelectionKey;
- import java.nio.channels.Selector;
- import java.nio.channels.SocketChannel;
- import java.util.Iterator;
-
- public class HelloNioClient {
- // 需连接的服务端IP或域名,此例中服务器即为本机。
- private static String serverName = "127.0.0.1";
- // 需连接的服务端端口
- private static int port = 8888;
-
- public static void main(String[] args) throws Exception {
-
- //1.打开SocketChannel,用于与服务器建立连接通道
- SocketChannel schannel = SocketChannel.open();
- //2.连接通道设置为非阻塞模式
- schannel.configureBlocking(false);
- //3.根据目标服务器地址和端口与目标服务器建立连接。
- schannel.connect(new InetSocketAddress(serverName, port));
- //4.获取选择器,即创建客户端的多路复用器
- Selector selector = Selector.open();
- //5.将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_CONNECT事件。
- schannel.register(selector, SelectionKey.OP_CONNECT);
- //6.轮询获取选择器上已经准备就绪的事件,
- while (true) {
- //这是一个阻塞方法,基于内核实现,会一直等待直到有数据可读,返回值是key的数量(可以有多个)
- System.out.println("客户端Selector等待注册事件发生...");
- selector.select();
-
- //获取当前选择器中所有注册的选择键
- Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
- while (iterator.hasNext()) {
- //获取准备就绪的事件
- SelectionKey key = iterator.next();
- // 删除已选的key,以防重复处理
- iterator.remove();
-
- //判断具体是什么事件,来进行相应处理
- if (key.isConnectable()) {
- SocketChannel channel = (SocketChannel) key.channel();
- // 如果正在连接,则完成连接
- if (channel.isConnectionPending()) {
- channel.finishConnect();
- }
- System.out.println("【客户端Connectable事件发生:与服务端完成连接。】");
- // 设置成非阻塞
- channel.configureBlocking(false);
-
- //连接上服务端时给服务端发送信息
- String sendMsg = "Hello, I am jvxb. Here is Nio Client.";
- channel.write(ByteBuffer.wrap(sendMsg.getBytes()));
- //在和服务端连接成功之后,为了可以接收到服务端的信息,需要给通道设置读的权限。
- channel.register(selector, SelectionKey.OP_READ);
-
- // 获得了可读的事件
- } else if (key.isReadable()) {
- System.out.println("【客户端Readable事件发生:接收到服务端数据。】");
- //获取当前选择器上读就绪状态的通道
- SocketChannel channel = (SocketChannel) key.channel();
- //通过通道和缓冲区,获取服务端返回过来的数据
- ByteBuffer buffer = ByteBuffer.allocate(1024);
- channel.read(buffer);
- String msg = new String(buffer.array()).trim();
- System.out.println("客户端收到服务器消息:" + msg);
- }
-
- }
-
- }
- }
-
- }

第四步:测试
1)运行服务端类
2)运行客户端类
3)查看输出结果
可以看到结果如下:

什么是序列化?
我们知道,Java是面向对象的语言,对象中会有各种属性,在一次程序运行中,我们很容易通过getXxx()方法获取到这些属性的值。但是如果不做处理,当对象被垃圾回收销毁或下一次程序运行时我们无法再次获取到它的对象状态,那如何保存这些对象状态呢?方法很多,比如将值保存到数据库之类的,即使用时再从数据库中读取并set值给对象。Java给我们提供的一种比较好的保存对象状态的机制,那就是序列化。所以简单说序列化就是为了保存在内存中的各种对象的状态,并且可以把保存的对象状态再读出来。对象序列化是一个用于将对象状态转换为字节流的过程,可以将其保存到磁盘文件中或通过网络发送到任何其他程序;从字节流创建对象的相反的过程称为反序列化。
什么情况下需要序列化?
如何实现序列化?
对象实现Java提供的Serializeable接口,对象即可以进行序列化。下面我们直接看在Java代码中是怎么实现的。
1)我们新建一个User对象,有username、password两个属性,对外提供set/get方法和重写toString()方法。
- package myserialize;
-
- import java.io.Serializable;
-
- public class User implements Serializable {
- private String username;
- private String password;
-
- public String getUsername() {
- return username;
- }
-
- public void setUsername(String username) {
- this.username = username;
- }
-
- public String getPassword() {
- return password;
- }
-
- public void setPassword(String password) {
- this.password = password;
- }
-
- @Override
- public String toString() {
- return "User[username : " + username + ", password : " + password + "]";
- }
- }

2)将User对象序列化和反序列化
- package myserialize;
-
- import java.io.FileInputStream;
- import java.io.FileOutputStream;
- import java.io.ObjectInputStream;
- import java.io.ObjectOutputStream;
-
- public class MySerializeTest {
- public static void main(String[] args) throws Exception {
- //序列化测试,保存对象到磁盘
- serializeTest();
- //反序列化测试,从磁盘读取对象
- deSerializeTest();
- }
-
- //序列化测试
- private static void serializeTest() throws Exception {
- //初始化对象
- User user = new User();
- user.setUsername("jvxb");
- user.setPassword("123456");
- //保存对象到磁盘。此处我们保留到桌面,方便查看和删除
- FileOutputStream fs = new FileOutputStream("C:\\Users\\ASUS\\Desktop\\user.ser");
- ObjectOutputStream os = new ObjectOutputStream(fs);
- os.writeObject(user);
- os.close();
- System.out.println("对象序列化成功:" + user.toString());
- }
-
- //反序列化
- private static void deSerializeTest() throws Exception {
- //从磁盘读取对象。之前我们保留到了桌面。
- FileInputStream fis = new FileInputStream("C:\\Users\\ASUS\\Desktop\\user.ser");
- ObjectInputStream os = new ObjectInputStream(fis);
- User user = (User) os.readObject();
- os.close();
- System.out.println("对象反序列化成功:" + user.toString());
- }
-
- }

3)测试并查看结果
可以看到在桌面我们看到了一个user.ser文件,它是一个字节流文件,有一些乱码,里面的内容如下:

然后在我们的控制台,输出如下,表示序列化和反序列化成功:

练习题:利用socket编程,传输对象。
常用的序列化方式有什么?
Java提供原生的序列化方式存在耗时较长、文件过大导致传输效率比较低,且不能跨语言对接的缺点。为了实现跨语言对接,Java对象常用的跨语言序列化反序列化主要有三种:一是xml形式;二是json形式;三是二进制字节流形式。Json形式主要有JackJson(Spring默认),FastJson(阿里开发),Gson(谷歌)等技术,二进制字节流形式有Hessian(caucho公司开发)ProtoBuf(谷歌开发,Netty使用)等。本篇介绍序列化主要是对象在网络上传输的情景,比如通过套接字socket传输对象,所以此处介绍的是以Json的形式序列化。如FastJson中:
通过String objJson = JSON.toJSONString(Object object); 可以实现序列化,
通过Object object = JSON.parse(objJson); 可以实现反序列化。
序列化的变迁过程:由于Java原生的序列化不能跨语言对接,以至于在后来的很长一段时间,基于XML格式编码的对象序列化机制成为了主流,一方面解决了多语言兼容问题,另一方面比二进制的序列化方式更容易理解。以至于基于XML的SOAP协议及对应的WebService框架在很长一段时间内成为各个主流开发语言的必备的技术。
再到后来,基于JSON的简单文本格式编码的HTTP REST接口又基本上取代了复杂的Web Service接口,成为分布式架构中远程通信的首要选择。但是JSON序列化存储占用的空间大、性能低等问题,同时移动客户端应用需要更高效的传输数据来提升用户体验。在这种情况下与语言无关并且高效的二进制编码协议就成为了大家追求的热点技术之一。首先诞生的一个开源的二进制序列化框架-MessagePack。它比google的Protocol Buffers出现得还要早。
关于序列化的总结?
1、在java中,只要一个类实现了java.io.Serializable接口,那么它就可以被序列化
2、通过ObjectOutputStream和ObjectInputStream将对象进行序列化和反序列化
3、对象是否允许被反序列化,不仅仅是取决于对象的代码是否一致,同时还有一个重要的因素(serialVersionUID)
4、要将从父类继承的属性序列化,那么父类也必须实现Serializable接口
5、静态变量不会被序列化
6、transient关键字,表示变量将不被序列化处理,如用 transient 修饰密码属性,在反序列化后该属性值为初始值null。
7、通过序列化操作可实现深度克隆
终于说到Netty了,我们先看一下百度百科上对Netty的描述:
Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。
也就是说,Netty 是一个基于NIO的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。
“快速”和“简单”并不用产生维护性或性能上的问题。Netty 是一个吸收了多种协议(包括FTP、SMTP、HTTP等各种二进制文本协议)的实现经验,并经过相当精心设计的项目。最终,Netty 成功的找到了一种方式,在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性。
据说Netty的源码写得相当棒,但本文不打算深入Netty的源码去解读,这里只讲一下Netty的基本使用,需要深入探究的童鞋参考其它资料资料即可。下面我们一起来用Netty来实现下面的小目标。
(终于要回到我们熟悉的代码部分了,Code Time Begin !)
小目标:对基于Netty的网络编程有初步的了解。具体需求如下:
1)从客户端把“Hello, I am xxx. Here is Client.”这条消息传送给服务端;
2)从服务端读取该消息,并给客户端返回响应消息:“Hello, xxx, nice to meet you! Here is Server.”
我们可以按照以下步骤实现上述需求:
第一步:建项目
我们仍旧使用之前建的项目distributed,新建一个名为mynetty的包。maven中引入netty的依赖(此处使用netty4.1.6)。
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-all</artifactId>
- <version>4.1.6.Final</version>
- </dependency>
第二步:建服务端类
然后建一个服务端类:HelloNettyServer,代码如下:
- package mynetty;
-
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.*;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- import io.netty.util.CharsetUtil;
-
- public class HelloNettyServer {
- // 选择一个端口作为服务端端口
- private static int port = 8888;
-
- public static void main(String[] args) throws Exception {
-
- //1. 创建一个线程组:接收客户端连接
- EventLoopGroup bossGroup = new NioEventLoopGroup();
- //2. 创建一个线程组:处理网络操作
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- //3. 创建服务器端类,来配置参数
- ServerBootstrap serverBootstrap = new ServerBootstrap();
- serverBootstrap.group(bossGroup, workerGroup) //4.设置两个线程组
- .channel(NioServerSocketChannel.class) //5.使用NioServerSocketChannel作为服务器端通道的实现
- .option(ChannelOption.SO_BACKLOG, 128) //6.设置线程队列中等待连接的个数
- .childOption(ChannelOption.SO_KEEPALIVE, true) //7.保持活动连接状态
- .childHandler(new ChannelInitializer<SocketChannel>() { //8. 创建一个通道初始化对象
- public void initChannel(SocketChannel sc) { //9. 往Pipeline链中添加自定义的handler类
- sc.pipeline().addLast(new NettyServerHandler());
- }
- });
-
- ChannelFuture cf = serverBootstrap.bind(port).sync(); //10. 绑定端口 bind方法是异步的 sync方法是同步阻塞的
- System.out.println("服务端开始监听端口:" + port + "...");
-
- //11. 关闭通道,关闭线程组
- cf.channel().closeFuture().sync(); //异步
- bossGroup.shutdownGracefully();
- workerGroup.shutdownGracefully();
- }
-
- }
-
- class NettyServerHandler extends ChannelInboundHandlerAdapter {
-
- //读取数据事件
- public void channelRead(ChannelHandlerContext ctx, Object msg) {
- System.out.println("Server:" + ctx);
- ByteBuf buf = (ByteBuf) msg;
- System.out.println("客户端发来的消息:" + buf.toString(CharsetUtil.UTF_8));
- }
-
- //数据读取完毕事件
- public void channelReadComplete(ChannelHandlerContext ctx) {
- ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,jvxb, nice to meet you!Here is Server。", CharsetUtil.UTF_8));
- }
-
- //异常发生事件
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) {
- ctx.close();
- }
-
-
- }

第三步:建客户端类
然后建一个客户端类:HelloNettyClient,代码如下:
- package mynetty;
-
- import io.netty.bootstrap.Bootstrap;
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.*;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioSocketChannel;
- import io.netty.util.CharsetUtil;
-
- public class HelloNettyClient {
- // 需连接的服务端IP或域名
- private static String serverName = "127.0.0.1";
- // 需连接的服务端端口
- private static int port = 8888;
-
- public static void main(String[] args) throws Exception {
-
- //1. 创建一个线程组
- EventLoopGroup group = new NioEventLoopGroup();
- //2. 创建客户端的启动类,完成相关配置
- Bootstrap bootstrap = new Bootstrap();
- bootstrap.group(group) //3. 设置线程组
- .channel(NioSocketChannel.class) //4. 设置客户端通道的实现类
- .handler(new ChannelInitializer<SocketChannel>() { //5. 创建一个通道初始化对象
- @Override
- protected void initChannel(SocketChannel socketChannel) throws Exception {
- socketChannel.pipeline().addLast(new NettyClientHandler()); //6.往Pipeline链中添加自定义的handler
- }
- });
-
- //7.启动客户端去连接服务器端 connect方法是异步的 sync方法是同步阻塞的
- ChannelFuture cf = bootstrap.connect(serverName, port).sync();
- System.out.println("客户端连接服务器成功...");
-
- //8.关闭连接(异步非阻塞)
- cf.channel().closeFuture().sync();
-
- }
- }
-
- class NettyClientHandler extends ChannelInboundHandlerAdapter {
-
- //通道就绪事件
- public void channelActive(ChannelHandlerContext ctx) {
- System.out.println("Client:" + ctx);
- ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,I am jvxb! Here is Client.", CharsetUtil.UTF_8));
- }
-
- //读取数据事件
- public void channelRead(ChannelHandlerContext ctx, Object msg) {
- ByteBuf buf = (ByteBuf) msg;
- System.out.println("服务器端发来的消息:" + buf.toString(CharsetUtil.UTF_8));
- }
-
- }

第四步:测试
1)运行服务端类
2)运行客户端类
3)查看输出结果
可以看到结果如下:

根据上面Netty的实例我们可以看到,服务端和客户端代码的主要逻辑都是,通过创建启动类(Bootstrap、ServerBootstrap)来配置相关参数,如配置线程组(EventLoopGroup)、配置通道的类型(NioSocketChannel、NioServerSocketChannel)等,且在配置中可以指定事件的处理类Handler(ChannelInboundHandler的实现类),当channel中发生事件后相关逻辑都在对应的Handler中处理。Netty的编程逻辑是很清晰的,除了Netty本身提供的配置类外,我们也可以很方便地根据自己的需求配置自己的实现类来使用,扩展起来也很方便。
下述引用自链接:Netty工作原理架构图 - 简书
server端工作原理如下图:
server端启动时绑定本地某个端口,将自己的NioServerSocketChannel注册到某个boss NioEventLoop的selector上。
server端包含1个boss NioEventLoopGroup和1个worker NioEventLoopGroup,NioEventLoopGroup相当于1个事件循环组,这个组里包含多个事件循环NioEventLoop,每个NioEventLoop包含1个selector和1个事件循环线程。
每个boss NioEventLoop循环执行的任务包含3步:
每个worker NioEventLoop循环执行的任务包含3步:
client端工作原理如下图:
client端启动时connect到server,建立NioSocketChannel,并注册到某个NioEventLoop的selector上。
client端只包含1个NioEventLoopGroup,每个NioEventLoop循环执行的任务包含3步:
最后小结一下Netty:
对于分布式应用中解决“如何连接”的问题,在上面我们讲了两种方式,一种是基于Java API,一种是基于开源框架。但是基于Java API的socket编程实在是太底层了,尤其是服务器端,又要处理连接,又得能够处理高并发的访问请求,没有强悍的、一流的编程能力根本无法驾驭,根本做不到高并发情况下的可靠和高效。接着我们讲到了基于开源框架实现系统通信,讲到了Netty。知道了Netty能够快速开发高性能、高可靠性的网络服务器和客户端程序。上面两种方式都能够实现系统间的通信,但总感觉还是有些麻烦,很多时候我们是不需要了解底层网络技术的,有什么办法能让我们使用分布式系统中的各个服务就像本地调用一个普通的方法那样简便呢?这样开发起来多方便快捷啊!基于以上的需求和想法,于是有了RPC的概念。
RPC(Remote Procedure Call):远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的思想。
RPC是一种思想,不是指某种具体的技术。 它的主要功能目标是让构建分布式计算(应用)更容易,在提供强大的远程调用能力时不损失本地调用的语义简洁性。为实现该目标,RPC 框架需提供一种透明调用机制让使用者不必显式的区分本地调用和远程调用。常见 RPC 技术和框架有:RMI、Spring RMI、WebService、HTTP、Hessian、Dubbo等。
本篇只介绍一下RMI和Hessian的使用,Dubbo阿里集团开源的一个极为出名的 RPC 框架,在很多互联网公司和企业应用中广泛使用,在后续文章中会比较详细地介绍,其它的一些技术此处不再展开,可自行了解。
RMI(remote method invocation,远程方法调用) , 可以认为是RPC的Java版本。RMI使用的是JRMP(Java Remote Messageing Protocol), JRMP是专门为Java定制的通信协议,所以它是纯Java的分布式解决方案。
现在,打开我们的IDEA或者Eclipse,一起来实现下面的小目标。
(终于要回到我们熟悉的代码部分了,Code Time Begin !)
小目标:对基于远程通信技术(RMI技术)的网络编程有初步的了解。具体需求如下:
1)从客户端把“Hello, I am xxx. Here is RMI Client.”这条消息传送给服务端;
2)从服务端读取该消息,并给客户端返回响应消息:“Hello, xxx, nice to meet you! Here is RMI Server.”
我们可以按照以下步骤实现上述需求:
第一步:创建远程接口, 并且继承java.rmi.Remote接口
- package myrmi;
-
- import java.rmi.Remote;
- import java.rmi.RemoteException;
-
- public interface IHelloRmiService extends Remote {
- /**
- * 接收请求消息,并响应。
- */
- String helloRmi(String requestMsg) throws RemoteException;
- }
第二步:实现远程接口,并且继承:UnicastRemoteObject
- package myrmi;
-
- import java.rmi.RemoteException;
- import java.rmi.server.UnicastRemoteObject;
-
- public class HelloRmiServiceImpl extends UnicastRemoteObject implements IHelloRmiService {
-
- private static final long serialVersionUID = 1L;
-
- public HelloRmiServiceImpl() throws RemoteException {
- }
-
- public String helloRmi(String requestMsg) {
- System.out.println("服务端收到客户端消息:" + requestMsg);
- String responseMsg = "Hello, jvxb, nice to meet you! Here is RMI Server.";
- return responseMsg;
- }
- }

第三步:创建服务器程序: LocateRegistry.createRegistry方法注册远程对象
- package myrmi;
-
- import java.rmi.Naming;
- import java.rmi.registry.LocateRegistry;
-
- public class HelloRmiServer {
- //客户端调用远程对象,注意RMI路径与接口必须与server配置一致
- private static String RMI_NAME = "rmi://127.0.0.1:8888/helloRmiService";
-
- public static void main(String[] args) throws Exception {
- IHelloRmiService helloRmiService = new HelloRmiServiceImpl();
- //注冊通讯端口
- LocateRegistry.createRegistry(8888);
- //注冊通讯路径
- Naming.rebind(RMI_NAME, helloRmiService);
- System.out.println("服务端启动8888端口监听...");
- }
-
- }

第四步:创建客户端程序,通过Naming.lookup方法使用远程对象
- package myrmi;
-
- import java.rmi.Naming;
-
- public class HelloRmiClient {
- //客户端调用远程对象,注意RMI路径与接口必须与server配置一致
- private static String RMI_NAME = "rmi://127.0.0.1:8888/helloRmiService";
-
- public static void main(String[] args) throws Exception {
- //调用远程对象,注意RMI路径与接口必须与server配置一致
- IHelloRmiService helloRmiService = (IHelloRmiService) Naming.lookup(RMI_NAME);
- String responseMsg = helloRmiService.helloRmi("Hello, I am jvxb. Here is RMI Client.");
- System.out.println("客户端收到服务端消息:" + responseMsg);
- }
-
- }

测试并查看结果:

(RMI的本质就是实现在方法在不同JVM之间的调用,通过在两个JVM中各开一个Stub(客户端)和Skeleton(服务端),二者通过socket通信来实现参数和返回值的传递。)
RMI其它的点就不多讲了,了解一下即可。实际上用的也不太多。
Hessian是一个轻量级的RPC框架,它基于HTTP协议传输,使用Hessian二进制序列化,对于数据包比较大的情况比较友好。Hessian能够简单、快捷地提供RMI的功能。下面看它的工作原理和编程实例。
现在,打开我们的IDEA或者Eclipse,一起来实现下面的小目标。
(终于要回到我们熟悉的代码部分了,Code Time Begin !)
小目标:对基于远程通信技术(Hessian技术)的网络编程有初步的了解。具体需求如下:
1)从客户端把“Hello, I am xxx. Here is Hessian Client.”这条消息传送给服务端;
2)从服务端读取该消息,并给客户端返回响应消息:“Hello, xxx, nice to meet you! Here is Hessian Server.”
我们可以按照以下步骤实现上述需求:
第一步:下载hessian的依赖jar包
- <dependency>
- <groupId>com.caucho</groupId>
- <artifactId>hessian</artifactId>
- <version>4.0.38</version>
- </dependency>
第二步:服务端的配置和服务类的编写
- package com.jvxb.test.basePro.controller;
-
- public interface HelloHessianService {
- String helloHessian(String requestMsg);
- }
- package com.jvxb.test.basePro.controller;
-
- import org.springframework.stereotype.Service;
-
- @Service("HelloHessianService")
- public class HelloHessianServiceImpl implements HelloHessianService {
- @Override
- public String helloHessian(String requestMsg) {
- System.out.println("服务端收到客户端信息:" + requestMsg);
- return "Hello, jvxb, nice to meet you! Here is Hessian Server.";
- }
- }
- package com.jvxb.test.basePro.controller;
-
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.remoting.caucho.HessianServiceExporter;
-
- @Configuration
- public class HessianServerConfig {
-
- @Autowired
- private HelloHessianService helloHessianService;
-
- //发布服务
- @Bean(name = "/HelloHessianService")
- public HessianServiceExporter accountService() {
- HessianServiceExporter exporter = new HessianServiceExporter();
- exporter.setService(helloHessianService);
- exporter.setServiceInterface(HelloHessianService.class);
- return exporter;
- }
- }

第三步:客户端的调用
- package com.jvxb.test.basePro2.controller;
-
- public interface HelloHessianService {
- String helloHessian(String requestMsg);
- }
- package com.jvxb.test.basePro2.controller;
-
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.remoting.caucho.HessianProxyFactoryBean;
-
- @Configuration
- public class HessianClientConfig {
-
- @Bean
- public HessianProxyFactoryBean helloClient() {
- HessianProxyFactoryBean factory = new HessianProxyFactoryBean();
- factory.setServiceUrl("http://localhost:8080/HelloHessianService");
- factory.setServiceInterface(HelloHessianService.class);
- return factory;
- }
- }

因为我测试时两个项目在同一台机器上,所以我的服务端端口是8080,客户端端口是8081。
第四步:测试及查看结果
为了方便测试及查看结果,我们在客户端添加一个方法进行验证。
- @RestController
- public class TestController {
- @Autowired
- private HelloHessianService helloHessianService;
-
- @RequestMapping("test")
- public Object test() {
- System.out.println(helloHessianService.helloHessian("Hello, I am jvxb. Here is Hessian Client."));
- return "hessiaan call success";
- }
-
- }
然后1)运行服务端,2)运行客户端,3)访问客户端的test接口,即访问 localhost:8081/test

可以看到Hessian使用起来是非常的方便的,根本不用多少代码。只需要在服务端通过HessianServiceExporter将需要提供的服务类发布,然后在客户端配置相同的接口,然后由HessianProxyFactoryBean对象将该接口指定到服务端的服务地址,就能够通过服务端来具体实现客户端的服务接口,很快速地提供了RMI功能。
还是看回我们这张图:
针对分布式应用中系统拆分后“如何连接”的问题,这篇文章详细写了三种基于Java实现系统间的通信的方式。即基于Java API、基于开源框架和基于远程通信技术。对这三种方式,本篇都做出了基本介绍并给出socket编程、Nio编程、Netty编程、RMI编程、Hession编程这五种技术的编程示例。希望通过这些介绍和描述,能让你对分布式通信有多一点的了解。如果你能有一丝收获,那不妨给小兵点个赞,也不枉小兵辛苦凑出这篇6万字的文了。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。