Java NIO 实现网络通信
954 total views, 3 views today
Java NIO 的相关资料很多,对 channel,buffer,selector 如何相关概念也有详细的阐述。但是,不亲自写代码调试一遍,对这些概念的理解仍然是一知半解。
即使代码跑起来,也不见得有多懂这些概念,因为只是肤浅的尝试,但肤浅的尝试胜过于纸上谈兵,至少迈出了第一步,后续深入,可能要等到真的有实际应用时,才会深入研究。先贴示例代码。
一个典型的服务端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
public class MyServer { private Selector selector; private ServerSocketChannel serverChannel; public void start() throws Exception { int port = 9527; // 创建选择器 selector = Selector.open(); // 打开监听通道 serverChannel = ServerSocketChannel.open(); // 如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式 serverChannel.configureBlocking(false);// 开启非阻塞模式 // 绑定端口 backlog设为1024 serverChannel.socket().bind(new InetSocketAddress(port), 1024); // 监听客户端连接请求 serverChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("服务器已启动,端口号:" + port); while (true) { // 无论是否有读写事件发生,selector每隔1s被唤醒一次 selector.select(1000); // 阻塞,只有当至少一个注册的事件发生的时候才会继续. // selector.select(); Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> it = keys.iterator(); while (it.hasNext()) { SelectionKey key = it.next(); it.remove(); handleInput(key); } } } private void handleInput(SelectionKey key) throws Exception { if (key.isValid()) { // 处理新接入的请求消息 if (key.isAcceptable()) { ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); // 通过ServerSocketChannel的accept创建SocketChannel实例 // 完成该操作意味着完成TCP三次握手,TCP物理链路正式建立 SocketChannel sc = ssc.accept(); // 设置为非阻塞的 sc.configureBlocking(false); // 注册为读 sc.register(selector, SelectionKey.OP_READ); } // 读消息 if (key.isReadable()) { SocketChannel sc = (SocketChannel) key.channel(); // 创建ByteBuffer,并开辟一个1M的缓冲区 ByteBuffer buffer = ByteBuffer.allocate(1024); // 读取请求码流,返回读取到的字节数 int readBytes = sc.read(buffer); // 读取到字节,对字节进行编解码 if (readBytes > 0) { // 将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操作 buffer.flip(); // 根据缓冲区可读字节数创建字节数组 byte[] bytes = new byte[buffer.remaining()]; // 将缓冲区可读字节数组复制到新建的数组中 buffer.get(bytes); String input = new String(bytes, "UTF-8"); System.out.println("服务器收到消息:" + input); // 发送应答消息 doWrite(sc, LocalTime.now().toString()); } } else if (key.isWritable()) { ByteBuffer sendbuffer = ByteBuffer.allocate(1024); sendbuffer.clear(); SocketChannel sc = (SocketChannel) key.channel(); sc.write(sendbuffer); } } } // 异步发送应答消息 private void doWrite(SocketChannel channel, String response) throws IOException { // 将消息编码为字节数组 byte[] bytes = response.getBytes(); // 根据数组容量创建ByteBuffer ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); // 将字节数组复制到缓冲区 writeBuffer.put(bytes); // flip操作 writeBuffer.flip(); // 发送缓冲区的字节数组 channel.write(writeBuffer); } } |
1 2 3 4 5 6 7 8 |
public class Main { public static void main(String[] args) throws Exception { MyServer myserver = new MyServer(); myserver.start(); } } |
关键代码:
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
把 channel 注册到 selector 上,如此,一个 selector 可以管理多个 channel。
selector.select(1000);
该方法功能就是阻塞直到该选择器中的通道所关注的事件就绪,最多阻塞 1000 毫秒,使得程序可以继续往下运行。
while(true){
while (it.hasNext()) {}
}
这样的循环结构,使得服务器不断的轮询是否有请求事件发生,如果有发生,则会获得这个请求中的 channel,并且往这个 channel 中读写数据。
handleInput 方法中,就是处理对应的请求。
客户端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; public class MyClient { private Selector selector; private SocketChannel socketChannel; public void start() throws Exception { int port = 9527; String host = "127.0.0.1"; // 创建选择器 selector = Selector.open(); // 打开监听通道 socketChannel = SocketChannel.open(); // 如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式 socketChannel.configureBlocking(false);// 开启非阻塞模式 socketChannel.connect(new InetSocketAddress(host, port)); // 等待100毫秒直到连接上服务器 while (!socketChannel.finishConnect()) { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } socketChannel.register(selector, SelectionKey.OP_CONNECT); while (true) { try { // 无论是否有读写事件发生,selector每隔1s被唤醒一次 selector.select(1000); // 阻塞,只有当至少一个注册的事件发生的时候才会继续. // selector.select(); Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> it = keys.iterator(); SelectionKey key = null; while (it.hasNext()) { key = it.next(); it.remove(); try { handleInput(key); } catch (Exception e) { if (key != null) { key.cancel(); if (key.channel() != null) { key.channel().close(); } } } } } catch (Exception e) { e.printStackTrace(); System.exit(1); } } } private void handleInput(SelectionKey key) throws IOException { if (key.isValid()) { SocketChannel sc = (SocketChannel) key.channel(); // 读消息 if (key.isReadable()) { // 创建ByteBuffer,并开辟一个1M的缓冲区 ByteBuffer buffer = ByteBuffer.allocate(1024); // 读取请求码流,返回读取到的字节数 int readBytes = sc.read(buffer); // 读取到字节,对字节进行编解码 if (readBytes > 0) { // 将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操作 buffer.flip(); // 根据缓冲区可读字节数创建字节数组 byte[] bytes = new byte[buffer.remaining()]; // 将缓冲区可读字节数组复制到新建的数组中 buffer.get(bytes); String result = new String(bytes, "UTF-8"); System.out.println("客户端收到消息:" + result); } } } } // 异步发送消息 private void doWrite(SocketChannel channel, String request) throws IOException { // 将消息编码为字节数组 byte[] bytes = request.getBytes(); // 根据数组容量创建ByteBuffer ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); // 将字节数组复制到缓冲区 writeBuffer.put(bytes); // flip操作 writeBuffer.flip(); // 发送缓冲区的字节数组 channel.write(writeBuffer); } public void sendMsg(String msg) throws Exception { socketChannel.register(selector, SelectionKey.OP_READ); doWrite(socketChannel, msg); } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
public class Main { public static void main(String[] args) throws Exception { final MyClient myclient = new MyClient(); FutureTask<Integer> Task2 = new FutureTask<>(() -> { myclient.start(); return 0; });// 用FutureTask包裹 Thread Thread2 = new Thread(Task2);// 用Thread包裹 Thread2.start(); Thread.sleep(1000); myclient.sendMsg("time"); } } |
客户端代码和服务器端代码,原理是类似的。
上述例子中,先启动服务器端代码,然后启动客户端代码,就能跑起来。例子中,客户端发送任意字符到服务器端,服务器返回当前时间给客户端。
原创文章,转载请注明出处!http://www.javathings.top/java-nio实现网络通信/