基于管道的即时通讯(java nio)

2023-09-23 10 0

此项目实现原理

sevice只需往管道中(数据池)中发送数据,等到池中有数据了,它自动会找你。你不必要关心数据怎么发送与接收,只需要关注你业务的处理。
如下图

优点:

基于管道的实现是消息的发送或接受只需要发送到管道或者从管道读取,而不用关注如何通过Channer发送,这样则实现了service层与socket的解耦。
依赖于广播而不依赖于回调函数,与nio的异步非阻塞,真正实现线程的零等待。

缺点:

发送的数据很难通过会掉函数实现(或者根本不能),只能通过广播实现。

相关类介绍

ClientMessagePool,ServiceMessagePool管道(数据池)
内部实现原理是一个链表队列,数据的增加读取对应队列中的压入队列,读取队列头元素

Sevice 
业务逻辑处理类,必须实现IMessageSevice接口,并向MessageObserver注册
MessageObserver
内部有一个IMessageSevice的链表,保存各个实现IMessageSevice接口的Service,与Sevice 构成观察者模式,
会有一个线程专门监测MessagePool,一旦有数据,就交给MessageObserver。MessageObserver根据特定消息类推送给制定的service.
SocketChannel
实现了一个SockenChannel的类,相当于一个客户端。从管道中(ClientMessagePool)中读取数据,一旦有数据,则将数据写入管道
Selector
接收管道的注册,并根纷发条件,向指定的SocketChannel推动数据。也会根据过滤条件过滤数据。

代码实现

管道代码实现
package com.pool;import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;public class MessagePool {public static Queue<String> clintmessageQueue = new LinkedBlockingQueue<String>(); public static Queue<String> serverMessageQueue = new LinkedBlockingQueue<String>(); }

接口

package com.pool;public interface IMessagePool {public void addMessage(String message);public String pollMessage();public boolean isEmpty();}

实现类

package com.pool.impl;import com.pool.IMessagePool;
import com.pool.MessagePool;public class ClientMessagePool implements IMessagePool {@Overridepublic  void addMessage(String message) {MessagePool.clintmessageQueue.add(message);}@Overridepublic  String pollMessage() {return MessagePool.clintmessageQueue.poll();}@Overridepublic boolean isEmpty() {if(MessagePool.clintmessageQueue.size()>0)return true;elsereturn false;}
}

客户端

