001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.io.IOException;
021import java.nio.ByteBuffer;
022import java.nio.channels.GatheringByteChannel;
023
024import org.apache.yetus.audience.InterfaceAudience;
025
026/**
027 * Chain of ByteBuffers.
028 * Used writing out an array of byte buffers.  Writes in chunks.
029 */
030@InterfaceAudience.Private
031class BufferChain {
032  private final ByteBuffer[] buffers;
033  private int remaining = 0;
034  private int bufferOffset = 0;
035  private int size;
036
037  BufferChain(ByteBuffer... buffers) {
038    for (ByteBuffer b : buffers) {
039      this.remaining += b.remaining();
040    }
041    this.size = remaining;
042    this.buffers = buffers;
043  }
044
045  /**
046   * Expensive.  Makes a new buffer to hold a copy of what is in contained ByteBuffers.  This
047   * call drains this instance; it cannot be used subsequent to the call.
048   * @return A new byte buffer with the content of all contained ByteBuffers.
049   */
050  byte [] getBytes() {
051    if (!hasRemaining()) throw new IllegalAccessError();
052    byte [] bytes = new byte [this.remaining];
053    int offset = 0;
054    for (ByteBuffer bb: this.buffers) {
055      int length = bb.remaining();
056      bb.get(bytes, offset, length);
057      offset += length;
058    }
059    return bytes;
060  }
061
062  boolean hasRemaining() {
063    return remaining > 0;
064  }
065
066  /**
067   * Write out our chain of buffers in chunks
068   * @param channel Where to write
069   * @param chunkSize Size of chunks to write.
070   * @return Amount written.
071   * @throws IOException
072   */
073  long write(GatheringByteChannel channel, int chunkSize) throws IOException {
074    int chunkRemaining = chunkSize;
075    ByteBuffer lastBuffer = null;
076    int bufCount = 0;
077    int restoreLimit = -1;
078
079    while (chunkRemaining > 0 && bufferOffset + bufCount < buffers.length) {
080      lastBuffer = buffers[bufferOffset + bufCount];
081      if (!lastBuffer.hasRemaining()) {
082        bufferOffset++;
083        continue;
084      }
085      bufCount++;
086      if (lastBuffer.remaining() > chunkRemaining) {
087        restoreLimit = lastBuffer.limit();
088        lastBuffer.limit(lastBuffer.position() + chunkRemaining);
089        chunkRemaining = 0;
090        break;
091      } else {
092        chunkRemaining -= lastBuffer.remaining();
093      }
094    }
095    assert lastBuffer != null;
096    if (chunkRemaining == chunkSize) {
097      assert !hasRemaining();
098      // no data left to write
099      return 0;
100    }
101    try {
102      long ret = channel.write(buffers, bufferOffset, bufCount);
103      if (ret > 0) {
104        remaining = (int) (remaining - ret);
105      }
106      return ret;
107    } finally {
108      if (restoreLimit >= 0) {
109        lastBuffer.limit(restoreLimit);
110      }
111    }
112  }
113
114  int size() {
115    return size;
116  }
117
118  ByteBuffer[] getBuffers() {
119    return this.buffers;
120  }
121}