1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.client;
21
22 import com.google.common.annotations.VisibleForTesting;
23 import com.google.common.util.concurrent.ThreadFactoryBuilder;
24
25 import java.io.IOException;
26 import java.util.AbstractMap.SimpleEntry;
27 import java.util.ArrayList;
28 import java.util.Collections;
29 import java.util.HashMap;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.concurrent.ConcurrentHashMap;
33 import java.util.concurrent.ExecutorService;
34 import java.util.concurrent.Executors;
35 import java.util.concurrent.LinkedBlockingQueue;
36 import java.util.concurrent.ScheduledExecutorService;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicInteger;
39 import java.util.concurrent.atomic.AtomicLong;
40
41 import org.apache.commons.logging.Log;
42 import org.apache.commons.logging.LogFactory;
43 import org.apache.hadoop.conf.Configuration;
44 import org.apache.hadoop.hbase.HBaseConfiguration;
45 import org.apache.hadoop.hbase.HConstants;
46 import org.apache.hadoop.hbase.HRegionInfo;
47 import org.apache.hadoop.hbase.HRegionLocation;
48 import org.apache.hadoop.hbase.ServerName;
49 import org.apache.hadoop.hbase.TableName;
50 import org.apache.hadoop.hbase.classification.InterfaceAudience;
51 import org.apache.hadoop.hbase.classification.InterfaceStability;
52 import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
53 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
54 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70 @InterfaceAudience.Public
71 @InterfaceStability.Evolving
72 public class HTableMultiplexer {
73 private static final Log LOG = LogFactory.getLog(HTableMultiplexer.class.getName());
74
75 public static final String TABLE_MULTIPLEXER_FLUSH_PERIOD_MS =
76 "hbase.tablemultiplexer.flush.period.ms";
77 public static final String TABLE_MULTIPLEXER_INIT_THREADS = "hbase.tablemultiplexer.init.threads";
78 public static final String TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE =
79 "hbase.client.max.retries.in.queue";
80
81
82 private final Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap =
83 new ConcurrentHashMap<>();
84
85 private final Configuration workerConf;
86 private final ClusterConnection conn;
87 private final ExecutorService pool;
88 private final int retryNum;
89 private final int perRegionServerBufferQueueSize;
90 private final int maxKeyValueSize;
91 private final ScheduledExecutorService executor;
92 private final long flushPeriod;
93
94
95
96
97
98
99 public HTableMultiplexer(Configuration conf, int perRegionServerBufferQueueSize)
100 throws IOException {
101 this(ConnectionFactory.createConnection(conf), conf, perRegionServerBufferQueueSize);
102 }
103
104
105
106
107
108
109
110 public HTableMultiplexer(Connection conn, Configuration conf,
111 int perRegionServerBufferQueueSize) {
112 this.conn = (ClusterConnection) conn;
113 this.pool = HTable.getDefaultExecutor(conf);
114 this.retryNum = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
115 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
116 this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize;
117 this.maxKeyValueSize = HTable.getMaxKeyValueSize(conf);
118 this.flushPeriod = conf.getLong(TABLE_MULTIPLEXER_FLUSH_PERIOD_MS, 100);
119 int initThreads = conf.getInt(TABLE_MULTIPLEXER_INIT_THREADS, 10);
120 this.executor =
121 Executors.newScheduledThreadPool(initThreads,
122 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HTableFlushWorker-%d").build());
123
124 this.workerConf = HBaseConfiguration.create(conf);
125
126
127 this.workerConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
128 }
129
130
131
132
133
134
135 @SuppressWarnings("deprecation")
136 public synchronized void close() throws IOException {
137 if (!getConnection().isClosed()) {
138 getConnection().close();
139 }
140 }
141
142
143
144
145
146
147
148
149 public boolean put(TableName tableName, final Put put) {
150 return put(tableName, put, this.retryNum);
151 }
152
153
154
155
156
157
158
159
160 public List<Put> put(TableName tableName, final List<Put> puts) {
161 if (puts == null)
162 return null;
163
164 List <Put> failedPuts = null;
165 boolean result;
166 for (Put put : puts) {
167 result = put(tableName, put, this.retryNum);
168 if (result == false) {
169
170
171 if (failedPuts == null) {
172 failedPuts = new ArrayList<Put>();
173 }
174
175 failedPuts.add(put);
176 }
177 }
178 return failedPuts;
179 }
180
181
182
183
184 @Deprecated
185 public List<Put> put(byte[] tableName, final List<Put> puts) {
186 return put(TableName.valueOf(tableName), puts);
187 }
188
189
190
191
192
193
194
195 public boolean put(final TableName tableName, final Put put, int maxAttempts) {
196 if (maxAttempts <= 0) {
197 return false;
198 }
199
200 try {
201 HTable.validatePut(put, maxKeyValueSize);
202
203 ClusterConnection conn = (ClusterConnection) getConnection();
204
205
206 HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), false);
207 if (loc != null) {
208
209 LinkedBlockingQueue<PutStatus> queue = getQueue(loc);
210
211
212 PutStatus s = new PutStatus(loc.getRegionInfo(), put, maxAttempts);
213
214 return queue.offer(s);
215 }
216 } catch (IOException e) {
217 LOG.debug("Cannot process the put " + put, e);
218 }
219 return false;
220 }
221
222
223
224
225 @Deprecated
226 public boolean put(final byte[] tableName, final Put put, int retry) {
227 return put(TableName.valueOf(tableName), put, retry);
228 }
229
230
231
232
233 @Deprecated
234 public boolean put(final byte[] tableName, Put put) {
235 return put(TableName.valueOf(tableName), put);
236 }
237
238
239
240
241 public HTableMultiplexerStatus getHTableMultiplexerStatus() {
242 return new HTableMultiplexerStatus(serverToFlushWorkerMap);
243 }
244
245 @VisibleForTesting
246 LinkedBlockingQueue<PutStatus> getQueue(HRegionLocation addr) {
247 FlushWorker worker = serverToFlushWorkerMap.get(addr);
248 if (worker == null) {
249 synchronized (this.serverToFlushWorkerMap) {
250 worker = serverToFlushWorkerMap.get(addr);
251 if (worker == null) {
252
253 worker = new FlushWorker(workerConf, this.conn, addr, this, perRegionServerBufferQueueSize,
254 pool, executor);
255 this.serverToFlushWorkerMap.put(addr, worker);
256 executor.scheduleAtFixedRate(worker, flushPeriod, flushPeriod, TimeUnit.MILLISECONDS);
257 }
258 }
259 }
260 return worker.getQueue();
261 }
262
263 @VisibleForTesting
264 ClusterConnection getConnection() {
265 return this.conn;
266 }
267
268
269
270
271
272
273 @InterfaceAudience.Public
274 @InterfaceStability.Evolving
275 public static class HTableMultiplexerStatus {
276 private long totalFailedPutCounter;
277 private long totalBufferedPutCounter;
278 private long maxLatency;
279 private long overallAverageLatency;
280 private Map<String, Long> serverToFailedCounterMap;
281 private Map<String, Long> serverToBufferedCounterMap;
282 private Map<String, Long> serverToAverageLatencyMap;
283 private Map<String, Long> serverToMaxLatencyMap;
284
285 public HTableMultiplexerStatus(
286 Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) {
287 this.totalBufferedPutCounter = 0;
288 this.totalFailedPutCounter = 0;
289 this.maxLatency = 0;
290 this.overallAverageLatency = 0;
291 this.serverToBufferedCounterMap = new HashMap<String, Long>();
292 this.serverToFailedCounterMap = new HashMap<String, Long>();
293 this.serverToAverageLatencyMap = new HashMap<String, Long>();
294 this.serverToMaxLatencyMap = new HashMap<String, Long>();
295 this.initialize(serverToFlushWorkerMap);
296 }
297
298 private void initialize(
299 Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) {
300 if (serverToFlushWorkerMap == null) {
301 return;
302 }
303
304 long averageCalcSum = 0;
305 int averageCalcCount = 0;
306 for (Map.Entry<HRegionLocation, FlushWorker> entry : serverToFlushWorkerMap
307 .entrySet()) {
308 HRegionLocation addr = entry.getKey();
309 FlushWorker worker = entry.getValue();
310
311 long bufferedCounter = worker.getTotalBufferedCount();
312 long failedCounter = worker.getTotalFailedCount();
313 long serverMaxLatency = worker.getMaxLatency();
314 AtomicAverageCounter averageCounter = worker.getAverageLatencyCounter();
315
316 SimpleEntry<Long, Integer> averageComponents = averageCounter
317 .getComponents();
318 long serverAvgLatency = averageCounter.getAndReset();
319
320 this.totalBufferedPutCounter += bufferedCounter;
321 this.totalFailedPutCounter += failedCounter;
322 if (serverMaxLatency > this.maxLatency) {
323 this.maxLatency = serverMaxLatency;
324 }
325 averageCalcSum += averageComponents.getKey();
326 averageCalcCount += averageComponents.getValue();
327
328 this.serverToBufferedCounterMap.put(addr.getHostnamePort(),
329 bufferedCounter);
330 this.serverToFailedCounterMap
331 .put(addr.getHostnamePort(),
332 failedCounter);
333 this.serverToAverageLatencyMap.put(addr.getHostnamePort(),
334 serverAvgLatency);
335 this.serverToMaxLatencyMap
336 .put(addr.getHostnamePort(),
337 serverMaxLatency);
338 }
339 this.overallAverageLatency = averageCalcCount != 0 ? averageCalcSum
340 / averageCalcCount : 0;
341 }
342
343 public long getTotalBufferedCounter() {
344 return this.totalBufferedPutCounter;
345 }
346
347 public long getTotalFailedCounter() {
348 return this.totalFailedPutCounter;
349 }
350
351 public long getMaxLatency() {
352 return this.maxLatency;
353 }
354
355 public long getOverallAverageLatency() {
356 return this.overallAverageLatency;
357 }
358
359 public Map<String, Long> getBufferedCounterForEachRegionServer() {
360 return this.serverToBufferedCounterMap;
361 }
362
363 public Map<String, Long> getFailedCounterForEachRegionServer() {
364 return this.serverToFailedCounterMap;
365 }
366
367 public Map<String, Long> getMaxLatencyForEachRegionServer() {
368 return this.serverToMaxLatencyMap;
369 }
370
371 public Map<String, Long> getAverageLatencyForEachRegionServer() {
372 return this.serverToAverageLatencyMap;
373 }
374 }
375
376 @VisibleForTesting
377 static class PutStatus {
378 public final HRegionInfo regionInfo;
379 public final Put put;
380 public final int retryCount;
381
382 public PutStatus(HRegionInfo regionInfo, Put put, int retryCount) {
383 this.regionInfo = regionInfo;
384 this.put = put;
385 this.retryCount = retryCount;
386 }
387 }
388
389
390
391
392 private static class AtomicAverageCounter {
393 private long sum;
394 private int count;
395
396 public AtomicAverageCounter() {
397 this.sum = 0L;
398 this.count = 0;
399 }
400
401 public synchronized long getAndReset() {
402 long result = this.get();
403 this.reset();
404 return result;
405 }
406
407 public synchronized long get() {
408 if (this.count == 0) {
409 return 0;
410 }
411 return this.sum / this.count;
412 }
413
414 public synchronized SimpleEntry<Long, Integer> getComponents() {
415 return new SimpleEntry<Long, Integer>(sum, count);
416 }
417
418 public synchronized void reset() {
419 this.sum = 0l;
420 this.count = 0;
421 }
422
423 public synchronized void add(long value) {
424 this.sum += value;
425 this.count++;
426 }
427 }
428
429 @VisibleForTesting
430 static class FlushWorker implements Runnable {
431 private final HRegionLocation addr;
432 private final LinkedBlockingQueue<PutStatus> queue;
433 private final HTableMultiplexer multiplexer;
434 private final AtomicLong totalFailedPutCount = new AtomicLong(0);
435 private final AtomicInteger currentProcessingCount = new AtomicInteger(0);
436 private final AtomicAverageCounter averageLatency = new AtomicAverageCounter();
437 private final AtomicLong maxLatency = new AtomicLong(0);
438
439 private final AsyncProcess ap;
440 private final List<PutStatus> processingList = new ArrayList<>();
441 private final ScheduledExecutorService executor;
442 private final int maxRetryInQueue;
443 private final AtomicInteger retryInQueue = new AtomicInteger(0);
444
445 public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr,
446 HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize,
447 ExecutorService pool, ScheduledExecutorService executor) {
448 this.addr = addr;
449 this.multiplexer = htableMultiplexer;
450 this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize);
451 RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf);
452 RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf);
453 this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory);
454 this.executor = executor;
455 this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000);
456 }
457
458 protected LinkedBlockingQueue<PutStatus> getQueue() {
459 return this.queue;
460 }
461
462 public long getTotalFailedCount() {
463 return totalFailedPutCount.get();
464 }
465
466 public long getTotalBufferedCount() {
467 return queue.size() + currentProcessingCount.get();
468 }
469
470 public AtomicAverageCounter getAverageLatencyCounter() {
471 return this.averageLatency;
472 }
473
474 public long getMaxLatency() {
475 return this.maxLatency.getAndSet(0);
476 }
477
478 boolean resubmitFailedPut(PutStatus ps, HRegionLocation oldLoc) throws IOException {
479
480 final int retryCount = ps.retryCount - 1;
481
482 if (retryCount <= 0) {
483
484 return false;
485 }
486
487 int cnt = getRetryInQueue().incrementAndGet();
488 if (cnt > getMaxRetryInQueue()) {
489
490 getRetryInQueue().decrementAndGet();
491 return false;
492 }
493
494 final Put failedPut = ps.put;
495
496 final TableName tableName = ps.regionInfo.getTable();
497
498 long delayMs = getNextDelay(retryCount);
499 if (LOG.isDebugEnabled()) {
500 LOG.debug("resubmitting after " + delayMs + "ms: " + retryCount);
501 }
502
503
504
505
506
507 getExecutor().schedule(new Runnable() {
508 @Override
509 public void run() {
510 boolean succ = false;
511 try {
512 succ = FlushWorker.this.getMultiplexer().put(tableName, failedPut, retryCount);
513 } finally {
514 FlushWorker.this.getRetryInQueue().decrementAndGet();
515 if (!succ) {
516 FlushWorker.this.getTotalFailedPutCount().incrementAndGet();
517 }
518 }
519 }
520 }, delayMs, TimeUnit.MILLISECONDS);
521 return true;
522 }
523
524 @VisibleForTesting
525 long getNextDelay(int retryCount) {
526 return ConnectionUtils.getPauseTime(multiplexer.flushPeriod,
527 multiplexer.retryNum - retryCount - 1);
528 }
529
530 @VisibleForTesting
531 AtomicInteger getRetryInQueue() {
532 return this.retryInQueue;
533 }
534
535 @VisibleForTesting
536 int getMaxRetryInQueue() {
537 return this.maxRetryInQueue;
538 }
539
540 @VisibleForTesting
541 AtomicLong getTotalFailedPutCount() {
542 return this.totalFailedPutCount;
543 }
544
545 @VisibleForTesting
546 HTableMultiplexer getMultiplexer() {
547 return this.multiplexer;
548 }
549
550 @VisibleForTesting
551 ScheduledExecutorService getExecutor() {
552 return this.executor;
553 }
554
555 @Override
556 public void run() {
557 int failedCount = 0;
558 try {
559 long start = EnvironmentEdgeManager.currentTime();
560
561
562 processingList.clear();
563 queue.drainTo(processingList);
564 if (processingList.size() == 0) {
565
566 return;
567 }
568
569 currentProcessingCount.set(processingList.size());
570
571 failedCount = processingList.size();
572
573 List<Action<Row>> retainedActions = new ArrayList<>(processingList.size());
574 MultiAction<Row> actions = new MultiAction<>();
575 for (int i = 0; i < processingList.size(); i++) {
576 PutStatus putStatus = processingList.get(i);
577 Action<Row> action = new Action<Row>(putStatus.put, i);
578 actions.add(putStatus.regionInfo.getRegionName(), action);
579 retainedActions.add(action);
580 }
581
582
583 List<PutStatus> failed = null;
584 Object[] results = new Object[actions.size()];
585 ServerName server = addr.getServerName();
586 Map<ServerName, MultiAction<Row>> actionsByServer =
587 Collections.singletonMap(server, actions);
588 try {
589 AsyncRequestFuture arf =
590 ap.submitMultiActions(null, retainedActions, 0L, null, results, true, null,
591 null, actionsByServer, null);
592 arf.waitUntilDone();
593 if (arf.hasError()) {
594
595 LOG.debug("Caught some exceptions when flushing puts to region server "
596 + addr.getHostnamePort(), arf.getErrors());
597 }
598 } finally {
599 for (int i = 0; i < results.length; i++) {
600 if (results[i] instanceof Result) {
601 failedCount--;
602 } else {
603 if (failed == null) {
604 failed = new ArrayList<PutStatus>();
605 }
606 failed.add(processingList.get(i));
607 }
608 }
609 }
610
611 if (failed != null) {
612
613 for (PutStatus putStatus : failed) {
614 if (resubmitFailedPut(putStatus, this.addr)) {
615 failedCount--;
616 }
617 }
618 }
619
620 long elapsed = EnvironmentEdgeManager.currentTime() - start;
621
622 averageLatency.add(elapsed);
623 if (elapsed > maxLatency.get()) {
624 maxLatency.set(elapsed);
625 }
626
627
628 if (LOG.isDebugEnabled()) {
629 LOG.debug("Processed " + currentProcessingCount + " put requests for "
630 + addr.getHostnamePort() + " and " + failedCount + " failed"
631 + ", latency for this send: " + elapsed);
632 }
633
634
635 currentProcessingCount.set(0);
636 } catch (RuntimeException e) {
637
638
639 LOG.debug(
640 "Caught some exceptions " + e + " when flushing puts to region server "
641 + addr.getHostnamePort(), e);
642 } catch (Exception e) {
643 if (e instanceof InterruptedException) {
644 Thread.currentThread().interrupt();
645 }
646
647 LOG.debug(
648 "Caught some exceptions " + e + " when flushing puts to region server "
649 + addr.getHostnamePort(), e);
650 } finally {
651
652 this.totalFailedPutCount.addAndGet(failedCount);
653 }
654 }
655 }
656 }