001/**
002 * Copyright The Apache Software Foundation
003 *
004 * Licensed to the Apache Software Foundation (ASF) under one or more
005 * contributor license agreements. See the NOTICE file distributed with this
006 * work for additional information regarding copyright ownership. The ASF
007 * licenses this file to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
015 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
016 * License for the specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.hadoop.hbase.util;
020
021import java.io.IOException;
022import java.nio.ByteBuffer;
023import java.util.ArrayList;
024import java.util.Iterator;
025import java.util.List;
026import java.util.concurrent.ExecutorService;
027import java.util.concurrent.Executors;
028import java.util.concurrent.Future;
029import java.util.function.BiConsumer;
030
031import org.apache.hadoop.hbase.nio.ByteBuff;
032import org.apache.hadoop.util.StringUtils;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
038
039/**
040 * This class manages an array of ByteBuffers with a default size 4MB. These buffers are sequential
041 * and could be considered as a large buffer.It supports reading/writing data from this large buffer
042 * with a position and offset
043 */
044@InterfaceAudience.Private
045public class ByteBufferArray {
046  private static final Logger LOG = LoggerFactory.getLogger(ByteBufferArray.class);
047
048  public static final int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
049  private final int bufferSize;
050  private final int bufferCount;
051  final ByteBuffer[] buffers;
052
053  /**
054   * We allocate a number of byte buffers as the capacity.
055   * @param capacity total size of the byte buffer array
056   * @param allocator the ByteBufferAllocator that will create the buffers
057   * @throws IOException throws IOException if there is an exception thrown by the allocator
058   */
059  public ByteBufferArray(long capacity, ByteBufferAllocator allocator) throws IOException {
060    this(getBufferSize(capacity), getBufferCount(capacity),
061        Runtime.getRuntime().availableProcessors(), capacity, allocator);
062  }
063
064  @VisibleForTesting
065  ByteBufferArray(int bufferSize, int bufferCount, int threadCount, long capacity,
066      ByteBufferAllocator alloc) throws IOException {
067    this.bufferSize = bufferSize;
068    this.bufferCount = bufferCount;
069    LOG.info("Allocating buffers total={}, sizePerBuffer={}, count={}",
070      StringUtils.byteDesc(capacity), StringUtils.byteDesc(bufferSize), bufferCount);
071    this.buffers = new ByteBuffer[bufferCount];
072    createBuffers(threadCount, alloc);
073  }
074
075  private void createBuffers(int threadCount, ByteBufferAllocator alloc) throws IOException {
076    ExecutorService pool = Executors.newFixedThreadPool(threadCount);
077    int perThreadCount = bufferCount / threadCount;
078    int reminder = bufferCount % threadCount;
079    try {
080      List<Future<ByteBuffer[]>> futures = new ArrayList<>(threadCount);
081      // Dispatch the creation task to each thread.
082      for (int i = 0; i < threadCount; i++) {
083        final int chunkSize = perThreadCount + ((i == threadCount - 1) ? reminder : 0);
084        futures.add(pool.submit(() -> {
085          ByteBuffer[] chunk = new ByteBuffer[chunkSize];
086          for (int k = 0; k < chunkSize; k++) {
087            chunk[k] = alloc.allocate(bufferSize);
088          }
089          return chunk;
090        }));
091      }
092      // Append the buffers created by each thread.
093      int bufferIndex = 0;
094      try {
095        for (Future<ByteBuffer[]> f : futures) {
096          for (ByteBuffer b : f.get()) {
097            this.buffers[bufferIndex++] = b;
098          }
099        }
100        assert bufferIndex == bufferCount;
101      } catch (Exception e) {
102        LOG.error("Buffer creation interrupted", e);
103        throw new IOException(e);
104      }
105    } finally {
106      pool.shutdownNow();
107    }
108  }
109
110  @VisibleForTesting
111  static int getBufferSize(long capacity) {
112    int bufferSize = DEFAULT_BUFFER_SIZE;
113    if (bufferSize > (capacity / 16)) {
114      bufferSize = (int) roundUp(capacity / 16, 32768);
115    }
116    return bufferSize;
117  }
118
119  private static int getBufferCount(long capacity) {
120    int bufferSize = getBufferSize(capacity);
121    return (int) (roundUp(capacity, bufferSize) / bufferSize);
122  }
123
124  private static long roundUp(long n, long to) {
125    return ((n + to - 1) / to) * to;
126  }
127
128  /**
129   * Transfers bytes from this buffers array into the given destination {@link ByteBuff}
130   * @param offset start position in this big logical array.
131   * @param dst the destination ByteBuff. Notice that its position will be advanced.
132   * @return number of bytes read
133   */
134  public int read(long offset, ByteBuff dst) {
135    return internalTransfer(offset, dst, READER);
136  }
137
138  /**
139   * Transfers bytes from the given source {@link ByteBuff} into this buffer array
140   * @param offset start offset of this big logical array.
141   * @param src the source ByteBuff. Notice that its position will be advanced.
142   * @return number of bytes write
143   */
144  public int write(long offset, ByteBuff src) {
145    return internalTransfer(offset, src, WRITER);
146  }
147
148  /**
149   * Transfer bytes from source {@link ByteBuff} to destination {@link ByteBuffer}. Position of both
150   * source and destination will be advanced.
151   */
152  private static final BiConsumer<ByteBuffer, ByteBuff> WRITER = (dst, src) -> {
153    int off = src.position(), len = dst.remaining();
154    src.get(dst, off, len);
155    src.position(off + len);
156  };
157
158  /**
159   * Transfer bytes from source {@link ByteBuffer} to destination {@link ByteBuff}, Position of both
160   * source and destination will be advanced.
161   */
162  private static final BiConsumer<ByteBuffer, ByteBuff> READER = (src, dst) -> {
163    int off = dst.position(), len = src.remaining(), srcOff = src.position();
164    dst.put(off, ByteBuff.wrap(src), srcOff, len);
165    src.position(srcOff + len);
166    dst.position(off + len);
167  };
168
169  /**
170   * Transferring all remaining bytes from b to the buffers array starting at offset, or
171   * transferring bytes from the buffers array at offset to b until b is filled. Notice that
172   * position of ByteBuff b will be advanced.
173   * @param offset where we start in the big logical array.
174   * @param b the ByteBuff to transfer from or to
175   * @param transfer the transfer interface.
176   * @return the length of bytes we transferred.
177   */
178  private int internalTransfer(long offset, ByteBuff b, BiConsumer<ByteBuffer, ByteBuff> transfer) {
179    int expectedTransferLen = b.remaining();
180    if (expectedTransferLen == 0) {
181      return 0;
182    }
183    BufferIterator it = new BufferIterator(offset, expectedTransferLen);
184    while (it.hasNext()) {
185      ByteBuffer a = it.next();
186      transfer.accept(a, b);
187      assert !a.hasRemaining();
188    }
189    assert expectedTransferLen == it.getSum() : "Expected transfer length (=" + expectedTransferLen
190        + ") don't match the actual transfer length(=" + it.getSum() + ")";
191    return expectedTransferLen;
192  }
193
194  /**
195   * Creates a sub-array from a given array of ByteBuffers from the given offset to the length
196   * specified. For eg, if there are 4 buffers forming an array each with length 10 and if we call
197   * asSubByteBuffers(5, 10) then we will create an sub-array consisting of two BBs and the first
198   * one be a BB from 'position' 5 to a 'length' 5 and the 2nd BB will be from 'position' 0 to
199   * 'length' 5.
200   * @param offset the position in the whole array which is composited by multiple byte buffers.
201   * @param len the length of bytes
202   * @return the underlying ByteBuffers, each ByteBuffer is a slice from the backend and will have a
203   *         zero position.
204   */
205  public ByteBuffer[] asSubByteBuffers(long offset, final int len) {
206    BufferIterator it = new BufferIterator(offset, len);
207    ByteBuffer[] mbb = new ByteBuffer[it.getBufferCount()];
208    for (int i = 0; i < mbb.length; i++) {
209      assert it.hasNext();
210      mbb[i] = it.next();
211    }
212    assert it.getSum() == len;
213    return mbb;
214  }
215
216  /**
217   * Iterator to fetch ByteBuffers from offset with given length in this big logical array.
218   */
219  private class BufferIterator implements Iterator<ByteBuffer> {
220    private final int len;
221    private int startBuffer, startOffset, endBuffer, endOffset;
222    private int curIndex, sum = 0;
223
224    private int index(long pos) {
225      return (int) (pos / bufferSize);
226    }
227
228    private int offset(long pos) {
229      return (int) (pos % bufferSize);
230    }
231
232    public BufferIterator(long offset, int len) {
233      assert len >= 0 && offset >= 0;
234      this.len = len;
235
236      this.startBuffer = index(offset);
237      this.startOffset = offset(offset);
238
239      this.endBuffer = index(offset + len);
240      this.endOffset = offset(offset + len);
241      if (startBuffer < endBuffer && endOffset == 0) {
242        endBuffer--;
243        endOffset = bufferSize;
244      }
245      assert startBuffer >= 0 && startBuffer < bufferCount;
246      assert endBuffer >= 0 && endBuffer < bufferCount;
247
248      // initialize the index to the first buffer index.
249      this.curIndex = startBuffer;
250    }
251
252    @Override
253    public boolean hasNext() {
254      return this.curIndex <= endBuffer;
255    }
256
257    /**
258     * The returned ByteBuffer is an sliced one, it won't affect the position or limit of the
259     * original one.
260     */
261    @Override
262    public ByteBuffer next() {
263      ByteBuffer bb = buffers[curIndex].duplicate();
264      if (curIndex == startBuffer) {
265        bb.position(startOffset).limit(Math.min(bufferSize, startOffset + len));
266      } else if (curIndex == endBuffer) {
267        bb.position(0).limit(endOffset);
268      } else {
269        bb.position(0).limit(bufferSize);
270      }
271      curIndex++;
272      sum += bb.remaining();
273      // Make sure that its pos is zero, it's important because MBB will count from zero for all nio
274      // ByteBuffers.
275      return bb.slice();
276    }
277
278    int getSum() {
279      return sum;
280    }
281
282    int getBufferCount() {
283      return this.endBuffer - this.startBuffer + 1;
284    }
285  }
286}