package com.socket;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;import org.apache.commons.lang.ArrayUtils;import com.pool.IMessagePool;
import com.pool.impl.ClientMessagePool;
import com.util.PackageUtil;public class MySocket {private SocketChannel mSocketChannel;private SelectionKey key;public static String CHARSET = "utf-8";public static String ADDRESS = "127.0.0.1";public static int HOST = 34521;protected Selector mSelector;protected IMessagePool messagePool = new ClientMessagePool();;ByteBuffer buffer;public MySocket() {try {mSelector = Selector.open();initSocketChannel();initBassiness();} catch (Exception e) {e.printStackTrace();} finally {try {key.channel().close();} catch (IOException e) {e.printStackTrace();}}}/*** 业务逻辑* * @throws Exception*/private void initBassiness() throws Exception {while (true) {checkWriteable();// 瞬时检测if (mSelector.select(100) > 0) {Iterator<SelectionKey> keys = mSelector.selectedKeys().iterator();while (keys.hasNext()) {SelectionKey key = keys.next();if (key.isReadable()) {dispose4Readable(key);}if (key.isValid() && key.isWritable()) {dispose4Writable(key);}keys.remove();}}}}/*** 可读请求* * @param key* @throws Exception*/protected void dispose4Readable(SelectionKey key) throws Exception {SocketChannel mSocketChannel = ((SocketChannel) key.channel());buffer = ByteBuffer.allocate(1024);mSocketChannel.read(buffer);buffer.flip();this.unPacket(buffer.array(), key);}/*** 可写请求* * @param key* @throws Exception*/protected void dispose4Writable(SelectionKey key) throws Exception {SocketChannel mSocketChannel = ((SocketChannel) key.channel());int value = 0;do{value = mSocketChannel.write(buffer);}while(value!=0);key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);key.interestOps(SelectionKey.OP_READ);}/*** 解包* * @param buf* @return*/public byte[] unPacket(byte[] buf, SelectionKey key) {int len = buf.length;// 37int i;for (i = 0; i < len; i++) {if (len < i + PackageUtil.PACKAGEHEADERLENGTH+ PackageUtil.PACKAGESAVEDATALENGTH) {break;}String tmp = new String(ArrayUtils.subarray(buf, i, i+ PackageUtil.PACKAGEHEADERLENGTH));if (tmp.equals(PackageUtil.PACKAGEHEADER)) {int messageLength = PackageUtil.byte2Int(ArrayUtils.subarray(buf, i + PackageUtil.PACKAGEHEADERLENGTH, i+ PackageUtil.PACKAGEHEADERLENGTH+ PackageUtil.PACKAGESAVEDATALENGTH));if (len < i + PackageUtil.PACKAGEHEADERLENGTH+ PackageUtil.PACKAGESAVEDATALENGTH + messageLength) {break;}byte[] data = ArrayUtils.subarray(buf, i+ PackageUtil.PACKAGEHEADERLENGTH+ PackageUtil.PACKAGESAVEDATALENGTH, i+ PackageUtil.PACKAGEHEADERLENGTH+ PackageUtil.PACKAGESAVEDATALENGTH + messageLength);String message = new String(data);System.out.println(message);
//				Filter.filterRead(message, key, messagePool);i += PackageUtil.PACKAGEHEADERLENGTH+ PackageUtil.PACKAGESAVEDATALENGTH + messageLength - 1;}}if (i == len) {return new byte[0];}return ArrayUtils.subarray(buf, i, buf.length);}void initSocketChannel() throws Exception {mSocketChannel = SocketChannel.open();mSocketChannel.connect(new InetSocketAddress(ADDRESS, HOST));mSocketChannel.configureBlocking(false);key = mSocketChannel.register(mSelector, SelectionKey.OP_CONNECT|SelectionKey.OP_READ);}void checkWriteable() {if (messagePool.isEmpty()) {String values = messagePool.pollMessage();System.out.println("                                   "+values);buffer = ByteBuffer.wrap(PackageUtil.packet(values.getBytes()));key.interestOps(SelectionKey.OP_WRITE);}}
}

服务器

package com.socket;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;import org.apache.commons.lang.ArrayUtils;import com.filter.Filter;
import com.pool.IMessagePool;
import com.pool.impl.ServerMessagePoll;
import com.util.PackageUtil;public class MyServerSocket {private ServerSocketChannel mServerSocketChannel;private static MyServerSocket serverSocket;public static String CHARSET = "utf-8";public static String ADDRESS = "127.0.0.1";public static int HOST = 34521;protected Selector mSelector;protected IMessagePool messagePool = new ServerMessagePoll();;ByteBuffer buffer;private MyServerSocket() throws Exception {try {mSelector = Selector.open();initSocketChannel();initBassiness();} catch (Exception e) {e.printStackTrace();} finally {Set<SelectionKey> keys = mSelector.keys();{for (SelectionKey key : keys) {try {key.channel().close();} catch (IOException e) {e.printStackTrace();continue;}}}}}/*** 业务逻辑* * @throws Exception*/private void initBassiness() throws Exception {while (true) {checkWriteable();// 瞬时检测if (mSelector.select() > 0) {Iterator<SelectionKey> keys = mSelector.selectedKeys().iterator();while (keys.hasNext()) {SelectionKey key = keys.next();if (key.isAcceptable()) {dispose4Acceptable(key);}if (key.isReadable()) {dispose4Readable(key);}if (key.isValid() && key.isWritable()) {dispose4Writable(key);}keys.remove();}}}}/*** 响应读* @param key* @throws Exception*/protected void dispose4Readable(SelectionKey key) throws Exception {SocketChannel mSocketChannel = ((SocketChannel) key.channel());buffer = ByteBuffer.allocate(1024);mSocketChannel.read(buffer);buffer.flip();this.unPacket(buffer.array(), key);}/*** 可写请求* * @param key* @throws Exception*/protected void dispose4Writable(SelectionKey key) throws Exception {SocketChannel mSocketChannel = ((SocketChannel) key.channel());if(mSocketChannel.write(buffer)!=-1){buffer.clear();}key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
//		key.interestOps(SelectionKey.OP_READ);}/*** 解包* * @param buf* @return*/private byte[] unPacket(byte[] buf, SelectionKey key) {int len = buf.length;// 37int i;for (i = 0; i < len; i++) {if (len < i + PackageUtil.PACKAGEHEADERLENGTH+ PackageUtil.PACKAGESAVEDATALENGTH) {break;}String tmp = new String(ArrayUtils.subarray(buf, i, i+ PackageUtil.PACKAGEHEADERLENGTH));if (tmp.equals(PackageUtil.PACKAGEHEADER)) {int messageLength = PackageUtil.byte2Int(ArrayUtils.subarray(buf, i + PackageUtil.PACKAGEHEADERLENGTH, i+ PackageUtil.PACKAGEHEADERLENGTH+ PackageUtil.PACKAGESAVEDATALENGTH));if (len < i + PackageUtil.PACKAGEHEADERLENGTH+ PackageUtil.PACKAGESAVEDATALENGTH + messageLength) {break;}byte[] data = ArrayUtils.subarray(buf, i+ PackageUtil.PACKAGEHEADERLENGTH+ PackageUtil.PACKAGESAVEDATALENGTH, i+ PackageUtil.PACKAGEHEADERLENGTH+ PackageUtil.PACKAGESAVEDATALENGTH + messageLength);String message = new String(data);System.out.println("server read message" + message);Filter.filterRead(message, key, messagePool);i += PackageUtil.PACKAGEHEADERLENGTH+ PackageUtil.PACKAGESAVEDATALENGTH + messageLength - 1;}}if (i == len) {return new byte[0];}return ArrayUtils.subarray(buf, i, buf.length);}public static MyServerSocket newInstence() throws Exception {if (serverSocket == null) {return new MyServerSocket();}return serverSocket;}/*** SocketChannel初始化* @throws Exception*/void initSocketChannel() throws Exception {mServerSocketChannel = ServerSocketChannel.open();mServerSocketChannel.configureBlocking(false);mServerSocketChannel.bind(new InetSocketAddress(ADDRESS, HOST));mServerSocketChannel.register(mSelector, SelectionKey.OP_ACCEPT);}void dispose4Acceptable(SelectionKey key) throws Exception {SocketChannel mSocketChannel = ((ServerSocketChannel) key.channel()).accept();mSocketChannel.configureBlocking(false);mSocketChannel.register(mSelector, SelectionKey.OP_READ);}void checkWriteable() {if (messagePool.isEmpty()) {String value = messagePool.pollMessage();String result = Filter.filterWrite(value, mSelector);if (result != null) {System.out.println("server:" + result);buffer = ByteBuffer.wrap(PackageUtil.packet(result.getBytes()));}}}}

过滤器

package com.filter;import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Set;import com.model.BaseModule;
import com.model.Chat;
import com.model.User;
import com.pool.IMessagePool;
import com.util.StringUtil;public class Filter {private static final String LOGIN = "login";private static BaseModule modul = new BaseModule();private static SelectionKey selectionKey=null;private static Selector selector = null;/*** TODO 线程启动* * @param message* @return*/public static void filterRead(String message, SelectionKey key,IMessagePool messagePool) {selectionKey = key;try {BaseModule filterModul = (BaseModule) StringUtil.string2Bean(modul,message);if(filterType(filterModul.getType())){if(filterValue(filterModul.getMessage())){
//					messagePool.addMessage(message);}else{}}else{messagePool.addMessage(message);}} catch (Exception e) {return;}}public static String filterWrite(String message,Selector mSelector){selector = mSelector;return filter(message);}private static String filter(String message){BaseModule filterModul = (BaseModule) StringUtil.string2Bean(modul,message);Chat chat = (Chat) StringUtil.string2Bean(new Chat(),filterModul.getMessage());Set<SelectionKey> keys=selector.keys();for(SelectionKey key:keys){String  markString=key.attachment()!=null?key.attachment().toString():null;if(markString!=null && markString.equals(chat.getTo())){key.interestOps(SelectionKey.OP_WRITE);return chat.getMessage();}}return null;}/*** 过滤类型* @param value* @return*/private static boolean filterType(String value) {if (LOGIN.equals(value)) {return true;}return false;}/*** 过滤内容* @param value* @return*/private static boolean filterValue(String value) {return filterLogin(value);}private static boolean filterLogin(String value) {User user = (User) StringUtil.string2Bean(new User(), value);if (user.getUserName() != null) {selectionKey.attach(user.getUserName());return true;}return false;}}
service接口
package com.service;public interface IMessageService {void doMessage(String message);}

util

package com.util;import java.io.UnsupportedEncodingException;import org.apache.commons.lang.ArrayUtils;public class PackageUtil {public static final String PACKAGEHEADER = "↨-↨";//消息长度public static final int PACKAGEHEADERLENGTH = 7;  //数据头长�?public static final int PACKAGESAVEDATALENGTH = 4; //数据长度站的位数/*** 打包* @param pkg 要打包的字节数组* @return*/public static byte[] packet(byte[] pkg) {int intValue = pkg.length;byte[] b = new byte[4];for (int i = 0; i < 4; i++) {b[i] = (byte) (intValue >> 8 * (3 - i) & 0xFF);// System.out.print(Integer.toBinaryString(b[i])+" ");//System.out.println((b[i] & 0xFF) + " ");}try {byte[] newPkg = ArrayUtils.addAll(PackageUtil.PACKAGEHEADER.getBytes("utf-8"), b);newPkg = ArrayUtils.addAll(newPkg, pkg);return newPkg;} catch (UnsupportedEncodingException e) {e.printStackTrace();}return null;}/*** 字节数组转整形* @param b* @return*/public static int byte2Int(byte[] b) {int intValue = 0;for (int i = 0; i < b.length; i++) {intValue += (b[i] & 0xFF) << (8 * (3 - i));// System.out.print(Integer.toBinaryString(intValue)+" ");}return intValue;}/*** @param args*/public static void main(String[] args) {// TODO Auto-generated method stub}
}
StringUtil
package com.util;import com.google.gson.Gson;public class StringUtil {private static Gson json =new Gson();/*** 将字符串专为json* @param clazz* @param message* @return*/public static Object string2Bean(Object clazz,String message){return json.fromJson(message, clazz.getClass());}/*** 将json专为字符串* @param clazz* @return*/public static String bean2Json(Object clazz){return json.toJson(clazz);}}

module

package com.model;
/*** 默认包装* @author Administrator**/
public class BaseModule {String type ;String message;public String getType() {return type;}public void setType(String type) {this.type = type;}public String getMessage() {return message;}public void setMessage(String message) {this.message = message;}}

这个为原型。后面会对具体细节进行实现以及原理进行讲解

代码编程
赞赏

相关文章

SparkCore之RDD持久化
SparkStraming之Checkpoint
SparkStreaming之persist缓存
SparkStreaming之foreachRDD的正确使用姿势
SparkStreamingReceiverDirect方式的消息处理方式对比和代码示意
SparkStreaming的普通transform算子和一些特殊的output算子使用