1200字范文,内容丰富有趣,写作的好帮手!
1200字范文 > java网络通信:异步非阻塞I/O (NIO)

java网络通信:异步非阻塞I/O (NIO)

时间:2019-12-26 17:33:58

相关推荐

java网络通信:异步非阻塞I/O (NIO)

首先是channel,是一个双向的全双工的通道,可同时读写,而输入输出流都是单工的,要么读要么写。Channel分为两大类,分别是用于网络数据的SelectableChannel和用于文件操作的FileChannel。

注意:在java NIO库中,所有的数据都是用缓冲区处理,常用的是ByteBuffer。

多路复用器Selector:

Selector会不断轮询注册在其上的Channel,如果某个Channel上又新的连接接入、读和写事件,这个Channel就处于就绪状态,通过SelectorKey可以获取就绪Channel的集合。底层使用了epoll()实现,没有最大连接句柄的限制。

服务端代码:

public class TimeServer {public static void main(String[] args) throws IOException {int port = 8080;MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);new Thread(timeServer, "NIO-MultiplexerTimeServer-001").start();}}public class MultiplexerTimeServer implements Runnable {private Selector selector;private ServerSocketChannel servChannel;private volatile boolean stop;public MultiplexerTimeServer(int port) {try {selector = Selector.open();servChannel = ServerSocketChannel.open();servChannel.configureBlocking(false);//设置非阻塞模式servChannel.socket().bind(new InetSocketAddress(port), 1024);servChannel.register(selector, SelectionKey.OP_ACCEPT);//将Channel注册到selector,监听accept事件System.out.println("The time server is start in port : " + port);} catch (IOException e) {e.printStackTrace();System.exit(1);}}public void stop() {this.stop = true;}@Overridepublic void run() {while (!stop) {try {selector.select(1000);//每隔一秒轮询一次Set<SelectionKey> selectedKeys = selector.selectedKeys();Iterator<SelectionKey> it = selectedKeys.iterator();SelectionKey key = null;while (it.hasNext()) {//如果有新的客户端接入key = it.next();it.remove();try {handleInput(key);//处理请求} catch (Exception e) {if (key != null) {key.cancel();if (key.channel() != null)key.channel().close();}}}} catch (Throwable t) {t.printStackTrace();}}// 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源if (selector != null)try {selector.close();} catch (IOException e) {e.printStackTrace();}}private void handleInput(SelectionKey key) throws IOException {if (key.isValid()) {// 处理新接入的请求消息if (key.isAcceptable()) {// 获取客户端的连接通道ServerSocketChannel ssc = (ServerSocketChannel) key.channel();SocketChannel sc = ssc.accept();sc.configureBlocking(false);//将新连接注册到selector,并监听读事件sc.register(selector, SelectionKey.OP_READ);}if (key.isReadable()) {//读取通道数据写入字节缓存SocketChannel sc = (SocketChannel) key.channel();ByteBuffer readBuffer = ByteBuffer.allocate(1024);int readBytes = sc.read(readBuffer);if (readBytes > 0) {readBuffer.flip();byte[] bytes = new byte[readBuffer.remaining()];readBuffer.get(bytes);//字节缓存写入字节数组String body = new String(bytes, "UTF-8");System.out.println("The time server receive order : " + body);String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)? new java.util.Date(System.currentTimeMillis()).toString(): "BAD ORDER";doWrite(sc, currentTime);//向socketchannel写入数据} else if (readBytes < 0) {// 对端链路关闭key.cancel();sc.close();} }}}private void doWrite(SocketChannel channel, String response)throws IOException {if (response != null && response.trim().length() > 0) {byte[] bytes = response.getBytes();ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);writeBuffer.put(bytes);writeBuffer.flip();channel.write(writeBuffer);}}}

客户端:

public class TimeClient {public static void main(String[] args) {int port = 8080;new Thread(new TimeClientHandle("127.0.0.1", port), "TimeClient-001").start();}}public class TimeClientHandle implements Runnable {private String host;private int port;private Selector selector;private SocketChannel socketChannel;private volatile boolean stop;public TimeClientHandle(String host, int port) {this.host = host == null ? "127.0.0.1" : host;this.port = port;try {selector = Selector.open();socketChannel = SocketChannel.open();socketChannel.configureBlocking(false);} catch (IOException e) {e.printStackTrace();System.exit(1);}}@Overridepublic void run() {try {doConnect();} catch (IOException e) {e.printStackTrace();System.exit(1);}while (!stop) {try {selector.select(1000);Set<SelectionKey> selectedKeys = selector.selectedKeys();Iterator<SelectionKey> it = selectedKeys.iterator();SelectionKey key = null;while (it.hasNext()) {key = it.next();it.remove();try {handleInput(key);} catch (Exception e) {if (key != null) {key.cancel();if (key.channel() != null)key.channel().close();}}}} catch (Exception e) {e.printStackTrace();System.exit(1);}}// 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源if (selector != null)try {selector.close();} catch (IOException e) {e.printStackTrace();}}private void doConnect() throws IOException {// 如果直接连接成功,则注册到多路复用器上,发送请求消息,读应答if (!socketChannel.connect(new InetSocketAddress(host, port))) {socketChannel.register(selector, SelectionKey.OP_READ);doWrite(socketChannel);} else//没有直接连接成功,不代表失败,而是说明服务器还没有返回TCP握手的应答消息,所以注册OP_CONNECT事件,监听消息。 socketChannel.register(selector, SelectionKey.OP_CONNECT); }private void handleInput(SelectionKey key) throws IOException {if (key.isValid()) {// 判断是否连接成功SocketChannel sc = (SocketChannel) key.channel();if (key.isConnectable()) {//判断是否有连接事件,是的话,说明未连接if (sc.finishConnect()) {//再连接sc.register(selector, SelectionKey.OP_READ);doWrite(sc);} elseSystem.exit(1);// 连接失败,进程退出 }if (key.isReadable()) {ByteBuffer readBuffer = ByteBuffer.allocate(1024);int readBytes = sc.read(readBuffer);if (readBytes > 0) {readBuffer.flip();byte[] bytes = new byte[readBuffer.remaining()];readBuffer.get(bytes);String body = new String(bytes, "UTF-8");System.out.println("Now is : " + body);this.stop = true;} else if (readBytes < 0) {// 对端链路关闭key.cancel();sc.close();}}}}private void doWrite(SocketChannel sc) throws IOException {byte[] req = "QUERY TIME ORDER".getBytes();ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);writeBuffer.put(req);writeBuffer.flip();sc.write(writeBuffer);if (!writeBuffer.hasRemaining())System.out.println("Send order 2 server succeed.");}}

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。