View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.io;
21  
22  import java.io.IOException;
23  import java.io.OutputStream;
24  import java.nio.BufferOverflowException;
25  import java.nio.ByteBuffer;
26  import java.nio.ByteOrder;
27  import java.nio.channels.Channels;
28  import java.nio.channels.WritableByteChannel;
29  
30  import org.apache.hadoop.hbase.classification.InterfaceAudience;
31  import org.apache.hadoop.hbase.classification.InterfaceStability;
32  import org.apache.hadoop.hbase.util.ByteBufferUtils;
33  import org.apache.hadoop.hbase.util.Bytes;
34  
35  /**
36   * Not thread safe!
37   */
38  @InterfaceAudience.Public
39  @InterfaceStability.Evolving
40  public class ByteBufferOutputStream extends OutputStream
41      implements ByteBufferSupportOutputStream {
42    
43    // Borrowed from openJDK:
44    // http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/8-b132/java/util/ArrayList.java#221
45    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
46  
47    protected ByteBuffer buf;
48  
49    public ByteBufferOutputStream(int capacity) {
50      this(capacity, false);
51    }
52  
53    public ByteBufferOutputStream(int capacity, boolean useDirectByteBuffer) {
54      this(allocate(capacity, useDirectByteBuffer));
55    }
56  
57    /**
58     * @param bb ByteBuffer to use. If too small, will be discarded and a new one allocated in its
59     * place; i.e. the passed in BB may NOT BE RETURNED!! Minimally it will be altered. SIDE EFFECT!!
60     * If you want to get the newly allocated ByteBuffer, you'll need to pick it up when
61     * done with this instance by calling {@link #getByteBuffer()}. All this encapsulation violation
62     * is so we can recycle buffers rather than allocate each time; it can get expensive especially
63     * if the buffers are big doing allocations each time or having them undergo resizing because
64     * initial allocation was small.
65     * @see #getByteBuffer()
66     */
67    public ByteBufferOutputStream(final ByteBuffer bb) {
68      assert bb.order() == ByteOrder.BIG_ENDIAN;
69      this.buf = bb;
70      this.buf.clear();
71    }
72  
73    public int size() {
74      return buf.position();
75    }
76  
77    private static ByteBuffer allocate(final int capacity, final boolean useDirectByteBuffer) {
78      if (capacity > MAX_ARRAY_SIZE) { // avoid OutOfMemoryError
79        throw new BufferOverflowException();
80      }
81      return useDirectByteBuffer? ByteBuffer.allocateDirect(capacity): ByteBuffer.allocate(capacity);
82    }
83  
84    /**
85     * This flips the underlying BB so be sure to use it _last_!
86     * @return ByteBuffer
87     */
88    public ByteBuffer getByteBuffer() {
89      buf.flip();
90      return buf;
91    }
92  
93    private void checkSizeAndGrow(int extra) {
94      long capacityNeeded = buf.position() + (long) extra;
95      if (capacityNeeded > buf.limit()) {
96        // guarantee it's possible to fit
97        if (capacityNeeded > MAX_ARRAY_SIZE) {
98          throw new BufferOverflowException();
99        }
100       // double until hit the cap
101       long nextCapacity = Math.min(buf.capacity() * 2L, MAX_ARRAY_SIZE);
102       // but make sure there is enough if twice the existing capacity is still too small
103       nextCapacity = Math.max(nextCapacity, capacityNeeded);
104       ByteBuffer newBuf = allocate((int) nextCapacity, buf.isDirect());
105       buf.flip();
106       ByteBufferUtils.copyFromBufferToBuffer(buf, newBuf);
107       buf = newBuf;
108     }
109   }
110 
111   // OutputStream
112   @Override
113   public void write(int b) throws IOException {
114     checkSizeAndGrow(Bytes.SIZEOF_BYTE);
115     buf.put((byte)b);
116   }
117 
118  /**
119   * Writes the complete contents of this byte buffer output stream to
120   * the specified output stream argument.
121   *
122   * @param      out   the output stream to which to write the data.
123   * @exception  IOException  if an I/O error occurs.
124   */
125   public synchronized void writeTo(OutputStream out) throws IOException {
126     WritableByteChannel channel = Channels.newChannel(out);
127     ByteBuffer bb = buf.duplicate();
128     bb.flip();
129     channel.write(bb);
130   }
131 
132   @Override
133   public void write(byte[] b) throws IOException {
134     write(b, 0, b.length);
135   }
136 
137   @Override
138   public void write(byte[] b, int off, int len) throws IOException {
139     checkSizeAndGrow(len);
140     ByteBufferUtils.copyFromArrayToBuffer(buf, b, off, len);
141   }
142 
143   public void write(ByteBuffer b, int off, int len) throws IOException {
144     checkSizeAndGrow(len);
145     ByteBufferUtils.copyFromBufferToBuffer(b, buf, off, len);
146   }
147 
148   /**
149    * Writes an <code>int</code> to the underlying output stream as four
150    * bytes, high byte first.
151    * @param i the <code>int</code> to write
152    * @throws IOException if an I/O error occurs.
153    */
154   public void writeInt(int i) throws IOException {
155     checkSizeAndGrow(Bytes.SIZEOF_INT);
156     ByteBufferUtils.putInt(this.buf, i);
157   }
158 
159   @Override
160   public void flush() throws IOException {
161     // noop
162   }
163 
164   @Override
165   public void close() throws IOException {
166     // noop again. heh
167   }
168 
169   public byte[] toByteArray(int offset, int length) {
170     ByteBuffer bb = buf.duplicate();
171     bb.flip();
172 
173     byte[] chunk = new byte[length];
174 
175     bb.position(offset);
176     bb.get(chunk, 0, length);
177     return chunk;
178   }
179 }