1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.util;
20
21 import java.nio.ByteBuffer;
22 import java.util.concurrent.locks.Lock;
23 import java.util.concurrent.locks.ReentrantLock;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.hbase.classification.InterfaceAudience;
28 import org.apache.hadoop.util.StringUtils;
29
30
31
32
33
34
35 @InterfaceAudience.Private
36 public final class ByteBufferArray {
37 private static final Log LOG = LogFactory.getLog(ByteBufferArray.class);
38
39 static final int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
40 private ByteBuffer buffers[];
41 private Lock locks[];
42 private int bufferSize;
43 private int bufferCount;
44
45
46
47
48
49
50
51
52 public ByteBufferArray(long capacity, boolean directByteBuffer) {
53 this.bufferSize = DEFAULT_BUFFER_SIZE;
54 if (this.bufferSize > (capacity / 16))
55 this.bufferSize = (int) roundUp(capacity / 16, 32768);
56 this.bufferCount = (int) (roundUp(capacity, bufferSize) / bufferSize);
57 LOG.info("Allocating buffers total=" + StringUtils.byteDesc(capacity)
58 + ", sizePerBuffer=" + StringUtils.byteDesc(bufferSize) + ", count="
59 + bufferCount + ", direct=" + directByteBuffer);
60 buffers = new ByteBuffer[bufferCount + 1];
61 locks = new Lock[bufferCount + 1];
62 for (int i = 0; i <= bufferCount; i++) {
63 locks[i] = new ReentrantLock();
64 if (i < bufferCount) {
65 buffers[i] = directByteBuffer ? ByteBuffer.allocateDirect(bufferSize)
66 : ByteBuffer.allocate(bufferSize);
67 } else {
68 buffers[i] = ByteBuffer.allocate(0);
69 }
70
71 }
72 }
73
74 private long roundUp(long n, long to) {
75 return ((n + to - 1) / to) * to;
76 }
77
78
79
80
81
82
83
84
85 public int getMultiple(long start, int len, byte[] dstArray) {
86 return getMultiple(start, len, dstArray, 0);
87 }
88
89
90
91
92
93
94
95
96
97
98 public int getMultiple(long start, int len, byte[] dstArray, int dstOffset) {
99 multiple(start, len, dstArray, dstOffset, GET_MULTIPLE_VISTOR);
100 return len;
101 }
102
103 private final static Visitor GET_MULTIPLE_VISTOR = new Visitor() {
104 @Override
105 public void visit(ByteBuffer bb, byte[] array, int arrayIdx, int len) {
106 bb.get(array, arrayIdx, len);
107 }
108 };
109
110
111
112
113
114
115
116 public void putMultiple(long start, int len, byte[] srcArray) {
117 putMultiple(start, len, srcArray, 0);
118 }
119
120
121
122
123
124
125
126
127
128 public void putMultiple(long start, int len, byte[] srcArray, int srcOffset) {
129 multiple(start, len, srcArray, srcOffset, PUT_MULTIPLE_VISITOR);
130 }
131
132 private final static Visitor PUT_MULTIPLE_VISITOR = new Visitor() {
133 @Override
134 public void visit(ByteBuffer bb, byte[] array, int arrayIdx, int len) {
135 bb.put(array, arrayIdx, len);
136 }
137 };
138
139 private interface Visitor {
140
141
142
143
144
145
146
147
148
149 void visit(ByteBuffer bb, byte[] array, int arrayOffset, int len);
150 }
151
152
153
154
155
156
157
158
159
160
161
162
163 void multiple(long start, int len, byte[] array, int arrayOffset, Visitor visitor) {
164 assert len >= 0;
165 long end = start + len;
166 int startBuffer = (int) (start / bufferSize), startOffset = (int) (start % bufferSize);
167 int endBuffer = (int) (end / bufferSize), endOffset = (int) (end % bufferSize);
168 assert array.length >= len + arrayOffset;
169 assert startBuffer >= 0 && startBuffer < bufferCount;
170 assert endBuffer >= 0 && endBuffer < bufferCount
171 || (endBuffer == bufferCount && endOffset == 0);
172 if (startBuffer >= locks.length || startBuffer < 0) {
173 String msg = "Failed multiple, start=" + start + ",startBuffer="
174 + startBuffer + ",bufferSize=" + bufferSize;
175 LOG.error(msg);
176 throw new RuntimeException(msg);
177 }
178 int srcIndex = 0, cnt = -1;
179 for (int i = startBuffer; i <= endBuffer; ++i) {
180 Lock lock = locks[i];
181 lock.lock();
182 try {
183 ByteBuffer bb = buffers[i];
184 if (i == startBuffer) {
185 cnt = bufferSize - startOffset;
186 if (cnt > len) cnt = len;
187 bb.limit(startOffset + cnt).position(startOffset);
188 } else if (i == endBuffer) {
189 cnt = endOffset;
190 bb.limit(cnt).position(0);
191 } else {
192 cnt = bufferSize;
193 bb.limit(cnt).position(0);
194 }
195 visitor.visit(bb, array, srcIndex + arrayOffset, cnt);
196 srcIndex += cnt;
197 } finally {
198 lock.unlock();
199 }
200 }
201 assert srcIndex == len;
202 }
203 }