Java基础知识——JavaIO

05月07日 收藏 0 评论 1 java开发

Java基础知识——JavaIO

转载声明:文章来源https://blog.csdn.net/weixin_45680007/article/details/104099713

一、IO的基本分类

data = socket.read();socket本意为套接字,为ip+端口,我理解为一个数据载体
(BIO)阻塞io:进程系统调用获取数据后,没有等到数据,则陷入阻塞状态。等到IO获取数据后,进程才就绪状态
(NIO not block io)非阻塞io:进程系统调用获取数据后,没有等到数据,则立即返回。循环往复
(NIO new io)多路复用io.:非阻塞的延伸,进程和IO进程不再是一对一而是一对多的关系,会一次性循环
同步非阻塞模型:不再创建线程去IO,而是需要是发出请求给acceptor,acceptor不断去轮询多个 socket 的状态,只有当 socket 真正有读写事件时,才真正调用实际的 IO 读写操作。因为在多路复用 IO 模型中,只需要使用一个线程就可以管理多个 socket,系统不需要建立新的进程或者线程,并且只有在真正有 socket 读写事件进行时,才会使用 IO 资源,所以它大大减少了资源占用率(select,poll,epoll)

(AIO)异步 IO 模型
在异步 IO 模型中,用户线程发起 read 操作,立刻就可以开始去做其它的事。然后,内核线程会等待数据准备完成,然后内核线程将数据拷贝到用户线程的缓冲区(磁盘等位置),当这一切都完成之后,内核会给用户线程发送一个信号,告诉它 read 操作完成

多路复用IO和AIO的区别:多路复用IO用户线程需要主动去读取数据,而AIO内核线程已经帮用户读区数据,用户直接使用
所以同步与异步的区别在于:谁去读取数据或者叫是否经历阻塞

注意和Java调用的区别:
Java调用:同步调用,异步调用,
回调:A调用B的同时传入函数指针,B会调用A的函数指针

二、BIO

Javaio包:

BIO:blockIO,Java的基本IO都是阻塞IO
经典模式:
Acceptor模式:即每当一个client线程访问,服务器创建一个线程来回答。
优化:依靠线程池模式


① 服务器端的Server是一个线程,线程中执行一个死循环来阻塞的监听客户端的连接请求和通信。
② 当客户端向服务器端发送一个连接请求后,服务器端的Server会接受客户端的请求,ServerSocket.accept()从阻塞中返回,得到一个与客户端连接相对于的Socket
③ 构建一个handler,将Socket传入该handler。创建一个线程并启动该线程,在线程中执行handler,这样与客户端的所有的通信以及数据处理都在该线程中执行。当该客户端和服务器端完成通信关闭连接后,线程就会被销毁
④ 然后Server继续执行accept()操作等待新的连接请求

三、Java nio :new io

NIO 主要有:Channel(通道),Buffer(缓冲区), Selector。
传统 IO 基于字节流和字符流进行操作,而 NIO 基于 Channel 和 Buffer(缓冲区)进行操作,数据总是从通道读取到缓冲区 中,或者从缓冲区写入到通道中。Selector(选择区)用于监听多个通道的事件(比如:连接打开, 数据到达)。因此,单个线程可以监听多个数据通道。

Channel:“通道”。Channel 和 IO 中的 Stream(流)是差不多一个等级的。只不过 Stream 是单向的而 Channel 是双向 的,既可以用来进行读操作,又可以用来进行写操作。面向缓冲区

Buffer:缓冲区,实际上是一个容器,是一个连续数组。Channel 提供从文件、 网络读取数据的渠道,但是读取或写入的数据都必须经由
buffer
所有的缓冲区类型都继承于抽象类Buffer

1)使用NIO读取数据

在前面我们说过,任何时候读取数据,都不是直接从通道读取,而是从通道读取到缓冲区。所以使用NIO读取数据可以分为下面三个步骤:
FileInputStream fin = new FileInputStream(“c:\test.txt”);
从FileInputStream获取Channel ——FileChannel fc = fin.getChannel();
创建Buffer ——ByteBuffer buffer = ByteBuffer.allocate(1024);
文本将数据从Channel写入到Buffer中——fc.read(buffer);
用户从Buffer中直接读数据——byte b = buffer.get();

2)使用NIO写入数据

