001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io; 019 020import java.io.IOException; 021import java.io.OutputStream; 022import java.nio.ByteBuffer; 023import java.util.ArrayList; 024import java.util.List; 025 026import org.apache.hadoop.hbase.util.ByteBufferUtils; 027import org.apache.yetus.audience.InterfaceAudience; 028import org.slf4j.Logger; 029import org.slf4j.LoggerFactory; 030 031/** 032 * An OutputStream which writes data into ByteBuffers. It will try to get ByteBuffer, as and when 033 * needed, from the passed pool. When pool is not giving a ByteBuffer it will create one on heap. 034 * Make sure to call {@link #releaseResources()} method once the Stream usage is over and 035 * data is transferred to the wanted destination. 036 * Not thread safe! 037 */ 038@InterfaceAudience.Private 039public class ByteBufferListOutputStream extends ByteBufferOutputStream { 040 private static final Logger LOG = LoggerFactory.getLogger(ByteBufferListOutputStream.class); 041 042 private ByteBufferPool pool; 043 // Keep track of the BBs where bytes written to. We will first try to get a BB from the pool. If 044 // it is not available will make a new one our own and keep writing to that. We keep track of all 045 // the BBs that we got from pool, separately so that on closeAndPutbackBuffers, we can make sure 046 // to return back all of them to pool 047 protected List<ByteBuffer> allBufs = new ArrayList<>(); 048 protected List<ByteBuffer> bufsFromPool = new ArrayList<>(); 049 050 private boolean lastBufFlipped = false;// Indicate whether the curBuf/lastBuf is flipped already 051 052 public ByteBufferListOutputStream(ByteBufferPool pool) { 053 this.pool = pool; 054 allocateNewBuffer(); 055 } 056 057 private void allocateNewBuffer() { 058 if (this.curBuf != null) { 059 this.curBuf.flip();// On the current buf set limit = pos and pos = 0. 060 } 061 // Get an initial BB to work with from the pool 062 this.curBuf = this.pool.getBuffer(); 063 if (this.curBuf == null) { 064 // No free BB at this moment. Make a new one. The pool returns off heap BBs. Don't make off 065 // heap BB on demand. It is difficult to account for all such and so proper sizing of Max 066 // direct heap size. See HBASE-15525 also for more details. 067 // Make BB with same size of pool's buffer size. 068 this.curBuf = ByteBuffer.allocate(this.pool.getBufferSize()); 069 } else { 070 this.bufsFromPool.add(this.curBuf); 071 } 072 this.allBufs.add(this.curBuf); 073 } 074 075 @Override 076 public int size() { 077 int s = 0; 078 for (int i = 0; i < this.allBufs.size() - 1; i++) { 079 s += this.allBufs.get(i).remaining(); 080 } 081 // On the last BB, it might not be flipped yet if getByteBuffers is not yet called 082 if (this.lastBufFlipped) { 083 s += this.curBuf.remaining(); 084 } else { 085 s += this.curBuf.position(); 086 } 087 return s; 088 } 089 090 @Override 091 public ByteBuffer getByteBuffer() { 092 throw new UnsupportedOperationException("This stream is not backed by a single ByteBuffer"); 093 } 094 095 @Override 096 protected void checkSizeAndGrow(int extra) { 097 long capacityNeeded = curBuf.position() + (long) extra; 098 if (capacityNeeded > curBuf.limit()) { 099 allocateNewBuffer(); 100 } 101 } 102 103 @Override 104 public void writeTo(OutputStream out) throws IOException { 105 // No usage of this API in code. Just making it as an Unsupported operation as of now 106 throw new UnsupportedOperationException(); 107 } 108 109 /** 110 * Release the resources it uses (The ByteBuffers) which are obtained from pool. Call this only 111 * when all the data is fully used. And it must be called at the end of usage else we will leak 112 * ByteBuffers from pool. 113 */ 114 public void releaseResources() { 115 try { 116 close(); 117 } catch (IOException e) { 118 LOG.debug(e.toString(), e); 119 } 120 // Return back all the BBs to pool 121 if (this.bufsFromPool != null) { 122 for (int i = 0; i < this.bufsFromPool.size(); i++) { 123 this.pool.putbackBuffer(this.bufsFromPool.get(i)); 124 } 125 this.bufsFromPool = null; 126 } 127 this.allBufs = null; 128 this.curBuf = null; 129 } 130 131 @Override 132 public byte[] toByteArray(int offset, int length) { 133 // No usage of this API in code. Just making it as an Unsupported operation as of now 134 throw new UnsupportedOperationException(); 135 } 136 137 /** 138 * We can be assured that the buffers returned by this method are all flipped 139 * @return list of bytebuffers 140 */ 141 public List<ByteBuffer> getByteBuffers() { 142 if (!this.lastBufFlipped) { 143 this.lastBufFlipped = true; 144 // All the other BBs are already flipped while moving to the new BB. 145 curBuf.flip(); 146 } 147 return this.allBufs; 148 } 149 150 @Override 151 public void write(byte[] b, int off, int len) throws IOException { 152 int toWrite = 0; 153 while (len > 0) { 154 toWrite = Math.min(len, this.curBuf.remaining()); 155 ByteBufferUtils.copyFromArrayToBuffer(this.curBuf, b, off, toWrite); 156 off += toWrite; 157 len -= toWrite; 158 if (len > 0) { 159 allocateNewBuffer();// The curBuf is over. Let us move to the next one 160 } 161 } 162 } 163 164 @Override 165 public void write(ByteBuffer b, int off, int len) throws IOException { 166 int toWrite = 0; 167 while (len > 0) { 168 toWrite = Math.min(len, this.curBuf.remaining()); 169 ByteBufferUtils.copyFromBufferToBuffer(b, this.curBuf, off, toWrite); 170 off += toWrite; 171 len -= toWrite; 172 if (len > 0) { 173 allocateNewBuffer();// The curBuf is over. Let us move to the next one 174 } 175 } 176 } 177}