001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io;
019
020import java.io.IOException;
021import java.io.OutputStream;
022import java.nio.ByteBuffer;
023import java.util.ArrayList;
024import java.util.List;
025
026import org.apache.hadoop.hbase.util.ByteBufferUtils;
027import org.apache.yetus.audience.InterfaceAudience;
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030
031/**
032 * An OutputStream which writes data into ByteBuffers. It will try to get ByteBuffer, as and when
033 * needed, from the passed pool. When pool is not giving a ByteBuffer it will create one on heap.
034 * Make sure to call {@link #releaseResources()} method once the Stream usage is over and
035 * data is transferred to the wanted destination.
036 * Not thread safe!
037 */
038@InterfaceAudience.Private
039public class ByteBufferListOutputStream extends ByteBufferOutputStream {
040  private static final Logger LOG = LoggerFactory.getLogger(ByteBufferListOutputStream.class);
041
042  private ByteBufferPool pool;
043  // Keep track of the BBs where bytes written to. We will first try to get a BB from the pool. If
044  // it is not available will make a new one our own and keep writing to that. We keep track of all
045  // the BBs that we got from pool, separately so that on closeAndPutbackBuffers, we can make sure
046  // to return back all of them to pool
047  protected List<ByteBuffer> allBufs = new ArrayList<>();
048  protected List<ByteBuffer> bufsFromPool = new ArrayList<>();
049
050  private boolean lastBufFlipped = false;// Indicate whether the curBuf/lastBuf is flipped already
051
052  public ByteBufferListOutputStream(ByteBufferPool pool) {
053    this.pool = pool;
054    allocateNewBuffer();
055  }
056
057  private void allocateNewBuffer() {
058    if (this.curBuf != null) {
059      this.curBuf.flip();// On the current buf set limit = pos and pos = 0.
060    }
061    // Get an initial BB to work with from the pool
062    this.curBuf = this.pool.getBuffer();
063    if (this.curBuf == null) {
064      // No free BB at this moment. Make a new one. The pool returns off heap BBs. Don't make off
065      // heap BB on demand. It is difficult to account for all such and so proper sizing of Max
066      // direct heap size. See HBASE-15525 also for more details.
067      // Make BB with same size of pool's buffer size.
068      this.curBuf = ByteBuffer.allocate(this.pool.getBufferSize());
069    } else {
070      this.bufsFromPool.add(this.curBuf);
071    }
072    this.allBufs.add(this.curBuf);
073  }
074
075  @Override
076  public int size() {
077    int s = 0;
078    for (int i = 0; i < this.allBufs.size() - 1; i++) {
079      s += this.allBufs.get(i).remaining();
080    }
081    // On the last BB, it might not be flipped yet if getByteBuffers is not yet called
082    if (this.lastBufFlipped) {
083      s += this.curBuf.remaining();
084    } else {
085      s += this.curBuf.position();
086    }
087    return s;
088  }
089
090  @Override
091  public ByteBuffer getByteBuffer() {
092    throw new UnsupportedOperationException("This stream is not backed by a single ByteBuffer");
093  }
094
095  @Override
096  protected void checkSizeAndGrow(int extra) {
097    long capacityNeeded = curBuf.position() + (long) extra;
098    if (capacityNeeded > curBuf.limit()) {
099      allocateNewBuffer();
100    }
101  }
102
103  @Override
104  public void writeTo(OutputStream out) throws IOException {
105    // No usage of this API in code. Just making it as an Unsupported operation as of now
106    throw new UnsupportedOperationException();
107  }
108
109  /**
110   * Release the resources it uses (The ByteBuffers) which are obtained from pool. Call this only
111   * when all the data is fully used. And it must be called at the end of usage else we will leak
112   * ByteBuffers from pool.
113   */
114  public void releaseResources() {
115    try {
116      close();
117    } catch (IOException e) {
118      LOG.debug(e.toString(), e);
119    }
120    // Return back all the BBs to pool
121    if (this.bufsFromPool != null) {
122      for (int i = 0; i < this.bufsFromPool.size(); i++) {
123        this.pool.putbackBuffer(this.bufsFromPool.get(i));
124      }
125      this.bufsFromPool = null;
126    }
127    this.allBufs = null;
128    this.curBuf = null;
129  }
130
131  @Override
132  public byte[] toByteArray(int offset, int length) {
133    // No usage of this API in code. Just making it as an Unsupported operation as of now
134    throw new UnsupportedOperationException();
135  }
136
137  /**
138   * We can be assured that the buffers returned by this method are all flipped
139   * @return list of bytebuffers
140   */
141  public List<ByteBuffer> getByteBuffers() {
142    if (!this.lastBufFlipped) {
143      this.lastBufFlipped = true;
144      // All the other BBs are already flipped while moving to the new BB.
145      curBuf.flip();
146    }
147    return this.allBufs;
148  }
149
150  @Override
151  public void write(byte[] b, int off, int len) throws IOException {
152    int toWrite = 0;
153    while (len > 0) {
154      toWrite = Math.min(len, this.curBuf.remaining());
155      ByteBufferUtils.copyFromArrayToBuffer(this.curBuf, b, off, toWrite);
156      off += toWrite;
157      len -= toWrite;
158      if (len > 0) {
159        allocateNewBuffer();// The curBuf is over. Let us move to the next one
160      }
161    }
162  }
163
164  @Override
165  public void write(ByteBuffer b, int off, int len) throws IOException {
166    int toWrite = 0;
167    while (len > 0) {
168      toWrite = Math.min(len, this.curBuf.remaining());
169      ByteBufferUtils.copyFromBufferToBuffer(b, this.curBuf, off, toWrite);
170      off += toWrite;
171      len -= toWrite;
172      if (len > 0) {
173        allocateNewBuffer();// The curBuf is over. Let us move to the next one
174      }
175    }
176  }
177}