001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io;
019
020import java.io.IOException;
021import java.io.OutputStream;
022import java.nio.BufferOverflowException;
023import java.nio.ByteBuffer;
024import java.nio.ByteOrder;
025import java.nio.channels.Channels;
026import java.nio.channels.WritableByteChannel;
027import org.apache.hadoop.hbase.util.ByteBufferUtils;
028import org.apache.hadoop.hbase.util.Bytes;
029import org.apache.yetus.audience.InterfaceAudience;
030
031/**
032 * Not thread safe!
033 */
034@InterfaceAudience.Public
035public class ByteBufferOutputStream extends OutputStream implements ByteBufferWriter {
036
037  // Borrowed from openJDK:
038  // http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/8-b132/java/util/ArrayList.java#221
039  private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
040
041  protected ByteBuffer curBuf = null;
042
043  ByteBufferOutputStream() {
044
045  }
046
047  public ByteBufferOutputStream(int capacity) {
048    this(capacity, false);
049  }
050
051  public ByteBufferOutputStream(int capacity, boolean useDirectByteBuffer) {
052    this(allocate(capacity, useDirectByteBuffer));
053  }
054
055  /**
056   * @param bb ByteBuffer to use. If too small, will be discarded and a new one allocated in its
057   *           place; i.e. the passed in BB may NOT BE RETURNED!! Minimally it will be altered. SIDE
058   *           EFFECT!! If you want to get the newly allocated ByteBuffer, you'll need to pick it up
059   *           when done with this instance by calling {@link #getByteBuffer()}. All this
060   *           encapsulation violation is so we can recycle buffers rather than allocate each time;
061   *           it can get expensive especially if the buffers are big doing allocations each time or
062   *           having them undergo resizing because initial allocation was small.
063   * @see #getByteBuffer()
064   */
065  public ByteBufferOutputStream(final ByteBuffer bb) {
066    assert bb.order() == ByteOrder.BIG_ENDIAN;
067    this.curBuf = bb;
068    this.curBuf.clear();
069  }
070
071  public int size() {
072    return curBuf.position();
073  }
074
075  private static ByteBuffer allocate(final int capacity, final boolean useDirectByteBuffer) {
076    if (capacity > MAX_ARRAY_SIZE) { // avoid OutOfMemoryError
077      throw new BufferOverflowException();
078    }
079    return useDirectByteBuffer
080      ? ByteBuffer.allocateDirect(capacity)
081      : ByteBuffer.allocate(capacity);
082  }
083
084  /**
085   * This flips the underlying BB so be sure to use it _last_! n
086   */
087  public ByteBuffer getByteBuffer() {
088    curBuf.flip();
089    return curBuf;
090  }
091
092  protected void checkSizeAndGrow(int extra) {
093    long capacityNeeded = curBuf.position() + (long) extra;
094    if (capacityNeeded > curBuf.limit()) {
095      // guarantee it's possible to fit
096      if (capacityNeeded > MAX_ARRAY_SIZE) {
097        throw new BufferOverflowException();
098      }
099      // double until hit the cap
100      long nextCapacity = Math.min(curBuf.capacity() * 2L, MAX_ARRAY_SIZE);
101      // but make sure there is enough if twice the existing capacity is still too small
102      nextCapacity = Math.max(nextCapacity, capacityNeeded);
103      ByteBuffer newBuf = allocate((int) nextCapacity, curBuf.isDirect());
104      curBuf.flip();
105      ByteBufferUtils.copyFromBufferToBuffer(curBuf, newBuf);
106      curBuf = newBuf;
107    }
108  }
109
110  // OutputStream
111  @Override
112  public void write(int b) throws IOException {
113    checkSizeAndGrow(Bytes.SIZEOF_BYTE);
114    curBuf.put((byte) b);
115  }
116
117  /**
118   * Writes the complete contents of this byte buffer output stream to the specified output stream
119   * argument.
120   * @param out the output stream to which to write the data.
121   * @exception IOException if an I/O error occurs.
122   */
123  public void writeTo(OutputStream out) throws IOException {
124    WritableByteChannel channel = Channels.newChannel(out);
125    ByteBuffer bb = curBuf.duplicate();
126    bb.flip();
127    channel.write(bb);
128  }
129
130  @Override
131  public void write(byte[] b) throws IOException {
132    write(b, 0, b.length);
133  }
134
135  @Override
136  public void write(byte[] b, int off, int len) throws IOException {
137    checkSizeAndGrow(len);
138    ByteBufferUtils.copyFromArrayToBuffer(curBuf, b, off, len);
139  }
140
141  @Override
142  public void write(ByteBuffer b, int off, int len) throws IOException {
143    checkSizeAndGrow(len);
144    ByteBufferUtils.copyFromBufferToBuffer(b, curBuf, off, len);
145  }
146
147  /**
148   * Writes an <code>int</code> to the underlying output stream as four bytes, high byte first.
149   * @param i the <code>int</code> to write
150   * @throws IOException if an I/O error occurs.
151   */
152  @Override
153  public void writeInt(int i) throws IOException {
154    checkSizeAndGrow(Bytes.SIZEOF_INT);
155    ByteBufferUtils.putInt(this.curBuf, i);
156  }
157
158  @Override
159  public void flush() throws IOException {
160    // noop
161  }
162
163  @Override
164  public void close() throws IOException {
165    // noop again. heh
166  }
167
168  public byte[] toByteArray(int offset, int length) {
169    ByteBuffer bb = curBuf.duplicate();
170    bb.flip();
171
172    byte[] chunk = new byte[length];
173
174    bb.position(offset);
175    bb.get(chunk, 0, length);
176    return chunk;
177  }
178}