001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io;
019
020import java.io.IOException;
021import java.io.OutputStream;
022import java.nio.ByteBuffer;
023import java.util.ArrayList;
024import java.util.List;
025
026import org.apache.hadoop.hbase.nio.ByteBuff;
027import org.apache.hadoop.hbase.nio.SingleByteBuff;
028import org.apache.hadoop.hbase.util.ByteBufferUtils;
029import org.apache.yetus.audience.InterfaceAudience;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/**
034 * An OutputStream which writes data into ByteBuffers. It will try to get ByteBuffer, as and when
035 * needed, from the passed pool. When pool is not giving a ByteBuffer it will create one on heap.
036 * Make sure to call {@link #releaseResources()} method once the Stream usage is over and
037 * data is transferred to the wanted destination.
038 * Not thread safe!
039 */
040@InterfaceAudience.Private
041public class ByteBufferListOutputStream extends ByteBufferOutputStream {
042  private static final Logger LOG = LoggerFactory.getLogger(ByteBufferListOutputStream.class);
043
044  private final ByteBuffAllocator allocator;
045  // Keep track of the BBs where bytes written to. We will first try to get a BB from the pool. If
046  // it is not available will make a new one our own and keep writing to that. We keep track of all
047  // the BBs that we got from pool, separately so that on closeAndPutbackBuffers, we can make sure
048  // to return back all of them to pool
049  protected List<SingleByteBuff> allBufs = new ArrayList<>();
050
051  private boolean lastBufFlipped = false;// Indicate whether the curBuf/lastBuf is flipped already
052
053  public ByteBufferListOutputStream(ByteBuffAllocator allocator) {
054    this.allocator = allocator;
055    allocateNewBuffer();
056  }
057
058  private void allocateNewBuffer() {
059    if (this.curBuf != null) {
060      this.curBuf.flip();// On the current buf set limit = pos and pos = 0.
061    }
062    // Get an initial ByteBuffer from the allocator.
063    SingleByteBuff sbb = allocator.allocateOneBuffer();
064    this.curBuf = sbb.nioByteBuffers()[0];
065    this.allBufs.add(sbb);
066  }
067
068  @Override
069  public int size() {
070    int s = 0;
071    for (int i = 0; i < this.allBufs.size() - 1; i++) {
072      s += this.allBufs.get(i).remaining();
073    }
074    // On the last BB, it might not be flipped yet if getByteBuffers is not yet called
075    if (this.lastBufFlipped) {
076      s += this.curBuf.remaining();
077    } else {
078      s += this.curBuf.position();
079    }
080    return s;
081  }
082
083  @Override
084  public ByteBuffer getByteBuffer() {
085    throw new UnsupportedOperationException("This stream is not backed by a single ByteBuffer");
086  }
087
088  @Override
089  protected void checkSizeAndGrow(int extra) {
090    long capacityNeeded = curBuf.position() + (long) extra;
091    if (capacityNeeded > curBuf.limit()) {
092      allocateNewBuffer();
093    }
094  }
095
096  @Override
097  public void writeTo(OutputStream out) throws IOException {
098    // No usage of this API in code. Just making it as an Unsupported operation as of now
099    throw new UnsupportedOperationException();
100  }
101
102  /**
103   * Release the resources it uses (The ByteBuffers) which are obtained from pool. Call this only
104   * when all the data is fully used. And it must be called at the end of usage else we will leak
105   * ByteBuffers from pool.
106   */
107  public void releaseResources() {
108    try {
109      close();
110    } catch (IOException e) {
111      LOG.debug(e.toString(), e);
112    }
113    // Return back all the BBs to pool
114    for (ByteBuff buf : this.allBufs) {
115      buf.release();
116    }
117    this.allBufs = null;
118    this.curBuf = null;
119  }
120
121  @Override
122  public byte[] toByteArray(int offset, int length) {
123    // No usage of this API in code. Just making it as an Unsupported operation as of now
124    throw new UnsupportedOperationException();
125  }
126
127  /**
128   * We can be assured that the buffers returned by this method are all flipped
129   * @return list of bytebuffers
130   */
131  public List<ByteBuffer> getByteBuffers() {
132    if (!this.lastBufFlipped) {
133      this.lastBufFlipped = true;
134      // All the other BBs are already flipped while moving to the new BB.
135      curBuf.flip();
136    }
137    List<ByteBuffer> bbs = new ArrayList<>(this.allBufs.size());
138    for (SingleByteBuff bb : this.allBufs) {
139      bbs.add(bb.nioByteBuffers()[0]);
140    }
141    return bbs;
142  }
143
144  @Override
145  public void write(byte[] b, int off, int len) throws IOException {
146    int toWrite = 0;
147    while (len > 0) {
148      toWrite = Math.min(len, this.curBuf.remaining());
149      ByteBufferUtils.copyFromArrayToBuffer(this.curBuf, b, off, toWrite);
150      off += toWrite;
151      len -= toWrite;
152      if (len > 0) {
153        allocateNewBuffer();// The curBuf is over. Let us move to the next one
154      }
155    }
156  }
157
158  @Override
159  public void write(ByteBuffer b, int off, int len) throws IOException {
160    int toWrite = 0;
161    while (len > 0) {
162      toWrite = Math.min(len, this.curBuf.remaining());
163      ByteBufferUtils.copyFromBufferToBuffer(b, this.curBuf, off, toWrite);
164      off += toWrite;
165      len -= toWrite;
166      if (len > 0) {
167        allocateNewBuffer();// The curBuf is over. Let us move to the next one
168      }
169    }
170  }
171}