001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io;
019
020import java.io.IOException;
021import java.io.OutputStream;
022import java.nio.ByteBuffer;
023import java.util.ArrayList;
024import java.util.List;
025import org.apache.hadoop.hbase.nio.ByteBuff;
026import org.apache.hadoop.hbase.nio.SingleByteBuff;
027import org.apache.hadoop.hbase.util.ByteBufferUtils;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032/**
033 * An OutputStream which writes data into ByteBuffers. It will try to get ByteBuffer, as and when
034 * needed, from the passed pool. When pool is not giving a ByteBuffer it will create one on heap.
035 * Make sure to call {@link #releaseResources()} method once the Stream usage is over and data is
036 * transferred to the wanted destination. Not thread safe!
037 */
038@InterfaceAudience.Private
039public class ByteBufferListOutputStream extends ByteBufferOutputStream {
040  private static final Logger LOG = LoggerFactory.getLogger(ByteBufferListOutputStream.class);
041
042  private final ByteBuffAllocator allocator;
043  // Keep track of the BBs where bytes written to. We will first try to get a BB from the pool. If
044  // it is not available will make a new one our own and keep writing to that. We keep track of all
045  // the BBs that we got from pool, separately so that on closeAndPutbackBuffers, we can make sure
046  // to return back all of them to pool
047  protected List<SingleByteBuff> allBufs = new ArrayList<>();
048
049  private boolean lastBufFlipped = false;// Indicate whether the curBuf/lastBuf is flipped already
050
051  public ByteBufferListOutputStream(ByteBuffAllocator allocator) {
052    this.allocator = allocator;
053    allocateNewBuffer();
054  }
055
056  private void allocateNewBuffer() {
057    if (this.curBuf != null) {
058      this.curBuf.flip();// On the current buf set limit = pos and pos = 0.
059    }
060    // Get an initial ByteBuffer from the allocator.
061    SingleByteBuff sbb = allocator.allocateOneBuffer();
062    this.curBuf = sbb.nioByteBuffers()[0];
063    this.allBufs.add(sbb);
064  }
065
066  @Override
067  public int size() {
068    int s = 0;
069    for (int i = 0; i < this.allBufs.size() - 1; i++) {
070      s += this.allBufs.get(i).remaining();
071    }
072    // On the last BB, it might not be flipped yet if getByteBuffers is not yet called
073    if (this.lastBufFlipped) {
074      s += this.curBuf.remaining();
075    } else {
076      s += this.curBuf.position();
077    }
078    return s;
079  }
080
081  @Override
082  public ByteBuffer getByteBuffer() {
083    throw new UnsupportedOperationException("This stream is not backed by a single ByteBuffer");
084  }
085
086  @Override
087  protected void checkSizeAndGrow(int extra) {
088    long capacityNeeded = curBuf.position() + (long) extra;
089    if (capacityNeeded > curBuf.limit()) {
090      allocateNewBuffer();
091    }
092  }
093
094  @Override
095  public void writeTo(OutputStream out) throws IOException {
096    // No usage of this API in code. Just making it as an Unsupported operation as of now
097    throw new UnsupportedOperationException();
098  }
099
100  /**
101   * Release the resources it uses (The ByteBuffers) which are obtained from pool. Call this only
102   * when all the data is fully used. And it must be called at the end of usage else we will leak
103   * ByteBuffers from pool.
104   */
105  public void releaseResources() {
106    try {
107      close();
108    } catch (IOException e) {
109      LOG.debug(e.toString(), e);
110    }
111    // Return back all the BBs to pool
112    for (ByteBuff buf : this.allBufs) {
113      buf.release();
114    }
115    this.allBufs = null;
116    this.curBuf = null;
117  }
118
119  @Override
120  public byte[] toByteArray(int offset, int length) {
121    // No usage of this API in code. Just making it as an Unsupported operation as of now
122    throw new UnsupportedOperationException();
123  }
124
125  /**
126   * We can be assured that the buffers returned by this method are all flipped
127   * @return list of bytebuffers
128   */
129  public List<ByteBuffer> getByteBuffers() {
130    if (!this.lastBufFlipped) {
131      this.lastBufFlipped = true;
132      // All the other BBs are already flipped while moving to the new BB.
133      curBuf.flip();
134    }
135    List<ByteBuffer> bbs = new ArrayList<>(this.allBufs.size());
136    for (SingleByteBuff bb : this.allBufs) {
137      bbs.add(bb.nioByteBuffers()[0]);
138    }
139    return bbs;
140  }
141
142  @Override
143  public void write(byte[] b, int off, int len) throws IOException {
144    int toWrite = 0;
145    while (len > 0) {
146      toWrite = Math.min(len, this.curBuf.remaining());
147      ByteBufferUtils.copyFromArrayToBuffer(this.curBuf, b, off, toWrite);
148      off += toWrite;
149      len -= toWrite;
150      if (len > 0) {
151        allocateNewBuffer();// The curBuf is over. Let us move to the next one
152      }
153    }
154  }
155
156  @Override
157  public void write(ByteBuffer b, int off, int len) throws IOException {
158    int toWrite = 0;
159    while (len > 0) {
160      toWrite = Math.min(len, this.curBuf.remaining());
161      ByteBufferUtils.copyFromBufferToBuffer(b, this.curBuf, off, toWrite);
162      off += toWrite;
163      len -= toWrite;
164      if (len > 0) {
165        allocateNewBuffer();// The curBuf is over. Let us move to the next one
166      }
167    }
168  }
169}