1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.thrift;
20
21 import java.io.IOException;
22 import java.util.Arrays;
23 import java.util.List;
24 import java.util.Set;
25 import java.util.concurrent.Callable;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.ConcurrentMap;
28 import java.util.concurrent.LinkedBlockingQueue;
29 import java.util.concurrent.ThreadFactory;
30 import java.util.concurrent.ThreadPoolExecutor;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicInteger;
33 import java.util.concurrent.atomic.AtomicLong;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.hbase.KeyValue;
38 import org.apache.hadoop.hbase.client.Table;
39 import org.apache.hadoop.hbase.thrift.ThriftServerRunner.HBaseHandler;
40 import org.apache.hadoop.hbase.thrift.generated.TIncrement;
41 import org.apache.hadoop.hbase.util.Bytes;
42 import org.apache.hadoop.hbase.util.Threads;
43 import org.apache.hadoop.metrics.util.MBeanUtil;
44 import org.apache.thrift.TException;
45
46
47
48
49
50
51
52
53
54
55 public class IncrementCoalescer implements IncrementCoalescerMBean {
56
57
58
59
60
61 static class FullyQualifiedRow {
62 private byte[] table;
63 private byte[] rowKey;
64 private byte[] family;
65 private byte[] qualifier;
66
67 public FullyQualifiedRow(byte[] table, byte[] rowKey, byte[] fam, byte[] qual) {
68 super();
69 this.table = table;
70 this.rowKey = rowKey;
71 this.family = fam;
72 this.qualifier = qual;
73 }
74
75 public byte[] getTable() {
76 return table;
77 }
78
79 public void setTable(byte[] table) {
80 this.table = table;
81 }
82
83 public byte[] getRowKey() {
84 return rowKey;
85 }
86
87 public void setRowKey(byte[] rowKey) {
88 this.rowKey = rowKey;
89 }
90
91 public byte[] getFamily() {
92 return family;
93 }
94
95 public void setFamily(byte[] fam) {
96 this.family = fam;
97 }
98
99 public byte[] getQualifier() {
100 return qualifier;
101 }
102
103 public void setQualifier(byte[] qual) {
104 this.qualifier = qual;
105 }
106
107 @Override
108 public int hashCode() {
109 final int prime = 31;
110 int result = 1;
111 result = prime * result + Arrays.hashCode(family);
112 result = prime * result + Arrays.hashCode(qualifier);
113 result = prime * result + Arrays.hashCode(rowKey);
114 result = prime * result + Arrays.hashCode(table);
115 return result;
116 }
117
118 @Override
119 public boolean equals(Object obj) {
120 if (this == obj) return true;
121 if (obj == null) return false;
122 if (getClass() != obj.getClass()) return false;
123 FullyQualifiedRow other = (FullyQualifiedRow) obj;
124 if (!Arrays.equals(family, other.family)) return false;
125 if (!Arrays.equals(qualifier, other.qualifier)) return false;
126 if (!Arrays.equals(rowKey, other.rowKey)) return false;
127 if (!Arrays.equals(table, other.table)) return false;
128 return true;
129 }
130
131 }
132
133 static class DaemonThreadFactory implements ThreadFactory {
134 static final AtomicInteger poolNumber = new AtomicInteger(1);
135 final ThreadGroup group;
136 final AtomicInteger threadNumber = new AtomicInteger(1);
137 final String namePrefix;
138
139 DaemonThreadFactory() {
140 SecurityManager s = System.getSecurityManager();
141 group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
142 namePrefix = "ICV-" + poolNumber.getAndIncrement() + "-thread-";
143 }
144
145 public Thread newThread(Runnable r) {
146 Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
147 if (!t.isDaemon()) t.setDaemon(true);
148 if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY);
149 return t;
150 }
151 }
152
153 private final AtomicLong failedIncrements = new AtomicLong();
154 private final AtomicLong successfulCoalescings = new AtomicLong();
155 private final AtomicLong totalIncrements = new AtomicLong();
156 private final ConcurrentMap<FullyQualifiedRow, Long> countersMap =
157 new ConcurrentHashMap<FullyQualifiedRow, Long>(100000, 0.75f, 1500);
158 private final ThreadPoolExecutor pool;
159 private final HBaseHandler handler;
160
161 private int maxQueueSize = 500000;
162 private static final int CORE_POOL_SIZE = 1;
163
164 private static final Log LOG = LogFactory.getLog(FullyQualifiedRow.class);
165
166 @SuppressWarnings("deprecation")
167 public IncrementCoalescer(HBaseHandler hand) {
168 this.handler = hand;
169 LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
170 pool =
171 new ThreadPoolExecutor(CORE_POOL_SIZE, CORE_POOL_SIZE, 50, TimeUnit.MILLISECONDS, queue,
172 Threads.newDaemonThreadFactory("IncrementCoalescer"));
173
174 MBeanUtil.registerMBean("thrift", "Thrift", this);
175 }
176
177 public boolean queueIncrement(TIncrement inc) throws TException {
178 if (!canQueue()) {
179 failedIncrements.incrementAndGet();
180 return false;
181 }
182 return internalQueueTincrement(inc);
183 }
184
185 public boolean queueIncrements(List<TIncrement> incs) throws TException {
186 if (!canQueue()) {
187 failedIncrements.incrementAndGet();
188 return false;
189 }
190
191 for (TIncrement tinc : incs) {
192 internalQueueTincrement(tinc);
193 }
194 return true;
195
196 }
197
198 private boolean internalQueueTincrement(TIncrement inc) throws TException {
199 byte[][] famAndQf = KeyValue.parseColumn(inc.getColumn());
200 if (famAndQf.length != 2) return false;
201
202 return internalQueueIncrement(inc.getTable(), inc.getRow(), famAndQf[0], famAndQf[1],
203 inc.getAmmount());
204 }
205
206 private boolean internalQueueIncrement(byte[] tableName, byte[] rowKey, byte[] fam,
207 byte[] qual, long ammount) throws TException {
208 int countersMapSize = countersMap.size();
209
210
211
212 dynamicallySetCoreSize(countersMapSize);
213
214 totalIncrements.incrementAndGet();
215
216 FullyQualifiedRow key = new FullyQualifiedRow(tableName, rowKey, fam, qual);
217
218 long currentAmount = ammount;
219
220 while (true) {
221 Long value = countersMap.remove(key);
222 if (value == null) {
223
224 value = Long.valueOf(currentAmount);
225 } else {
226 value += currentAmount;
227 successfulCoalescings.incrementAndGet();
228 }
229
230 Long oldValue = countersMap.putIfAbsent(key, value);
231 if (oldValue == null) {
232
233 break;
234 }
235
236
237 currentAmount = value;
238 }
239
240
241
242
243 if (pool.getQueue().size() <= 1000) {
244
245 Callable<Integer> callable = createIncCallable();
246 pool.submit(callable);
247 }
248
249 return true;
250 }
251
252 public boolean canQueue() {
253 return countersMap.size() < maxQueueSize;
254 }
255
256 private Callable<Integer> createIncCallable() {
257 return new Callable<Integer>() {
258 @Override
259 public Integer call() throws Exception {
260 int failures = 0;
261 Set<FullyQualifiedRow> keys = countersMap.keySet();
262 for (FullyQualifiedRow row : keys) {
263 Long counter = countersMap.remove(row);
264 if (counter == null) {
265 continue;
266 }
267 Table table = null;
268 try {
269 table = handler.getTable(row.getTable());
270 if (failures > 2) {
271 throw new IOException("Auto-Fail rest of ICVs");
272 }
273 table.incrementColumnValue(row.getRowKey(), row.getFamily(), row.getQualifier(),
274 counter);
275 } catch (IOException e) {
276
277 failures++;
278 LOG.error("FAILED_ICV: " + Bytes.toString(row.getTable()) + ", "
279 + Bytes.toStringBinary(row.getRowKey()) + ", "
280 + Bytes.toStringBinary(row.getFamily()) + ", "
281 + Bytes.toStringBinary(row.getQualifier()) + ", " + counter, e);
282 } finally{
283 if(table != null){
284 table.close();
285 }
286 }
287 }
288 return failures;
289 }
290 };
291 }
292
293
294
295
296
297
298 private void dynamicallySetCoreSize(int countersMapSize) {
299
300
301 if (countersMapSize % 10 != 0) {
302 return;
303 }
304 double currentRatio = (double) countersMapSize / (double) maxQueueSize;
305 int newValue = 1;
306 if (currentRatio < 0.1) {
307
308 } else if (currentRatio < 0.3) {
309 newValue = 2;
310 } else if (currentRatio < 0.5) {
311 newValue = 4;
312 } else if (currentRatio < 0.7) {
313 newValue = 8;
314 } else if (currentRatio < 0.9) {
315 newValue = 14;
316 } else {
317 newValue = 22;
318 }
319 if (pool.getCorePoolSize() != newValue) {
320 pool.setCorePoolSize(newValue);
321 }
322 }
323
324
325 public int getQueueSize() {
326 return pool.getQueue().size();
327 }
328 public int getMaxQueueSize() {
329 return this.maxQueueSize;
330 }
331 public void setMaxQueueSize(int newSize) {
332 this.maxQueueSize = newSize;
333 }
334
335 public long getPoolCompletedTaskCount() {
336 return pool.getCompletedTaskCount();
337 }
338 public long getPoolTaskCount() {
339 return pool.getTaskCount();
340 }
341 public int getPoolLargestPoolSize() {
342 return pool.getLargestPoolSize();
343 }
344 public int getCorePoolSize() {
345 return pool.getCorePoolSize();
346 }
347 public void setCorePoolSize(int newCoreSize) {
348 pool.setCorePoolSize(newCoreSize);
349 }
350 public int getMaxPoolSize() {
351 return pool.getMaximumPoolSize();
352 }
353 public void setMaxPoolSize(int newMaxSize) {
354 pool.setMaximumPoolSize(newMaxSize);
355 }
356 public long getFailedIncrements() {
357 return failedIncrements.get();
358 }
359
360 public long getSuccessfulCoalescings() {
361 return successfulCoalescings.get();
362 }
363
364 public long getTotalIncrements() {
365 return totalIncrements.get();
366 }
367
368 public long getCountersMapSize() {
369 return countersMap.size();
370 }
371
372 }