001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.io;
021
022import java.io.IOException;
023import java.io.OutputStream;
024import java.nio.BufferOverflowException;
025import java.nio.ByteBuffer;
026import java.nio.ByteOrder;
027import java.nio.channels.Channels;
028import java.nio.channels.WritableByteChannel;
029
030import org.apache.hadoop.hbase.util.ByteBufferUtils;
031import org.apache.hadoop.hbase.util.Bytes;
032import org.apache.yetus.audience.InterfaceAudience;
033
034/**
035 * Not thread safe!
036 */
037@InterfaceAudience.Public
038public class ByteBufferOutputStream extends OutputStream
039    implements ByteBufferWriter {
040  
041  // Borrowed from openJDK:
042  // http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/8-b132/java/util/ArrayList.java#221
043  private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
044
045  protected ByteBuffer curBuf = null;
046
047  ByteBufferOutputStream() {
048
049  }
050
051  public ByteBufferOutputStream(int capacity) {
052    this(capacity, false);
053  }
054
055  public ByteBufferOutputStream(int capacity, boolean useDirectByteBuffer) {
056    this(allocate(capacity, useDirectByteBuffer));
057  }
058
059  /**
060   * @param bb ByteBuffer to use. If too small, will be discarded and a new one allocated in its
061   * place; i.e. the passed in BB may NOT BE RETURNED!! Minimally it will be altered. SIDE EFFECT!!
062   * If you want to get the newly allocated ByteBuffer, you'll need to pick it up when
063   * done with this instance by calling {@link #getByteBuffer()}. All this encapsulation violation
064   * is so we can recycle buffers rather than allocate each time; it can get expensive especially
065   * if the buffers are big doing allocations each time or having them undergo resizing because
066   * initial allocation was small.
067   * @see #getByteBuffer()
068   */
069  public ByteBufferOutputStream(final ByteBuffer bb) {
070    assert bb.order() == ByteOrder.BIG_ENDIAN;
071    this.curBuf = bb;
072    this.curBuf.clear();
073  }
074
075  public int size() {
076    return curBuf.position();
077  }
078
079  private static ByteBuffer allocate(final int capacity, final boolean useDirectByteBuffer) {
080    if (capacity > MAX_ARRAY_SIZE) { // avoid OutOfMemoryError
081      throw new BufferOverflowException();
082    }
083    return useDirectByteBuffer? ByteBuffer.allocateDirect(capacity): ByteBuffer.allocate(capacity);
084  }
085
086  /**
087   * This flips the underlying BB so be sure to use it _last_!
088   * @return ByteBuffer
089   */
090  public ByteBuffer getByteBuffer() {
091    curBuf.flip();
092    return curBuf;
093  }
094
095  protected void checkSizeAndGrow(int extra) {
096    long capacityNeeded = curBuf.position() + (long) extra;
097    if (capacityNeeded > curBuf.limit()) {
098      // guarantee it's possible to fit
099      if (capacityNeeded > MAX_ARRAY_SIZE) {
100        throw new BufferOverflowException();
101      }
102      // double until hit the cap
103      long nextCapacity = Math.min(curBuf.capacity() * 2L, MAX_ARRAY_SIZE);
104      // but make sure there is enough if twice the existing capacity is still too small
105      nextCapacity = Math.max(nextCapacity, capacityNeeded);
106      ByteBuffer newBuf = allocate((int) nextCapacity, curBuf.isDirect());
107      curBuf.flip();
108      ByteBufferUtils.copyFromBufferToBuffer(curBuf, newBuf);
109      curBuf = newBuf;
110    }
111  }
112
113  // OutputStream
114  @Override
115  public void write(int b) throws IOException {
116    checkSizeAndGrow(Bytes.SIZEOF_BYTE);
117    curBuf.put((byte)b);
118  }
119
120 /**
121  * Writes the complete contents of this byte buffer output stream to
122  * the specified output stream argument.
123  *
124  * @param      out   the output stream to which to write the data.
125  * @exception  IOException  if an I/O error occurs.
126  */
127  public void writeTo(OutputStream out) throws IOException {
128    WritableByteChannel channel = Channels.newChannel(out);
129    ByteBuffer bb = curBuf.duplicate();
130    bb.flip();
131    channel.write(bb);
132  }
133
134  @Override
135  public void write(byte[] b) throws IOException {
136    write(b, 0, b.length);
137  }
138
139  @Override
140  public void write(byte[] b, int off, int len) throws IOException {
141    checkSizeAndGrow(len);
142    ByteBufferUtils.copyFromArrayToBuffer(curBuf, b, off, len);
143  }
144
145  @Override
146  public void write(ByteBuffer b, int off, int len) throws IOException {
147    checkSizeAndGrow(len);
148    ByteBufferUtils.copyFromBufferToBuffer(b, curBuf, off, len);
149  }
150
151  /**
152   * Writes an <code>int</code> to the underlying output stream as four
153   * bytes, high byte first.
154   * @param i the <code>int</code> to write
155   * @throws IOException if an I/O error occurs.
156   */
157  @Override
158  public void writeInt(int i) throws IOException {
159    checkSizeAndGrow(Bytes.SIZEOF_INT);
160    ByteBufferUtils.putInt(this.curBuf, i);
161  }
162
163  @Override
164  public void flush() throws IOException {
165    // noop
166  }
167
168  @Override
169  public void close() throws IOException {
170    // noop again. heh
171  }
172
173  public byte[] toByteArray(int offset, int length) {
174    ByteBuffer bb = curBuf.duplicate();
175    bb.flip();
176
177    byte[] chunk = new byte[length];
178
179    bb.position(offset);
180    bb.get(chunk, 0, length);
181    return chunk;
182  }
183}