View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   * http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.ipc;
19  
20  import java.io.IOException;
21  import java.nio.ByteBuffer;
22  import java.nio.channels.GatheringByteChannel;
23  import java.util.ArrayList;
24  import java.util.List;
25  
26  import org.apache.hadoop.hbase.classification.InterfaceAudience;
27  
28  /**
29   * Chain of ByteBuffers.
30   * Used writing out an array of byte buffers.  Writes in chunks.
31   */
32  @InterfaceAudience.Private
33  class BufferChain {
34    private final ByteBuffer[] buffers;
35    private int remaining = 0;
36    private int bufferOffset = 0;
37  
38    BufferChain(ByteBuffer ... buffers) {
39      // Some of the incoming buffers can be null
40      List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(buffers.length);
41      for (ByteBuffer b : buffers) {
42        if (b == null) continue;
43        bbs.add(b);
44        this.remaining += b.remaining();
45      }
46      this.buffers = bbs.toArray(new ByteBuffer[bbs.size()]);
47    }
48  
49    /**
50     * Expensive.  Makes a new buffer to hold a copy of what is in contained ByteBuffers.  This
51     * call drains this instance; it cannot be used subsequent to the call.
52     * @return A new byte buffer with the content of all contained ByteBuffers.
53     */
54    byte [] getBytes() {
55      if (!hasRemaining()) throw new IllegalAccessError();
56      byte [] bytes = new byte [this.remaining];
57      int offset = 0;
58      for (ByteBuffer bb: this.buffers) {
59        System.arraycopy(bb.array(), bb.arrayOffset(), bytes, offset, bb.limit());
60        offset += bb.capacity();
61      }
62      return bytes;
63    }
64  
65    boolean hasRemaining() {
66      return remaining > 0;
67    }
68  
69    /**
70     * Write out our chain of buffers in chunks
71     * @param channel Where to write
72     * @param chunkSize Size of chunks to write.
73     * @return Amount written.
74     * @throws IOException
75     */
76    long write(GatheringByteChannel channel, int chunkSize) throws IOException {
77      int chunkRemaining = chunkSize;
78      ByteBuffer lastBuffer = null;
79      int bufCount = 0;
80      int restoreLimit = -1;
81  
82      while (chunkRemaining > 0 && bufferOffset + bufCount < buffers.length) {
83        lastBuffer = buffers[bufferOffset + bufCount];
84        if (!lastBuffer.hasRemaining()) {
85          bufferOffset++;
86          continue;
87        }
88        bufCount++;
89        if (lastBuffer.remaining() > chunkRemaining) {
90          restoreLimit = lastBuffer.limit();
91          lastBuffer.limit(lastBuffer.position() + chunkRemaining);
92          chunkRemaining = 0;
93          break;
94        } else {
95          chunkRemaining -= lastBuffer.remaining();
96        }
97      }
98      assert lastBuffer != null;
99      if (chunkRemaining == chunkSize) {
100       assert !hasRemaining();
101       // no data left to write
102       return 0;
103     }
104     try {
105       long ret = channel.write(buffers, bufferOffset, bufCount);
106       if (ret > 0) {
107         remaining -= ret;
108       }
109       return ret;
110     } finally {
111       if (restoreLimit >= 0) {
112         lastBuffer.limit(restoreLimit);
113       }
114     }
115   }
116 }