转载声明:文章来源https://www.cnblogs.com/wangyongwen/p/11337420.html
多线程(11) — NIO
Java NIO是new IO的简称,是一种可以替代Java IO的一套新的IO机制。它提供了一套不同于Java标准IO的操作机制,严格来说,NIO与并发并无直接关系,但是使用NIO技术可以大大提高线程的使用效率。Java NIO设计的基础内容有通道(Channel)、缓冲区(Buffer)、Selector(选择器)。下面说说这几个内容
一)通道(Channel)
Channel:Channel是一对象,可以通过它读取和写入数据。可以把它看做是IO中的流,不同的是:
Channel是双向的,既可以读又可以写,而流是单向的
Channel可以进行异步的读写
对Channel的读写必须通过buffer对象
正如上面提到的,所有数据都通过Buffer对象处理,所以不会将字节写入到Channel中,而是将数据写入到Buffer中;不会从Channel中读取字节,而是将数据从Channel读入Buffer,再从Buffer获取这个字节。Channel可以比流更好地反映出底层操作系统的真实情况。特别是在Unix模型中,底层操作系统通常都是双向的。在Java NIO中的Channel主要有如下几种类型:
FileChannel:从文件读取数据的
DatagramChannel:读写UDP网络协议数据
SocketChannel:读写TCP网络协议数据
ServerSocketChannel:可以监听TCP连接
二)缓冲区(Buffer)
Buffer是一对象,它包含一些要写入或者读到的Stream对象。应用程序不能直接对 Channel 进行读写操作,而必须通过 Buffer 来进行,即 Channel 是通过 Buffer 来读写数据的。在NIO中,所有的数据都是用Buffer处理的,它是NIO读写数据的中转池。Buffer实质上是一个数组,通常是一个字节数据,但也可以是其他类型的数组。但一个缓冲区不仅仅是一个数组,重要的是它提供了对数据的结构化访问,而且还可以跟踪系统的读写进程。使用 Buffer 读写数据一般遵循以下四个步骤:
1.写入数据到 Buffer;
2.调用 flip() 方法;
3.从 Buffer 中读取数据;
4.调用 clear() 方法或者 compact() 方法。
当向 Buffer 写入数据时,Buffer 会记录下写了多少数据。一旦要读取数据,需要通过 flip() 方法将 Buffer 从写模式切换到读模式。在读模式下,可以读取之前写入到 Buffer 的所有数据。一旦读完了所有的数据,就需要清空缓冲区,让它可以再次被写入。有两种方式能清空缓冲区:调用 clear() 或 compact() 方法。clear() 方法会清空整个缓冲区。compact() 方法只会清除已经读过的数据。任何未读的数据都被移到缓冲区的起始处,新写入的数据将放到缓冲区未读数据的后面。Buffer主要有如下几种:
ByteBuffer
CharBuffer
DoubleBuffer
FloatBuffer
IntBuffer
LongBuffer
ShortBuffer
CopyFile执行三个基本的操作:创建一个Buffer,然后从源文件读取数据到缓冲区,然后再将缓冲区写入目标文件。
public static void copyFileUseNIO(String src,String dst) throws IOException{
//声明源文件和目标文件
FileInputStream fi=new FileInputStream(new File(src));
FileOutputStream fo=new FileOutputStream(new File(dst));
//获得传输通道channel
FileChannel inChannel=fi.getChannel();
FileChannel outChannel=fo.getChannel();
//获得容器buffer
ByteBuffer buffer=ByteBuffer.allocate(1024);
while(true){
//判断是否读完文件
int eof =inChannel.read(buffer);
if(eof==-1){
break;
}
//重设一下buffer的position=0,limit=position
buffer.flip();
//开始写
outChannel.write(buffer);
//写完要重置buffer,重设position=0,limit=capacity
buffer.clear();
}
inChannel.close();
outChannel.close();
fi.close();
fo.close();
}
三)Selector(选择器对象)
Selector是一个对象,它可以注册到很多个Channel上,监听各个Channel上发生的事件,并且能够根据事件情况决定Channel读写。这样,通过一个线程管理多个Channel,就可以处理大量网络连接了。有了Selector,我们就可以利用一个线程来处理所有的channels。线程之间的切换对操作系统来说代价是很高的,并且每个线程也会占用一定的系统资源。所以,对系统来说使用的线程越少越好。Selector 就是注册对各种 I/O 事件的地方,而且当那些事件发生时,就是这个对象告诉您所发生的事件。Selector selector = Selector.open();
为了能让Channel和Selector配合使用,我们需要把Channel注册到Selector上。通过调用 channel.register()方法来实现注册:channel.configureBlocking(false);
SelectionKey key =channel.register(selector,SelectionKey.OP_READ);
注意,注册的Channel 必须设置成异步模式 才可以,否则异步IO就无法工作,这就意味着我们不能把一个FileChannel注册到Selector,因为FileChannel没有异步模式,但是网络编程中的SocketChannel是可以的。
register()的调用的返回值是一个SelectionKey,代表这个通道在此 Selector 上注册。当某个 Selector 通知您某个传入事件时,它是通过提供对应于该事件的 SelectionKey 来进行的。SelectionKey 还可以用于取消通道的注册。
SelectionKey中包含如下属性:
(1)interestSet
把Channel注册到Selector来监听感兴趣的事件,interestSet就是你要选择的感兴趣的事件的集合。可以通过SelectionKey对象来读写interest set:
int interestSet = selectionKey.interestOps();
boolean isInterestedInAccept = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;
通过上面例子可以看到,我们可以通过用 & 和 SelectionKey 中的常量做运算,从SelectionKey中找到我们感兴趣的事件。
(2)readySet
readySet 是通道已经准备就绪进行操作的集合。在一次选Selection之后,你应该会首先访问这个readySet。Selection将在下一小节进行解释。可以这样访问ready集合,也可以用像检测interest集合那样的方法,来检测Channel中什么事件或操作已经就绪:int readySet = selectionKey.readyOps();
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();
(3)Channel 和 Selector
我们可以通过SelectionKey获得Selector和注册的Channel:Channel channel = selectionKey.channel();
Selector selector = selectionKey.selector();
(4)Attach一个对象
可以将一个对象或者更多信息attach 到SelectionKey上,这样就能识别某个给定的通道。例如,可以附加与通道一起使用的Buffer,或包含聚集数据对象。使用方法如下:selectionKey.attach(theObject);
Object attachedObj = selectionKey.attachment();
还可以在用register()方法向Selector注册Channel的时候附加对象。如:SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);
NIO多路复用
主要步骤和元素:
首先,通过 Selector.open() 创建一个 Selector,作为类似调度员的角色。
然后,创建一个 ServerSocketChannel,并且向 Selector 注册,通过指定 SelectionKey.OP_ACCEPT,告诉调度员,它关注的是新的连接请求。
注意,为什么我们要明确配置非阻塞模式呢?这是因为阻塞模式下,注册操作是不允许的,会抛出 IllegalBlockingModeException 异常。
Selector 阻塞在 select 操作,当有 Channel 发生接入请求,就会被唤醒。
在具体的方法中,通过 SocketChannel 和 Buffer 进行数据操作
IO 都是同步阻塞模式,所以需要多线程以实现多任务处理。而 NIO 则是利用了单线程轮询事件的机制,通过高效地定位就绪的 Channel,来决定做什么,仅仅 select 阶段是阻塞的,可以有效避免大量客户端连接时,频繁线程切换带来的问题,应用的扩展能力有了非常大的提高
下面用NIO设计一个Echo服务器:
首先定义一个Selector和线程池private Selector selector;
private ExecutorService tp = Executors.newCachedThreadPool();
selector处理所有的网络连接,tp线程池处理每一个客户端请求。为了统计服务器线程在客户端花费的时间,还需要定义一个时间统计有关的变量,用于统计在某一个Socket上花费的时间,time_stat的key为Socket,value为时间戳:public static Map
下面来看一下NIO服务器的核心代码,startServer()方法用于启动NIO Server。
private void startServer() throws IOException{
this.selector = SelectorProvider.provider().openSelector();
ServerSocketChannel ssc = ServerSocketChannel.open(); // 服务端SocketChannel
ssc.configureBlocking(false); // 设置为非阻塞模式
InetSocketAddress isa = new InetSocketAddress(InetAddress.getLocalHost(),8000);// 使用8000端口
ssc.socket().bind(isa);
SelectionKey acceptKey = ssc.register(selector, SelectionKey.OP_ACCEPT); // 将ServerSocketChannel绑定到Selector上,感兴趣的时间为Accept
for(;;){ // 主要任务是等待-分发网络消息
this.selector.select(); // 阻塞方法,如果当前没有准备好的的数据,就会等待,如果有的话返回已经准备好的SelectionKey数量
Set<SelectionKey> readyKeys = this.selector.selectedKeys(); // 获取准备好的SelectionKey
Iterator<SelectionKey> i = readyKeys.iterator();
long e = 0;
while(i.hasNext()){
SelectionKey sk = i.next();
i.remove();// 处理一个删除一个,不然可能重复处理
if(sk.isAcceptable()){
doAccept(sk);
}else if(sk.isValid() && sk.isReadable()){// 判断是否可以读
if(!time_stat.containsKey(((SocketChannel) sk.channel()).socket())){
time_stat.put(((SocketChannel) sk.channel()).socket(), System.currentTimeMillis());
}
doRead(sk);
}else if(sk.isValid() && sk.isWritable()){ // 判断是否可以写
doWrite(sk);
e = System.currentTimeMillis();
long b = time_stat.remove(((SocketChannel) sk.channel()).socket());
System.out.println("spend: "+(b-e)+"ms");
}
}
}
}
在了解服务端整体框架后,下面从具体的方法中看看几个主要方法的使用:
private void doAccept(SelectionKey sk) {
ServerSocketChannel server = (ServerSocketChannel) sk.channel();
SocketChannel clientChannel;
try {
clientChannel = server.accept();
clientChannel.configureBlocking(false);// 非阻塞
SelectionKey clientKey = clientChannel.register(selector, SelectionKey.OP_READ);//将Channel注册到Selector上,并告诉Selector对读感兴趣,Channel准备好读时给线程一个通知
EchoClient ec = new EchoClient();
clientKey.attach(ec);// 客户端实例作为附件,附加到表示这个连接的SelectionKey上,可以在整个连接过程共享ec
InetAddress clientAddress = clientChannel.socket().getInetAddress();
System.out.println("Accepted connection from "+clientAddress.getHostAddress());
} catch (Exception e) {}
}
EchoClient封装一个队列,保存在需要恢复给这个客户端所有信息上,这样再进行回复,只要outq对象中弹出元素即可。
public class EchoClient {
private LinkedList<ByteBuffer> outq;
public EchoClient() {
this.outq = new LinkedList<ByteBuffer>();
}
public LinkedList<ByteBuffer> getOutq() {
return outq;
}
public void enqueue(ByteBuffer bb) {
this.outq.addFirst(bb);
}
}
下面看看doRead()方法的实现。
private void doRead(SelectionKey sk) {
SocketChannel c = (SocketChannel) sk.channel();
ByteBuffer bb = ByteBuffer.allocate(8192);
int len;
try {
len = c.read(bb);// 存放读取的数据
if(len<0){
disconnect(sk);
return;
}
} catch (Exception e) {
System.out.println("Failed to read from client!");
e.printStackTrace();
disconnect(sk);
return;
}
bb.flip();
tp.execute(new HandleMsg(sk,bb)); // 线程池处理数据
}
HandleMsg的实现很简单:
public class HandleMsg implements Runnable{
SelectionKey sk;
ByteBuffer bb;
public HandleMsg(SelectionKey sk,ByteBuffer bb){
this.sk = sk;
this.bb = bb;
}
@Override
public void run() {
EchoClient ec = (EchoClient) sk.attachment();
ec.enqueue(bb);// 将收到的数据压入队列,业务逻辑也可以在这个地方处理了
sk.interestOps(SelectionKey.OP_READ|SelectionKey.OP_WRITE);
selector.wakeup();// 强迫Selector立即返回
}
}
doWrite()代码如下,这个方法拿到的sk和doread()方法拿到的是同一个,通过这个sk可以操作共享的EchoClient
private void doWrite(SelectionKey sk) {
SocketChannel c = (SocketChannel) sk.channel();
EchoClient ec = (EchoClient) sk.attachment();
LinkedList<ByteBuffer> outq = ec.getOutq();
ByteBuffer bb = outq.getLast();// 列表顶部元素,写回客户端
try {
int len = c.write(bb);
if(len == -1){
disconnect(sk);
return;
}
if(bb.remaining()== 0){
outq.removeLast();// 缓冲区已经完成写,删除它
}
} catch (Exception e) {
System.out.println("Failed to write to client.");
e.printStackTrace();
disconnect(sk);
return;
}
if(outq.size()==0){
sk.interestOps(SelectionKey.OP_READ);
}
}
下面用NIO设计一个客户端
首先初始化Selector和Channel
private Selector selector;
public void init(String ip,int port) throws IOException{
SocketChannel s = SocketChannel.open();
s.configureBlocking(false);
this.selector = SelectorProvider.provider().openSelector();
s.connect(new InetSocketAddress(ip,port));// 并不定连接成功,需要finishConnect()确认
s.register(selector, SelectionKey.OP_CONNECT);
}
程序的工作执行逻辑,主要两件事,一个是链接就绪的Connect,一个是刻度的read()事件:
public void working() throws IOException{
while(true){
if(!this.selector.isOpen()){
break;
}
this.selector.select();
Iterator<SelectionKey> i = this.selector.selectedKeys().iterator();
while(i.hasNext()){
SelectionKey key = i.next();
i.remove();
if(key.isConnectable()){
connect(key);// 判断有没有完成连接,没有的话使用finishConnect()方法完成连接,并向Channel中写入数据及感兴趣的事情
}else if(key.isReadable()){
read(key);
}
}
}
}
下面是read事件
private void read(SelectionKey key) throws IOException {
SocketChannel c = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(100);
c.read(buffer);
byte[] bs = buffer.array();
String msg = new String(bs).trim();
System.out.println("客户端收到信息:"+msg);
c.close();
key.selector().close();
}
帖子还没人回复快来抢沙发