View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.io;
21  
22  import java.io.IOException;
23  import java.io.OutputStream;
24  import java.nio.BufferOverflowException;
25  import java.nio.ByteBuffer;
26  import java.nio.channels.Channels;
27  import java.nio.channels.WritableByteChannel;
28  
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.hbase.classification.InterfaceStability;
31  import org.apache.hadoop.hbase.util.Bytes;
32  
33  /**
34   * Not thread safe!
35   */
36  @InterfaceAudience.Public
37  @InterfaceStability.Evolving
38  public class ByteBufferOutputStream extends OutputStream {
39    
40    // Borrowed from openJDK:
41    // http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/8-b132/java/util/ArrayList.java#221
42    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
43  
44    protected ByteBuffer buf;
45  
46    public ByteBufferOutputStream(int capacity) {
47      this(capacity, false);
48    }
49  
50    public ByteBufferOutputStream(int capacity, boolean useDirectByteBuffer) {
51      this(allocate(capacity, useDirectByteBuffer));
52    }
53  
54    /**
55     * @param bb ByteBuffer to use. If too small, will be discarded and a new one allocated in its
56     * place; i.e. the passed in BB may NOT BE RETURNED!! Minimally it will be altered. SIDE EFFECT!!
57     * If you want to get the newly allocated ByteBuffer, you'll need to pick it up when
58     * done with this instance by calling {@link #getByteBuffer()}. All this encapsulation violation
59     * is so we can recycle buffers rather than allocate each time; it can get expensive especially
60     * if the buffers are big doing allocations each time or having them undergo resizing because
61     * initial allocation was small.
62     * @see #getByteBuffer()
63     */
64    public ByteBufferOutputStream(final ByteBuffer bb) {
65      this.buf = bb;
66      this.buf.clear();
67    }
68  
69    public int size() {
70      return buf.position();
71    }
72  
73    private static ByteBuffer allocate(final int capacity, final boolean useDirectByteBuffer) {
74      if (capacity > MAX_ARRAY_SIZE) { // avoid OutOfMemoryError
75        throw new BufferOverflowException();
76      }
77      return useDirectByteBuffer? ByteBuffer.allocateDirect(capacity): ByteBuffer.allocate(capacity);
78    }
79  
80    /**
81     * This flips the underlying BB so be sure to use it _last_!
82     * @return ByteBuffer
83     */
84    public ByteBuffer getByteBuffer() {
85      buf.flip();
86      return buf;
87    }
88  
89    private void checkSizeAndGrow(int extra) {
90      long capacityNeeded = buf.position() + (long) extra;
91      if (capacityNeeded > buf.limit()) {
92        // guarantee it's possible to fit
93        if (capacityNeeded > MAX_ARRAY_SIZE) {
94          throw new BufferOverflowException();
95        }
96        // double until hit the cap
97        long nextCapacity = Math.min(buf.capacity() * 2L, MAX_ARRAY_SIZE);
98        // but make sure there is enough if twice the existing capacity is still too small
99        nextCapacity = Math.max(nextCapacity, capacityNeeded);
100       ByteBuffer newBuf = allocate((int) nextCapacity, buf.isDirect());
101       buf.flip();
102       newBuf.put(buf);
103       buf = newBuf;
104     }
105   }
106 
107   // OutputStream
108   @Override
109   public void write(int b) throws IOException {
110     checkSizeAndGrow(Bytes.SIZEOF_BYTE);
111 
112     buf.put((byte)b);
113   }
114 
115  /**
116   * Writes the complete contents of this byte buffer output stream to
117   * the specified output stream argument.
118   *
119   * @param      out   the output stream to which to write the data.
120   * @exception  IOException  if an I/O error occurs.
121   */
122   public synchronized void writeTo(OutputStream out) throws IOException {
123     WritableByteChannel channel = Channels.newChannel(out);
124     ByteBuffer bb = buf.duplicate();
125     bb.flip();
126     channel.write(bb);
127   }
128 
129   @Override
130   public void write(byte[] b) throws IOException {
131     checkSizeAndGrow(b.length);
132 
133     buf.put(b);
134   }
135 
136   @Override
137   public void write(byte[] b, int off, int len) throws IOException {
138     checkSizeAndGrow(len);
139 
140     buf.put(b, off, len);
141   }
142 
143   @Override
144   public void flush() throws IOException {
145     // noop
146   }
147 
148   @Override
149   public void close() throws IOException {
150     // noop again. heh
151   }
152 
153   public byte[] toByteArray(int offset, int length) {
154     ByteBuffer bb = buf.duplicate();
155     bb.flip();
156 
157     byte[] chunk = new byte[length];
158 
159     bb.position(offset);
160     bb.get(chunk, 0, length);
161     return chunk;
162   }
163 }