001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.io; 021 022import java.io.IOException; 023import java.io.OutputStream; 024import java.nio.BufferOverflowException; 025import java.nio.ByteBuffer; 026import java.nio.ByteOrder; 027import java.nio.channels.Channels; 028import java.nio.channels.WritableByteChannel; 029 030import org.apache.hadoop.hbase.util.ByteBufferUtils; 031import org.apache.hadoop.hbase.util.Bytes; 032import org.apache.yetus.audience.InterfaceAudience; 033 034/** 035 * Not thread safe! 036 */ 037@InterfaceAudience.Public 038public class ByteBufferOutputStream extends OutputStream 039 implements ByteBufferWriter { 040 041 // Borrowed from openJDK: 042 // http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/8-b132/java/util/ArrayList.java#221 043 private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; 044 045 protected ByteBuffer curBuf = null; 046 047 ByteBufferOutputStream() { 048 049 } 050 051 public ByteBufferOutputStream(int capacity) { 052 this(capacity, false); 053 } 054 055 public ByteBufferOutputStream(int capacity, boolean useDirectByteBuffer) { 056 this(allocate(capacity, useDirectByteBuffer)); 057 } 058 059 /** 060 * @param bb ByteBuffer to use. If too small, will be discarded and a new one allocated in its 061 * place; i.e. the passed in BB may NOT BE RETURNED!! Minimally it will be altered. SIDE EFFECT!! 062 * If you want to get the newly allocated ByteBuffer, you'll need to pick it up when 063 * done with this instance by calling {@link #getByteBuffer()}. All this encapsulation violation 064 * is so we can recycle buffers rather than allocate each time; it can get expensive especially 065 * if the buffers are big doing allocations each time or having them undergo resizing because 066 * initial allocation was small. 067 * @see #getByteBuffer() 068 */ 069 public ByteBufferOutputStream(final ByteBuffer bb) { 070 assert bb.order() == ByteOrder.BIG_ENDIAN; 071 this.curBuf = bb; 072 this.curBuf.clear(); 073 } 074 075 public int size() { 076 return curBuf.position(); 077 } 078 079 private static ByteBuffer allocate(final int capacity, final boolean useDirectByteBuffer) { 080 if (capacity > MAX_ARRAY_SIZE) { // avoid OutOfMemoryError 081 throw new BufferOverflowException(); 082 } 083 return useDirectByteBuffer? ByteBuffer.allocateDirect(capacity): ByteBuffer.allocate(capacity); 084 } 085 086 /** 087 * This flips the underlying BB so be sure to use it _last_! 088 * @return ByteBuffer 089 */ 090 public ByteBuffer getByteBuffer() { 091 curBuf.flip(); 092 return curBuf; 093 } 094 095 protected void checkSizeAndGrow(int extra) { 096 long capacityNeeded = curBuf.position() + (long) extra; 097 if (capacityNeeded > curBuf.limit()) { 098 // guarantee it's possible to fit 099 if (capacityNeeded > MAX_ARRAY_SIZE) { 100 throw new BufferOverflowException(); 101 } 102 // double until hit the cap 103 long nextCapacity = Math.min(curBuf.capacity() * 2L, MAX_ARRAY_SIZE); 104 // but make sure there is enough if twice the existing capacity is still too small 105 nextCapacity = Math.max(nextCapacity, capacityNeeded); 106 ByteBuffer newBuf = allocate((int) nextCapacity, curBuf.isDirect()); 107 curBuf.flip(); 108 ByteBufferUtils.copyFromBufferToBuffer(curBuf, newBuf); 109 curBuf = newBuf; 110 } 111 } 112 113 // OutputStream 114 @Override 115 public void write(int b) throws IOException { 116 checkSizeAndGrow(Bytes.SIZEOF_BYTE); 117 curBuf.put((byte)b); 118 } 119 120 /** 121 * Writes the complete contents of this byte buffer output stream to 122 * the specified output stream argument. 123 * 124 * @param out the output stream to which to write the data. 125 * @exception IOException if an I/O error occurs. 126 */ 127 public void writeTo(OutputStream out) throws IOException { 128 WritableByteChannel channel = Channels.newChannel(out); 129 ByteBuffer bb = curBuf.duplicate(); 130 bb.flip(); 131 channel.write(bb); 132 } 133 134 @Override 135 public void write(byte[] b) throws IOException { 136 write(b, 0, b.length); 137 } 138 139 @Override 140 public void write(byte[] b, int off, int len) throws IOException { 141 checkSizeAndGrow(len); 142 ByteBufferUtils.copyFromArrayToBuffer(curBuf, b, off, len); 143 } 144 145 @Override 146 public void write(ByteBuffer b, int off, int len) throws IOException { 147 checkSizeAndGrow(len); 148 ByteBufferUtils.copyFromBufferToBuffer(b, curBuf, off, len); 149 } 150 151 /** 152 * Writes an <code>int</code> to the underlying output stream as four 153 * bytes, high byte first. 154 * @param i the <code>int</code> to write 155 * @throws IOException if an I/O error occurs. 156 */ 157 @Override 158 public void writeInt(int i) throws IOException { 159 checkSizeAndGrow(Bytes.SIZEOF_INT); 160 ByteBufferUtils.putInt(this.curBuf, i); 161 } 162 163 @Override 164 public void flush() throws IOException { 165 // noop 166 } 167 168 @Override 169 public void close() throws IOException { 170 // noop again. heh 171 } 172 173 public byte[] toByteArray(int offset, int length) { 174 ByteBuffer bb = curBuf.duplicate(); 175 bb.flip(); 176 177 byte[] chunk = new byte[length]; 178 179 bb.position(offset); 180 bb.get(chunk, 0, length); 181 return chunk; 182 } 183}