NIO
non-blocking io 非阻塞io
三大组件
channel & buffer
channel有点类似于stream,他就是读写数据的双向通道,可以从channel将数据读入buffer,也可以将buffer的数据写入channel,而之前的stream要么是输入,要么是输出,channel比stream更为底层
常见的channel有
- FileChannel
- DatagramChannel
- SocketChannel
- ServerSocketChannel
buffer则用来缓冲读写数据,常见的buff有
- ByteBuffer
-
- MapperByteBuffer
-
- DirectByteBuffer
-
- HeapByteBuffer
- ShortBuffer
- IntBuffer
- LongBuffer
- FloatBuffer
- DoubleBuffer
- CharBuffer
Selector选择器
selector 单从字面意思不好理解,需要结合服务器的设计演化来理解他的用途
多线程版设计
多线程版缺点:
- 内存占用高
- 线程上下文切换成本高
- 只适合连接少数的场景
线程池版
线程池版的缺点:
- 阻塞模式下,线程只能处理一个socket连接
- 仅适合短连接场景
selector版设计
selector的作用就是配合一个线程来管理多个Channel,获取这些channel上发生的事件,这些channel工作在非阻塞模式下,不会让线程吊死在一个channel上。适合连接数特别多,但是流量低的场景(low traffic)
调用selector的select()会阻塞直到channel发生了读写就绪事件,这些事件发生,select方法就会返回这些事件交给thread来处理
ByteBuffer 初识
使用的主要依赖
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.110.Final</version>
</dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.11.0</version>
</dependency><dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>20.0</version>
</dependency>
有一段普通的文本文件data.txt,内容为:
1234567890abc
使用FileChannel来读取文件内容
public static void main(String[] args) {//FileChannel//1.输入输出流 2.RandomAccessFiletry(FileChannel channel = new FileInputStream("data.txt").getChannel()){//准备缓冲区ByteBuffer buffer = ByteBuffer.allocate(10);//从channel读取数据,向buffer写入数据channel.read(buffer);//打印buffer内容,获取buffer里的内容并打印//切换至读模式buffer.flip();//是否还有剩余的数据,有就一直读取while (buffer.hasRemaining()) {byte b = buffer.get();//空参,每次一个字节System.out.println((char) b);}}catch (Exception e){e.printStackTrace();}
}
结果显示是
为什么没有abc呢?因为设置的是一次读取10个字节,也只读取了一次,所以后面的就没有读到。而我们不能让一次读取的字节大小随着文件的大小变化,应该是多次读取才对。因此:需要改进,channel读取数据的返回值是-1时,表示已经没有数据可读了,所以我们可以在我们加个while-true循环,一直读取,直到读完。注意,向buffer里面写入数据之后(也就是buffer为读模式),需要重新将buffer设置为写模式,buffer.clear();
public static void main(String[] args) {//FileChannel//1.输入输出流 2.RandomAccessFiletry(FileChannel channel = new FileInputStream("data.txt").getChannel()){//准备缓冲区ByteBuffer buffer = ByteBuffer.allocate(10);while(true){//从channel读取数据,向buffer写入数据int read = channel.read(buffer);//返回值是-1表示读完了log.debug("读取到的字节数 : {}", read);if(read == -1){break;}//打印buffer内容,获取buffer里的内容并打印//切换至读模式buffer.flip();//是否还有剩余的数据,有就一直读取while (buffer.hasRemaining()) {byte b = buffer.get();//空参,每次一个字节log.debug("读取到的字节 : {}",(char) b);}// 切换为写模式buffer.clear();}}catch (Exception e){e.printStackTrace();}}
buffer使用姿势
- 向buffer写入数据,例如调用channel.read(buffer)
- 调用filp()切换至读模式
- 从buffer读取数据,例如调用buffer.get()
- 调用clear()或compact切换至写模式
- 重复1-4步骤
ByteBuffer 结构
ByteBuffer有以下重要属性
- capacity 容量
- position 位置
- limit 写入限制
调试工具类
public class ByteBufferUtil {private static final char[] BYTE2CHAR = new char[256];private static final char[] HEXDUMP_TABLE = new char[256 * 4];private static final String[] HEXPADDING = new String[16];private static final String[] HEXDUMP_ROWPREFIXES = new String[65536 >>> 4];private static final String[] BYTE2HEX = new String[256];private static final String[] BYTEPADDING = new String[16];
static {final char[] DIGITS = "0123456789abcdef".toCharArray();for (int i = 0; i < 256; i++) {HEXDUMP_TABLE[i << 1] = DIGITS[i >>> 4 & 0x0F];HEXDUMP_TABLE[(i << 1) + 1] = DIGITS[i & 0x0F];}
int i;
// Generate the lookup table for hex dump paddingsfor (i = 0; i < HEXPADDING.length; i++) {int padding = HEXPADDING.length - i;StringBuilder buf = new StringBuilder(padding * 3);for (int j = 0; j < padding; j++) {buf.append(" ");}HEXPADDING[i] = buf.toString();}
// Generate the lookup table for the start-offset header in each row (up to 64KiB).for (i = 0; i < HEXDUMP_ROWPREFIXES.length; i++) {StringBuilder buf = new StringBuilder(12);buf.append(NEWLINE);buf.append(Long.toHexString(i << 4 & 0xFFFFFFFFL | 0x100000000L));buf.setCharAt(buf.length() - 9, '|');buf.append('|');HEXDUMP_ROWPREFIXES[i] = buf.toString();}
// Generate the lookup table for byte-to-hex-dump conversionfor (i = 0; i < BYTE2HEX.length; i++) {BYTE2HEX[i] = ' ' + StringUtil.byteToHexStringPadded(i);}
// Generate the lookup table for byte dump paddingsfor (i = 0; i < BYTEPADDING.length; i++) {int padding = BYTEPADDING.length - i;StringBuilder buf = new StringBuilder(padding);for (int j = 0; j < padding; j++) {buf.append(' ');}BYTEPADDING[i] = buf.toString();}
// Generate the lookup table for byte-to-char conversionfor (i = 0; i < BYTE2CHAR.length; i++) {if (i <= 0x1f || i >= 0x7f) {BYTE2CHAR[i] = '.';} else {BYTE2CHAR[i] = (char) i;}}}
/*** 打印所有内容* @param buffer*/public static void debugAll(ByteBuffer buffer) {int oldlimit = buffer.limit();buffer.limit(buffer.capacity());StringBuilder origin = new StringBuilder(256);appendPrettyHexDump(origin, buffer, 0, buffer.capacity());System.out.println("+--------+-------------------- all ------------------------+----------------+");System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), oldlimit);System.out.println(origin);buffer.limit(oldlimit);}
/*** 打印可读取内容* @param buffer*/public static void debugRead(ByteBuffer buffer) {StringBuilder builder = new StringBuilder(256);appendPrettyHexDump(builder, buffer, buffer.position(), buffer.limit() - buffer.position());System.out.println("+--------+-------------------- read -----------------------+----------------+");System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), buffer.limit());System.out.println(builder);}
private static void appendPrettyHexDump(StringBuilder dump, ByteBuffer buf, int offset, int length) {if (isOutOfBounds(offset, length, buf.capacity())) {throw new IndexOutOfBoundsException("expected: " + "0 <= offset(" + offset + ") <= offset + length(" + length+ ") <= " + "buf.capacity(" + buf.capacity() + ')');}if (length == 0) {return;}dump.append(" +-------------------------------------------------+" +NEWLINE + " | 0 1 2 3 4 5 6 7 8 9 a b c d e f |" +NEWLINE + "+--------+-------------------------------------------------+----------------+");
final int startIndex = offset;final int fullRows = length >>> 4;final int remainder = length & 0xF;
// Dump the rows which have 16 bytes.for (int row = 0; row < fullRows; row++) {int rowStartIndex = (row << 4) + startIndex;
// Per-row prefix.appendHexDumpRowPrefix(dump, row, rowStartIndex);
// Hex dumpint rowEndIndex = rowStartIndex + 16;for (int j = rowStartIndex; j < rowEndIndex; j++) {dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);}dump.append(" |");
// ASCII dumpfor (int j = rowStartIndex; j < rowEndIndex; j++) {dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);}dump.append('|');}
// Dump the last row which has less than 16 bytes.if (remainder != 0) {int rowStartIndex = (fullRows << 4) + startIndex;appendHexDumpRowPrefix(dump, fullRows, rowStartIndex);
// Hex dumpint rowEndIndex = rowStartIndex + remainder;for (int j = rowStartIndex; j < rowEndIndex; j++) {dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);}dump.append(HEXPADDING[remainder]);dump.append(" |");
// Ascii dumpfor (int j = rowStartIndex; j < rowEndIndex; j++) {dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);}dump.append(BYTEPADDING[remainder]);dump.append('|');}
dump.append(NEWLINE +"+--------+-------------------------------------------------+----------------+");}
private static void appendHexDumpRowPrefix(StringBuilder dump, int row, int rowStartIndex) {if (row < HEXDUMP_ROWPREFIXES.length) {dump.append(HEXDUMP_ROWPREFIXES[row]);} else {dump.append(NEWLINE);dump.append(Long.toHexString(rowStartIndex & 0xFFFFFFFFL | 0x100000000L));dump.setCharAt(dump.length() - 9, '|');dump.append('|');}}
public static short getUnsignedByte(ByteBuffer buffer, int index) {return (short) (buffer.get(index) & 0xFF);}
}
重要例子:
public class TestByteBufferReadWrite {public static void main(String[] args) {ByteBuffer buffer = ByteBuffer.allocate(10);buffer.put((byte) 0x61);//'a'ByteBufferUtil.debugAll(buffer);buffer.put(new byte[]{(byte) 0x62, (byte) 0x63, (byte) 0x64, (byte) 0x65});ByteBufferUtil.debugAll(buffer);//此时调用get,因为此时position在后一个索引上,所以是根本没有数据的,为0 因此需要切换读模式才能将position置为开头来读取数据//buffer.get();//切换成读模式buffer.flip();byte b = buffer.get();System.out.println(b);//’0x61‘十进制的’97‘ByteBufferUtil.debugAll(buffer);}
}
结果:
额外提一下
//使用另一个方法切换成写模式,这个是将剩余数据挤到开头
buffer.compact();
ByteBufferUtil.debugAll(buffer);
会发现最后一个数据还是65,并没有清零,但是注意看position的位置是在索引4,所以下一次往buffer里写数据之后就会继续写,就会把这个65给覆盖掉,所以不清零也没关系。
ByteBuffer 常见方法
分配空间
可以使用allocate方法为ByteBuffer分配空间,其他buffer也有该方法
public static void main(String[] args) {//分配后容量是固定的,不能超过。ByteBuffer buffer = ByteBuffer.allocate(10);System.out.println(ByteBuffer.allocate(16).getClass());System.out.println(ByteBuffer.allocateDirect(16).getClass());/** class java.nio.HeapByteBuffer -使用的是java堆内存,读写效率较低,受垃圾回收的影响,万一内存不够,需哟垃圾回收,有用的数据就要备份拷贝迁移,就需要时间影响效率* class java.nio.DirectByteBuffer -使用的是直接内存,读写效率高(少一次数据拷贝),使用的是系统内存,不会受GC影响,但是分配的效率低。假如用完不关闭什么的,可能还会造成内存泄漏。*/
}
向Buffer写入数据
有两种方法
- 调用channel的read方法
- 调用buffer自己的put方法
int readBytes = channel.read(buf);
和
buf.put((byte) 127);
从Buffer里读取数据
同样有两种办法
- 调用channel的write方法
- 调用buffer自己的get方法
int writeBytes = channel.write(buf);
和
byte b = buf.get();
get方法会让position 读取指针向后走,如果想重复读取数据
- 可以调用rewind方法将position重新置为0
- 或者调用get(int i)方法获取索引 i 的内容,它不会移动读指针
mark & reset方法:mark 做一个标记,记录position的位置,reset 是将 position 重置到mark 的位置
ByteBuffer与字符串之间的转换
public static void main(String[] args) {//1.字符串转为ByteBufferByteBuffer buffer = ByteBuffer.allocate(16);buffer.put("hello".getBytes());debugAll(buffer);//buffer还是写模式//2.CharsetByteBuffer buffer2 = StandardCharsets.UTF_8.encode("hello");debugAll(buffer2);//buffer自动切换成读模式了//3.wrapByteBuffer buffer3 = ByteBuffer.wrap("hello".getBytes());debugAll(buffer3);//类似的 buffer转字符串String string = StandardCharsets.UTF_8.decode(buffer2).toString();System.out.println(string);
}
因为第一种方法转换完之后还是写模式,所以用decode转成字符串就会有问题,出现乱码。所以为了正确读取,第一个方法需要用filp()方法切换成读模式
Scattering Reads
分散读取,有一个文本文件 words.txt
onetwothree
使用如下方法读取,可以将数据填充至多个buffer
public static void main(String[] args) {try(FileChannel channel = new RandomAccessFile("words.txt", "rw").getChannel()){ByteBuffer buffer = ByteBuffer.allocate(3);ByteBuffer buffer2 = ByteBuffer.allocate(3);ByteBuffer buffer3 = ByteBuffer.allocate(5);channel.read(new ByteBuffer[]{buffer, buffer2, buffer3});buffer.flip();buffer2.flip();buffer3.flip();debugAll(buffer);debugAll(buffer2);debugAll(buffer3);} catch (Exception e) {e.printStackTrace();}
}
集中写
public static void main(String[] args) {ByteBuffer buffer = StandardCharsets.UTF_8.encode("hello");ByteBuffer buffer2 = StandardCharsets.UTF_8.encode("world");ByteBuffer buffer3 = StandardCharsets.UTF_8.encode("你好");try(FileChannel channel = new RandomAccessFile("words2.txt","rw").getChannel()){channel.write(new ByteBuffer[]{buffer,buffer2,buffer3});}catch (Exception e){e.printStackTrace();}
}
解决黏包和半包
说白了就是发送多条信息,buffer的长度有限,假如一个buffer包含多条信息,那就是黏包,假如一条消息被分在了多个buffer里面,那就是半包。我们要做的就是正确获取黏包和半包里面的信息。
public static void main(String[] args) {ByteBuffer source = ByteBuffer.allocate(32);source.put("Hello,world\nI'm zhangsan\nHo".getBytes());split(source);source.put("w are you\n".getBytes());split(source);
}private static void split(ByteBuffer source) {source.flip();//切换读模式for (int i=0; i<source.limit(); i++) {//找到一条完整的消息if(source.get(i) == '\n'){//把这条消息存入新的ByteBufferint length = i+1-source.position();//这里计算了消息的长度ByteBuffer target = ByteBuffer.allocate(length);//从 source 读,向 target 写for (int j=0; j<length; j++){byte b = source.get();target.put(b);}debugAll(target);}}source.compact();//切换写模式,注意不能clear,还有消息没读完,要压缩继续写
}
文件编程
FileChannel
FileChannel只能工作在非阻塞模式下面,不能和selector一起使用
获取
不能直接打开FIleChannel,必须通过FileInputSream,或者FileOutputSetream ,或者RandomAccessFile来获取FileChannel
- 通过FileInputSream获取的channel只能读
- 通过FileOutputSetream 获取的channel只能写
- 通过RandomAccessFile 是否能读写,根据构造时指定的读写模式相关(“r”,“w”)
读取
会从channel读取数据填充到ByteBuffer中,返回的值,表示读到了多少字节,-1表示到达了文件的末尾
int read = channel.read(buffer);
写入
在while中调用 write方法,是因为 write方法并不能保证一次将buffer中的内容全部写入channel中
public void test4(){try (FileChannel channel = new RandomAccessFile("data.txt", "rw").getChannel()) {ByteBuffer b = ByteBuffer.allocate(10);b.put((byte) 'a'); // 存入数据b.put((byte) 'b'); // 存入数据b.put((byte) 'c'); // 存入数据b.flip(); // 切换为读模式// 在while中调用 write方法,是因为 write方法并不能保证一次将buffer中的内容全部写入channel中while (b.hasRemaining()){// 写入数据channel.write(b);}} catch (IOException e) {}
}
关闭
channel必须关闭,不过调用了FileInoutStream,FileOutputStream,或者RandomAccessFile的close方法会间接的调用channle的close方法
位置
channel.position 是 Java NIO 中用于获取通道(Channel)当前的位置的方法。通道的位置表示从数据源(如文件或网络连接)的开头到当前位置之间的字节数。
@Test
@DisplayName("测试channel.position()方法")
public void test5(){try (FileChannel channel = new RandomAccessFile("data.txt", "r").getChannel()) {// 获取当前通道的位置ByteBuffer buffer = ByteBuffer.allocate(10);buffer.put((byte) 'a');buffer.put((byte) 'a');buffer.put((byte) 'a');// 切换为读模式buffer.flip();channel.read(buffer);long position = channel.position();logger.error("Current position: {}", position);// 在文件中移动位置,假设移动到文件的开头channel.position(0);// 再次获取当前通道的位置position = channel.position();logger.error("Current position: {}", position);} catch (IOException e) {e.printStackTrace();}
}
大小
使用size方法获取文件的大小
强制写入
操作系统处于性能的考虑,会将数据缓存,不是立刻写入磁盘,库调用force(true)方法将文件内容和元数据(文件的权限等信息)立刻写入磁盘
两个Channel传输数据
public static void main(String[] args) {String FROM = "data.txt";//项目路径下String TO = "to.txt";long start = System.nanoTime();try(FileChannel from = new FileInputStream(FROM).getChannel();FileChannel to = new FileOutputStream(TO).getChannel()) {//效率高,底层会用操作系统的零拷贝进行优化,传输一次最多传2g,解决可以多传几次from.transferTo(0, from.size(), to);}catch (Exception e){e.printStackTrace();}long end = System.nanoTime();System.out.println("传输用时:"+(end - start)/1000_000.0);}
Path
Files
遍历目录
也就是对Files.walkFileTree的使用,用到了访问者模式
public static void main(String[] args) throws IOException {Files.walkFileTree(Paths.get("D:\\IdeaProjects\\JUC-study\\files"),new SimpleFileVisitor<Path>(){//遍历前的处理@Overridepublic FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes basicFileAttributes) throws IOException {System.out.println(dir);return super.preVisitDirectory(dir, basicFileAttributes);}//遍历时对文件的处理@Overridepublic FileVisitResult visitFile(Path file, BasicFileAttributes basicFileAttributes) throws IOException {System.out.println(file);return super.visitFile(file, basicFileAttributes);}});
}
遍历删除
public static void main(String[] args) throws IOException {Files.walkFileTree(Paths.get("D:\\IdeaProjects\\JUC-study\\files"),new SimpleFileVisitor<Path>(){@Overridepublic FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes basicFileAttributes) throws IOException {System.out.println("进入:"+dir);return super.preVisitDirectory(dir, basicFileAttributes);}@Overridepublic FileVisitResult visitFile(Path file, BasicFileAttributes basicFileAttributes) throws IOException {System.out.println(file);Files.delete(file);return super.visitFile(file, basicFileAttributes);}@Overridepublic FileVisitResult postVisitDirectory(Path dir, IOException e) throws IOException {System.out.println("退出:"+dir);Files.delete(dir);return super.postVisitDirectory(dir, e);}});
}
主要就是重写的几个方法,一个是进入目录前,然后是文件,然后是退出目录后。