View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.io;
21  
22  import java.io.IOException;
23  import java.io.OutputStream;
24  import java.nio.ByteBuffer;
25  import java.nio.ByteOrder;
26  import java.nio.channels.Channels;
27  import java.nio.channels.WritableByteChannel;
28  
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.hbase.classification.InterfaceStability;
31  import org.apache.hadoop.hbase.util.ByteBufferUtils;
32  import org.apache.hadoop.hbase.util.Bytes;
33  
34  /**
35   * Not thread safe!
36   */
37  @InterfaceAudience.Public
38  @InterfaceStability.Evolving
39  public class ByteBufferOutputStream extends OutputStream {
40  
41    protected ByteBuffer buf;
42  
43    public ByteBufferOutputStream(int capacity) {
44      this(capacity, false);
45    }
46  
47    public ByteBufferOutputStream(int capacity, boolean useDirectByteBuffer) {
48      this(allocate(capacity, useDirectByteBuffer));
49    }
50  
51    /**
52     * @param bb ByteBuffer to use. If too small, will be discarded and a new one allocated in its
53     * place; i.e. the passed in BB may NOT BE RETURNED!! Minimally it will be altered. SIDE EFFECT!!
54     * If you want to get the newly allocated ByteBuffer, you'll need to pick it up when
55     * done with this instance by calling {@link #getByteBuffer()}. All this encapsulation violation
56     * is so we can recycle buffers rather than allocate each time; it can get expensive especially
57     * if the buffers are big doing allocations each time or having them undergo resizing because
58     * initial allocation was small.
59     * @see #getByteBuffer()
60     */
61    public ByteBufferOutputStream(final ByteBuffer bb) {
62      assert bb.order() == ByteOrder.BIG_ENDIAN;
63      this.buf = bb;
64      this.buf.clear();
65    }
66  
67    public int size() {
68      return buf.position();
69    }
70  
71    private static ByteBuffer allocate(final int capacity, final boolean useDirectByteBuffer) {
72      return useDirectByteBuffer? ByteBuffer.allocateDirect(capacity): ByteBuffer.allocate(capacity);
73    }
74  
75    /**
76     * This flips the underlying BB so be sure to use it _last_!
77     * @return ByteBuffer
78     */
79    public ByteBuffer getByteBuffer() {
80      buf.flip();
81      return buf;
82    }
83  
84    private void checkSizeAndGrow(int extra) {
85      if ( (buf.position() + extra) > buf.limit()) {
86        // size calculation is complex, because we could overflow negative,
87        // and/or not allocate enough space. this fixes that.
88        int newSize = (int)Math.min((((long)buf.capacity()) * 2),
89            (long)(Integer.MAX_VALUE));
90        newSize = Math.max(newSize, buf.position() + extra);
91        ByteBuffer newBuf = allocate(newSize, buf.isDirect());
92        buf.flip();
93        ByteBufferUtils.copyFromBufferToBuffer(buf, newBuf);
94        buf = newBuf;
95      }
96    }
97  
98    // OutputStream
99    @Override
100   public void write(int b) throws IOException {
101     checkSizeAndGrow(Bytes.SIZEOF_BYTE);
102     buf.put((byte)b);
103   }
104 
105  /**
106   * Writes the complete contents of this byte buffer output stream to
107   * the specified output stream argument.
108   *
109   * @param      out   the output stream to which to write the data.
110   * @exception  IOException  if an I/O error occurs.
111   */
112   public synchronized void writeTo(OutputStream out) throws IOException {
113     WritableByteChannel channel = Channels.newChannel(out);
114     ByteBuffer bb = buf.duplicate();
115     bb.flip();
116     channel.write(bb);
117   }
118 
119   @Override
120   public void write(byte[] b) throws IOException {
121     write(b, 0, b.length);
122   }
123 
124   @Override
125   public void write(byte[] b, int off, int len) throws IOException {
126     checkSizeAndGrow(len);
127     ByteBufferUtils.copyFromArrayToBuffer(buf, b, off, len);
128   }
129 
130   public void write(ByteBuffer b, int off, int len) throws IOException {
131     checkSizeAndGrow(len);
132     ByteBufferUtils.copyFromBufferToBuffer(b, buf, off, len);
133   }
134 
135   /**
136    * Writes an <code>int</code> to the underlying output stream as four
137    * bytes, high byte first.
138    * @param i the <code>int</code> to write
139    * @throws IOException if an I/O error occurs.
140    */
141   public void writeInt(int i) throws IOException {
142     checkSizeAndGrow(Bytes.SIZEOF_INT);
143     ByteBufferUtils.putInt(this.buf, i);
144   }
145 
146   @Override
147   public void flush() throws IOException {
148     // noop
149   }
150 
151   @Override
152   public void close() throws IOException {
153     // noop again. heh
154   }
155 
156   public byte[] toByteArray(int offset, int length) {
157     ByteBuffer bb = buf.duplicate();
158     bb.flip();
159 
160     byte[] chunk = new byte[length];
161 
162     bb.position(offset);
163     bb.get(chunk, 0, length);
164     return chunk;
165   }
166 }