001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io; 019 020import java.io.IOException; 021import java.io.OutputStream; 022import java.nio.ByteBuffer; 023import java.util.ArrayList; 024import java.util.List; 025import org.apache.hadoop.hbase.nio.ByteBuff; 026import org.apache.hadoop.hbase.nio.SingleByteBuff; 027import org.apache.hadoop.hbase.util.ByteBufferUtils; 028import org.apache.yetus.audience.InterfaceAudience; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032/** 033 * An OutputStream which writes data into ByteBuffers. It will try to get ByteBuffer, as and when 034 * needed, from the passed pool. When pool is not giving a ByteBuffer it will create one on heap. 035 * Make sure to call {@link #releaseResources()} method once the Stream usage is over and data is 036 * transferred to the wanted destination. Not thread safe! 037 */ 038@InterfaceAudience.Private 039public class ByteBufferListOutputStream extends ByteBufferOutputStream { 040 private static final Logger LOG = LoggerFactory.getLogger(ByteBufferListOutputStream.class); 041 042 private final ByteBuffAllocator allocator; 043 // Keep track of the BBs where bytes written to. We will first try to get a BB from the pool. If 044 // it is not available will make a new one our own and keep writing to that. We keep track of all 045 // the BBs that we got from pool, separately so that on closeAndPutbackBuffers, we can make sure 046 // to return back all of them to pool 047 protected List<SingleByteBuff> allBufs = new ArrayList<>(); 048 049 private boolean lastBufFlipped = false;// Indicate whether the curBuf/lastBuf is flipped already 050 051 public ByteBufferListOutputStream(ByteBuffAllocator allocator) { 052 this.allocator = allocator; 053 allocateNewBuffer(); 054 } 055 056 private void allocateNewBuffer() { 057 if (this.curBuf != null) { 058 this.curBuf.flip();// On the current buf set limit = pos and pos = 0. 059 } 060 // Get an initial ByteBuffer from the allocator. 061 SingleByteBuff sbb = allocator.allocateOneBuffer(); 062 this.curBuf = sbb.nioByteBuffers()[0]; 063 this.allBufs.add(sbb); 064 } 065 066 @Override 067 public int size() { 068 int s = 0; 069 for (int i = 0; i < this.allBufs.size() - 1; i++) { 070 s += this.allBufs.get(i).remaining(); 071 } 072 // On the last BB, it might not be flipped yet if getByteBuffers is not yet called 073 if (this.lastBufFlipped) { 074 s += this.curBuf.remaining(); 075 } else { 076 s += this.curBuf.position(); 077 } 078 return s; 079 } 080 081 @Override 082 public ByteBuffer getByteBuffer() { 083 throw new UnsupportedOperationException("This stream is not backed by a single ByteBuffer"); 084 } 085 086 @Override 087 protected void checkSizeAndGrow(int extra) { 088 long capacityNeeded = curBuf.position() + (long) extra; 089 if (capacityNeeded > curBuf.limit()) { 090 allocateNewBuffer(); 091 } 092 } 093 094 @Override 095 public void writeTo(OutputStream out) throws IOException { 096 // No usage of this API in code. Just making it as an Unsupported operation as of now 097 throw new UnsupportedOperationException(); 098 } 099 100 /** 101 * Release the resources it uses (The ByteBuffers) which are obtained from pool. Call this only 102 * when all the data is fully used. And it must be called at the end of usage else we will leak 103 * ByteBuffers from pool. 104 */ 105 public void releaseResources() { 106 try { 107 close(); 108 } catch (IOException e) { 109 LOG.debug(e.toString(), e); 110 } 111 // Return back all the BBs to pool 112 for (ByteBuff buf : this.allBufs) { 113 buf.release(); 114 } 115 this.allBufs = null; 116 this.curBuf = null; 117 } 118 119 @Override 120 public byte[] toByteArray(int offset, int length) { 121 // No usage of this API in code. Just making it as an Unsupported operation as of now 122 throw new UnsupportedOperationException(); 123 } 124 125 /** 126 * We can be assured that the buffers returned by this method are all flipped 127 * @return list of bytebuffers 128 */ 129 public List<ByteBuffer> getByteBuffers() { 130 if (!this.lastBufFlipped) { 131 this.lastBufFlipped = true; 132 // All the other BBs are already flipped while moving to the new BB. 133 curBuf.flip(); 134 } 135 List<ByteBuffer> bbs = new ArrayList<>(this.allBufs.size()); 136 for (SingleByteBuff bb : this.allBufs) { 137 bbs.add(bb.nioByteBuffers()[0]); 138 } 139 return bbs; 140 } 141 142 @Override 143 public void write(byte[] b, int off, int len) throws IOException { 144 int toWrite = 0; 145 while (len > 0) { 146 toWrite = Math.min(len, this.curBuf.remaining()); 147 ByteBufferUtils.copyFromArrayToBuffer(this.curBuf, b, off, toWrite); 148 off += toWrite; 149 len -= toWrite; 150 if (len > 0) { 151 allocateNewBuffer();// The curBuf is over. Let us move to the next one 152 } 153 } 154 } 155 156 @Override 157 public void write(ByteBuffer b, int off, int len) throws IOException { 158 int toWrite = 0; 159 while (len > 0) { 160 toWrite = Math.min(len, this.curBuf.remaining()); 161 ByteBufferUtils.copyFromBufferToBuffer(b, this.curBuf, off, toWrite); 162 off += toWrite; 163 len -= toWrite; 164 if (len > 0) { 165 allocateNewBuffer();// The curBuf is over. Let us move to the next one 166 } 167 } 168 } 169}