加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Java > 正文

通过JAVA NIO实现Socket服务器与客户端功能

发布时间:2020-12-15 03:18:32 所属栏目:Java 来源:网络整理
导读:今天PHP站长网 52php.cn把收集自互联网的代码分享给大家,仅供参考。 package niocommunicate; import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.Can

以下代码由PHP站长网 52php.cn收集自互联网

现在PHP站长网小编把它分享给大家,仅供参考

package niocommunicate;
 
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
 
public class Server {
 
    private Selector selector = getSelector();
    private ServerSocketChannel ss = null;
    private static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10,10,500,TimeUnit.MILLISECONDS,new ArrayBlockingQueue<Runnable>(20));
 
    private static Map<Integer,SelectionKey> selectionKeyMap = new ConcurrentHashMap<>();
 
    public Selector getSelector() {
        try {
            return Selector.open();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }
 
    /**
     * 创建非阻塞服务器绑定5555端口
     */
    public Server() {
        try {
            ss = ServerSocketChannel.open();
            ss.bind(new InetSocketAddress(5555));
            ss.configureBlocking(false);
            if (selector == null) {
                selector = Selector.open();
            }
            ss.register(selector,SelectionKey.OP_ACCEPT);
        } catch (Exception e) {
            e.printStackTrace();
            close();
        }
    }
 
    /**
     * 关闭服务器
     */
    private void close() {
        threadPool.shutdown();
        try {
            if (ss != null) {
                ss.close();
            }
            if (selector != null) {
                selector.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
 
    /**
     * 启动选择器监听客户端事件
     */
    private void start() {
        threadPool.execute(new Runnable() {
 
            @Override
            public void run() {
                try {
                    while (true) {
                        if (selector.select(10) == 0) {
                            continue;
                        }
                        Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                        while (iterator.hasNext()) {
                            SelectionKey selectedKey = iterator.next();
                            iterator.remove();
                            try {
                                if (selectedKey.isReadable()) {
 
                                    if (selectionKeyMap.get(selectedKey.hashCode()) != selectedKey) {
                                        selectionKeyMap.put(selectedKey.hashCode(),selectedKey);
                                        threadPool.execute(new ReadClientSocketHandler(selectedKey));
                                    }
 
                                } else if (selectedKey.isWritable()) {
                                    Object responseMessage = selectedKey.attachment();
                                    SocketChannel serverSocketChannel = (SocketChannel) selectedKey.channel();
                                    selectedKey.interestOps(SelectionKey.OP_READ);
                                    if (responseMessage != null) {
                                        threadPool.execute(new WriteClientSocketHandler(serverSocketChannel,responseMessage));
                                    }
                                } else if (selectedKey.isAcceptable()) {
                                    ServerSocketChannel ssc = (ServerSocketChannel) selectedKey.channel();
                                    SocketChannel clientSocket = ssc.accept();
                                    if (clientSocket != null) {
                                        clientSocket.configureBlocking(false);
                                        clientSocket.register(selector,SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                                    }
                                }
                            } catch (CancelledKeyException cc) {
                                selectedKey.cancel();
                                selectionKeyMap.remove(selectedKey.hashCode());
                            }
                        }
 
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    close();
                }
            }
 
        });
    }
 
    /**
     * 响应数据给客户端线程
     * @author haoguo
     *
     */
    private class WriteClientSocketHandler implements Runnable {
        SocketChannel client;
        Object respnoseMessage;
 
        WriteClientSocketHandler(SocketChannel client,Object respnoseMessage) {
            this.client = client;
            this.respnoseMessage = respnoseMessage;
        }
 
        @Override
        public void run() {
            byte[] responseByteData = null;
            String logResponseString = "";
            if (respnoseMessage instanceof byte[]) {
                responseByteData = (byte[]) respnoseMessage;
                logResponseString = new String(responseByteData);
            } else if (respnoseMessage instanceof String) {
                logResponseString = (String) respnoseMessage;
                responseByteData = logResponseString.getBytes();
            }
            if (responseByteData == null || responseByteData.length == 0) {
                System.out.println("响应的数据为空");
                return;
            }
            try {
                client.write(ByteBuffer.wrap(responseByteData));
                System.out.println("server响应客户端[" + client.keyFor(selector).hashCode() + "]数据 :[" + logResponseString
                        + "]");
            } catch (IOException e) {
                e.printStackTrace();
                try {
                    client.close();
                } catch (IOException e1) {
 
                    e1.printStackTrace();
                }
            }
        }
    }
 
    /**
     * 读客户端发送数据线程
     * @author haoguo
     *
     */
    private class ReadClientSocketHandler implements Runnable {
        private SocketChannel client;
        private ByteBuffer tmp = ByteBuffer.allocate(1024);
        private SelectionKey selectionKey;
 
        ReadClientSocketHandler(SelectionKey selectionKey) {
            this.selectionKey = selectionKey;
            this.client = (SocketChannel) selectionKey.channel();
        }
 
        @Override
        public void run() {
            try {
                tmp.clear();
                byte[] data = new byte[0];
                int len = -1;
                while ((len = client.read(tmp)) > 0) {
                    data = Arrays.copyOf(data,data.length + len);
                    System.arraycopy(tmp.array(),data,data.length - len,len);
                    tmp.rewind();
                }
                if (data.length == 0) {
                    return;
                }
                System.out.println("接收到客户端[" + client.keyFor(selector).hashCode() + "]数据 :[" + new String(data) + "]");
                // dosomthing
                byte[] response = "response".getBytes();
                client.register(selector,SelectionKey.OP_WRITE,response);
            } catch (IOException e) {
 
                System.out.println("客户端[" + selectionKey.hashCode() + "]关闭了连接");
                try {
                    SelectionKey selectionKey = client.keyFor(selector);
                    selectionKey.cancel();
                    client.close();
                } catch (IOException e1) {
 
                    e1.printStackTrace();
                }
            } finally {
                selectionKeyMap.remove(selectionKey.hashCode());
            }
        }
    }
 
    public static void main(String[] args) {
        Server server = new Server();
        server.start();
    }
}

package niocommunicate;
 
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
 
public class Client {
    SocketChannel client;
    Selector selctor = getSelector();
 
    private volatile boolean run = true;
 
    private List<Object> messageQueue = new LinkedList<>();
 
    public Selector getSelector() {
        try {
            return Selector.open();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }
 
    public Client() {
        try {
            client = SocketChannel.open();
            client.configureBlocking(false);
            client.connect(new InetSocketAddress(InetAddress.getLocalHost(),5555));
            client.register(selctor,SelectionKey.OP_CONNECT);
        } catch (IOException e) {
            e.printStackTrace();
        }
        new Thread(new Runnable() {
 
            @Override
            public void run() {
                while (run) {
                    try {
                        if (selctor.select(20) == 0) {
                            continue;
                        }
                        Iterator<SelectionKey> iterator = selctor.selectedKeys().iterator();
                        while (iterator.hasNext()) {
                            SelectionKey selectionKey = iterator.next();
                            iterator.remove();
                            if (selectionKey.isConnectable()) {
                                SocketChannel sc = (SocketChannel) selectionKey.channel();
                                sc.finishConnect();
                                sc.register(selctor,SelectionKey.OP_READ);
                            } else if (selectionKey.isWritable()) {
                                selectionKey.interestOps(SelectionKey.OP_READ);
                                Object requestMessage = selectionKey.attachment();
                                SocketChannel writeSocketChannel = (SocketChannel) selectionKey.channel();
                                byte[] requestByteData = null;
                                if (requestMessage instanceof byte[]) {
                                    requestByteData = (byte[]) requestMessage;
                                } else if (requestMessage instanceof String) {
                                    requestByteData = ((String) requestMessage).getBytes();
                                    System.out.println("client send Message:[" + requestMessage + "]");
                                } else {
                                    System.out.println("unsupport send Message Type" + requestMessage.getClass());
                                }
                                System.out.println("requestMessage:" + requestMessage);
                                if (requestByteData != null && requestByteData.length > 0) {
                                    try {
                                        writeSocketChannel.write(ByteBuffer.wrap(requestByteData));
                                    } catch (IOException e) {
                                        e.printStackTrace();
                                    }
                                }
                            } else if (selectionKey.isReadable()) {
                                SocketChannel readSocketChannel = (SocketChannel) selectionKey.channel();
                                ByteBuffer tmp = ByteBuffer.allocate(1024);
                                int len = -1;
                                byte[] data = new byte[0];
                                if ((len = readSocketChannel.read(tmp)) > 0) {
                                    data = Arrays.copyOf(data,data.length + len);
                                    System.arraycopy(tmp.array(),len);
                                    tmp.rewind();
                                }
                                if (data.length > 0) {
                                    System.out.println("客户端接收到数据:[" + new String(data) + "]");
                                }
                            }
                        }
                    } catch (IOException e1) {
                        e1.printStackTrace();
                        close();
                    }
                }
            }
        }).start();
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
 
            e.printStackTrace();
        }
    }
 
    public void close() {
        try {
            SelectionKey selectionKey = client.keyFor(selctor);
            selectionKey.cancel();
            client.close();
            run = false;
        } catch (IOException e) {
 
            e.printStackTrace();
        }
    }
 
    public void writeData(String data) {
        messageQueue.add(data);
        while (messageQueue.size() > 0) {
            Object firstSendData = messageQueue.remove(0);
            try {
                client.register(selctor,firstSendData);
            } catch (ClosedChannelException e) {
                e.printStackTrace();
            }
            try {
                Thread.sleep(40);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
 
    public static void main(String[] args) {
 
        Client client = new Client();
        long t1 = System.currentTimeMillis();
        for (int i = 10; i < 200; i++) {
            client.writeData(i + "nimddddddddddsssssssssssssssssssssssssssssssssssscccccccccccccccccccccccc"
                    + "ccccccccccccccccccccccccccccccccccccccccccccccccccccccccdddddddddddd"
                    + "dddddddddddddddddwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwaaaaaaaaaaaaaa"
                    + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaddddddddddddddddddddddddddddddd"
                    + "ddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddrrrr"
                    + "jjjjjjjjjjjjjjjjjjjjjjjjjjjjrrrrrrrrrrrrrrrrrrrrrrrrrrrkkkkkkkkkkkkkkkkkkkk"
                    + "kkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkjjjjkkkkkklllllllllllllllllllllllllll"
                    + "lllllllldddddddddddddmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmddaei"
                    + "nimddddddddddsssssssssssssssssssssssssssssssssssscccccccccccccccccccccccc"
                    + "ccccccccccccccccccccccccccccccccccccccccccccccccccccccccdddddddddddd"
                    + "dddddddddddddddddwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwwaaaaaaaaaaaaaa"
                    + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaddddddddddddddddddddddddddddddd"
                    + "ddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddrrrr"
                    + "jjjjjjjjjjjjjjjjjjjjjjjjjjjjrrrrrrrrrrrrrrrrrrrrrrrrrrrkkkkkkkkkkkkkkkkkkkk"
                    + "kkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkkjjjjkkkkkklllllllllllllllllllllllllll"
                    + "lllllllldddddddddddddmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmddaei" + i);
        }
        long t2 = System.currentTimeMillis();
        System.out.println("总共耗时:" + (t2 - t1) + "ms");
        client.close();
    }
}

package niocommunicate;
 
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
 
public class Server {
 
    private Selector selector = getSelector();
    private ServerSocketChannel ss = null;
    private ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10,new ArrayBlockingQueue<Runnable>(20));
 
    private Map<Integer,SelectionKey> selectionKeyMap = new ConcurrentHashMap<>();
    private Map<Integer,List<Object>> responseMessageQueue = new ConcurrentHashMap<>();
    private volatile boolean run = true;
    private volatile boolean isClose = false;
 
    public Selector getSelector() {
        try {
            return Selector.open();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }
 
    /**
     * 创建非阻塞服务器绑定5555端口
     */
    public Server() {
        try {
            ss = ServerSocketChannel.open();
            ss.bind(new InetSocketAddress(5555));
            ss.configureBlocking(false);
            if (selector == null) {
                selector = Selector.open();
            }
            ss.register(selector,SelectionKey.OP_ACCEPT);
        } catch (Exception e) {
            e.printStackTrace();
            close();
        }
    }
 
    public boolean isClose() {
        return isClose;
    }
 
    /**
     * 关闭服务器
     */
    private void close() {
        run = false;
        isClose = true;
        threadPool.shutdown();
        try {
            if (ss != null) {
                ss.close();
            }
            if (selector != null) {
                selector.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
 
    /**
     * 启动选择器监听客户端事件
     */
    private void start() {
        threadPool.execute(new Runnable() {
 
            @Override
            public void run() {
                try {
                    while (run) {
                        if (selector.select(10) == 0) {
                            continue;
                        }
                        Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                        while (iterator.hasNext()) {
                            SelectionKey selectedKey = iterator.next();
                            iterator.remove();
                            try {
                                if (selectedKey.isReadable()) {
 
                                    if (selectionKeyMap.get(selectedKey.hashCode()) != selectedKey) {
                                        selectionKeyMap.put(selectedKey.hashCode(),selectedKey);
                                        threadPool.execute(new ReadClientSocketHandler(selectedKey));
                                    }
 
                                } else if (selectedKey.isWritable()) {
                                    SocketChannel serverSocketChannel = (SocketChannel) selectedKey.channel();
                                    selectedKey.interestOps(SelectionKey.OP_READ);
                                    List<Object> list = responseMessageQueue.get(selectedKey.hashCode());
                                    if (list == null) {
                                        list = new LinkedList<Object>();
                                        responseMessageQueue.put(selectedKey.hashCode(),list);
                                    }
                                    while (list.size() > 0) {
                                        Object responseMessage = list.remove(0);
                                        if (responseMessage != null) {
                                            threadPool.execute(new WriteClientSocketHandler(serverSocketChannel,responseMessage));
                                        }
                                    }
                                } else if (selectedKey.isAcceptable()) {
                                    ServerSocketChannel ssc = (ServerSocketChannel) selectedKey.channel();
                                    SocketChannel clientSocket = ssc.accept();
                                    if (clientSocket != null) {
                                        clientSocket.configureBlocking(false);
                                        clientSocket.register(selector,SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                                    }
                                }
                            } catch (CancelledKeyException cc) {
                                selectedKey.cancel();
                                int hashCode = selectedKey.hashCode();
                                selectionKeyMap.remove(hashCode);
                                responseMessageQueue.remove(hashCode);
                            }
                        }
 
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    close();
                }
            }
 
        });
    }
 
    /**
     * 响应数据给客户端线程
     *
     * @author haoguo
     *
     */
    private class WriteClientSocketHandler implements Runnable {
        SocketChannel client;
        Object respnoseMessage;
 
        WriteClientSocketHandler(SocketChannel client,Object respnoseMessage) {
            this.client = client;
            this.respnoseMessage = respnoseMessage;
        }
 
        @Override
        public void run() {
            byte[] responseByteData = null;
            String logResponseString = "";
            if (respnoseMessage instanceof byte[]) {
                responseByteData = (byte[]) respnoseMessage;
                logResponseString = new String(responseByteData);
            } else if (respnoseMessage instanceof String) {
                logResponseString = (String) respnoseMessage;
                responseByteData = logResponseString.getBytes();
            }
            if (responseByteData == null || responseByteData.length == 0) {
                System.out.println("响应的数据为空");
                return;
            }
            try {
                client.write(ByteBuffer.wrap(responseByteData));
                System.out.println("server响应客户端[" + client.keyFor(selector).hashCode() + "]数据 :[" + logResponseString
                        + "]");
            } catch (IOException e) {
                e.printStackTrace();
                try {
                    SelectionKey selectionKey = client.keyFor(selector);
                    if (selectionKey != null) {
                        selectionKey.cancel();
                        int hashCode = selectionKey.hashCode();
                        responseMessageQueue.remove(hashCode);
                    }
                    if (client != null) {
                        client.close();
                    }
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
            }
        }
    }
 
    /**
     * 读客户端发送数据线程
     *
     * @author haoguo
     *
     */
    private class ReadClientSocketHandler implements Runnable {
        private SocketChannel client;
        private ByteBuffer tmp = ByteBuffer.allocate(1024);
        private SelectionKey selectionKey;
        int hashCode;
 
        ReadClientSocketHandler(SelectionKey selectionKey) {
            this.selectionKey = selectionKey;
            this.client = (SocketChannel) selectionKey.channel();
            this.hashCode = selectionKey.hashCode();
        }
 
        @Override
        public void run() {
            try {
                tmp.clear();
                byte[] data = new byte[0];
                int len = -1;
                while ((len = client.read(tmp)) > 0) {
                    data = Arrays.copyOf(data,len);
                    tmp.rewind();
                }
                if (data.length == 0) {
                    return;
                }
                String readData = new String(data);
                System.out.println("接收到客户端[" + hashCode + "]数据 :[" + readData.substring(0,3) + "]");
                // dosomthing
                byte[] response = ("response" + readData.substring(0,3)).getBytes();
                List<Object> list = responseMessageQueue.get(hashCode);
                list.add(response);
                client.register(selector,SelectionKey.OP_WRITE);
                // client.register(selector,response);
            } catch (IOException e) {
                System.out.println("客户端[" + selectionKey.hashCode() + "]关闭了连接");
                try {
                    SelectionKey selectionKey = client.keyFor(selector);
                    if (selectionKey != null) {
                        selectionKey.cancel();
                    }
                    if (client != null) {
                        client.close();
                    }
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
            } finally {
                selectionKeyMap.remove(hashCode);
            }
        }
    }
 
    public static void main(String[] args) {
        Server server = new Server();
        server.start();
    }
}

以上内容由PHP站长网【52php.cn】收集整理供大家参考研究

如果以上内容对您有帮助,欢迎收藏、点赞、推荐、分享。

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读