一、前言
通過NIO編寫簡單版聊天室,客戶端通過控制臺輸入發送消息到其他客戶端。注意:并未處理粘包半包問題。
二、邏輯簡述
服務器:
1)創建服務器NIO通道,綁定端口并啟動服務器
2)開啟非阻塞模式
3)創建選擇器、并把通道注冊到選擇器上,關心的事件為新連接
4)循環監聽選擇器的事件,
5)監聽到新連接事件:5.1) 建立連接、創建客戶端通道5.2)客戶端通道設置非阻塞5.3)客戶端注冊到選擇器上,關心的事件為讀
6)監聽到讀 事件6.1)獲取到發送數據的客戶端通道6.2)把通道數據寫入到一個緩沖區中6.3)打印數據6.4)發送給其他注冊在選擇器上的客戶端,排除自己
客戶器:
1)創建客戶端通道,連接服務器 ip和端口
2)創建選擇器,注冊客戶端通道到選擇器上,關心的事件為讀
3)開啟一個線程 循環監聽選擇器事件
4)監聽到讀事件后4.1)從通道中把數據讀到緩沖區中4.2)打印數據
5)主線程循環用scanner 來監聽控制臺輸入5.1)有輸入后 發送給服務器
三、代碼
服務器:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;/***/
public class GroupChatServer {private int port = 8888;private ServerSocketChannel serverSocketChannel;private Selector selector;public GroupChatServer() throws IOException {serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);serverSocketChannel.bind(new InetSocketAddress(port));//創建選擇器selector = Selector.open();//通道注冊到選擇器上,關心的事件為 OP_ACCEPT:新連接serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);System.out.println("server is ok");}public void listener() throws IOException {for (; ; ) {if (selector.select() == 0) {continue;}//監聽到時間Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {SelectionKey selectionKey = iterator.next();if (selectionKey.isAcceptable()) {//新連接事件newConnection();}if (selectionKey.isReadable()) {//客戶端消息事件clientMsg(selectionKey);}iterator.remove();}}}/*** 客戶端消息處理*/private void clientMsg(SelectionKey selectionKey) throws IOException {SocketChannel socketChannel = (SocketChannel) selectionKey.channel();ByteBuffer byteBuffer = (ByteBuffer) selectionKey.attachment();try {//通道數據讀取到 byteBuffer緩沖區socketChannel.read(byteBuffer);//創建一個數組用于接受 緩沖區的本次寫入的數據。byte[] bytes = new byte[byteBuffer.limit()];//轉換模式 寫->讀byteBuffer.flip();//獲取數據到 bytes 中 從位置0開始到limit結束byteBuffer.get(bytes, 0, byteBuffer.limit());String msg = socketChannel.getRemoteAddress() + "說:" + new String(bytes, "utf-8");//倒帶這個緩沖區。位置設置為零,標記為-1.這樣下次寫入數據會從0開始寫。但是如果下次的數據比這次少。那么使用 byteBuffer.array方法返回的byte數組數據會包含上一次的部分數據//例如 上次寫入了 11111 倒帶后 下次寫入了 22 讀取出來 卻是 22111byteBuffer.rewind();System.out.println(msg);//發送給其他客戶端sendOuterClient(msg, socketChannel);} catch (Exception e) {System.out.println(socketChannel.getRemoteAddress() + ":下線了");socketChannel.close();}}/*** 發送給其他客戶端** @param msg 要發送的消息* @param socketChannel 要排除的客戶端* @throws IOException*/private void sendOuterClient(String msg, SocketChannel socketChannel) throws IOException {//獲取selector上注冊的全部通道集合Set<SelectionKey> keys = selector.keys();for (SelectionKey key : keys) {SelectableChannel channel = key.channel();//判斷通道是客戶端通道(因為服務器的通道也注冊在該選擇器上),并且排除發送人的通道if (channel instanceof SocketChannel && !channel.equals(socketChannel)) {try {((SocketChannel) channel).write(ByteBuffer.wrap(msg.getBytes()));} catch (Exception e) {channel.close();System.out.println(((SocketChannel) channel).getRemoteAddress() + ":已下線");}}}}/*** 新連接處理方法* @throws IOException*/private void newConnection() throws IOException {//連接獲取SocketChannelSocketChannel socketChannel = serverSocketChannel.accept();//設置非阻塞socketChannel.configureBlocking(false);//注冊到選擇器上,關心的事件是讀,并附帶一個ByteBuffer對象socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));System.out.println(socketChannel.getRemoteAddress() + " 上線了");}public static void main(String[] args) throws IOException {GroupChatServer groupChatServer = new GroupChatServer();//啟動監聽groupChatServer.listener();}
}
客戶端:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;/** */
public class GroupChatClient {private Selector selector;private SocketChannel socketChannel;public GroupChatClient(String host, int port) throws IOException {socketChannel = SocketChannel.open(new InetSocketAddress(host, port));socketChannel.configureBlocking(false);selector = Selector.open();//注冊事件,關心讀事件socketChannel.register(selector, SelectionKey.OP_READ);System.out.println("我是:" + socketChannel.getLocalAddress());}/*** 讀消息*/private void read() {try {if(selector.select() == 0){//沒有事件,returnreturn;}Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()){SelectionKey selectionKey = iterator.next();if(selectionKey.isReadable()){//判斷是 讀 事件SocketChannel socketChannel = (SocketChannel)selectionKey.channel();ByteBuffer byteBuffer = ByteBuffer.allocate(1024);//讀取數據到 byteBuffer 緩沖區socketChannel.read(byteBuffer);//打印數據System.out.println(new String(byteBuffer.array()));}iterator.remove();}} catch (IOException e) {e.printStackTrace();}}/*** 發送數據* @param msg 消息* @throws IOException*/private void send(String msg) throws IOException {socketChannel.write(ByteBuffer.wrap(new String(msg.getBytes(),"utf-8").getBytes()));}public static void main(String[] args) throws IOException {//創建客戶端 指定 ip端口GroupChatClient groupChatClient = new GroupChatClient("127.0.0.1",8888);//啟動一個線程來讀取數據new Thread(()->{while (true){groupChatClient.read();}}).start();//Scanner 發送數據Scanner scanner = new Scanner(System.in);while (scanner.hasNextLine()){String s = scanner.nextLine();//發送數據groupChatClient.send(s);}}
}