1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.client;
21
22 import com.google.common.annotations.VisibleForTesting;
23 import com.google.common.util.concurrent.ThreadFactoryBuilder;
24
25 import java.io.IOException;
26 import java.util.AbstractMap.SimpleEntry;
27 import java.util.ArrayList;
28 import java.util.Collections;
29 import java.util.HashMap;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.concurrent.ConcurrentHashMap;
33 import java.util.concurrent.ExecutorService;
34 import java.util.concurrent.Executors;
35 import java.util.concurrent.LinkedBlockingQueue;
36 import java.util.concurrent.ScheduledExecutorService;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicInteger;
39 import java.util.concurrent.atomic.AtomicLong;
40
41 import org.apache.commons.logging.Log;
42 import org.apache.commons.logging.LogFactory;
43 import org.apache.hadoop.conf.Configuration;
44 import org.apache.hadoop.hbase.HBaseConfiguration;
45 import org.apache.hadoop.hbase.HConstants;
46 import org.apache.hadoop.hbase.HRegionInfo;
47 import org.apache.hadoop.hbase.HRegionLocation;
48 import org.apache.hadoop.hbase.ServerName;
49 import org.apache.hadoop.hbase.TableName;
50 import org.apache.hadoop.hbase.classification.InterfaceAudience;
51 import org.apache.hadoop.hbase.classification.InterfaceStability;
52 import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
53 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
54 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70 @InterfaceAudience.Public
71 @InterfaceStability.Evolving
72 public class HTableMultiplexer {
73 private static final Log LOG = LogFactory.getLog(HTableMultiplexer.class.getName());
74
75 public static final String TABLE_MULTIPLEXER_FLUSH_PERIOD_MS =
76 "hbase.tablemultiplexer.flush.period.ms";
77 public static final String TABLE_MULTIPLEXER_INIT_THREADS = "hbase.tablemultiplexer.init.threads";
78 public static final String TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE =
79 "hbase.client.max.retries.in.queue";
80
81
82 private final Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap =
83 new ConcurrentHashMap<>();
84
85 private final Configuration workerConf;
86 private final ClusterConnection conn;
87 private final ExecutorService pool;
88 private final int retryNum;
89 private final int perRegionServerBufferQueueSize;
90 private final int maxKeyValueSize;
91 private final ScheduledExecutorService executor;
92 private final long flushPeriod;
93
94
95
96
97
98
99 public HTableMultiplexer(Configuration conf, int perRegionServerBufferQueueSize)
100 throws IOException {
101 this(ConnectionFactory.createConnection(conf), conf, perRegionServerBufferQueueSize);
102 }
103
104
105
106
107
108
109
110 public HTableMultiplexer(Connection conn, Configuration conf,
111 int perRegionServerBufferQueueSize) {
112 this.conn = (ClusterConnection) conn;
113 this.pool = HTable.getDefaultExecutor(conf);
114 this.retryNum = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
115 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
116 this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize;
117 this.maxKeyValueSize = HTable.getMaxKeyValueSize(conf);
118 this.flushPeriod = conf.getLong(TABLE_MULTIPLEXER_FLUSH_PERIOD_MS, 100);
119 int initThreads = conf.getInt(TABLE_MULTIPLEXER_INIT_THREADS, 10);
120 this.executor =
121 Executors.newScheduledThreadPool(initThreads,
122 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HTableFlushWorker-%d").build());
123
124 this.workerConf = HBaseConfiguration.create(conf);
125
126
127 this.workerConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
128 }
129
130
131
132
133
134
135 @SuppressWarnings("deprecation")
136 public synchronized void close() throws IOException {
137 if (!getConnection().isClosed()) {
138 getConnection().close();
139 }
140 }
141
142
143
144
145
146
147
148
149
150 public boolean put(TableName tableName, final Put put) {
151 return put(tableName, put, this.retryNum);
152 }
153
154
155
156
157
158
159
160
161
162 public List<Put> put(TableName tableName, final List<Put> puts) {
163 if (puts == null)
164 return null;
165
166 List <Put> failedPuts = null;
167 boolean result;
168 for (Put put : puts) {
169 result = put(tableName, put, this.retryNum);
170 if (result == false) {
171
172
173 if (failedPuts == null) {
174 failedPuts = new ArrayList<Put>();
175 }
176
177 failedPuts.add(put);
178 }
179 }
180 return failedPuts;
181 }
182
183
184
185
186 @Deprecated
187 public List<Put> put(byte[] tableName, final List<Put> puts) {
188 return put(TableName.valueOf(tableName), puts);
189 }
190
191
192
193
194
195
196
197
198 public boolean put(final TableName tableName, final Put put, int retry) {
199 return _put(tableName, put, retry, false);
200 }
201
202
203
204
205
206
207
208
209
210
211 boolean _put(final TableName tableName, final Put put, int retry, boolean reloadCache) {
212 if (retry <= 0) {
213 return false;
214 }
215
216 try {
217 HTable.validatePut(put, maxKeyValueSize);
218
219 ClusterConnection conn = (ClusterConnection) getConnection();
220 HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), reloadCache);
221 if (loc != null) {
222
223 LinkedBlockingQueue<PutStatus> queue = getQueue(loc);
224
225
226 PutStatus s = new PutStatus(loc.getRegionInfo(), put, retry);
227
228 return queue.offer(s);
229 }
230 } catch (IOException e) {
231 LOG.debug("Cannot process the put " + put, e);
232 }
233 return false;
234 }
235
236
237
238
239 @Deprecated
240 public boolean put(final byte[] tableName, final Put put, int retry) {
241 return put(TableName.valueOf(tableName), put, retry);
242 }
243
244
245
246
247 @Deprecated
248 public boolean put(final byte[] tableName, Put put) {
249 return put(TableName.valueOf(tableName), put);
250 }
251
252
253
254
255 public HTableMultiplexerStatus getHTableMultiplexerStatus() {
256 return new HTableMultiplexerStatus(serverToFlushWorkerMap);
257 }
258
259 @VisibleForTesting
260 LinkedBlockingQueue<PutStatus> getQueue(HRegionLocation addr) {
261 FlushWorker worker = serverToFlushWorkerMap.get(addr);
262 if (worker == null) {
263 synchronized (this.serverToFlushWorkerMap) {
264 worker = serverToFlushWorkerMap.get(addr);
265 if (worker == null) {
266
267 worker = new FlushWorker(workerConf, this.conn, addr, this, perRegionServerBufferQueueSize,
268 pool, executor);
269 this.serverToFlushWorkerMap.put(addr, worker);
270 executor.scheduleAtFixedRate(worker, flushPeriod, flushPeriod, TimeUnit.MILLISECONDS);
271 }
272 }
273 }
274 return worker.getQueue();
275 }
276
277 @VisibleForTesting
278 ClusterConnection getConnection() {
279 return this.conn;
280 }
281
282
283
284
285
286
287 @InterfaceAudience.Public
288 @InterfaceStability.Evolving
289 public static class HTableMultiplexerStatus {
290 private long totalFailedPutCounter;
291 private long totalBufferedPutCounter;
292 private long maxLatency;
293 private long overallAverageLatency;
294 private Map<String, Long> serverToFailedCounterMap;
295 private Map<String, Long> serverToBufferedCounterMap;
296 private Map<String, Long> serverToAverageLatencyMap;
297 private Map<String, Long> serverToMaxLatencyMap;
298
299 public HTableMultiplexerStatus(
300 Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) {
301 this.totalBufferedPutCounter = 0;
302 this.totalFailedPutCounter = 0;
303 this.maxLatency = 0;
304 this.overallAverageLatency = 0;
305 this.serverToBufferedCounterMap = new HashMap<String, Long>();
306 this.serverToFailedCounterMap = new HashMap<String, Long>();
307 this.serverToAverageLatencyMap = new HashMap<String, Long>();
308 this.serverToMaxLatencyMap = new HashMap<String, Long>();
309 this.initialize(serverToFlushWorkerMap);
310 }
311
312 private void initialize(
313 Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) {
314 if (serverToFlushWorkerMap == null) {
315 return;
316 }
317
318 long averageCalcSum = 0;
319 int averageCalcCount = 0;
320 for (Map.Entry<HRegionLocation, FlushWorker> entry : serverToFlushWorkerMap
321 .entrySet()) {
322 HRegionLocation addr = entry.getKey();
323 FlushWorker worker = entry.getValue();
324
325 long bufferedCounter = worker.getTotalBufferedCount();
326 long failedCounter = worker.getTotalFailedCount();
327 long serverMaxLatency = worker.getMaxLatency();
328 AtomicAverageCounter averageCounter = worker.getAverageLatencyCounter();
329
330 SimpleEntry<Long, Integer> averageComponents = averageCounter
331 .getComponents();
332 long serverAvgLatency = averageCounter.getAndReset();
333
334 this.totalBufferedPutCounter += bufferedCounter;
335 this.totalFailedPutCounter += failedCounter;
336 if (serverMaxLatency > this.maxLatency) {
337 this.maxLatency = serverMaxLatency;
338 }
339 averageCalcSum += averageComponents.getKey();
340 averageCalcCount += averageComponents.getValue();
341
342 this.serverToBufferedCounterMap.put(addr.getHostnamePort(),
343 bufferedCounter);
344 this.serverToFailedCounterMap
345 .put(addr.getHostnamePort(),
346 failedCounter);
347 this.serverToAverageLatencyMap.put(addr.getHostnamePort(),
348 serverAvgLatency);
349 this.serverToMaxLatencyMap
350 .put(addr.getHostnamePort(),
351 serverMaxLatency);
352 }
353 this.overallAverageLatency = averageCalcCount != 0 ? averageCalcSum
354 / averageCalcCount : 0;
355 }
356
357 public long getTotalBufferedCounter() {
358 return this.totalBufferedPutCounter;
359 }
360
361 public long getTotalFailedCounter() {
362 return this.totalFailedPutCounter;
363 }
364
365 public long getMaxLatency() {
366 return this.maxLatency;
367 }
368
369 public long getOverallAverageLatency() {
370 return this.overallAverageLatency;
371 }
372
373 public Map<String, Long> getBufferedCounterForEachRegionServer() {
374 return this.serverToBufferedCounterMap;
375 }
376
377 public Map<String, Long> getFailedCounterForEachRegionServer() {
378 return this.serverToFailedCounterMap;
379 }
380
381 public Map<String, Long> getMaxLatencyForEachRegionServer() {
382 return this.serverToMaxLatencyMap;
383 }
384
385 public Map<String, Long> getAverageLatencyForEachRegionServer() {
386 return this.serverToAverageLatencyMap;
387 }
388 }
389
390 @VisibleForTesting
391 static class PutStatus {
392 public final HRegionInfo regionInfo;
393 public final Put put;
394 public final int retryCount;
395
396 public PutStatus(HRegionInfo regionInfo, Put put, int retryCount) {
397 this.regionInfo = regionInfo;
398 this.put = put;
399 this.retryCount = retryCount;
400 }
401 }
402
403
404
405
406 private static class AtomicAverageCounter {
407 private long sum;
408 private int count;
409
410 public AtomicAverageCounter() {
411 this.sum = 0L;
412 this.count = 0;
413 }
414
415 public synchronized long getAndReset() {
416 long result = this.get();
417 this.reset();
418 return result;
419 }
420
421 public synchronized long get() {
422 if (this.count == 0) {
423 return 0;
424 }
425 return this.sum / this.count;
426 }
427
428 public synchronized SimpleEntry<Long, Integer> getComponents() {
429 return new SimpleEntry<Long, Integer>(sum, count);
430 }
431
432 public synchronized void reset() {
433 this.sum = 0l;
434 this.count = 0;
435 }
436
437 public synchronized void add(long value) {
438 this.sum += value;
439 this.count++;
440 }
441 }
442
443 @VisibleForTesting
444 static class FlushWorker implements Runnable {
445 private final HRegionLocation addr;
446 private final LinkedBlockingQueue<PutStatus> queue;
447 private final HTableMultiplexer multiplexer;
448 private final AtomicLong totalFailedPutCount = new AtomicLong(0);
449 private final AtomicInteger currentProcessingCount = new AtomicInteger(0);
450 private final AtomicAverageCounter averageLatency = new AtomicAverageCounter();
451 private final AtomicLong maxLatency = new AtomicLong(0);
452
453 private final AsyncProcess ap;
454 private final List<PutStatus> processingList = new ArrayList<>();
455 private final ScheduledExecutorService executor;
456 private final int maxRetryInQueue;
457 private final AtomicInteger retryInQueue = new AtomicInteger(0);
458
459 public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr,
460 HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize,
461 ExecutorService pool, ScheduledExecutorService executor) {
462 this.addr = addr;
463 this.multiplexer = htableMultiplexer;
464 this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize);
465 RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf);
466 RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf);
467 this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory);
468 this.executor = executor;
469 this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000);
470 }
471
472 protected LinkedBlockingQueue<PutStatus> getQueue() {
473 return this.queue;
474 }
475
476 public long getTotalFailedCount() {
477 return totalFailedPutCount.get();
478 }
479
480 public long getTotalBufferedCount() {
481 return queue.size() + currentProcessingCount.get();
482 }
483
484 public AtomicAverageCounter getAverageLatencyCounter() {
485 return this.averageLatency;
486 }
487
488 public long getMaxLatency() {
489 return this.maxLatency.getAndSet(0);
490 }
491
492 boolean resubmitFailedPut(PutStatus ps, HRegionLocation oldLoc) throws IOException {
493
494 final int retryCount = ps.retryCount - 1;
495
496 if (retryCount <= 0) {
497
498 return false;
499 }
500
501 int cnt = getRetryInQueue().incrementAndGet();
502 if (cnt > getMaxRetryInQueue()) {
503
504 getRetryInQueue().decrementAndGet();
505 return false;
506 }
507
508 final Put failedPut = ps.put;
509
510 final TableName tableName = ps.regionInfo.getTable();
511
512 long delayMs = getNextDelay(retryCount);
513 if (LOG.isDebugEnabled()) {
514 LOG.debug("resubmitting after " + delayMs + "ms: " + retryCount);
515 }
516
517 getExecutor().schedule(new Runnable() {
518 @Override
519 public void run() {
520 boolean succ = false;
521 try {
522 succ = FlushWorker.this.getMultiplexer()._put(tableName, failedPut, retryCount, true);
523 } finally {
524 FlushWorker.this.getRetryInQueue().decrementAndGet();
525 if (!succ) {
526 FlushWorker.this.getTotalFailedPutCount().incrementAndGet();
527 }
528 }
529 }
530 }, delayMs, TimeUnit.MILLISECONDS);
531 return true;
532 }
533
534 @VisibleForTesting
535 long getNextDelay(int retryCount) {
536 return ConnectionUtils.getPauseTime(multiplexer.flushPeriod,
537 multiplexer.retryNum - retryCount - 1);
538 }
539
540 @VisibleForTesting
541 AtomicInteger getRetryInQueue() {
542 return this.retryInQueue;
543 }
544
545 @VisibleForTesting
546 int getMaxRetryInQueue() {
547 return this.maxRetryInQueue;
548 }
549
550 @VisibleForTesting
551 AtomicLong getTotalFailedPutCount() {
552 return this.totalFailedPutCount;
553 }
554
555 @VisibleForTesting
556 HTableMultiplexer getMultiplexer() {
557 return this.multiplexer;
558 }
559
560 @VisibleForTesting
561 ScheduledExecutorService getExecutor() {
562 return this.executor;
563 }
564
565 @Override
566 public void run() {
567 int failedCount = 0;
568 try {
569 long start = EnvironmentEdgeManager.currentTime();
570
571
572 processingList.clear();
573 queue.drainTo(processingList);
574 if (processingList.size() == 0) {
575
576 return;
577 }
578
579 currentProcessingCount.set(processingList.size());
580
581 failedCount = processingList.size();
582
583 List<Action<Row>> retainedActions = new ArrayList<>(processingList.size());
584 MultiAction<Row> actions = new MultiAction<>();
585 for (int i = 0; i < processingList.size(); i++) {
586 PutStatus putStatus = processingList.get(i);
587 Action<Row> action = new Action<Row>(putStatus.put, i);
588 actions.add(putStatus.regionInfo.getRegionName(), action);
589 retainedActions.add(action);
590 }
591
592
593 List<PutStatus> failed = null;
594 Object[] results = new Object[actions.size()];
595 ServerName server = addr.getServerName();
596 Map<ServerName, MultiAction<Row>> actionsByServer =
597 Collections.singletonMap(server, actions);
598 try {
599 AsyncRequestFuture arf =
600 ap.submitMultiActions(null, retainedActions, 0L, null, results, true, null,
601 null, actionsByServer, null);
602 arf.waitUntilDone();
603 if (arf.hasError()) {
604
605 LOG.debug("Caught some exceptions when flushing puts to region server "
606 + addr.getHostnamePort(), arf.getErrors());
607 }
608 } finally {
609 for (int i = 0; i < results.length; i++) {
610 if (results[i] instanceof Result) {
611 failedCount--;
612 } else {
613 if (failed == null) {
614 failed = new ArrayList<PutStatus>();
615 }
616 failed.add(processingList.get(i));
617 }
618 }
619 }
620
621 if (failed != null) {
622
623 for (PutStatus putStatus : failed) {
624 if (resubmitFailedPut(putStatus, this.addr)) {
625 failedCount--;
626 }
627 }
628 }
629
630 long elapsed = EnvironmentEdgeManager.currentTime() - start;
631
632 averageLatency.add(elapsed);
633 if (elapsed > maxLatency.get()) {
634 maxLatency.set(elapsed);
635 }
636
637
638 if (LOG.isDebugEnabled()) {
639 LOG.debug("Processed " + currentProcessingCount + " put requests for "
640 + addr.getHostnamePort() + " and " + failedCount + " failed"
641 + ", latency for this send: " + elapsed);
642 }
643
644
645 currentProcessingCount.set(0);
646 } catch (RuntimeException e) {
647
648
649 LOG.debug(
650 "Caught some exceptions " + e + " when flushing puts to region server "
651 + addr.getHostnamePort(), e);
652 } catch (Exception e) {
653 if (e instanceof InterruptedException) {
654 Thread.currentThread().interrupt();
655 }
656
657 LOG.debug(
658 "Caught some exceptions " + e + " when flushing puts to region server "
659 + addr.getHostnamePort(), e);
660 } finally {
661
662 this.totalFailedPutCount.addAndGet(failedCount);
663 }
664 }
665 }
666 }