服务端:
package org.example;import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.channels.spi.SelectorProvider; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;public class NIOServer {static ExecutorService executorService = Executors.newFixedThreadPool(10);public static void main(String[] args) {try {// 打开一个选择器Selector selector = SelectorProvider.provider().openSelector();// 打开一个服务器套接字通道ServerSocketChannel serverChannel = ServerSocketChannel.open();serverChannel.configureBlocking(false);// 绑定到指定端口serverChannel.socket().bind(new InetSocketAddress(8081));// 将服务器通道注册到选择器上,并指定关注的事件为接收连接 serverChannel.register(selector, SelectionKey.OP_ACCEPT);System.out.println("服务器已启动,监听端口8080...");while (true) {// 选择器等待就绪的通道 selector.select();// 获取就绪的选键集合Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();while (keyIterator.hasNext()) {SelectionKey key = keyIterator.next();keyIterator.remove();// 检查选键的状态并进行相应的处理if (key.isAcceptable()) {handleAccept(key);} else if (key.isReadable()) {processMsg(key);} else if (key.isWritable()) {// 通常这里不会处理写就绪,因为写操作通常由应用逻辑触发} else if (key.isConnectable()) {// 这是客户端连接的情况,对于服务器来说通常不会遇到 }}}} catch (IOException e) {e.printStackTrace();System.err.println("服务器发生异常,已关闭。");} finally {executorService.shutdown();}}private static void handleAccept(SelectionKey key) throws IOException {ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();SocketChannel clientChannel = serverChannel.accept();clientChannel.configureBlocking(false);// 注册到选择器并关注读事件 clientChannel.register(key.selector(), SelectionKey.OP_READ);System.out.println("接受到新的客户端连接:" + clientChannel.getRemoteAddress());}private static void sendData(SocketChannel socketChannel, String message) {ByteBuffer buffer = ByteBuffer.wrap(message.getBytes());while (buffer.hasRemaining()) {try {socketChannel.write(buffer);} catch (IOException e) {throw new RuntimeException(e);}}}public static void processMsg(SelectionKey key) {SocketChannel clientChannel = (SocketChannel) key.channel();ByteBuffer buffer = ByteBuffer.allocate(2);StringBuilder sb = new StringBuilder();try {while (true) {int bytesRead = clientChannel.read(buffer);if (bytesRead == -1) { // 客户端关闭连接 clientChannel.close();break;} else if (bytesRead == 0) { // 消息读取结束break;}buffer.flip();byte[] bytes = new byte[buffer.remaining()];buffer.get(bytes);sb.append(new String(bytes, StandardCharsets.UTF_8));buffer.clear();}if (!clientChannel.isOpen()) return;executorService.submit(() -> {System.out.println("收到消息:" + sb);sendData(clientChannel, "abcdaaaaaaaaaaaaaaabbbbbbbbccccccccccccccccccccccccccccd");});} catch (Exception e) {// 发生读错误,关闭连接并打印错误信息System.err.println("读错误:" + e.getMessage());try {clientChannel.close();} catch (IOException ex) {ex.printStackTrace();}}} }
客户端:
package org.example;import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel;public class NIOClient {public static void main(String[] args) {// 服务器的地址和端口String serverAddress = "localhost";int serverPort = 8081;try {// 打开一个套接字通道SocketChannel socketChannel = SocketChannel.open();// 配置为非阻塞模式socketChannel.configureBlocking(false);// 尝试连接到服务器if (socketChannel.connect(new InetSocketAddress(serverAddress, serverPort))) {// 连接成功,可以直接发送数据(但在非阻塞模式下,这通常不会发生)sendData(socketChannel, "Hello, Server!");} else {// 连接可能需要时间,等待连接完成(在实际应用中,这通常是在一个循环中完成的)while (!socketChannel.finishConnect()) {// 可以在这里执行其他任务,比如更新用户界面等// 但在这个简单的例子中,我们只是空转等待连接完成 }// 连接完成,发送数据for (int index = 1; index <= 5; index++) {sendData(socketChannel, "Hello, Server!");}}// 读取服务器的响应(在非阻塞模式下,这可能需要多次尝试)StringBuilder sb = new StringBuilder();ByteBuffer buffer = ByteBuffer.allocate(1024);while (socketChannel.read(buffer) <= 0) {// 如果没有数据可读,可以在这里执行其他任务// 但在这个简单的例子中,我们只是空转等待数据到来 }// 服务器发送过来的所有数据 buffer.flip();String response = new String(buffer.array(), 0, buffer.limit());sb.append(response);while (socketChannel.read(buffer) > 0) {buffer.flip();response = new String(buffer.array(), 0, buffer.limit());sb.append(response);}System.out.println("Received from server: " + sb);// 关闭连接 socketChannel.close();} catch (Exception e) {e.printStackTrace();System.err.println("Client failed to connect to server or read response.");}System.out.println("finish");}private static void sendData(SocketChannel socketChannel, String message) throws Exception {ByteBuffer buffer = ByteBuffer.wrap(message.getBytes());while (buffer.hasRemaining()) {socketChannel.write(buffer);}} }
执行效果: