View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.client;
21  
22  import com.google.common.annotations.VisibleForTesting;
23  import com.google.common.util.concurrent.ThreadFactoryBuilder;
24  
25  import java.io.IOException;
26  import java.util.AbstractMap.SimpleEntry;
27  import java.util.ArrayList;
28  import java.util.Collections;
29  import java.util.HashMap;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.concurrent.ConcurrentHashMap;
33  import java.util.concurrent.ExecutorService;
34  import java.util.concurrent.Executors;
35  import java.util.concurrent.LinkedBlockingQueue;
36  import java.util.concurrent.ScheduledExecutorService;
37  import java.util.concurrent.TimeUnit;
38  import java.util.concurrent.atomic.AtomicInteger;
39  import java.util.concurrent.atomic.AtomicLong;
40  
41  import org.apache.commons.logging.Log;
42  import org.apache.commons.logging.LogFactory;
43  import org.apache.hadoop.conf.Configuration;
44  import org.apache.hadoop.hbase.HBaseConfiguration;
45  import org.apache.hadoop.hbase.HConstants;
46  import org.apache.hadoop.hbase.HRegionInfo;
47  import org.apache.hadoop.hbase.HRegionLocation;
48  import org.apache.hadoop.hbase.ServerName;
49  import org.apache.hadoop.hbase.TableName;
50  import org.apache.hadoop.hbase.classification.InterfaceAudience;
51  import org.apache.hadoop.hbase.classification.InterfaceStability;
52  import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
53  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
54  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
55  
56  /**
57   * HTableMultiplexer provides a thread-safe non blocking PUT API across all the tables.
58   * Each put will be sharded into different buffer queues based on its destination region server.
59   * So each region server buffer queue will only have the puts which share the same destination.
60   * And each queue will have a flush worker thread to flush the puts request to the region server.
61   * If any queue is full, the HTableMultiplexer starts to drop the Put requests for that
62   * particular queue.
63   *
64   * Also all the puts will be retried as a configuration number before dropping.
65   * And the HTableMultiplexer can report the number of buffered requests and the number of the
66   * failed (dropped) requests in total or on per region server basis.
67   *
68   * This class is thread safe.
69   */
70  @InterfaceAudience.Public
71  @InterfaceStability.Evolving
72  public class HTableMultiplexer {
73    private static final Log LOG = LogFactory.getLog(HTableMultiplexer.class.getName());
74  
75    public static final String TABLE_MULTIPLEXER_FLUSH_PERIOD_MS =
76        "hbase.tablemultiplexer.flush.period.ms";
77    public static final String TABLE_MULTIPLEXER_INIT_THREADS = "hbase.tablemultiplexer.init.threads";
78    public static final String TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE =
79        "hbase.client.max.retries.in.queue";
80  
81    /** The map between each region server to its flush worker */
82    private final Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap =
83        new ConcurrentHashMap<>();
84  
85    private final Configuration workerConf;
86    private final ClusterConnection conn;
87    private final ExecutorService pool;
88    private final int retryNum;
89    private final int perRegionServerBufferQueueSize;
90    private final int maxKeyValueSize;
91    private final ScheduledExecutorService executor;
92    private final long flushPeriod;
93  
94    /**
95     * @param conf The HBaseConfiguration
96     * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for
97     *          each region server before dropping the request.
98     */
99    public HTableMultiplexer(Configuration conf, int perRegionServerBufferQueueSize)
100       throws IOException {
101     this(ConnectionFactory.createConnection(conf), conf, perRegionServerBufferQueueSize);
102   }
103 
104   /**
105    * @param conn The HBase connection.
106    * @param conf The HBase configuration
107    * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for
108    *          each region server before dropping the request.
109    */
110   public HTableMultiplexer(Connection conn, Configuration conf,
111       int perRegionServerBufferQueueSize) {
112     this.conn = (ClusterConnection) conn;
113     this.pool = HTable.getDefaultExecutor(conf);
114     this.retryNum = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
115         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
116     this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize;
117     this.maxKeyValueSize = HTable.getMaxKeyValueSize(conf);
118     this.flushPeriod = conf.getLong(TABLE_MULTIPLEXER_FLUSH_PERIOD_MS, 100);
119     int initThreads = conf.getInt(TABLE_MULTIPLEXER_INIT_THREADS, 10);
120     this.executor =
121         Executors.newScheduledThreadPool(initThreads,
122           new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HTableFlushWorker-%d").build());
123 
124     this.workerConf = HBaseConfiguration.create(conf);
125     // We do not do the retry because we need to reassign puts to different queues if regions are
126     // moved.
127     this.workerConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
128   }
129 
130   /**
131    * Closes the internal {@link Connection}. Does nothing if the {@link Connection} has already
132    * been closed.
133    * @throws IOException If there is an error closing the connection.
134    */
135   @SuppressWarnings("deprecation")
136   public synchronized void close() throws IOException {
137     if (!getConnection().isClosed()) {
138       getConnection().close();
139     }
140   }
141 
142   /**
143    * The put request will be buffered by its corresponding buffer queue. Return false if the queue
144    * is already full.
145    * @param tableName
146    * @param put
147    * @return true if the request can be accepted by its corresponding buffer queue.
148    * @throws IOException
149    */
150   public boolean put(TableName tableName, final Put put) {
151     return put(tableName, put, this.retryNum);
152   }
153 
154   /**
155    * The puts request will be buffered by their corresponding buffer queue.
156    * Return the list of puts which could not be queued.
157    * @param tableName
158    * @param puts
159    * @return the list of puts which could not be queued
160    * @throws IOException
161    */
162   public List<Put> put(TableName tableName, final List<Put> puts) {
163     if (puts == null)
164       return null;
165 
166     List <Put> failedPuts = null;
167     boolean result;
168     for (Put put : puts) {
169       result = put(tableName, put, this.retryNum);
170       if (result == false) {
171 
172         // Create the failed puts list if necessary
173         if (failedPuts == null) {
174           failedPuts = new ArrayList<Put>();
175         }
176         // Add the put to the failed puts list
177         failedPuts.add(put);
178       }
179     }
180     return failedPuts;
181   }
182 
183   /**
184    * Deprecated. Use {@link #put(TableName, List) } instead.
185    */
186   @Deprecated
187   public List<Put> put(byte[] tableName, final List<Put> puts) {
188     return put(TableName.valueOf(tableName), puts);
189   }
190 
191   /**
192    * The put request will be buffered by its corresponding buffer queue. And the put request will be
193    * retried before dropping the request.
194    * Return false if the queue is already full.
195    * @return true if the request can be accepted by its corresponding buffer queue.
196    * @throws IOException
197    */
198   public boolean put(final TableName tableName, final Put put, int retry) {
199     return _put(tableName, put, retry, false);
200   }
201 
202   /**
203    * Internal "put" which exposes a boolean flag to control whether or not the region location
204    * cache should be reloaded when trying to queue the {@link Put}.
205    * @param tableName Destination table for the Put
206    * @param put The Put to send
207    * @param retry Number of attempts to retry the {@code put}
208    * @param reloadCache Should the region location cache be reloaded
209    * @return true if the request was accepted in the queue, otherwise false
210    */
211   boolean _put(final TableName tableName, final Put put, int retry, boolean reloadCache) {
212     if (retry <= 0) {
213       return false;
214     }
215 
216     try {
217       HTable.validatePut(put, maxKeyValueSize);
218       // Allow mocking to get at the connection, but don't expose the connection to users.
219       ClusterConnection conn = (ClusterConnection) getConnection();
220       HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), reloadCache);
221       if (loc != null) {
222         // Add the put pair into its corresponding queue.
223         LinkedBlockingQueue<PutStatus> queue = getQueue(loc);
224 
225         // Generate a MultiPutStatus object and offer it into the queue
226         PutStatus s = new PutStatus(loc.getRegionInfo(), put, retry);
227 
228         return queue.offer(s);
229       }
230     } catch (IOException e) {
231       LOG.debug("Cannot process the put " + put, e);
232     }
233     return false;
234   }
235 
236   /**
237    * Deprecated. Use {@link #put(TableName, Put) } instead.
238    */
239   @Deprecated
240   public boolean put(final byte[] tableName, final Put put, int retry) {
241     return put(TableName.valueOf(tableName), put, retry);
242   }
243 
244   /**
245    * Deprecated. Use {@link #put(TableName, Put)} instead.
246    */
247   @Deprecated
248   public boolean put(final byte[] tableName, Put put) {
249     return put(TableName.valueOf(tableName), put);
250   }
251 
252   /**
253    * @return the current HTableMultiplexerStatus
254    */
255   public HTableMultiplexerStatus getHTableMultiplexerStatus() {
256     return new HTableMultiplexerStatus(serverToFlushWorkerMap);
257   }
258 
259   @VisibleForTesting
260   LinkedBlockingQueue<PutStatus> getQueue(HRegionLocation addr) {
261     FlushWorker worker = serverToFlushWorkerMap.get(addr);
262     if (worker == null) {
263       synchronized (this.serverToFlushWorkerMap) {
264         worker = serverToFlushWorkerMap.get(addr);
265         if (worker == null) {
266           // Create the flush worker
267           worker = new FlushWorker(workerConf, this.conn, addr, this, perRegionServerBufferQueueSize,
268                   pool, executor);
269           this.serverToFlushWorkerMap.put(addr, worker);
270           executor.scheduleAtFixedRate(worker, flushPeriod, flushPeriod, TimeUnit.MILLISECONDS);
271         }
272       }
273     }
274     return worker.getQueue();
275   }
276 
277   @VisibleForTesting
278   ClusterConnection getConnection() {
279     return this.conn;
280   }
281 
282   /**
283    * HTableMultiplexerStatus keeps track of the current status of the HTableMultiplexer.
284    * report the number of buffered requests and the number of the failed (dropped) requests
285    * in total or on per region server basis.
286    */
287   @InterfaceAudience.Public
288   @InterfaceStability.Evolving
289   public static class HTableMultiplexerStatus {
290     private long totalFailedPutCounter;
291     private long totalBufferedPutCounter;
292     private long maxLatency;
293     private long overallAverageLatency;
294     private Map<String, Long> serverToFailedCounterMap;
295     private Map<String, Long> serverToBufferedCounterMap;
296     private Map<String, Long> serverToAverageLatencyMap;
297     private Map<String, Long> serverToMaxLatencyMap;
298 
299     public HTableMultiplexerStatus(
300         Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) {
301       this.totalBufferedPutCounter = 0;
302       this.totalFailedPutCounter = 0;
303       this.maxLatency = 0;
304       this.overallAverageLatency = 0;
305       this.serverToBufferedCounterMap = new HashMap<String, Long>();
306       this.serverToFailedCounterMap = new HashMap<String, Long>();
307       this.serverToAverageLatencyMap = new HashMap<String, Long>();
308       this.serverToMaxLatencyMap = new HashMap<String, Long>();
309       this.initialize(serverToFlushWorkerMap);
310     }
311 
312     private void initialize(
313         Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) {
314       if (serverToFlushWorkerMap == null) {
315         return;
316       }
317 
318       long averageCalcSum = 0;
319       int averageCalcCount = 0;
320       for (Map.Entry<HRegionLocation, FlushWorker> entry : serverToFlushWorkerMap
321           .entrySet()) {
322         HRegionLocation addr = entry.getKey();
323         FlushWorker worker = entry.getValue();
324 
325         long bufferedCounter = worker.getTotalBufferedCount();
326         long failedCounter = worker.getTotalFailedCount();
327         long serverMaxLatency = worker.getMaxLatency();
328         AtomicAverageCounter averageCounter = worker.getAverageLatencyCounter();
329         // Get sum and count pieces separately to compute overall average
330         SimpleEntry<Long, Integer> averageComponents = averageCounter
331             .getComponents();
332         long serverAvgLatency = averageCounter.getAndReset();
333 
334         this.totalBufferedPutCounter += bufferedCounter;
335         this.totalFailedPutCounter += failedCounter;
336         if (serverMaxLatency > this.maxLatency) {
337           this.maxLatency = serverMaxLatency;
338         }
339         averageCalcSum += averageComponents.getKey();
340         averageCalcCount += averageComponents.getValue();
341 
342         this.serverToBufferedCounterMap.put(addr.getHostnamePort(),
343             bufferedCounter);
344         this.serverToFailedCounterMap
345             .put(addr.getHostnamePort(),
346             failedCounter);
347         this.serverToAverageLatencyMap.put(addr.getHostnamePort(),
348             serverAvgLatency);
349         this.serverToMaxLatencyMap
350             .put(addr.getHostnamePort(),
351             serverMaxLatency);
352       }
353       this.overallAverageLatency = averageCalcCount != 0 ? averageCalcSum
354           / averageCalcCount : 0;
355     }
356 
357     public long getTotalBufferedCounter() {
358       return this.totalBufferedPutCounter;
359     }
360 
361     public long getTotalFailedCounter() {
362       return this.totalFailedPutCounter;
363     }
364 
365     public long getMaxLatency() {
366       return this.maxLatency;
367     }
368 
369     public long getOverallAverageLatency() {
370       return this.overallAverageLatency;
371     }
372 
373     public Map<String, Long> getBufferedCounterForEachRegionServer() {
374       return this.serverToBufferedCounterMap;
375     }
376 
377     public Map<String, Long> getFailedCounterForEachRegionServer() {
378       return this.serverToFailedCounterMap;
379     }
380 
381     public Map<String, Long> getMaxLatencyForEachRegionServer() {
382       return this.serverToMaxLatencyMap;
383     }
384 
385     public Map<String, Long> getAverageLatencyForEachRegionServer() {
386       return this.serverToAverageLatencyMap;
387     }
388   }
389 
390   @VisibleForTesting
391   static class PutStatus {
392     public final HRegionInfo regionInfo;
393     public final Put put;
394     public final int retryCount;
395 
396     public PutStatus(HRegionInfo regionInfo, Put put, int retryCount) {
397       this.regionInfo = regionInfo;
398       this.put = put;
399       this.retryCount = retryCount;
400     }
401   }
402 
403   /**
404    * Helper to count the average over an interval until reset.
405    */
406   private static class AtomicAverageCounter {
407     private long sum;
408     private int count;
409 
410     public AtomicAverageCounter() {
411       this.sum = 0L;
412       this.count = 0;
413     }
414 
415     public synchronized long getAndReset() {
416       long result = this.get();
417       this.reset();
418       return result;
419     }
420 
421     public synchronized long get() {
422       if (this.count == 0) {
423         return 0;
424       }
425       return this.sum / this.count;
426     }
427 
428     public synchronized SimpleEntry<Long, Integer> getComponents() {
429       return new SimpleEntry<Long, Integer>(sum, count);
430     }
431 
432     public synchronized void reset() {
433       this.sum = 0l;
434       this.count = 0;
435     }
436 
437     public synchronized void add(long value) {
438       this.sum += value;
439       this.count++;
440     }
441   }
442 
443   @VisibleForTesting
444   static class FlushWorker implements Runnable {
445     private final HRegionLocation addr;
446     private final LinkedBlockingQueue<PutStatus> queue;
447     private final HTableMultiplexer multiplexer;
448     private final AtomicLong totalFailedPutCount = new AtomicLong(0);
449     private final AtomicInteger currentProcessingCount = new AtomicInteger(0);
450     private final AtomicAverageCounter averageLatency = new AtomicAverageCounter();
451     private final AtomicLong maxLatency = new AtomicLong(0);
452 
453     private final AsyncProcess ap;
454     private final List<PutStatus> processingList = new ArrayList<>();
455     private final ScheduledExecutorService executor;
456     private final int maxRetryInQueue;
457     private final AtomicInteger retryInQueue = new AtomicInteger(0);
458 
459     public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr,
460         HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize,
461         ExecutorService pool, ScheduledExecutorService executor) {
462       this.addr = addr;
463       this.multiplexer = htableMultiplexer;
464       this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize);
465       RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf);
466       RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf);
467       this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory);
468       this.executor = executor;
469       this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000);
470     }
471 
472     protected LinkedBlockingQueue<PutStatus> getQueue() {
473       return this.queue;
474     }
475 
476     public long getTotalFailedCount() {
477       return totalFailedPutCount.get();
478     }
479 
480     public long getTotalBufferedCount() {
481       return queue.size() + currentProcessingCount.get();
482     }
483 
484     public AtomicAverageCounter getAverageLatencyCounter() {
485       return this.averageLatency;
486     }
487 
488     public long getMaxLatency() {
489       return this.maxLatency.getAndSet(0);
490     }
491 
492     boolean resubmitFailedPut(PutStatus ps, HRegionLocation oldLoc) throws IOException {
493       // Decrease the retry count
494       final int retryCount = ps.retryCount - 1;
495 
496       if (retryCount <= 0) {
497         // Update the failed counter and no retry any more.
498         return false;
499       }
500 
501       int cnt = getRetryInQueue().incrementAndGet();
502       if (cnt > getMaxRetryInQueue()) {
503         // Too many Puts in queue for resubmit, give up this
504         getRetryInQueue().decrementAndGet();
505         return false;
506       }
507 
508       final Put failedPut = ps.put;
509       // The currentPut is failed. So get the table name for the currentPut.
510       final TableName tableName = ps.regionInfo.getTable();
511 
512       long delayMs = getNextDelay(retryCount);
513       if (LOG.isDebugEnabled()) {
514         LOG.debug("resubmitting after " + delayMs + "ms: " + retryCount);
515       }
516 
517       getExecutor().schedule(new Runnable() {
518         @Override
519         public void run() {
520           boolean succ = false;
521           try {
522             succ = FlushWorker.this.getMultiplexer()._put(tableName, failedPut, retryCount, true);
523           } finally {
524             FlushWorker.this.getRetryInQueue().decrementAndGet();
525             if (!succ) {
526               FlushWorker.this.getTotalFailedPutCount().incrementAndGet();
527             }
528           }
529         }
530       }, delayMs, TimeUnit.MILLISECONDS);
531       return true;
532     }
533 
534     @VisibleForTesting
535     long getNextDelay(int retryCount) {
536       return ConnectionUtils.getPauseTime(multiplexer.flushPeriod,
537           multiplexer.retryNum - retryCount - 1);
538     }
539 
540     @VisibleForTesting
541     AtomicInteger getRetryInQueue() {
542       return this.retryInQueue;
543     }
544 
545     @VisibleForTesting
546     int getMaxRetryInQueue() {
547       return this.maxRetryInQueue;
548     }
549 
550     @VisibleForTesting
551     AtomicLong getTotalFailedPutCount() {
552       return this.totalFailedPutCount;
553     }
554 
555     @VisibleForTesting
556     HTableMultiplexer getMultiplexer() {
557       return this.multiplexer;
558     }
559 
560     @VisibleForTesting
561     ScheduledExecutorService getExecutor() {
562       return this.executor;
563     }
564 
565     @Override
566     public void run() {
567       int failedCount = 0;
568       try {
569         long start = EnvironmentEdgeManager.currentTime();
570 
571         // drain all the queued puts into the tmp list
572         processingList.clear();
573         queue.drainTo(processingList);
574         if (processingList.size() == 0) {
575           // Nothing to flush
576           return;
577         }
578 
579         currentProcessingCount.set(processingList.size());
580         // failedCount is decreased whenever a Put is success or resubmit.
581         failedCount = processingList.size();
582 
583         List<Action<Row>> retainedActions = new ArrayList<>(processingList.size());
584         MultiAction<Row> actions = new MultiAction<>();
585         for (int i = 0; i < processingList.size(); i++) {
586           PutStatus putStatus = processingList.get(i);
587           Action<Row> action = new Action<Row>(putStatus.put, i);
588           actions.add(putStatus.regionInfo.getRegionName(), action);
589           retainedActions.add(action);
590         }
591 
592         // Process this multi-put request
593         List<PutStatus> failed = null;
594         Object[] results = new Object[actions.size()];
595         ServerName server = addr.getServerName();
596         Map<ServerName, MultiAction<Row>> actionsByServer =
597             Collections.singletonMap(server, actions);
598         try {
599           AsyncRequestFuture arf =
600               ap.submitMultiActions(null, retainedActions, 0L, null, results, true, null,
601                 null, actionsByServer, null);
602           arf.waitUntilDone();
603           if (arf.hasError()) {
604             // We just log and ignore the exception here since failed Puts will be resubmit again.
605             LOG.debug("Caught some exceptions when flushing puts to region server "
606                 + addr.getHostnamePort(), arf.getErrors());
607           }
608         } finally {
609           for (int i = 0; i < results.length; i++) {
610             if (results[i] instanceof Result) {
611               failedCount--;
612             } else {
613               if (failed == null) {
614                 failed = new ArrayList<PutStatus>();
615               }
616               failed.add(processingList.get(i));
617             }
618           }
619         }
620 
621         if (failed != null) {
622           // Resubmit failed puts
623           for (PutStatus putStatus : failed) {
624             if (resubmitFailedPut(putStatus, this.addr)) {
625               failedCount--;
626             }
627           }
628         }
629 
630         long elapsed = EnvironmentEdgeManager.currentTime() - start;
631         // Update latency counters
632         averageLatency.add(elapsed);
633         if (elapsed > maxLatency.get()) {
634           maxLatency.set(elapsed);
635         }
636 
637         // Log some basic info
638         if (LOG.isDebugEnabled()) {
639           LOG.debug("Processed " + currentProcessingCount + " put requests for "
640               + addr.getHostnamePort() + " and " + failedCount + " failed"
641               + ", latency for this send: " + elapsed);
642         }
643 
644         // Reset the current processing put count
645         currentProcessingCount.set(0);
646       } catch (RuntimeException e) {
647         // To make findbugs happy
648         // Log all the exceptions and move on
649         LOG.debug(
650           "Caught some exceptions " + e + " when flushing puts to region server "
651               + addr.getHostnamePort(), e);
652       } catch (Exception e) {
653         if (e instanceof InterruptedException) {
654           Thread.currentThread().interrupt();
655         }
656         // Log all the exceptions and move on
657         LOG.debug(
658           "Caught some exceptions " + e + " when flushing puts to region server "
659               + addr.getHostnamePort(), e);
660       } finally {
661         // Update the totalFailedCount
662         this.totalFailedPutCount.addAndGet(failedCount);
663       }
664     }
665   }
666 }