View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.io;
21  
22  import java.io.IOException;
23  import java.io.OutputStream;
24  import java.nio.ByteBuffer;
25  import java.nio.channels.Channels;
26  import java.nio.channels.WritableByteChannel;
27  
28  import org.apache.hadoop.hbase.classification.InterfaceAudience;
29  import org.apache.hadoop.hbase.classification.InterfaceStability;
30  import org.apache.hadoop.hbase.util.Bytes;
31  
32  /**
33   * Not thread safe!
34   */
35  @InterfaceAudience.Public
36  @InterfaceStability.Evolving
37  public class ByteBufferOutputStream extends OutputStream {
38  
39    protected ByteBuffer buf;
40  
41    public ByteBufferOutputStream(int capacity) {
42      this(capacity, false);
43    }
44  
45    public ByteBufferOutputStream(int capacity, boolean useDirectByteBuffer) {
46      this(allocate(capacity, useDirectByteBuffer));
47    }
48  
49    /**
50     * @param bb ByteBuffer to use. If too small, will be discarded and a new one allocated in its
51     * place; i.e. the passed in BB may NOT BE RETURNED!! Minimally it will be altered. SIDE EFFECT!!
52     * If you want to get the newly allocated ByteBuffer, you'll need to pick it up when
53     * done with this instance by calling {@link #getByteBuffer()}. All this encapsulation violation
54     * is so we can recycle buffers rather than allocate each time; it can get expensive especially
55     * if the buffers are big doing allocations each time or having them undergo resizing because
56     * initial allocation was small.
57     * @see #getByteBuffer()
58     */
59    public ByteBufferOutputStream(final ByteBuffer bb) {
60      this.buf = bb;
61      this.buf.clear();
62    }
63  
64    public int size() {
65      return buf.position();
66    }
67  
68    private static ByteBuffer allocate(final int capacity, final boolean useDirectByteBuffer) {
69      return useDirectByteBuffer? ByteBuffer.allocateDirect(capacity): ByteBuffer.allocate(capacity);
70    }
71  
72    /**
73     * This flips the underlying BB so be sure to use it _last_!
74     * @return ByteBuffer
75     */
76    public ByteBuffer getByteBuffer() {
77      buf.flip();
78      return buf;
79    }
80  
81    private void checkSizeAndGrow(int extra) {
82      if ( (buf.position() + extra) > buf.limit()) {
83        // size calculation is complex, because we could overflow negative,
84        // and/or not allocate enough space. this fixes that.
85        int newSize = (int)Math.min((((long)buf.capacity()) * 2),
86            (long)(Integer.MAX_VALUE));
87        newSize = Math.max(newSize, buf.position() + extra);
88        ByteBuffer newBuf = allocate(newSize, buf.isDirect());
89        buf.flip();
90        newBuf.put(buf);
91        buf = newBuf;
92      }
93    }
94  
95    // OutputStream
96    @Override
97    public void write(int b) throws IOException {
98      checkSizeAndGrow(Bytes.SIZEOF_BYTE);
99  
100     buf.put((byte)b);
101   }
102 
103  /**
104   * Writes the complete contents of this byte buffer output stream to
105   * the specified output stream argument.
106   *
107   * @param      out   the output stream to which to write the data.
108   * @exception  IOException  if an I/O error occurs.
109   */
110   public synchronized void writeTo(OutputStream out) throws IOException {
111     WritableByteChannel channel = Channels.newChannel(out);
112     ByteBuffer bb = buf.duplicate();
113     bb.flip();
114     channel.write(bb);
115   }
116 
117   @Override
118   public void write(byte[] b) throws IOException {
119     checkSizeAndGrow(b.length);
120 
121     buf.put(b);
122   }
123 
124   @Override
125   public void write(byte[] b, int off, int len) throws IOException {
126     checkSizeAndGrow(len);
127 
128     buf.put(b, off, len);
129   }
130 
131   @Override
132   public void flush() throws IOException {
133     // noop
134   }
135 
136   @Override
137   public void close() throws IOException {
138     // noop again. heh
139   }
140 
141   public byte[] toByteArray(int offset, int length) {
142     ByteBuffer bb = buf.duplicate();
143     bb.flip();
144 
145     byte[] chunk = new byte[length];
146 
147     bb.position(offset);
148     bb.get(chunk, 0, length);
149     return chunk;
150   }
151 }