View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.client;
21  
22  import com.google.common.annotations.VisibleForTesting;
23  import com.google.common.util.concurrent.ThreadFactoryBuilder;
24  
25  import java.io.IOException;
26  import java.util.AbstractMap.SimpleEntry;
27  import java.util.ArrayList;
28  import java.util.Collections;
29  import java.util.HashMap;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.concurrent.ConcurrentHashMap;
33  import java.util.concurrent.ExecutorService;
34  import java.util.concurrent.Executors;
35  import java.util.concurrent.LinkedBlockingQueue;
36  import java.util.concurrent.ScheduledExecutorService;
37  import java.util.concurrent.TimeUnit;
38  import java.util.concurrent.atomic.AtomicInteger;
39  import java.util.concurrent.atomic.AtomicLong;
40  
41  import org.apache.commons.logging.Log;
42  import org.apache.commons.logging.LogFactory;
43  import org.apache.hadoop.conf.Configuration;
44  import org.apache.hadoop.hbase.HBaseConfiguration;
45  import org.apache.hadoop.hbase.HConstants;
46  import org.apache.hadoop.hbase.HRegionInfo;
47  import org.apache.hadoop.hbase.HRegionLocation;
48  import org.apache.hadoop.hbase.ServerName;
49  import org.apache.hadoop.hbase.TableName;
50  import org.apache.hadoop.hbase.classification.InterfaceAudience;
51  import org.apache.hadoop.hbase.classification.InterfaceStability;
52  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
53  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
54
55  /**
56   * HTableMultiplexer provides a thread-safe non blocking PUT API across all the tables.
57   * Each put will be sharded into different buffer queues based on its destination region server.
58   * So each region server buffer queue will only have the puts which share the same destination.
59   * And each queue will have a flush worker thread to flush the puts request to the region server.
60   * If any queue is full, the HTableMultiplexer starts to drop the Put requests for that
61   * particular queue.
62   *
63   * Also all the puts will be retried as a configuration number before dropping.
64   * And the HTableMultiplexer can report the number of buffered requests and the number of the
65   * failed (dropped) requests in total or on per region server basis.
66   *
67   * This class is thread safe.
68   */
69  @InterfaceAudience.Public
70  @InterfaceStability.Evolving
71  public class HTableMultiplexer {
72    private static final Log LOG = LogFactory.getLog(HTableMultiplexer.class.getName());
73
74    public static final String TABLE_MULTIPLEXER_FLUSH_PERIOD_MS =
75        "hbase.tablemultiplexer.flush.period.ms";
76    public static final String TABLE_MULTIPLEXER_INIT_THREADS = "hbase.tablemultiplexer.init.threads";
77    public static final String TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE =
78        "hbase.client.max.retries.in.queue";
79
80    /** The map between each region server to its flush worker */
81    private final Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap =
82        new ConcurrentHashMap<>();
83
84    private final Configuration workerConf;
85    private final ClusterConnection conn;
86    private final ExecutorService pool;
87    private final int maxAttempts;
88    private final int perRegionServerBufferQueueSize;
89    private final int maxKeyValueSize;
90    private final ScheduledExecutorService executor;
91    private final long flushPeriod;
92
93    /**
94     * @param conf The HBaseConfiguration
95     * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for
96     *          each region server before dropping the request.
97     */
98    public HTableMultiplexer(Configuration conf, int perRegionServerBufferQueueSize)
99        throws IOException {
100     this(ConnectionFactory.createConnection(conf), conf, perRegionServerBufferQueueSize);
101   }
102
103   /**
104    * @param conn The HBase connection.
105    * @param conf The HBase configuration
106    * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for
107    *          each region server before dropping the request.
108    */
109   public HTableMultiplexer(Connection conn, Configuration conf,
110       int perRegionServerBufferQueueSize) {
111     this.conn = (ClusterConnection) conn;
112     this.pool = HTable.getDefaultExecutor(conf);
113     // how many times we could try in total, one more than retry number
114     this.maxAttempts = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
115         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1;
116     this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize;
117     this.maxKeyValueSize = HTable.getMaxKeyValueSize(conf);
118     this.flushPeriod = conf.getLong(TABLE_MULTIPLEXER_FLUSH_PERIOD_MS, 100);
119     int initThreads = conf.getInt(TABLE_MULTIPLEXER_INIT_THREADS, 10);
120     this.executor =
121         Executors.newScheduledThreadPool(initThreads,
122           new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HTableFlushWorker-%d").build());
123
124     this.workerConf = HBaseConfiguration.create(conf);
125     // We do not do the retry because we need to reassign puts to different queues if regions are
126     // moved.
127     this.workerConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
128   }
129
130   /**
131    * Closes the internal {@link Connection}. Does nothing if the {@link Connection} has already
132    * been closed.
133    * @throws IOException If there is an error closing the connection.
134    */
135   @SuppressWarnings("deprecation")
136   public synchronized void close() throws IOException {
137     if (!getConnection().isClosed()) {
138       getConnection().close();
139     }
140   }
141
142   /**
143    * The put request will be buffered by its corresponding buffer queue. Return false if the queue
144    * is already full.
145    * @param tableName
146    * @param put
147    * @return true if the request can be accepted by its corresponding buffer queue.
148    */
149   public boolean put(TableName tableName, final Put put) {
150     return put(tableName, put, this.maxAttempts);
151   }
152
153   /**
154    * The puts request will be buffered by their corresponding buffer queue.
155    * Return the list of puts which could not be queued.
156    * @param tableName
157    * @param puts
158    * @return the list of puts which could not be queued
159    */
160   public List<Put> put(TableName tableName, final List<Put> puts) {
161     if (puts == null)
162       return null;
163
164     List <Put> failedPuts = null;
165     boolean result;
166     for (Put put : puts) {
167       result = put(tableName, put, this.maxAttempts);
168       if (result == false) {
169
170         // Create the failed puts list if necessary
171         if (failedPuts == null) {
172           failedPuts = new ArrayList<Put>();
173         }
174         // Add the put to the failed puts list
175         failedPuts.add(put);
176       }
177     }
178     return failedPuts;
179   }
180
181   /**
182    * @deprecated Use {@link #put(TableName, List) } instead.
183    */
184   @Deprecated
185   public List<Put> put(byte[] tableName, final List<Put> puts) {
186     return put(TableName.valueOf(tableName), puts);
187   }
188
189   /**
190    * The put request will be buffered by its corresponding buffer queue. And the put request will be
191    * retried before dropping the request.
192    * Return false if the queue is already full.
193    * @return true if the request can be accepted by its corresponding buffer queue.
194    */
195   public boolean put(final TableName tableName, final Put put, int maxAttempts) {
196     if (maxAttempts <= 0) {
197       return false;
198     }
199
200     try {
201       HTable.validatePut(put, maxKeyValueSize);
202       // Allow mocking to get at the connection, but don't expose the connection to users.
203       ClusterConnection conn = (ClusterConnection) getConnection();
204       // AsyncProcess in the FlushWorker should take care of refreshing the location cache
205       // as necessary. We shouldn't have to do that here.
206       HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), false);
207       if (loc != null) {
208         // Add the put pair into its corresponding queue.
209         LinkedBlockingQueue<PutStatus> queue = getQueue(loc);
210
211         // Generate a MultiPutStatus object and offer it into the queue
212         PutStatus s = new PutStatus(loc.getRegionInfo(), put, maxAttempts);
213
214         return queue.offer(s);
215       }
216     } catch (IOException e) {
217       LOG.debug("Cannot process the put " + put, e);
218     }
219     return false;
220   }
221
222   /**
223    * @deprecated Use {@link #put(TableName, Put) } instead.
224    */
225   @Deprecated
226   public boolean put(final byte[] tableName, final Put put, int retry) {
227     return put(TableName.valueOf(tableName), put, retry);
228   }
229
230   /**
231    * @deprecated Use {@link #put(TableName, Put)} instead.
232    */
233   @Deprecated
234   public boolean put(final byte[] tableName, Put put) {
235     return put(TableName.valueOf(tableName), put);
236   }
237
238   /**
239    * @return the current HTableMultiplexerStatus
240    */
241   public HTableMultiplexerStatus getHTableMultiplexerStatus() {
242     return new HTableMultiplexerStatus(serverToFlushWorkerMap);
243   }
244
245   @VisibleForTesting
246   LinkedBlockingQueue<PutStatus> getQueue(HRegionLocation addr) {
247     FlushWorker worker = serverToFlushWorkerMap.get(addr);
248     if (worker == null) {
249       synchronized (this.serverToFlushWorkerMap) {
250         worker = serverToFlushWorkerMap.get(addr);
251         if (worker == null) {
252           // Create the flush worker
253           worker = new FlushWorker(workerConf, this.conn, addr, this,
254               perRegionServerBufferQueueSize, pool, executor);
255           this.serverToFlushWorkerMap.put(addr, worker);
256           executor.scheduleAtFixedRate(worker, flushPeriod, flushPeriod, TimeUnit.MILLISECONDS);
257         }
258       }
259     }
260     return worker.getQueue();
261   }
262
263   @VisibleForTesting
264   ClusterConnection getConnection() {
265     return this.conn;
266   }
267
268   /**
269    * HTableMultiplexerStatus keeps track of the current status of the HTableMultiplexer.
270    * report the number of buffered requests and the number of the failed (dropped) requests
271    * in total or on per region server basis.
272    */
273   @InterfaceAudience.Public
274   @InterfaceStability.Evolving
275   public static class HTableMultiplexerStatus {
276     private long totalFailedPutCounter;
277     private long totalBufferedPutCounter;
278     private long maxLatency;
279     private long overallAverageLatency;
280     private Map<String, Long> serverToFailedCounterMap;
281     private Map<String, Long> serverToBufferedCounterMap;
282     private Map<String, Long> serverToAverageLatencyMap;
283     private Map<String, Long> serverToMaxLatencyMap;
284
285     public HTableMultiplexerStatus(
286         Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) {
287       this.totalBufferedPutCounter = 0;
288       this.totalFailedPutCounter = 0;
289       this.maxLatency = 0;
290       this.overallAverageLatency = 0;
291       this.serverToBufferedCounterMap = new HashMap<String, Long>();
292       this.serverToFailedCounterMap = new HashMap<String, Long>();
293       this.serverToAverageLatencyMap = new HashMap<String, Long>();
294       this.serverToMaxLatencyMap = new HashMap<String, Long>();
295       this.initialize(serverToFlushWorkerMap);
296     }
297
298     private void initialize(
299         Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) {
300       if (serverToFlushWorkerMap == null) {
301         return;
302       }
303
304       long averageCalcSum = 0;
305       int averageCalcCount = 0;
306       for (Map.Entry<HRegionLocation, FlushWorker> entry : serverToFlushWorkerMap
307           .entrySet()) {
308         HRegionLocation addr = entry.getKey();
309         FlushWorker worker = entry.getValue();
310
311         long bufferedCounter = worker.getTotalBufferedCount();
312         long failedCounter = worker.getTotalFailedCount();
313         long serverMaxLatency = worker.getMaxLatency();
314         AtomicAverageCounter averageCounter = worker.getAverageLatencyCounter();
315         // Get sum and count pieces separately to compute overall average
316         SimpleEntry<Long, Integer> averageComponents = averageCounter
317             .getComponents();
318         long serverAvgLatency = averageCounter.getAndReset();
319
320         this.totalBufferedPutCounter += bufferedCounter;
321         this.totalFailedPutCounter += failedCounter;
322         if (serverMaxLatency > this.maxLatency) {
323           this.maxLatency = serverMaxLatency;
324         }
325         averageCalcSum += averageComponents.getKey();
326         averageCalcCount += averageComponents.getValue();
327
328         this.serverToBufferedCounterMap.put(addr.getHostnamePort(),
329             bufferedCounter);
330         this.serverToFailedCounterMap
331             .put(addr.getHostnamePort(),
332             failedCounter);
333         this.serverToAverageLatencyMap.put(addr.getHostnamePort(),
334             serverAvgLatency);
335         this.serverToMaxLatencyMap
336             .put(addr.getHostnamePort(),
337             serverMaxLatency);
338       }
339       this.overallAverageLatency = averageCalcCount != 0 ? averageCalcSum
340           / averageCalcCount : 0;
341     }
342
343     public long getTotalBufferedCounter() {
344       return this.totalBufferedPutCounter;
345     }
346
347     public long getTotalFailedCounter() {
348       return this.totalFailedPutCounter;
349     }
350
351     public long getMaxLatency() {
352       return this.maxLatency;
353     }
354
355     public long getOverallAverageLatency() {
356       return this.overallAverageLatency;
357     }
358
359     public Map<String, Long> getBufferedCounterForEachRegionServer() {
360       return this.serverToBufferedCounterMap;
361     }
362
363     public Map<String, Long> getFailedCounterForEachRegionServer() {
364       return this.serverToFailedCounterMap;
365     }
366
367     public Map<String, Long> getMaxLatencyForEachRegionServer() {
368       return this.serverToMaxLatencyMap;
369     }
370
371     public Map<String, Long> getAverageLatencyForEachRegionServer() {
372       return this.serverToAverageLatencyMap;
373     }
374   }
375
376   @VisibleForTesting
377   static class PutStatus {
378     final HRegionInfo regionInfo;
379     final Put put;
380     final int maxAttempCount;
381
382     public PutStatus(HRegionInfo regionInfo, Put put, int maxAttempCount) {
383       this.regionInfo = regionInfo;
384       this.put = put;
385       this.maxAttempCount = maxAttempCount;
386     }
387   }
388
389   /**
390    * Helper to count the average over an interval until reset.
391    */
392   private static class AtomicAverageCounter {
393     private long sum;
394     private int count;
395
396     public AtomicAverageCounter() {
397       this.sum = 0L;
398       this.count = 0;
399     }
400
401     public synchronized long getAndReset() {
402       long result = this.get();
403       this.reset();
404       return result;
405     }
406
407     public synchronized long get() {
408       if (this.count == 0) {
409         return 0;
410       }
411       return this.sum / this.count;
412     }
413
414     public synchronized SimpleEntry<Long, Integer> getComponents() {
415       return new SimpleEntry<Long, Integer>(sum, count);
416     }
417
418     public synchronized void reset() {
419       this.sum = 0L;
420       this.count = 0;
421     }
422
423     public synchronized void add(long value) {
424       this.sum += value;
425       this.count++;
426     }
427   }
428
429   @VisibleForTesting
430   static class FlushWorker implements Runnable {
431     private final HRegionLocation addr;
432     private final LinkedBlockingQueue<PutStatus> queue;
433     private final HTableMultiplexer multiplexer;
434     private final AtomicLong totalFailedPutCount = new AtomicLong(0);
435     private final AtomicInteger currentProcessingCount = new AtomicInteger(0);
436     private final AtomicAverageCounter averageLatency = new AtomicAverageCounter();
437     private final AtomicLong maxLatency = new AtomicLong(0);
438
439     private final AsyncProcess ap;
440     private final List<PutStatus> processingList = new ArrayList<>();
441     private final ScheduledExecutorService executor;
442     private final int maxRetryInQueue;
443     private final AtomicInteger retryInQueue = new AtomicInteger(0);
444     private final int writeRpcTimeout; // needed to pass in through AsyncProcess constructor
445
446     public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr,
447         HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize,
448         ExecutorService pool, ScheduledExecutorService executor) {
449       this.addr = addr;
450       this.multiplexer = htableMultiplexer;
451       this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize);
452       RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf);
453       RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf);
454       this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
455           conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
456               HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
457       this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory, writeRpcTimeout);
458       this.executor = executor;
459       this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000);
460     }
461
462     protected LinkedBlockingQueue<PutStatus> getQueue() {
463       return this.queue;
464     }
465
466     public long getTotalFailedCount() {
467       return totalFailedPutCount.get();
468     }
469
470     public long getTotalBufferedCount() {
471       return queue.size() + currentProcessingCount.get();
472     }
473
474     public AtomicAverageCounter getAverageLatencyCounter() {
475       return this.averageLatency;
476     }
477
478     public long getMaxLatency() {
479       return this.maxLatency.getAndSet(0);
480     }
481
482     boolean resubmitFailedPut(PutStatus ps, HRegionLocation oldLoc) throws IOException {
483       // Decrease the retry count
484       final int retryCount = ps.maxAttempCount - 1;
485
486       if (retryCount <= 0) {
487         // Update the failed counter and no retry any more.
488         return false;
489       }
490
491       int cnt = getRetryInQueue().incrementAndGet();
492       if (cnt > getMaxRetryInQueue()) {
493         // Too many Puts in queue for resubmit, give up this
494         getRetryInQueue().decrementAndGet();
495         return false;
496       }
497
498       final Put failedPut = ps.put;
499       // The currentPut is failed. So get the table name for the currentPut.
500       final TableName tableName = ps.regionInfo.getTable();
501
502       long delayMs = getNextDelay(retryCount);
503       if (LOG.isDebugEnabled()) {
504         LOG.debug("resubmitting after " + delayMs + "ms: " + retryCount);
505       }
506
507       // HBASE-12198, HBASE-15221, HBASE-15232: AsyncProcess should be responsible for updating
508       // the region location cache when the Put original failed with some exception. If we keep
509       // re-trying the same Put to the same location, AsyncProcess isn't doing the right stuff
510       // that we expect it to.
511       getExecutor().schedule(new Runnable() {
512         @Override
513         public void run() {
514           boolean succ = false;
515           try {
516             succ = FlushWorker.this.getMultiplexer().put(tableName, failedPut, retryCount);
517           } finally {
518             FlushWorker.this.getRetryInQueue().decrementAndGet();
519             if (!succ) {
520               FlushWorker.this.getTotalFailedPutCount().incrementAndGet();
521             }
522           }
523         }
524       }, delayMs, TimeUnit.MILLISECONDS);
525       return true;
526     }
527
528     @VisibleForTesting
529     long getNextDelay(int retryCount) {
530       return ConnectionUtils.getPauseTime(multiplexer.flushPeriod,
531           multiplexer.maxAttempts - retryCount - 1);
532     }
533
534     @VisibleForTesting
535     AtomicInteger getRetryInQueue() {
536       return this.retryInQueue;
537     }
538
539     @VisibleForTesting
540     int getMaxRetryInQueue() {
541       return this.maxRetryInQueue;
542     }
543
544     @VisibleForTesting
545     AtomicLong getTotalFailedPutCount() {
546       return this.totalFailedPutCount;
547     }
548
549     @VisibleForTesting
550     HTableMultiplexer getMultiplexer() {
551       return this.multiplexer;
552     }
553
554     @VisibleForTesting
555     ScheduledExecutorService getExecutor() {
556       return this.executor;
557     }
558
559     @Override
560     public void run() {
561       int failedCount = 0;
562       try {
563         long start = EnvironmentEdgeManager.currentTime();
564
565         // drain all the queued puts into the tmp list
566         processingList.clear();
567         queue.drainTo(processingList);
568         if (processingList.size() == 0) {
569           // Nothing to flush
570           return;
571         }
572
573         currentProcessingCount.set(processingList.size());
574         // failedCount is decreased whenever a Put is success or resubmit.
575         failedCount = processingList.size();
576
577         List<Action<Row>> retainedActions = new ArrayList<>(processingList.size());
578         MultiAction<Row> actions = new MultiAction<>();
579         for (int i = 0; i < processingList.size(); i++) {
580           PutStatus putStatus = processingList.get(i);
581           Action<Row> action = new Action<Row>(putStatus.put, i);
582           actions.add(putStatus.regionInfo.getRegionName(), action);
583           retainedActions.add(action);
584         }
585
586         // Process this multi-put request
587         List<PutStatus> failed = null;
588         Object[] results = new Object[actions.size()];
589         ServerName server = addr.getServerName();
590         Map<ServerName, MultiAction<Row>> actionsByServer =
591             Collections.singletonMap(server, actions);
592         try {
593           AsyncRequestFuture arf =
594               ap.submitMultiActions(null, retainedActions, 0L, null, results, true, null,
595                 null, actionsByServer, null);
596           arf.waitUntilDone();
597           if (arf.hasError()) {
598             // We just log and ignore the exception here since failed Puts will be resubmit again.
599             LOG.debug("Caught some exceptions when flushing puts to region server "
600                 + addr.getHostnamePort(), arf.getErrors());
601           }
602         } finally {
603           for (int i = 0; i < results.length; i++) {
604             if (results[i] instanceof Result) {
605               failedCount--;
606             } else {
607               if (failed == null) {
608                 failed = new ArrayList<PutStatus>();
609               }
610               failed.add(processingList.get(i));
611             }
612           }
613         }
614
615         if (failed != null) {
616           // Resubmit failed puts
617           for (PutStatus putStatus : failed) {
618             if (resubmitFailedPut(putStatus, this.addr)) {
619               failedCount--;
620             }
621           }
622         }
623
624         long elapsed = EnvironmentEdgeManager.currentTime() - start;
625         // Update latency counters
626         averageLatency.add(elapsed);
627         if (elapsed > maxLatency.get()) {
628           maxLatency.set(elapsed);
629         }
630
631         // Log some basic info
632         if (LOG.isDebugEnabled()) {
633           LOG.debug("Processed " + currentProcessingCount + " put requests for "
634               + addr.getHostnamePort() + " and " + failedCount + " failed"
635               + ", latency for this send: " + elapsed);
636         }
637
638         // Reset the current processing put count
639         currentProcessingCount.set(0);
640       } catch (RuntimeException e) {
641         // To make findbugs happy
642         // Log all the exceptions and move on
643         LOG.debug(
644           "Caught some exceptions " + e + " when flushing puts to region server "
645               + addr.getHostnamePort(), e);
646       } catch (Exception e) {
647         if (e instanceof InterruptedException) {
648           Thread.currentThread().interrupt();
649         }
650         // Log all the exceptions and move on
651         LOG.debug(
652           "Caught some exceptions " + e + " when flushing puts to region server "
653               + addr.getHostnamePort(), e);
654       } finally {
655         // Update the totalFailedCount
656         this.totalFailedPutCount.addAndGet(failedCount);
657       }
658     }
659   }
660 }