Okio 源码学习

Table of Contents

该项目的github地址: https://github.com/square/okio

简介

Okio是Square公司推出的Java IO库, 这个库本来Square公司的著名开源项目OkHttp的一部分, 后来被提取出来作为单独的一个开源项目发布.

本文是Okio源码的阅读笔记, 主要介绍该开源项目的内部实现原理,使用的数据结构和设计模式.

核心数据结构

常言道 "程序=数据结构+算法", Okio的核心之一就是数据结构的设计. 这部分主要"片段式"的介绍 Okio的各数据结构的实现.

Source and Sink

IO系统就是读写系统, 更直接的说就是二进制流的流动, 所以需要一个"流"的源头和目的. Source即代表Okio 数据流的来源, 即JDK中的InputStream. 而Sink则代表目的地.即JDK中的outputStream.

Source源码

Source的实现比较简单, 就是一个继承了Closeable的接口, 只定义了 read()/timeout()/close() 三个函数.

read()函数的功能是讲最多byteCount个bytes写入到sink中, sink是Buffer类型的变量, Buffer也是Okio的核心数据结构, 后面会讲到.

public interface Source extends Closeable {
  /**
   * Removes at least 1, and up to {@code byteCount} bytes from this and appends
   * them to {@code sink}. Returns the number of bytes read, or -1 if this
   * source is exhausted.
   */
  long read(Buffer sink, long byteCount) throws IOException;

  /** Returns the timeout for this source. */
  Timeout timeout();

  /**
   * Closes this source and releases the resources held by this source. It is an
   * error to read a closed source. It is safe to close a source more than once.
   */
  @Override void close() throws IOException;
}

Sink源码

Sink与Source类似, 也是一个接口, 继承了Closeable和Flushable两个接口, 同时定义(重写)了 write()/flush()/timeout()/close()四个函数.

write()也接受一个Buffer参数, 与Source的read()不同的是: 在read()中,source是作为一个"目的"使用, 而在write()中是作为"源"存在, 即将source中的内容写入到Sink中.

public interface Sink extends Closeable, Flushable {
  /** Removes {@code byteCount} bytes from {@code source} and appends them to this. */
  void write(Buffer source, long byteCount) throws IOException;

  /** Pushes all buffered bytes to their final destination. */
  @Override void flush() throws IOException;

  /** Returns the timeout for this sink. */
  Timeout timeout();

  /**
   * Pushes all buffered bytes to their final destination and releases the
   * resources held by this sink. It is an error to write a closed sink. It is
   * safe to close a sink more than once.
   */
  @Override void close() throws IOException;
}

BufferedSource and BufferedSink

在Okio的官方文档中,并不建议直接使用Source和Sink,而是建议使用他们的子类 BufferedSource和BufferedSink, 后者都封装了一个Buffer对象作为数据操作的"buffer", 并定义了一系列的接口, 例如读写一个byte/int/long/string/byteArray, 以及读取一行 等等常用的IO操作.

Segment

前面介绍的类InputStream/OutputStream的Source和Sink及其子类都使用了Buffer作为操作对象, 由此可见Buffer应该是Okio的核心数据结构.所以后面会详细介绍一下Buffer. 不过在讲Buffer之前, 先了解一下另外一个核心的数据结构: segment.

对于有过C语言经验的人来讲,segment与用C语言实现的"链表节点"在概念上完全相同,只不过segment是用 java实现的并且在类内部封装了操作的API.

//segment的C语言解释
struct node{
   "byte" data[SIZE];// no 'byte' type in c
   int pos; 
   int limit;
   int shared; //no boolean in c 
   int owner; 
   struct node *next;
   struct node *prev;
}
//好久没写C了, 好亲切...

接着看些segment的真正实现:

final class Segment {
  /** The size of all segments in bytes. */
  static final int SIZE = 2048;

  final byte[] data;

  /** The next byte of application data byte to read in this segment. */
  int pos;

  /** The first byte of available data ready to be written to. */
  int limit;

  /** True if other segments or byte strings use the same byte array. */
  boolean shared;

  /** True if this segment owns the byte array and can append to it, extending {@code limit}. */
  boolean owner;

  /** Next segment in a linked or circularly-linked list. */
  Segment next;

  /** Previous segment in a circularly-linked list. */
  Segment prev;

除了成员变量外,segment类还封装了对segment进行的操作:

  1. pop()删除当前的segment, 并返回后一个(next指向)segment.
  2. push()将segment插入到当前segment的后面.
  3. split(count), 该函数用于将segment拆分成两个segment, 第一个segment占用count个可用 数据, 第二个segment(即当前segment)占用(avail - count)个.
  4. compact(), 压缩函数, 如果当前segment的数据可以存放到前面的segment,则存放过去, 并回收当前的 segment.
  5. writeTo(), 将当前segment的count个byte写入到目标segment中.
  6. 共享, 在segment的设计中, 两个(或多个?)segment可以共享一个同一个segment的数据,可以将其理解为 "主从"关系, 只有一个segment是data的owner, 而其他的segment只是引用了这个数据. 同时, "主从"segment也有"读写"的关系:即只有data的owner segment才可以往data写数据. 非owner segment 不可以往data中写数据.

SegmentPool

Okio实现了一个segment池, 用于segment的获取和回收, 该segment pool通过"单链表"的方式 组织segment, 分为"获取"和"回收"操作:

  1. 获取, 检测单链表是否为空, 是, 取下链表头给申请者, 否则生成一个新segment.
  2. 回收, 回收需要一些条件判断(如下), 满足才可以放到链表头.
    • 该segment的数据被其他"segment"共享, 不能回收.
    • segment池的大小已经容不下该segment, 直接抛弃(会被GC回收).

Buffer

介绍

Buffer是Okio的存储数据的结构, 它的灵活性非常强, 官方文档里介绍了它的三个优点:

  1. 数据在buffer间"转移"速度快. 前面介绍segment时提到了segment有"owner"这个属性, 所以有时候 数据的转移就是简单的修改一下owner的值.
  2. buffer随着数据的增长而增长, 这样可以"节省"空间.
  3. buffer实现了byte池的功能.可以减少GC的反复操作.

Buffer类继承自前面介绍的BufferSource和BufferSink类, 所以它既可以作为"源", 又可以作为"目的" 来使用.

成员变量

buffer只有两个成员变量: head, size. head是一个Segment类型的变量, 由此来看buffe的内部数据 的组织单位Segment, Segment的天然链表构造也使其天然可以动态的Buffer的数据大小.

API分析

  1. write()/read()系列函数 Buffer提供了很强大的write()和read()系列函数簇, 使用这些函数可以很方便的从buffer中读写各种类型的数据, 例如readInt()/writeInt()用于从buffer中读取/写入一个整数, 类似的还有readLong()/writeLong()…等. 这里主要通过两个具体函数代码的分析, 来观察在这个过程中Buffer内部的变化, 主要就是Segment的变化.

    write(byte[] source, int offset, int byteCount), 这个函数将source中的部分数据写入到buffer中. 其代码如下:

    @Override public Buffer write(byte[] source, int offset, int byteCount) {
      if (source == null) throw new IllegalArgumentException("source == null");
      checkOffsetAndCount(source.length, offset, byteCount);
    
      int limit = offset + byteCount;
      while (offset < limit) {
        Segment tail = writableSegment(1);
    
        int toCopy = Math.min(limit - offset, Segment.SIZE - tail.limit);
        System.arraycopy(source, offset, tail.data, tail.limit, toCopy);
    
        offset += toCopy;
        tail.limit += toCopy;
      }
    
      size += byteCount;
      return this;
    }
    

    checkOffsetAndCount()用于检查传入参数的合法性. 不合法就直接抛出异常.如果合法, 计算一下 byte数组的要写入数据的"终点值".然后进入写数据的循环. 每次循环开始就会调用writableSegment()去获取一个可以写的Segment.这个函数会影响Buffer中 Segment链表长度的一个函数. 看一下其代码.

    • writableSegment

      Segment writableSegment(int minimumCapacity) {
         if (minimumCapacity < 1 || minimumCapacity > Segment.SIZE) throw new IllegalArgumentException();
      
         if (head == null) {
           head = SegmentPool.take(); // Acquire a first segment.
           return head.next = head.prev = head;
         }
      
         Segment tail = head.prev;
         if (tail.limit + minimumCapacity > Segment.SIZE || !tail.owner) {
           tail = tail.push(SegmentPool.take()); // Append a new empty segment to fill up.
         }
         return tail;
       }
      