FileOutputStream fout = new FileOutputStream( “e:\test.txt” );
从FileOutputStream获取Channel——FileChannel fc = fout.getChannel();
创建Buffer—— ByteBuffer buffer = ByteBuffer.allocate( 1024 );
用户将数据直接写入Buffer——buffer.put( message[i] );
文本从Channel中读入Buffer数据—— fc.write( buffer );
综上所述,用户都是直接与Buffer接触。而文本才利用channel与Buffer接触

Selector 类是 NIO 的核心类,Selector 能够检测多个注册的通道上是否有事件发生,如果有事件发生,便获取事件然后针对每个事件进行相应的响应处理。用一个单线程就可以管理多个通道,也就是管理多个连接。

优点:

1:BIO面向流,NIO面向缓冲区
Java IO 面向流意味着每次从流中读一个或多个字节,直至读取所有字节,它们没有被缓存在任何地方。此外,它不能前后移动流中的数据。
数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动。这就增加了处理过程中的灵活性。

2:IO 的各种流是阻塞的。这意味着,当一个线程调用 read() 或 write()时,该线程被阻塞,直到有一些数据被读取,或数据完全写入。该线程在此期间不能再干任何事情了。
NIO 的非阻塞模式, 使一个线程从某通道发送请求读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取。而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。

这里我理解为如果服务器和客户端使用socket,必须同时读和写,所以导致线程一直等待。而如果是buffer,则可以暂时放入buffer,等需要的时候才读写,不需要服务器和客户端同步

(1)select==>时间复杂度O(n)
返回的是所有存放socket标志位fd的数据结构
它仅仅知道了,有I/O事件发生了,却并不知道是哪那几个流(可能有一个,多个,甚至全部),我们只能无差别轮询所有流
(2)poll==>时间复杂度O(n)
没有最大连接数的限制,原因是它是基于链表来存储的.
(3)epoll==>时间复杂度O(1)
返回的是具体某个handler标识
空闲的Socket调用回调函数,会把哪个流发生了怎样的I/O事件通知我们。所以我们说epoll实际上是事件驱动(每个事件关联上fd)的,此时我们对这些流的操作都是有意义的。(复杂度降低到了O(1))

四、Reactor(反应器模式)

理解为NIO是Java提供的API,而其设计思想是Reactor模式
同步非阻塞的I/O多路复用机制

单线程:

Reactor(也就是Initiation Dispatcher)监听accept事件,收到事件进行转发。
如果是连接建立的事件,则Reactor监听后给acceptor,由acceptor创建handler处理后续事件,并且会注册连接所关注的READ事件以及对应的READ事件处理器注册(或者WRITE)到Reactor中。
如果不是建立连接事件(简历连接后的读写事件),则Reactor会分发调用Handler来响应。
handler会完成read->业务处理->send的完整业务流程。

缺点:
1:Reactor阻塞,则整个线程阻塞
2:事件accept()、read()、write()以及connect()和非IO操作都在一个线程上完成

多线程:
Handler只负责响应事件,不做具体业务处理,通过Read读取数据后,会分发给后面的Worker线程池进行业务处理。
Worker线程池会分配独立的线程完成真正的业务处理,Handler收到响应结果后通过send将响应结果返回给Client。


缺点:所有的I/O操作依旧由一个Reactor来完成,包括I/O的accept()、read()、write()以及connect()操作

主从线程:


mainReactor可以只有一个(多个CPU就有多个),但subReactor一般会有多个
mainReactor主要是监听连接请求并将accept事件给acceptor,acceptor连接后将读写请求传给subReactor线程池分配的subReactor线程。subReactor线程识别后将请求给handler

select/poll/epoll

linux中一切都是文件,网络连接都是以文件描述符fd表示
一般有两个管理线程,一个是socket处理线程,一般是接受连接,然后转化为对应的fd或者设置回调函数(epoll)。另一个是用于遍历fd查看是否有数据。

select

select的参数:最大文件描述符+1,读文件集合,写文件集合,异常集合,超时时间
bitmap(rset):1024位,需要监听的置1.不需要监听的0。实际
如1,2,5,7,9的文件描述符被占用,则bitmap为0110010101—

执行过程:
1:将bitmap(rset)从用户态拷贝到内核态,内核态进行判断,阻塞
如果有数据,则FD(rset)置位,说明有数据来了,select返回不再阻塞
2:遍历FD(rset),判断哪一位置位,进行读取

