001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io; 019 020import java.io.IOException; 021import java.io.OutputStream; 022import java.nio.ByteBuffer; 023import java.util.ArrayList; 024import java.util.List; 025 026import org.apache.hadoop.hbase.nio.ByteBuff; 027import org.apache.hadoop.hbase.nio.SingleByteBuff; 028import org.apache.hadoop.hbase.util.ByteBufferUtils; 029import org.apache.yetus.audience.InterfaceAudience; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033/** 034 * An OutputStream which writes data into ByteBuffers. It will try to get ByteBuffer, as and when 035 * needed, from the passed pool. When pool is not giving a ByteBuffer it will create one on heap. 036 * Make sure to call {@link #releaseResources()} method once the Stream usage is over and 037 * data is transferred to the wanted destination. 038 * Not thread safe! 039 */ 040@InterfaceAudience.Private 041public class ByteBufferListOutputStream extends ByteBufferOutputStream { 042 private static final Logger LOG = LoggerFactory.getLogger(ByteBufferListOutputStream.class); 043 044 private final ByteBuffAllocator allocator; 045 // Keep track of the BBs where bytes written to. We will first try to get a BB from the pool. If 046 // it is not available will make a new one our own and keep writing to that. We keep track of all 047 // the BBs that we got from pool, separately so that on closeAndPutbackBuffers, we can make sure 048 // to return back all of them to pool 049 protected List<SingleByteBuff> allBufs = new ArrayList<>(); 050 051 private boolean lastBufFlipped = false;// Indicate whether the curBuf/lastBuf is flipped already 052 053 public ByteBufferListOutputStream(ByteBuffAllocator allocator) { 054 this.allocator = allocator; 055 allocateNewBuffer(); 056 } 057 058 private void allocateNewBuffer() { 059 if (this.curBuf != null) { 060 this.curBuf.flip();// On the current buf set limit = pos and pos = 0. 061 } 062 // Get an initial ByteBuffer from the allocator. 063 SingleByteBuff sbb = allocator.allocateOneBuffer(); 064 this.curBuf = sbb.nioByteBuffers()[0]; 065 this.allBufs.add(sbb); 066 } 067 068 @Override 069 public int size() { 070 int s = 0; 071 for (int i = 0; i < this.allBufs.size() - 1; i++) { 072 s += this.allBufs.get(i).remaining(); 073 } 074 // On the last BB, it might not be flipped yet if getByteBuffers is not yet called 075 if (this.lastBufFlipped) { 076 s += this.curBuf.remaining(); 077 } else { 078 s += this.curBuf.position(); 079 } 080 return s; 081 } 082 083 @Override 084 public ByteBuffer getByteBuffer() { 085 throw new UnsupportedOperationException("This stream is not backed by a single ByteBuffer"); 086 } 087 088 @Override 089 protected void checkSizeAndGrow(int extra) { 090 long capacityNeeded = curBuf.position() + (long) extra; 091 if (capacityNeeded > curBuf.limit()) { 092 allocateNewBuffer(); 093 } 094 } 095 096 @Override 097 public void writeTo(OutputStream out) throws IOException { 098 // No usage of this API in code. Just making it as an Unsupported operation as of now 099 throw new UnsupportedOperationException(); 100 } 101 102 /** 103 * Release the resources it uses (The ByteBuffers) which are obtained from pool. Call this only 104 * when all the data is fully used. And it must be called at the end of usage else we will leak 105 * ByteBuffers from pool. 106 */ 107 public void releaseResources() { 108 try { 109 close(); 110 } catch (IOException e) { 111 LOG.debug(e.toString(), e); 112 } 113 // Return back all the BBs to pool 114 for (ByteBuff buf : this.allBufs) { 115 buf.release(); 116 } 117 this.allBufs = null; 118 this.curBuf = null; 119 } 120 121 @Override 122 public byte[] toByteArray(int offset, int length) { 123 // No usage of this API in code. Just making it as an Unsupported operation as of now 124 throw new UnsupportedOperationException(); 125 } 126 127 /** 128 * We can be assured that the buffers returned by this method are all flipped 129 * @return list of bytebuffers 130 */ 131 public List<ByteBuffer> getByteBuffers() { 132 if (!this.lastBufFlipped) { 133 this.lastBufFlipped = true; 134 // All the other BBs are already flipped while moving to the new BB. 135 curBuf.flip(); 136 } 137 List<ByteBuffer> bbs = new ArrayList<>(this.allBufs.size()); 138 for (SingleByteBuff bb : this.allBufs) { 139 bbs.add(bb.nioByteBuffers()[0]); 140 } 141 return bbs; 142 } 143 144 @Override 145 public void write(byte[] b, int off, int len) throws IOException { 146 int toWrite = 0; 147 while (len > 0) { 148 toWrite = Math.min(len, this.curBuf.remaining()); 149 ByteBufferUtils.copyFromArrayToBuffer(this.curBuf, b, off, toWrite); 150 off += toWrite; 151 len -= toWrite; 152 if (len > 0) { 153 allocateNewBuffer();// The curBuf is over. Let us move to the next one 154 } 155 } 156 } 157 158 @Override 159 public void write(ByteBuffer b, int off, int len) throws IOException { 160 int toWrite = 0; 161 while (len > 0) { 162 toWrite = Math.min(len, this.curBuf.remaining()); 163 ByteBufferUtils.copyFromBufferToBuffer(b, this.curBuf, off, toWrite); 164 off += toWrite; 165 len -= toWrite; 166 if (len > 0) { 167 allocateNewBuffer();// The curBuf is over. Let us move to the next one 168 } 169 } 170 } 171}