001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.io.IOException; 021import java.nio.ByteBuffer; 022import java.nio.channels.GatheringByteChannel; 023 024import org.apache.yetus.audience.InterfaceAudience; 025 026/** 027 * Chain of ByteBuffers. 028 * Used writing out an array of byte buffers. Writes in chunks. 029 */ 030@InterfaceAudience.Private 031class BufferChain { 032 private final ByteBuffer[] buffers; 033 private int remaining = 0; 034 private int bufferOffset = 0; 035 private int size; 036 037 BufferChain(ByteBuffer... buffers) { 038 for (ByteBuffer b : buffers) { 039 this.remaining += b.remaining(); 040 } 041 this.size = remaining; 042 this.buffers = buffers; 043 } 044 045 /** 046 * Expensive. Makes a new buffer to hold a copy of what is in contained ByteBuffers. This 047 * call drains this instance; it cannot be used subsequent to the call. 048 * @return A new byte buffer with the content of all contained ByteBuffers. 049 */ 050 byte [] getBytes() { 051 if (!hasRemaining()) throw new IllegalAccessError(); 052 byte [] bytes = new byte [this.remaining]; 053 int offset = 0; 054 for (ByteBuffer bb: this.buffers) { 055 int length = bb.remaining(); 056 bb.get(bytes, offset, length); 057 offset += length; 058 } 059 return bytes; 060 } 061 062 boolean hasRemaining() { 063 return remaining > 0; 064 } 065 066 /** 067 * Write out our chain of buffers in chunks 068 * @param channel Where to write 069 * @param chunkSize Size of chunks to write. 070 * @return Amount written. 071 * @throws IOException 072 */ 073 long write(GatheringByteChannel channel, int chunkSize) throws IOException { 074 int chunkRemaining = chunkSize; 075 ByteBuffer lastBuffer = null; 076 int bufCount = 0; 077 int restoreLimit = -1; 078 079 while (chunkRemaining > 0 && bufferOffset + bufCount < buffers.length) { 080 lastBuffer = buffers[bufferOffset + bufCount]; 081 if (!lastBuffer.hasRemaining()) { 082 bufferOffset++; 083 continue; 084 } 085 bufCount++; 086 if (lastBuffer.remaining() > chunkRemaining) { 087 restoreLimit = lastBuffer.limit(); 088 lastBuffer.limit(lastBuffer.position() + chunkRemaining); 089 chunkRemaining = 0; 090 break; 091 } else { 092 chunkRemaining -= lastBuffer.remaining(); 093 } 094 } 095 assert lastBuffer != null; 096 if (chunkRemaining == chunkSize) { 097 assert !hasRemaining(); 098 // no data left to write 099 return 0; 100 } 101 try { 102 long ret = channel.write(buffers, bufferOffset, bufCount); 103 if (ret > 0) { 104 remaining = (int) (remaining - ret); 105 } 106 return ret; 107 } finally { 108 if (restoreLimit >= 0) { 109 lastBuffer.limit(restoreLimit); 110 } 111 } 112 } 113 114 int size() { 115 return size; 116 } 117 118 ByteBuffer[] getBuffers() { 119 return this.buffers; 120 } 121}