001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io; 019 020import java.io.IOException; 021import java.io.OutputStream; 022import java.nio.BufferOverflowException; 023import java.nio.ByteBuffer; 024import java.nio.ByteOrder; 025import java.nio.channels.Channels; 026import java.nio.channels.WritableByteChannel; 027import org.apache.hadoop.hbase.util.ByteBufferUtils; 028import org.apache.hadoop.hbase.util.Bytes; 029import org.apache.yetus.audience.InterfaceAudience; 030 031/** 032 * Not thread safe! 033 */ 034@InterfaceAudience.Public 035public class ByteBufferOutputStream extends OutputStream implements ByteBufferWriter { 036 037 // Borrowed from openJDK: 038 // http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/8-b132/java/util/ArrayList.java#221 039 private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; 040 041 protected ByteBuffer curBuf = null; 042 043 ByteBufferOutputStream() { 044 045 } 046 047 public ByteBufferOutputStream(int capacity) { 048 this(capacity, false); 049 } 050 051 public ByteBufferOutputStream(int capacity, boolean useDirectByteBuffer) { 052 this(allocate(capacity, useDirectByteBuffer)); 053 } 054 055 /** 056 * @param bb ByteBuffer to use. If too small, will be discarded and a new one allocated in its 057 * place; i.e. the passed in BB may NOT BE RETURNED!! Minimally it will be altered. SIDE 058 * EFFECT!! If you want to get the newly allocated ByteBuffer, you'll need to pick it up 059 * when done with this instance by calling {@link #getByteBuffer()}. All this 060 * encapsulation violation is so we can recycle buffers rather than allocate each time; 061 * it can get expensive especially if the buffers are big doing allocations each time or 062 * having them undergo resizing because initial allocation was small. 063 * @see #getByteBuffer() 064 */ 065 public ByteBufferOutputStream(final ByteBuffer bb) { 066 assert bb.order() == ByteOrder.BIG_ENDIAN; 067 this.curBuf = bb; 068 this.curBuf.clear(); 069 } 070 071 public int size() { 072 return curBuf.position(); 073 } 074 075 private static ByteBuffer allocate(final int capacity, final boolean useDirectByteBuffer) { 076 if (capacity > MAX_ARRAY_SIZE) { // avoid OutOfMemoryError 077 throw new BufferOverflowException(); 078 } 079 return useDirectByteBuffer 080 ? ByteBuffer.allocateDirect(capacity) 081 : ByteBuffer.allocate(capacity); 082 } 083 084 /** 085 * This flips the underlying BB so be sure to use it _last_! n 086 */ 087 public ByteBuffer getByteBuffer() { 088 curBuf.flip(); 089 return curBuf; 090 } 091 092 protected void checkSizeAndGrow(int extra) { 093 long capacityNeeded = curBuf.position() + (long) extra; 094 if (capacityNeeded > curBuf.limit()) { 095 // guarantee it's possible to fit 096 if (capacityNeeded > MAX_ARRAY_SIZE) { 097 throw new BufferOverflowException(); 098 } 099 // double until hit the cap 100 long nextCapacity = Math.min(curBuf.capacity() * 2L, MAX_ARRAY_SIZE); 101 // but make sure there is enough if twice the existing capacity is still too small 102 nextCapacity = Math.max(nextCapacity, capacityNeeded); 103 ByteBuffer newBuf = allocate((int) nextCapacity, curBuf.isDirect()); 104 curBuf.flip(); 105 ByteBufferUtils.copyFromBufferToBuffer(curBuf, newBuf); 106 curBuf = newBuf; 107 } 108 } 109 110 // OutputStream 111 @Override 112 public void write(int b) throws IOException { 113 checkSizeAndGrow(Bytes.SIZEOF_BYTE); 114 curBuf.put((byte) b); 115 } 116 117 /** 118 * Writes the complete contents of this byte buffer output stream to the specified output stream 119 * argument. 120 * @param out the output stream to which to write the data. 121 * @exception IOException if an I/O error occurs. 122 */ 123 public void writeTo(OutputStream out) throws IOException { 124 WritableByteChannel channel = Channels.newChannel(out); 125 ByteBuffer bb = curBuf.duplicate(); 126 bb.flip(); 127 channel.write(bb); 128 } 129 130 @Override 131 public void write(byte[] b) throws IOException { 132 write(b, 0, b.length); 133 } 134 135 @Override 136 public void write(byte[] b, int off, int len) throws IOException { 137 checkSizeAndGrow(len); 138 ByteBufferUtils.copyFromArrayToBuffer(curBuf, b, off, len); 139 } 140 141 @Override 142 public void write(ByteBuffer b, int off, int len) throws IOException { 143 checkSizeAndGrow(len); 144 ByteBufferUtils.copyFromBufferToBuffer(b, curBuf, off, len); 145 } 146 147 /** 148 * Writes an <code>int</code> to the underlying output stream as four bytes, high byte first. 149 * @param i the <code>int</code> to write 150 * @throws IOException if an I/O error occurs. 151 */ 152 @Override 153 public void writeInt(int i) throws IOException { 154 checkSizeAndGrow(Bytes.SIZEOF_INT); 155 ByteBufferUtils.putInt(this.curBuf, i); 156 } 157 158 @Override 159 public void flush() throws IOException { 160 // noop 161 } 162 163 @Override 164 public void close() throws IOException { 165 // noop again. heh 166 } 167 168 public byte[] toByteArray(int offset, int length) { 169 ByteBuffer bb = curBuf.duplicate(); 170 bb.flip(); 171 172 byte[] chunk = new byte[length]; 173 174 bb.position(offset); 175 bb.get(chunk, 0, length); 176 return chunk; 177 } 178}