001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.IOException;
021import java.nio.ByteBuffer;
022import java.util.ArrayList;
023import java.util.Iterator;
024import java.util.List;
025import java.util.concurrent.ExecutorService;
026import java.util.concurrent.Executors;
027import java.util.concurrent.Future;
028import java.util.function.BiConsumer;
029import org.apache.hadoop.hbase.nio.ByteBuff;
030import org.apache.hadoop.util.StringUtils;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035/**
036 * This class manages an array of ByteBuffers with a default size 4MB. These buffers are sequential
037 * and could be considered as a large buffer.It supports reading/writing data from this large buffer
038 * with a position and offset
039 */
040@InterfaceAudience.Private
041public class ByteBufferArray {
042  private static final Logger LOG = LoggerFactory.getLogger(ByteBufferArray.class);
043
044  public static final int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
045  private final int bufferSize;
046  private final int bufferCount;
047  final ByteBuffer[] buffers;
048
049  /**
050   * We allocate a number of byte buffers as the capacity.
051   * @param capacity  total size of the byte buffer array
052   * @param allocator the ByteBufferAllocator that will create the buffers
053   * @throws IOException throws IOException if there is an exception thrown by the allocator
054   */
055  public ByteBufferArray(long capacity, ByteBufferAllocator allocator) throws IOException {
056    this(getBufferSize(capacity), getBufferCount(capacity),
057      Runtime.getRuntime().availableProcessors(), capacity, allocator);
058  }
059
060  ByteBufferArray(int bufferSize, int bufferCount, int threadCount, long capacity,
061    ByteBufferAllocator alloc) throws IOException {
062    this.bufferSize = bufferSize;
063    this.bufferCount = bufferCount;
064    LOG.info("Allocating buffers total={}, sizePerBuffer={}, count={}",
065      StringUtils.byteDesc(capacity), StringUtils.byteDesc(bufferSize), bufferCount);
066    this.buffers = new ByteBuffer[bufferCount];
067    createBuffers(threadCount, alloc);
068  }
069
070  private void createBuffers(int threadCount, ByteBufferAllocator alloc) throws IOException {
071    ExecutorService pool = Executors.newFixedThreadPool(threadCount);
072    int perThreadCount = bufferCount / threadCount;
073    int reminder = bufferCount % threadCount;
074    try {
075      List<Future<ByteBuffer[]>> futures = new ArrayList<>(threadCount);
076      // Dispatch the creation task to each thread.
077      for (int i = 0; i < threadCount; i++) {
078        final int chunkSize = perThreadCount + ((i == threadCount - 1) ? reminder : 0);
079        futures.add(pool.submit(() -> {
080          ByteBuffer[] chunk = new ByteBuffer[chunkSize];
081          for (int k = 0; k < chunkSize; k++) {
082            chunk[k] = alloc.allocate(bufferSize);
083          }
084          return chunk;
085        }));
086      }
087      // Append the buffers created by each thread.
088      int bufferIndex = 0;
089      try {
090        for (Future<ByteBuffer[]> f : futures) {
091          for (ByteBuffer b : f.get()) {
092            this.buffers[bufferIndex++] = b;
093          }
094        }
095        assert bufferIndex == bufferCount;
096      } catch (Exception e) {
097        LOG.error("Buffer creation interrupted", e);
098        throw new IOException(e);
099      }
100    } finally {
101      pool.shutdownNow();
102    }
103  }
104
105  static int getBufferSize(long capacity) {
106    int bufferSize = DEFAULT_BUFFER_SIZE;
107    if (bufferSize > (capacity / 16)) {
108      bufferSize = (int) roundUp(capacity / 16, 32768);
109    }
110    return bufferSize;
111  }
112
113  private static int getBufferCount(long capacity) {
114    int bufferSize = getBufferSize(capacity);
115    return (int) (roundUp(capacity, bufferSize) / bufferSize);
116  }
117
118  private static long roundUp(long n, long to) {
119    return ((n + to - 1) / to) * to;
120  }
121
122  /**
123   * Transfers bytes from this buffers array into the given destination {@link ByteBuff}
124   * @param offset start position in this big logical array.
125   * @param dst    the destination ByteBuff. Notice that its position will be advanced.
126   * @return number of bytes read
127   */
128  public int read(long offset, ByteBuff dst) {
129    return internalTransfer(offset, dst, READER);
130  }
131
132  /**
133   * Transfers bytes from the given source {@link ByteBuff} into this buffer array
134   * @param offset start offset of this big logical array.
135   * @param src    the source ByteBuff. Notice that its position will be advanced.
136   * @return number of bytes write
137   */
138  public int write(long offset, ByteBuff src) {
139    return internalTransfer(offset, src, WRITER);
140  }
141
142  /**
143   * Transfer bytes from source {@link ByteBuff} to destination {@link ByteBuffer}. Position of both
144   * source and destination will be advanced.
145   */
146  private static final BiConsumer<ByteBuffer, ByteBuff> WRITER = (dst, src) -> {
147    int off = src.position(), len = dst.remaining();
148    src.get(dst, off, len);
149    src.position(off + len);
150  };
151
152  /**
153   * Transfer bytes from source {@link ByteBuffer} to destination {@link ByteBuff}, Position of both
154   * source and destination will be advanced.
155   */
156  private static final BiConsumer<ByteBuffer, ByteBuff> READER = (src, dst) -> {
157    int off = dst.position(), len = src.remaining(), srcOff = src.position();
158    dst.put(off, ByteBuff.wrap(src), srcOff, len);
159    src.position(srcOff + len);
160    dst.position(off + len);
161  };
162
163  /**
164   * Transferring all remaining bytes from b to the buffers array starting at offset, or
165   * transferring bytes from the buffers array at offset to b until b is filled. Notice that
166   * position of ByteBuff b will be advanced.
167   * @param offset   where we start in the big logical array.
168   * @param b        the ByteBuff to transfer from or to
169   * @param transfer the transfer interface.
170   * @return the length of bytes we transferred.
171   */
172  private int internalTransfer(long offset, ByteBuff b, BiConsumer<ByteBuffer, ByteBuff> transfer) {
173    int expectedTransferLen = b.remaining();
174    if (expectedTransferLen == 0) {
175      return 0;
176    }
177    BufferIterator it = new BufferIterator(offset, expectedTransferLen);
178    while (it.hasNext()) {
179      ByteBuffer a = it.next();
180      transfer.accept(a, b);
181      assert !a.hasRemaining();
182    }
183    assert expectedTransferLen == it.getSum() : "Expected transfer length (=" + expectedTransferLen
184      + ") don't match the actual transfer length(=" + it.getSum() + ")";
185    return expectedTransferLen;
186  }
187
188  /**
189   * Creates a sub-array from a given array of ByteBuffers from the given offset to the length
190   * specified. For eg, if there are 4 buffers forming an array each with length 10 and if we call
191   * asSubByteBuffers(5, 10) then we will create an sub-array consisting of two BBs and the first
192   * one be a BB from 'position' 5 to a 'length' 5 and the 2nd BB will be from 'position' 0 to
193   * 'length' 5.
194   * @param offset the position in the whole array which is composited by multiple byte buffers.
195   * @param len    the length of bytes
196   * @return the underlying ByteBuffers, each ByteBuffer is a slice from the backend and will have a
197   *         zero position.
198   */
199  public ByteBuffer[] asSubByteBuffers(long offset, final int len) {
200    BufferIterator it = new BufferIterator(offset, len);
201    ByteBuffer[] mbb = new ByteBuffer[it.getBufferCount()];
202    for (int i = 0; i < mbb.length; i++) {
203      assert it.hasNext();
204      mbb[i] = it.next();
205    }
206    assert it.getSum() == len;
207    return mbb;
208  }
209
210  /**
211   * Iterator to fetch ByteBuffers from offset with given length in this big logical array.
212   */
213  private class BufferIterator implements Iterator<ByteBuffer> {
214    private final int len;
215    private int startBuffer, startOffset, endBuffer, endOffset;
216    private int curIndex, sum = 0;
217
218    private int index(long pos) {
219      return (int) (pos / bufferSize);
220    }
221
222    private int offset(long pos) {
223      return (int) (pos % bufferSize);
224    }
225
226    public BufferIterator(long offset, int len) {
227      assert len >= 0 && offset >= 0;
228      this.len = len;
229
230      this.startBuffer = index(offset);
231      this.startOffset = offset(offset);
232
233      this.endBuffer = index(offset + len);
234      this.endOffset = offset(offset + len);
235      if (startBuffer < endBuffer && endOffset == 0) {
236        endBuffer--;
237        endOffset = bufferSize;
238      }
239      assert startBuffer >= 0 && startBuffer < bufferCount;
240      assert endBuffer >= 0 && endBuffer < bufferCount;
241
242      // initialize the index to the first buffer index.
243      this.curIndex = startBuffer;
244    }
245
246    @Override
247    public boolean hasNext() {
248      return this.curIndex <= endBuffer;
249    }
250
251    /**
252     * The returned ByteBuffer is an sliced one, it won't affect the position or limit of the
253     * original one.
254     */
255    @Override
256    public ByteBuffer next() {
257      ByteBuffer bb = buffers[curIndex].duplicate();
258      if (curIndex == startBuffer) {
259        bb.position(startOffset).limit(Math.min(bufferSize, startOffset + len));
260      } else if (curIndex == endBuffer) {
261        bb.position(0).limit(endOffset);
262      } else {
263        bb.position(0).limit(bufferSize);
264      }
265      curIndex++;
266      sum += bb.remaining();
267      // Make sure that its pos is zero, it's important because MBB will count from zero for all nio
268      // ByteBuffers.
269      return bb.slice();
270    }
271
272    int getSum() {
273      return sum;
274    }
275
276    int getBufferCount() {
277      return this.endBuffer - this.startBuffer + 1;
278    }
279  }
280}