缺点:
1:bitmap最大为1024——利用poll即链表
2:rset不可重用,有新的文件描述符则重新创建
3:拷贝过程仍然有开销
4:需要遍历rset

poll

poll:采用pollfd
pollfd{fd,events在意的事件,revents是否有读数据}
同样的将pollfd从用户态拷贝到内核态,如果有数据则revents置位
返回后,将revents归位即可
1:解决1
2:pollfds可重用

epoll

利用epfd,实际是红黑树+双向链表
红黑树存储以后的socket,双向链表存储准备就绪的事件

事件驱动:所有添加到epoll中的事件都会与设备(如网卡)驱动程序建立回调关系,也就是说相应事件的发生时会调用这里的回调方法。这个回调方法在内核中叫做ep_poll_callback,它会把这样的事件放到上面的rdllist双向链表中。

1:用户态和内核态共享内存,堆外内存mmap
2:如果有数据,将fd放入epfd前面重排,如果有多个,则都放在前面。
拥有返回值count,一个触发事件则++

对比select的好处
1:O(1)的时间复杂度,不需要遍历bitmap
2:没有文件描述符的长度限制
3:不需要来回拷贝,使用mmap

epoll有EPOLLLT和EPOLLET两种触发模式
LT水平触发:要这个fd还有数据可读,每次 epoll_wait都会返回它的事件也就是放入双向链表
ET边缘触发:触发后,如果没有读完,下一次不会触发。只有新事件到来才会触发。系统不会充斥大量你不关心的就绪文件描述符

Preactor

异步非阻塞AIO

事件处理器(Handler)事件分离器(Proactor)
1:应用程序初始化一个异步读取操作,然后注册相应的事件处理器,此时事件处理器不关注读取就绪事件,而是关注读取完成事件
2:事件处理器提供(存放读到数据的缓存区,读的数据大小,或者用于存放外发数据的缓存区,以及这个请求完后的回调函数)等信息给操作系统
3:操作系统完成任务,并将读取的内容放入用户传递过来的缓存区中
4:事件分离器捕获到读取完成事件后,激活应用程序注册的事件处理器,事件处理器直接从缓存区读取数据,而不需要进行实际的读取操作。

Netty

异步事件驱动的 NIO 框架 采用Reactor(反应器模式)
NioServer /NioClient
Reactor Thread
IoHandler

NIO高效的原因:
零拷贝 Zero-copy
并不是零次拷贝,而是因为内核和用户缓冲区之间,没有数据是重复的
网络编程:

