View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.client;
21  
22  import java.io.IOException;
23  import java.util.AbstractMap.SimpleEntry;
24  import java.util.ArrayList;
25  import java.util.Collections;
26  import java.util.HashMap;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.concurrent.ConcurrentHashMap;
30  import java.util.concurrent.ExecutorService;
31  import java.util.concurrent.Executors;
32  import java.util.concurrent.LinkedBlockingQueue;
33  import java.util.concurrent.ScheduledExecutorService;
34  import java.util.concurrent.TimeUnit;
35  import java.util.concurrent.atomic.AtomicInteger;
36  import java.util.concurrent.atomic.AtomicLong;
37  
38  import org.apache.commons.logging.Log;
39  import org.apache.commons.logging.LogFactory;
40  import org.apache.hadoop.conf.Configuration;
41  import org.apache.hadoop.hbase.HBaseConfiguration;
42  import org.apache.hadoop.hbase.HConstants;
43  import org.apache.hadoop.hbase.HRegionInfo;
44  import org.apache.hadoop.hbase.HRegionLocation;
45  import org.apache.hadoop.hbase.ServerName;
46  import org.apache.hadoop.hbase.TableName;
47  import org.apache.hadoop.hbase.classification.InterfaceAudience;
48  import org.apache.hadoop.hbase.classification.InterfaceStability;
49  import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
50  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
51  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
52  
53  import com.google.common.util.concurrent.ThreadFactoryBuilder;
54  
55  /**
56   * HTableMultiplexer provides a thread-safe non blocking PUT API across all the tables.
57   * Each put will be sharded into different buffer queues based on its destination region server.
58   * So each region server buffer queue will only have the puts which share the same destination.
59   * And each queue will have a flush worker thread to flush the puts request to the region server.
60   * If any queue is full, the HTableMultiplexer starts to drop the Put requests for that
61   * particular queue.
62   *
63   * Also all the puts will be retried as a configuration number before dropping.
64   * And the HTableMultiplexer can report the number of buffered requests and the number of the
65   * failed (dropped) requests in total or on per region server basis.
66   *
67   * This class is thread safe.
68   */
69  @InterfaceAudience.Public
70  @InterfaceStability.Evolving
71  public class HTableMultiplexer {
72    private static final Log LOG = LogFactory.getLog(HTableMultiplexer.class.getName());
73  
74    public static final String TABLE_MULTIPLEXER_FLUSH_PERIOD_MS =
75        "hbase.tablemultiplexer.flush.period.ms";
76    public static final String TABLE_MULTIPLEXER_INIT_THREADS = "hbase.tablemultiplexer.init.threads";
77    public static final String TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE =
78        "hbase.client.max.retries.in.queue";
79  
80    /** The map between each region server to its flush worker */
81    private final Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap =
82        new ConcurrentHashMap<>();
83  
84    private final Configuration workerConf;
85    private final ClusterConnection conn;
86    private final ExecutorService pool;
87    private final int retryNum;
88    private final int perRegionServerBufferQueueSize;
89    private final int maxKeyValueSize;
90    private final ScheduledExecutorService executor;
91    private final long flushPeriod;
92  
93    /**
94     * @param conf The HBaseConfiguration
95     * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for
96     *          each region server before dropping the request.
97     */
98    public HTableMultiplexer(Configuration conf, int perRegionServerBufferQueueSize)
99        throws IOException {
100     this.conn = (ClusterConnection) ConnectionFactory.createConnection(conf);
101     this.pool = HTable.getDefaultExecutor(conf);
102     this.retryNum = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
103         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
104     this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize;
105     this.maxKeyValueSize = HTable.getMaxKeyValueSize(conf);
106     this.flushPeriod = conf.getLong(TABLE_MULTIPLEXER_FLUSH_PERIOD_MS, 100);
107     int initThreads = conf.getInt(TABLE_MULTIPLEXER_INIT_THREADS, 10);
108     this.executor =
109         Executors.newScheduledThreadPool(initThreads,
110           new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HTableFlushWorker-%d").build());
111 
112     this.workerConf = HBaseConfiguration.create(conf);
113     // We do not do the retry because we need to reassign puts to different queues if regions are
114     // moved.
115     this.workerConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
116   }
117 
118   /**
119    * The put request will be buffered by its corresponding buffer queue. Return false if the queue
120    * is already full.
121    * @param tableName
122    * @param put
123    * @return true if the request can be accepted by its corresponding buffer queue.
124    * @throws IOException
125    */
126   public boolean put(TableName tableName, final Put put) {
127     return put(tableName, put, this.retryNum);
128   }
129 
130   /**
131    * The puts request will be buffered by their corresponding buffer queue.
132    * Return the list of puts which could not be queued.
133    * @param tableName
134    * @param puts
135    * @return the list of puts which could not be queued
136    * @throws IOException
137    */
138   public List<Put> put(TableName tableName, final List<Put> puts) {
139     if (puts == null)
140       return null;
141 
142     List <Put> failedPuts = null;
143     boolean result;
144     for (Put put : puts) {
145       result = put(tableName, put, this.retryNum);
146       if (result == false) {
147 
148         // Create the failed puts list if necessary
149         if (failedPuts == null) {
150           failedPuts = new ArrayList<Put>();
151         }
152         // Add the put to the failed puts list
153         failedPuts.add(put);
154       }
155     }
156     return failedPuts;
157   }
158 
159   /**
160    * Deprecated. Use {@link #put(TableName, List) } instead.
161    */
162   @Deprecated
163   public List<Put> put(byte[] tableName, final List<Put> puts) {
164     return put(TableName.valueOf(tableName), puts);
165   }
166 
167   /**
168    * The put request will be buffered by its corresponding buffer queue. And the put request will be
169    * retried before dropping the request.
170    * Return false if the queue is already full.
171    * @return true if the request can be accepted by its corresponding buffer queue.
172    * @throws IOException
173    */
174   public boolean put(final TableName tableName, final Put put, int retry) {
175     if (retry <= 0) {
176       return false;
177     }
178 
179     try {
180       HTable.validatePut(put, maxKeyValueSize);
181       HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), false);
182       if (loc != null) {
183         // Add the put pair into its corresponding queue.
184         LinkedBlockingQueue<PutStatus> queue = getQueue(loc);
185 
186         // Generate a MultiPutStatus object and offer it into the queue
187         PutStatus s = new PutStatus(loc.getRegionInfo(), put, retry);
188 
189         return queue.offer(s);
190       }
191     } catch (IOException e) {
192       LOG.debug("Cannot process the put " + put, e);
193     }
194     return false;
195   }
196 
197   /**
198    * Deprecated. Use {@link #put(TableName, Put) } instead.
199    */
200   @Deprecated
201   public boolean put(final byte[] tableName, final Put put, int retry) {
202     return put(TableName.valueOf(tableName), put, retry);
203   }
204 
205   /**
206    * Deprecated. Use {@link #put(TableName, Put)} instead.
207    */
208   @Deprecated
209   public boolean put(final byte[] tableName, Put put) {
210     return put(TableName.valueOf(tableName), put);
211   }
212 
213   /**
214    * @return the current HTableMultiplexerStatus
215    */
216   public HTableMultiplexerStatus getHTableMultiplexerStatus() {
217     return new HTableMultiplexerStatus(serverToFlushWorkerMap);
218   }
219 
220   private LinkedBlockingQueue<PutStatus> getQueue(HRegionLocation addr) {
221     FlushWorker worker = serverToFlushWorkerMap.get(addr);
222     if (worker == null) {
223       synchronized (this.serverToFlushWorkerMap) {
224         worker = serverToFlushWorkerMap.get(addr);
225         if (worker == null) {
226           // Create the flush worker
227           worker = new FlushWorker(workerConf, this.conn, addr, this, perRegionServerBufferQueueSize,
228                   pool, executor);
229           this.serverToFlushWorkerMap.put(addr, worker);
230           executor.scheduleAtFixedRate(worker, flushPeriod, flushPeriod, TimeUnit.MILLISECONDS);
231         }
232       }
233     }
234     return worker.getQueue();
235   }
236 
237   /**
238    * HTableMultiplexerStatus keeps track of the current status of the HTableMultiplexer.
239    * report the number of buffered requests and the number of the failed (dropped) requests
240    * in total or on per region server basis.
241    */
242   @InterfaceAudience.Public
243   @InterfaceStability.Evolving
244   public static class HTableMultiplexerStatus {
245     private long totalFailedPutCounter;
246     private long totalBufferedPutCounter;
247     private long maxLatency;
248     private long overallAverageLatency;
249     private Map<String, Long> serverToFailedCounterMap;
250     private Map<String, Long> serverToBufferedCounterMap;
251     private Map<String, Long> serverToAverageLatencyMap;
252     private Map<String, Long> serverToMaxLatencyMap;
253 
254     public HTableMultiplexerStatus(
255         Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) {
256       this.totalBufferedPutCounter = 0;
257       this.totalFailedPutCounter = 0;
258       this.maxLatency = 0;
259       this.overallAverageLatency = 0;
260       this.serverToBufferedCounterMap = new HashMap<String, Long>();
261       this.serverToFailedCounterMap = new HashMap<String, Long>();
262       this.serverToAverageLatencyMap = new HashMap<String, Long>();
263       this.serverToMaxLatencyMap = new HashMap<String, Long>();
264       this.initialize(serverToFlushWorkerMap);
265     }
266 
267     private void initialize(
268         Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) {
269       if (serverToFlushWorkerMap == null) {
270         return;
271       }
272 
273       long averageCalcSum = 0;
274       int averageCalcCount = 0;
275       for (Map.Entry<HRegionLocation, FlushWorker> entry : serverToFlushWorkerMap
276           .entrySet()) {
277         HRegionLocation addr = entry.getKey();
278         FlushWorker worker = entry.getValue();
279 
280         long bufferedCounter = worker.getTotalBufferedCount();
281         long failedCounter = worker.getTotalFailedCount();
282         long serverMaxLatency = worker.getMaxLatency();
283         AtomicAverageCounter averageCounter = worker.getAverageLatencyCounter();
284         // Get sum and count pieces separately to compute overall average
285         SimpleEntry<Long, Integer> averageComponents = averageCounter
286             .getComponents();
287         long serverAvgLatency = averageCounter.getAndReset();
288 
289         this.totalBufferedPutCounter += bufferedCounter;
290         this.totalFailedPutCounter += failedCounter;
291         if (serverMaxLatency > this.maxLatency) {
292           this.maxLatency = serverMaxLatency;
293         }
294         averageCalcSum += averageComponents.getKey();
295         averageCalcCount += averageComponents.getValue();
296 
297         this.serverToBufferedCounterMap.put(addr.getHostnamePort(),
298             bufferedCounter);
299         this.serverToFailedCounterMap
300             .put(addr.getHostnamePort(),
301             failedCounter);
302         this.serverToAverageLatencyMap.put(addr.getHostnamePort(),
303             serverAvgLatency);
304         this.serverToMaxLatencyMap
305             .put(addr.getHostnamePort(),
306             serverMaxLatency);
307       }
308       this.overallAverageLatency = averageCalcCount != 0 ? averageCalcSum
309           / averageCalcCount : 0;
310     }
311 
312     public long getTotalBufferedCounter() {
313       return this.totalBufferedPutCounter;
314     }
315 
316     public long getTotalFailedCounter() {
317       return this.totalFailedPutCounter;
318     }
319 
320     public long getMaxLatency() {
321       return this.maxLatency;
322     }
323 
324     public long getOverallAverageLatency() {
325       return this.overallAverageLatency;
326     }
327 
328     public Map<String, Long> getBufferedCounterForEachRegionServer() {
329       return this.serverToBufferedCounterMap;
330     }
331 
332     public Map<String, Long> getFailedCounterForEachRegionServer() {
333       return this.serverToFailedCounterMap;
334     }
335 
336     public Map<String, Long> getMaxLatencyForEachRegionServer() {
337       return this.serverToMaxLatencyMap;
338     }
339 
340     public Map<String, Long> getAverageLatencyForEachRegionServer() {
341       return this.serverToAverageLatencyMap;
342     }
343   }
344 
345   private static class PutStatus {
346     public final HRegionInfo regionInfo;
347     public final Put put;
348     public final int retryCount;
349 
350     public PutStatus(HRegionInfo regionInfo, Put put, int retryCount) {
351       this.regionInfo = regionInfo;
352       this.put = put;
353       this.retryCount = retryCount;
354     }
355   }
356 
357   /**
358    * Helper to count the average over an interval until reset.
359    */
360   private static class AtomicAverageCounter {
361     private long sum;
362     private int count;
363 
364     public AtomicAverageCounter() {
365       this.sum = 0L;
366       this.count = 0;
367     }
368 
369     public synchronized long getAndReset() {
370       long result = this.get();
371       this.reset();
372       return result;
373     }
374 
375     public synchronized long get() {
376       if (this.count == 0) {
377         return 0;
378       }
379       return this.sum / this.count;
380     }
381 
382     public synchronized SimpleEntry<Long, Integer> getComponents() {
383       return new SimpleEntry<Long, Integer>(sum, count);
384     }
385 
386     public synchronized void reset() {
387       this.sum = 0l;
388       this.count = 0;
389     }
390 
391     public synchronized void add(long value) {
392       this.sum += value;
393       this.count++;
394     }
395   }
396 
397   private static class FlushWorker implements Runnable {
398     private final HRegionLocation addr;
399     private final LinkedBlockingQueue<PutStatus> queue;
400     private final HTableMultiplexer multiplexer;
401     private final AtomicLong totalFailedPutCount = new AtomicLong(0);
402     private final AtomicInteger currentProcessingCount = new AtomicInteger(0);
403     private final AtomicAverageCounter averageLatency = new AtomicAverageCounter();
404     private final AtomicLong maxLatency = new AtomicLong(0);
405 
406     private final AsyncProcess ap;
407     private final List<PutStatus> processingList = new ArrayList<>();
408     private final ScheduledExecutorService executor;
409     private final int maxRetryInQueue;
410     private final AtomicInteger retryInQueue = new AtomicInteger(0);
411 
412     public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr,
413         HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize,
414         ExecutorService pool, ScheduledExecutorService executor) {
415       this.addr = addr;
416       this.multiplexer = htableMultiplexer;
417       this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize);
418       RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf);
419       RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf);
420       this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory);
421       this.executor = executor;
422       this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000);
423     }
424 
425     protected LinkedBlockingQueue<PutStatus> getQueue() {
426       return this.queue;
427     }
428 
429     public long getTotalFailedCount() {
430       return totalFailedPutCount.get();
431     }
432 
433     public long getTotalBufferedCount() {
434       return queue.size() + currentProcessingCount.get();
435     }
436 
437     public AtomicAverageCounter getAverageLatencyCounter() {
438       return this.averageLatency;
439     }
440 
441     public long getMaxLatency() {
442       return this.maxLatency.getAndSet(0);
443     }
444 
445     private boolean resubmitFailedPut(PutStatus ps, HRegionLocation oldLoc) throws IOException {
446       // Decrease the retry count
447       final int retryCount = ps.retryCount - 1;
448 
449       if (retryCount <= 0) {
450         // Update the failed counter and no retry any more.
451         return false;
452       }
453 
454       int cnt = retryInQueue.incrementAndGet();
455       if (cnt > maxRetryInQueue) {
456         // Too many Puts in queue for resubmit, give up this
457         retryInQueue.decrementAndGet();
458         return false;
459       }
460 
461       final Put failedPut = ps.put;
462       // The currentPut is failed. So get the table name for the currentPut.
463       final TableName tableName = ps.regionInfo.getTable();
464 
465       long delayMs = ConnectionUtils.getPauseTime(multiplexer.flushPeriod,
466         multiplexer.retryNum - retryCount - 1);
467       if (LOG.isDebugEnabled()) {
468         LOG.debug("resubmitting after " + delayMs + "ms: " + retryCount);
469       }
470 
471       executor.schedule(new Runnable() {
472         @Override
473         public void run() {
474           boolean succ = false;
475           try {
476             succ = FlushWorker.this.multiplexer.put(tableName, failedPut, retryCount);
477           } finally {
478             FlushWorker.this.retryInQueue.decrementAndGet();
479             if (!succ) {
480               FlushWorker.this.totalFailedPutCount.incrementAndGet();
481             }
482           }
483         }
484       }, delayMs, TimeUnit.MILLISECONDS);
485       return true;
486     }
487 
488     @Override
489     public void run() {
490       int failedCount = 0;
491       try {
492         long start = EnvironmentEdgeManager.currentTime();
493 
494         // drain all the queued puts into the tmp list
495         processingList.clear();
496         queue.drainTo(processingList);
497         if (processingList.size() == 0) {
498           // Nothing to flush
499           return;
500         }
501 
502         currentProcessingCount.set(processingList.size());
503         // failedCount is decreased whenever a Put is success or resubmit.
504         failedCount = processingList.size();
505 
506         List<Action<Row>> retainedActions = new ArrayList<>(processingList.size());
507         MultiAction<Row> actions = new MultiAction<>();
508         for (int i = 0; i < processingList.size(); i++) {
509           PutStatus putStatus = processingList.get(i);
510           Action<Row> action = new Action<Row>(putStatus.put, i);
511           actions.add(putStatus.regionInfo.getRegionName(), action);
512           retainedActions.add(action);
513         }
514 
515         // Process this multi-put request
516         List<PutStatus> failed = null;
517         Object[] results = new Object[actions.size()];
518         ServerName server = addr.getServerName();
519         Map<ServerName, MultiAction<Row>> actionsByServer =
520             Collections.singletonMap(server, actions);
521         try {
522           AsyncRequestFuture arf =
523               ap.submitMultiActions(null, retainedActions, 0L, null, results, true, null,
524                 null, actionsByServer, null);
525           arf.waitUntilDone();
526           if (arf.hasError()) {
527             // We just log and ignore the exception here since failed Puts will be resubmit again.
528             LOG.debug("Caught some exceptions when flushing puts to region server "
529                 + addr.getHostnamePort(), arf.getErrors());
530           }
531         } finally {
532           for (int i = 0; i < results.length; i++) {
533             if (results[i] instanceof Result) {
534               failedCount--;
535             } else {
536               if (failed == null) {
537                 failed = new ArrayList<PutStatus>();
538               }
539               failed.add(processingList.get(i));
540             }
541           }
542         }
543 
544         if (failed != null) {
545           // Resubmit failed puts
546           for (PutStatus putStatus : failed) {
547             if (resubmitFailedPut(putStatus, this.addr)) {
548               failedCount--;
549             }
550           }
551         }
552 
553         long elapsed = EnvironmentEdgeManager.currentTime() - start;
554         // Update latency counters
555         averageLatency.add(elapsed);
556         if (elapsed > maxLatency.get()) {
557           maxLatency.set(elapsed);
558         }
559 
560         // Log some basic info
561         if (LOG.isDebugEnabled()) {
562           LOG.debug("Processed " + currentProcessingCount + " put requests for "
563               + addr.getHostnamePort() + " and " + failedCount + " failed"
564               + ", latency for this send: " + elapsed);
565         }
566 
567         // Reset the current processing put count
568         currentProcessingCount.set(0);
569       } catch (RuntimeException e) {
570         // To make findbugs happy
571         // Log all the exceptions and move on
572         LOG.debug(
573           "Caught some exceptions " + e + " when flushing puts to region server "
574               + addr.getHostnamePort(), e);
575       } catch (Exception e) {
576         if (e instanceof InterruptedException) {
577           Thread.currentThread().interrupt();
578         }
579         // Log all the exceptions and move on
580         LOG.debug(
581           "Caught some exceptions " + e + " when flushing puts to region server "
582               + addr.getHostnamePort(), e);
583       } finally {
584         // Update the totalFailedCount
585         this.totalFailedPutCount.addAndGet(failedCount);
586       }
587     }
588   }
589 }