      • 首先同样是检查参数合法性, 参数minimumCapacity的意思是获取到的segment剩余的最小可用空间(byte).
      • 接着检查head是否为空, 如果是, 从SegmentPool获取一个Segment, 并将其prev和next都指向自己.
      • 否则head不为空, 获取链表尾部的Segment(head->prev), 检查剩余大小是否符合要求, 同时该segment不能是一个引用 (即该fragment数据其实是引用的其他fragment的数据, 这种类型的fragment不可写, 即owner=false).如果尾部segment 不能满足要求, 则从SegmentPool中获取一个新的Segment插入链表中.

    获取到一个可写Segment之后, 就会调用System.arrayCopy()函数实现字节复制,重复进行上述动作 直到全部数据copy完为止.

    read()函数数据流的"流向"与write()是相反的, 并且当一个segment的数据全部读完后会调用 SegmentPool的recycle()函数进行回收.

  2. 数据在buffer间移动 既然Buffer的一个特点就是数据在Buffer间移动特别快, 那就来看一下代码的具体实现: 下面的函数用于将source头部开始的内容写入到当前Buffer的尾部.

    @Override public void write(Buffer source, long byteCount) {
    
        if (source == null) throw new IllegalArgumentException("source == null");
        if (source == this) throw new IllegalArgumentException("source == this");
        checkOffsetAndCount(source.size, 0, byteCount);
    
        while (byteCount > 0) {
          // Is a prefix of the source's head segment all that we need to move?
          if (byteCount < (source.head.limit - source.head.pos)) {
            Segment tail = head != null ? head.prev : null;
            if (tail != null && tail.owner
                && (byteCount + tail.limit - (tail.shared ? 0 : tail.pos) <= Segment.SIZE)) {
              // Our existing segments are sufficient. Move bytes from source's head to our tail.
              source.head.writeTo(tail, (int) byteCount);
              source.size -= byteCount;
              size += byteCount;
              return;
            } else {
              // We're going to need another segment. Split the source's head
              // segment in two, then move the first of those two to this buffer.
              source.head = source.head.split((int) byteCount);
            }
          }
    
          // Remove the source's head segment and append it to our tail.
          Segment segmentToMove = source.head;
          long movedByteCount = segmentToMove.limit - segmentToMove.pos;
          source.head = segmentToMove.pop();
          if (head == null) {
            head = segmentToMove;
            head.next = head.prev = head;
          } else {
            Segment tail = head.prev;
            tail = tail.push(segmentToMove);
            tail.compact();
          }
          source.size -= movedByteCount;
          size += movedByteCount;
          byteCount -= movedByteCount;
        }
      }
    

    函数最开始仍然是常规的参数检查, 然后就进入一个while()循环当中:

    1. 如果要写入Buffer的大小小于目标Buffer head的剩余可用大小.那么
      • 如果数据可以直接写到当前Buffer的tail中, 写入, 函数退出.
      • 否则, 说明需要一个新的fragment, 将目标Buffer的head按照要写入的byte值一分为二.这样目标Buffer的head Segment 就包含了所有要写入的数据. semeng的split()函数代码如下:

        public Segment split(int byteCount) {
           if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();
           Segment prefix = new Segment(this);
           prefix.limit = prefix.pos + byteCount;
           pos += byteCount;
           prev.push(prefix);
           return prefix;
         }
        
    2. 将目标Buffer的head从目标Buffer中弹出, 插入到当前Buffer中,
      • 如果当前buffer的head为null, 直接设置为head.
      • 否则, 插入到tail后面, 并将其设为tail, 并调用tail的compat()函数进行压缩. 按照compat()的算法, 之后跟tail之前的segment合并后的数据小于一个segment的才会压缩. 所以即使前面segment使用率为2%, tail的使用率为99%, 也不会压缩.

        public void compact() {
          if (prev == this) throw new IllegalStateException();
          if (!prev.owner) return; // Cannot compact: prev isn't writable.
          int byteCount = limit - pos;
          int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);
          if (byteCount > availableByteCount) return; // Cannot compact: not enough writable space.
          writeTo(prev, byteCount);
          pop();
          SegmentPool.recycle(this);
        }
        

Created At <2016-07-30 Sat 23:22> by Luis Xu. Email: xuzhengchaojob@gmail.com