1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.ipc;
19
20 import java.io.IOException;
21 import java.nio.ByteBuffer;
22 import java.nio.channels.GatheringByteChannel;
23 import java.util.ArrayList;
24 import java.util.List;
25
26 import org.apache.hadoop.hbase.classification.InterfaceAudience;
27
28
29
30
31
32 @InterfaceAudience.Private
33 class BufferChain {
34 private final ByteBuffer[] buffers;
35 private int remaining = 0;
36 private int bufferOffset = 0;
37
38 BufferChain(ByteBuffer ... buffers) {
39
40 List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(buffers.length);
41 for (ByteBuffer b : buffers) {
42 if (b == null) continue;
43 bbs.add(b);
44 this.remaining += b.remaining();
45 }
46 this.buffers = bbs.toArray(new ByteBuffer[bbs.size()]);
47 }
48
49
50
51
52
53
54 byte [] getBytes() {
55 if (!hasRemaining()) throw new IllegalAccessError();
56 byte [] bytes = new byte [this.remaining];
57 int offset = 0;
58 for (ByteBuffer bb: this.buffers) {
59 System.arraycopy(bb.array(), bb.arrayOffset(), bytes, offset, bb.limit());
60 offset += bb.capacity();
61 }
62 return bytes;
63 }
64
65 boolean hasRemaining() {
66 return remaining > 0;
67 }
68
69
70
71
72
73
74
75
76 long write(GatheringByteChannel channel, int chunkSize) throws IOException {
77 int chunkRemaining = chunkSize;
78 ByteBuffer lastBuffer = null;
79 int bufCount = 0;
80 int restoreLimit = -1;
81
82 while (chunkRemaining > 0 && bufferOffset + bufCount < buffers.length) {
83 lastBuffer = buffers[bufferOffset + bufCount];
84 if (!lastBuffer.hasRemaining()) {
85 bufferOffset++;
86 continue;
87 }
88 bufCount++;
89 if (lastBuffer.remaining() > chunkRemaining) {
90 restoreLimit = lastBuffer.limit();
91 lastBuffer.limit(lastBuffer.position() + chunkRemaining);
92 chunkRemaining = 0;
93 break;
94 } else {
95 chunkRemaining -= lastBuffer.remaining();
96 }
97 }
98 assert lastBuffer != null;
99 if (chunkRemaining == chunkSize) {
100 assert !hasRemaining();
101
102 return 0;
103 }
104 try {
105 long ret = channel.write(buffers, bufferOffset, bufCount);
106 if (ret > 0) {
107 remaining -= ret;
108 }
109 return ret;
110 } finally {
111 if (restoreLimit >= 0) {
112 lastBuffer.limit(restoreLimit);
113 }
114 }
115 }
116 }