File file = new File("index.html");
RandomAccessFile raf = new RandomAccessFile(file, "rw");
byte[] arr = new byte[(int) file.length()];
raf.read(arr);
Socket socket = new ServerSocket(8080).accept();
socket.getOutputStream().write(arr);
```正常流程:
read:磁盘->页缓存(内核空间)->用户空间 用户态-内核态-用户态
write:用户空间->Socket缓存(内核空间)->IO 用户态-内核态-用户态
传统的拷贝次数:4次
第一次:read :DMA将数据从磁盘到内核缓冲区。
第二次:内核缓冲区的数据拷贝到用户缓冲区
第三次:write:用户缓冲区到 Socket 缓冲区
第四次: Socket 缓冲区到网络协议引擎
上下文切换次数:4次
read两次,write两次

mmap 优化:内存映射,将文件映射到内核缓冲区,用户空间可以共享内核空间的数据
引入了mmap
mmap:内存映射文件。将用户空间(文件)映射到内核空间中(具体来说是页缓存中)。在实现上也是可行的,因为从虚拟地址上0-3G是用户,3-4G是内核,只要有个映射就好。
所以mmap+write代替传统的是read+write方法,少了一次CPU拷贝
mmap+write:
mmap:磁盘->页缓存 用户态-内核态-用户态
write:页缓存 ->Socket缓存->IO 用户态-内核态-用户态


sendFile:数据根本不经过用户态,直接从内核缓冲区进入到 Socket Buffer
拷贝次数:3次 同上(都是在第一次内核态中完成)
上下文切换次数:2次
SendFile代替mmap+write,减少一次用户内核切换
磁盘->页缓存-> Socket缓存->IO 用户态-内核态-用户态

JVM可以使用的内存分外2种:堆内存和堆外内存(直接内存)
代码中能直接操作本地内存的方式有2种:使用未公开的Unsafe和NIO包下ByteBuffer。
Unsafe申请的内存不受GC控制(所以可能导致OOM)
Direct Memory是受GC控制的,例如ByteBuffer bb = ByteBuffer.allocateDirect(1024)
JDK中使用DirectByteBuffer对象来表示堆外内存,每个DirectByteBuffer对象在初始化时,都会创建一个对应的Cleaner对象,用于保存堆外内存的元信息(开始地址、大小和容量等)
主动回收:只要从DirectByteBuffer里取出那个sun.misc.Cleaner,然后调用它的clean()就行;
GC回收:GC回收DirectByteBuffer时,会调用.Cleaner的clean()

堆外内存是核心态,堆内内存是用户态
正常存储到磁盘或者从磁盘到堆时:
磁盘——内核态内存——堆内内存(Naive堆(本地方法堆),因为是Naive函数控制读写流)
原因是操作系统把内存中的数据写入磁盘或网络时,要求数据所在的内存区域不能变动,但是JVM的GC机制会对内存进行整理,导致数据内存地址发生变化,所以无奈,JDK只能先拷贝到堆外内存(不受GC影响),然后把这个地址发给操作系统。==磁盘读取较慢==


非直接缓冲区(堆内缓冲区)和直接缓冲区别(堆外缓冲区):
非直接:四次拷贝
直接:相当于mmap,三次拷贝
使用Native函数库直接分配堆外内存,然后通过一个存储在Java堆中的DirectByteBuffer对象作为这块内存的引用进行操作,在垃圾回收时并不能用程序区控制堆外内存的回收,因为不属于虚拟机,只能是垃圾回收机制按需对堆内的DirectByteBuffer对象进行回收,回收后将与直接缓存区失去联系,也就意味着直接缓冲区被回收。

内存池:
JVM和JIT使得对象的分配和回收效率高,但是buffer却仍然效率低
针对于:缓冲区Buffer
PooledByteBuf采用二叉树来实现一个内存池==我理解内存池大小不同,所以需要进行排序==

串行无锁设计:==感觉就是selector的实现==
NioEventLoop中封装了一个线程即IO线程
拥有一个队列,存储其他读写进程需要执行的任务,队列的设计是线程安全的
简单来说就是,有多个读写进程,选取一个读写进程封装为NioEventLoop,从而在NioEventLoop执行其他读写进程的操作。
多个生产者单个消费者Mpsc

线程通过下面操作判断是应该主动读写还是应该封装。
它就是拿当前线程和之前创建NioEventLoop时绑定的那个IO线程进行判断, 如果是一样的, 说明此线程就是绑定的IO线程, 可以执行读写操作, 如果不一样, 那么说明是其他线程, 就要把读写操作封装成任务放在队列中, 由绑定的那个IO线程去执行
![在这里插入图片描述](https://img-blog.csdnimg.cn/20191217093316526.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80NTY4MDAwNw==,size_16,color_FFFFFF,t_70)

ChannelPipeline


![在这里插入图片描述](https://img-blog.csdnimg.cn/20200228210127226.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80NTY4MDAwNw==,size_16,color_FFFFFF,t_70)
Channel,表示一个连接,可以理解为每一个请求,就是一个Channel。
ChannelHandler,核心处理业务就在这里,用于处理业务请求。
ChannelHandlerContext,用于传输业务数据。
ChannelPipeline,用于保存处理过程需要用到的ChannelHandler和ChannelHandlerContext。



网络编程

JavaIO:阻塞型
服务器:
1: ServerSocket serverSocket = new ServerSocket(8000);socket绑定端口
2:Socket socket = serverSocket.accept();
3: InputStream inputStream = socket.getInputStream();

public class IOServer {
public static void main(String[] args) throws Exception {

ServerSocket serverSocket = new ServerSocket(8000);

// (1) 接收新连接线程
new Thread(() -> {
while (true) {
try {
// (1) 阻塞方法获取新的连接
Socket socket = serverSocket.accept();

// (2) 每一个新的连接都创建一个线程,负责读取数据
new Thread(() -> {
try {
byte[] data = new byte[1024];
InputStream inputStream = socket.getInputStream();
while (true) {
int len;
// (3) 按字节流方式读取数据
while ((len = inputStream.read(data)) != -1) {
System.out.println(new String(data, 0, len));
}
}
} catch (IOException e) {
}
}).start();

} catch (IOException e) {
}
}
}).start();
}
}

客户端:
1:Socket socket = new Socket("127.0.0.1", 8000);绑定端口
2: socket.getOutputStream().write()写数据

public class IOClient {
public static void main(String[] args) {
new Thread(() -> {
try {
Socket socket = new Socket("127.0.0.1", 8000);
while (true) {
try {
socket.getOutputStream().write((new Date() + ": hello world").getBytes());
socket.getOutputStream().flush();
Thread.sleep(2000);
} catch (Exception e) {
}
}
} catch (IOException e) {
}
}).start();
}
}


JavaNIO:
服务器:

public class NIOServer {
public static void main(String[] args) throws IOException {
Selector serverSelector = Selector.open();
Selector clientSelector = Selector.open();

new Thread(() -> {
try {
// 对应IO编程中服务端启动
ServerSocketChannel listenerChannel = ServerSocketChannel.open();
listenerChannel.socket().bind(new InetSocketAddress(8000));
listenerChannel.configureBlocking(false);
listenerChannel.register(serverSelector, SelectionKey.OP_ACCEPT);

while (true) {
// 监测是否有新的连接,这里的1指的是阻塞的时间为1ms
if (serverSelector.select(1) > 0) {
Set<SelectionKey> set = serverSelector.selectedKeys();
Iterator<SelectionKey> keyIterator = set.iterator();

while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();

if (key.isAcceptable()) {
try {
// (1) 每来一个新连接,不需要创建一个线程,而是直接注册到clientSelector
SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
clientChannel.configureBlocking(false);
clientChannel.register(clientSelector, SelectionKey.OP_READ);
} finally {
keyIterator.remove();
}
}

}
}
}
} catch (IOException ignored) {
}

}).start();

new Thread(() -> {
try {
while (true) {
// (2) 批量轮询是否有哪些连接有数据可读,这里的1指的是阻塞的时间为1ms
if (clientSelector.select(1) > 0) {
Set<SelectionKey> set = clientSelector.selectedKeys();
Iterator<SelectionKey> keyIterator = set.iterator();

while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();

if (key.isReadable()) {
try {
SocketChannel clientChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// (3) 读取数据以块为单位批量读取
clientChannel.read(byteBuffer);
byteBuffer.flip();
System.out.println(Charset.defaultCharset().newDecoder().decode(byteBuffer)
.toString());
} finally {
keyIterator.remove();
key.interestOps(SelectionKey.OP_READ);
}
}
}
}
}
} catch (IOException ignored) {
}
}).start();
}
}

1:
serverSelector负责轮询是否有新的连接,clientSelector负责轮询连接是否有数据可读
利用Selector
2:
在serverSelector中:Selector
如果serverSelector.select(1) > 0即有新连接
Set< SelectionKey> set = serverSelector.selectedKeys();
将clientChannel.register(clientSelector, SelectionKey.OP_READ);绑定

3:
在clientSelector中:
如果serverSelector.select(1) > 0即有新连接
同上利用keyIterator.next();查看哪一个key.isReadable()。这是select,O(n)时间复杂度

Netty:

private EventLoopGroup bossGroup = new NioEventLoopGroup(1);
private EventLoopGroup workerGroup = new NioEventLoopGroup();//用于处理I/O相关的读写操作,或者执行Task

服务端启动的时候,创建了两个NioEventLoopGroup,它们实际是两个独立的Reactor线程池。
一个用于接收客户端的TCP连接,另一个用于处理I/O相关的读写操作,或者执行系统Task、定时任务Task等。
bossGroup:接收客户端TCP连接,初始化Channel参数
workerGroup:
异步读取通信对端的数据报,发送读事件到ChannelPipeline;
异步发送消息到通信对端,调用ChannelPipeline的消息发送接口;


C 1条回复 评论
Amusi

测试真的是坑,啥都要会,一个项目要覆盖到方方面面,先是功能,再是自动化,每日构建,再是性能,再是安全,哎,小公司要你会的全,大厂要你的会的精还全,哎,真是太难了

发表于 2022-05-25 23:00:00
0 0