View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.client;
21  
22  import java.io.IOException;
23  import java.util.AbstractMap.SimpleEntry;
24  import java.util.ArrayList;
25  import java.util.HashMap;
26  import java.util.HashSet;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Set;
30  import java.util.concurrent.ConcurrentHashMap;
31  import java.util.concurrent.ConcurrentSkipListMap;
32  import java.util.concurrent.LinkedBlockingQueue;
33  import java.util.concurrent.atomic.AtomicInteger;
34  import java.util.concurrent.atomic.AtomicLong;
35  
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.hadoop.classification.InterfaceAudience;
39  import org.apache.hadoop.classification.InterfaceStability;
40  import org.apache.hadoop.conf.Configuration;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.HRegionInfo;
43  import org.apache.hadoop.hbase.HRegionLocation;
44  import org.apache.hadoop.hbase.TableName;
45  import org.apache.hadoop.hbase.ZooKeeperConnectionException;
46  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
47  
48  /**
49   * HTableMultiplexer provides a thread-safe non blocking PUT API across all the tables.
50   * Each put will be sharded into different buffer queues based on its destination region server.
51   * So each region server buffer queue will only have the puts which share the same destination.
52   * And each queue will have a flush worker thread to flush the puts request to the region server.
53   * If any queue is full, the HTableMultiplexer starts to drop the Put requests for that 
54   * particular queue.
55   * 
56   * Also all the puts will be retried as a configuration number before dropping.
57   * And the HTableMultiplexer can report the number of buffered requests and the number of the
58   * failed (dropped) requests in total or on per region server basis.
59   * 
60   * This class is thread safe.
61   */
62  @InterfaceAudience.Public
63  @InterfaceStability.Evolving
64  public class HTableMultiplexer {
65    private static final Log LOG = LogFactory.getLog(HTableMultiplexer.class.getName());
66    private static int poolID = 0;
67    
68    static final String TABLE_MULTIPLEXER_FLUSH_FREQ_MS = "hbase.tablemultiplexer.flush.frequency.ms";
69  
70    private Map<TableName, HTable> tableNameToHTableMap;
71  
72    /** The map between each region server to its corresponding buffer queue */
73    private Map<HRegionLocation, LinkedBlockingQueue<PutStatus>>
74      serverToBufferQueueMap;
75  
76    /** The map between each region server to its flush worker */
77    private Map<HRegionLocation, HTableFlushWorker> serverToFlushWorkerMap;
78  
79    private Configuration conf;
80    private int retryNum;
81    private int perRegionServerBufferQueueSize;
82    
83    /**
84     * 
85     * @param conf The HBaseConfiguration
86     * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops 
87     *         for each region server before dropping the request.
88     */
89    public HTableMultiplexer(Configuration conf,
90        int perRegionServerBufferQueueSize) throws ZooKeeperConnectionException {
91      this.conf = conf;
92      this.serverToBufferQueueMap = new ConcurrentHashMap<HRegionLocation,
93        LinkedBlockingQueue<PutStatus>>();
94      this.serverToFlushWorkerMap = new ConcurrentHashMap<HRegionLocation, HTableFlushWorker>();
95      this.tableNameToHTableMap = new ConcurrentSkipListMap<TableName, HTable>();
96      this.retryNum = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
97          HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
98      this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize;
99    }
100 
101   /**
102    * The put request will be buffered by its corresponding buffer queue. Return false if the queue
103    * is already full.
104    * @param tableName
105    * @param put
106    * @return true if the request can be accepted by its corresponding buffer queue.
107    * @throws IOException
108    */
109   public boolean put(TableName tableName, final Put put) throws IOException {
110     return put(tableName, put, this.retryNum);
111   }
112 
113   public boolean put(byte[] tableName, final Put put) throws IOException {
114     return put(TableName.valueOf(tableName), put);
115   }
116 
117   /**
118    * The puts request will be buffered by their corresponding buffer queue. 
119    * Return the list of puts which could not be queued.
120    * @param tableName
121    * @param puts
122    * @return the list of puts which could not be queued
123    * @throws IOException
124    */
125   public List<Put> put(TableName tableName, final List<Put> puts)
126       throws IOException {
127     if (puts == null)
128       return null;
129     
130     List <Put> failedPuts = null;
131     boolean result;
132     for (Put put : puts) {
133       result = put(tableName, put, this.retryNum);
134       if (result == false) {
135         
136         // Create the failed puts list if necessary
137         if (failedPuts == null) {
138           failedPuts = new ArrayList<Put>();
139         }
140         // Add the put to the failed puts list
141         failedPuts.add(put);
142       }
143     }
144     return failedPuts;
145   }
146 
147   public List<Put> put(byte[] tableName, final List<Put> puts) throws IOException {
148     return put(TableName.valueOf(tableName), puts);
149   }
150 
151 
152   /**
153    * The put request will be buffered by its corresponding buffer queue. And the put request will be
154    * retried before dropping the request.
155    * Return false if the queue is already full.
156    * @param tableName
157    * @param put
158    * @param retry
159    * @return true if the request can be accepted by its corresponding buffer queue.
160    * @throws IOException
161    */
162   public boolean put(final TableName tableName, final Put put, int retry)
163       throws IOException {
164     if (retry <= 0) {
165       return false;
166     }
167 
168     LinkedBlockingQueue<PutStatus> queue;
169     HTable htable = getHTable(tableName);
170     try {
171       htable.validatePut(put);
172       HRegionLocation loc = htable.getRegionLocation(put.getRow(), false);
173       if (loc != null) {
174         // Add the put pair into its corresponding queue.
175         queue = addNewRegionServer(loc, htable);
176         // Generate a MultiPutStatus obj and offer it into the queue
177         PutStatus s = new PutStatus(loc.getRegionInfo(), put, retry);
178         
179         return queue.offer(s);
180       }
181     } catch (Exception e) {
182       LOG.debug("Cannot process the put " + put + " because of " + e);
183     }
184     return false;
185   }
186 
187   public boolean put(final byte[] tableName, final Put put, int retry)
188       throws IOException {
189     return put(TableName.valueOf(tableName), put, retry);
190   }
191 
192   /**
193    * @return the current HTableMultiplexerStatus
194    */
195   public HTableMultiplexerStatus getHTableMultiplexerStatus() {
196     return new HTableMultiplexerStatus(serverToFlushWorkerMap);
197   }
198 
199 
200   private HTable getHTable(TableName tableName) throws IOException {
201     HTable htable = this.tableNameToHTableMap.get(tableName);
202     if (htable == null) {
203       synchronized (this.tableNameToHTableMap) {
204         htable = this.tableNameToHTableMap.get(tableName);
205         if (htable == null)  {
206           htable = new HTable(conf, tableName);
207           this.tableNameToHTableMap.put(tableName, htable);
208         }
209       }
210     }
211     return htable;
212   }
213 
214   private synchronized LinkedBlockingQueue<PutStatus> addNewRegionServer(
215       HRegionLocation addr, HTable htable) {
216     LinkedBlockingQueue<PutStatus> queue =
217       serverToBufferQueueMap.get(addr);
218     if (queue == null) {
219       // Create a queue for the new region server
220       queue = new LinkedBlockingQueue<PutStatus>(perRegionServerBufferQueueSize);
221       serverToBufferQueueMap.put(addr, queue);
222 
223       // Create the flush worker
224       HTableFlushWorker worker = new HTableFlushWorker(conf, addr,
225           this, queue, htable);
226       this.serverToFlushWorkerMap.put(addr, worker);
227 
228       // Launch a daemon thread to flush the puts
229       // from the queue to its corresponding region server.
230       String name = "HTableFlushWorker-" + addr.getHostnamePort() + "-"
231           + (poolID++);
232       Thread t = new Thread(worker, name);
233       t.setDaemon(true);
234       t.start();
235     }
236     return queue;
237   }
238 
239   /**
240    * HTableMultiplexerStatus keeps track of the current status of the HTableMultiplexer.
241    * report the number of buffered requests and the number of the failed (dropped) requests
242    * in total or on per region server basis.
243    */
244   static class HTableMultiplexerStatus {
245     private long totalFailedPutCounter;
246     private long totalBufferedPutCounter;
247     private long maxLatency;
248     private long overallAverageLatency;
249     private Map<String, Long> serverToFailedCounterMap;
250     private Map<String, Long> serverToBufferedCounterMap;
251     private Map<String, Long> serverToAverageLatencyMap;
252     private Map<String, Long> serverToMaxLatencyMap;
253 
254     public HTableMultiplexerStatus(
255         Map<HRegionLocation, HTableFlushWorker> serverToFlushWorkerMap) {
256       this.totalBufferedPutCounter = 0;
257       this.totalFailedPutCounter = 0;
258       this.maxLatency = 0;
259       this.overallAverageLatency = 0;
260       this.serverToBufferedCounterMap = new HashMap<String, Long>();
261       this.serverToFailedCounterMap = new HashMap<String, Long>();
262       this.serverToAverageLatencyMap = new HashMap<String, Long>();
263       this.serverToMaxLatencyMap = new HashMap<String, Long>();
264       this.initialize(serverToFlushWorkerMap);
265     }
266 
267     private void initialize(
268         Map<HRegionLocation, HTableFlushWorker> serverToFlushWorkerMap) {
269       if (serverToFlushWorkerMap == null) {
270         return;
271       }
272 
273       long averageCalcSum = 0;
274       int averageCalcCount = 0;
275       for (Map.Entry<HRegionLocation, HTableFlushWorker> entry : serverToFlushWorkerMap
276           .entrySet()) {
277         HRegionLocation addr = entry.getKey();
278         HTableFlushWorker worker = entry.getValue();
279 
280         long bufferedCounter = worker.getTotalBufferedCount();
281         long failedCounter = worker.getTotalFailedCount();
282         long serverMaxLatency = worker.getMaxLatency();
283         AtomicAverageCounter averageCounter = worker.getAverageLatencyCounter();
284         // Get sum and count pieces separately to compute overall average
285         SimpleEntry<Long, Integer> averageComponents = averageCounter
286             .getComponents();
287         long serverAvgLatency = averageCounter.getAndReset();
288 
289         this.totalBufferedPutCounter += bufferedCounter;
290         this.totalFailedPutCounter += failedCounter;
291         if (serverMaxLatency > this.maxLatency) {
292           this.maxLatency = serverMaxLatency;
293         }
294         averageCalcSum += averageComponents.getKey();
295         averageCalcCount += averageComponents.getValue();
296 
297         this.serverToBufferedCounterMap.put(addr.getHostnamePort(),
298             bufferedCounter);
299         this.serverToFailedCounterMap
300             .put(addr.getHostnamePort(),
301             failedCounter);
302         this.serverToAverageLatencyMap.put(addr.getHostnamePort(),
303             serverAvgLatency);
304         this.serverToMaxLatencyMap
305             .put(addr.getHostnamePort(),
306             serverMaxLatency);
307       }
308       this.overallAverageLatency = averageCalcCount != 0 ? averageCalcSum
309           / averageCalcCount : 0;
310     }
311 
312     public long getTotalBufferedCounter() {
313       return this.totalBufferedPutCounter;
314     }
315 
316     public long getTotalFailedCounter() {
317       return this.totalFailedPutCounter;
318     }
319 
320     public long getMaxLatency() {
321       return this.maxLatency;
322     }
323 
324     public long getOverallAverageLatency() {
325       return this.overallAverageLatency;
326     }
327 
328     public Map<String, Long> getBufferedCounterForEachRegionServer() {
329       return this.serverToBufferedCounterMap;
330     }
331 
332     public Map<String, Long> getFailedCounterForEachRegionServer() {
333       return this.serverToFailedCounterMap;
334     }
335 
336     public Map<String, Long> getMaxLatencyForEachRegionServer() {
337       return this.serverToMaxLatencyMap;
338     }
339 
340     public Map<String, Long> getAverageLatencyForEachRegionServer() {
341       return this.serverToAverageLatencyMap;
342     }
343   }
344   
345   private static class PutStatus {
346     private final HRegionInfo regionInfo;
347     private final Put put;
348     private final int retryCount;
349     public PutStatus(final HRegionInfo regionInfo, final Put put,
350         final int retryCount) {
351       this.regionInfo = regionInfo;
352       this.put = put;
353       this.retryCount = retryCount;
354     }
355 
356     public HRegionInfo getRegionInfo() {
357       return regionInfo;
358     }
359     public Put getPut() {
360       return put;
361     }
362     public int getRetryCount() {
363       return retryCount;
364     }
365   }
366 
367   /**
368    * Helper to count the average over an interval until reset.
369    */
370   private static class AtomicAverageCounter {
371     private long sum;
372     private int count;
373 
374     public AtomicAverageCounter() {
375       this.sum = 0L;
376       this.count = 0;
377     }
378 
379     public synchronized long getAndReset() {
380       long result = this.get();
381       this.reset();
382       return result;
383     }
384 
385     public synchronized long get() {
386       if (this.count == 0) {
387         return 0;
388       }
389       return this.sum / this.count;
390     }
391 
392     public synchronized SimpleEntry<Long, Integer> getComponents() {
393       return new SimpleEntry<Long, Integer>(sum, count);
394     }
395 
396     public synchronized void reset() {
397       this.sum = 0l;
398       this.count = 0;
399     }
400 
401     public synchronized void add(long value) {
402       this.sum += value;
403       this.count++;
404     }
405   }
406 
407   private static class HTableFlushWorker implements Runnable {
408     private HRegionLocation addr;
409     private Configuration conf;
410     private LinkedBlockingQueue<PutStatus> queue;
411     private HTableMultiplexer htableMultiplexer;
412     private AtomicLong totalFailedPutCount;
413     private AtomicInteger currentProcessingPutCount;
414     private AtomicAverageCounter averageLatency;
415     private AtomicLong maxLatency;
416     private HTable htable; // For Multi
417     
418     public HTableFlushWorker(Configuration conf, HRegionLocation addr,
419         HTableMultiplexer htableMultiplexer,
420         LinkedBlockingQueue<PutStatus> queue, HTable htable) {
421       this.addr = addr;
422       this.conf = conf;
423       this.htableMultiplexer = htableMultiplexer;
424       this.queue = queue;
425       this.totalFailedPutCount = new AtomicLong(0);
426       this.currentProcessingPutCount = new AtomicInteger(0);
427       this.averageLatency = new AtomicAverageCounter();
428       this.maxLatency = new AtomicLong(0);
429       this.htable = htable;
430     }
431 
432     public long getTotalFailedCount() {
433       return totalFailedPutCount.get();
434     }
435 
436     public long getTotalBufferedCount() {
437       return queue.size() + currentProcessingPutCount.get();
438     }
439 
440     public AtomicAverageCounter getAverageLatencyCounter() {
441       return this.averageLatency;
442     }
443 
444     public long getMaxLatency() {
445       return this.maxLatency.getAndSet(0);
446     }
447 
448     private boolean resubmitFailedPut(PutStatus failedPutStatus,
449         HRegionLocation oldLoc) throws IOException {
450       Put failedPut = failedPutStatus.getPut();
451       // The currentPut is failed. So get the table name for the currentPut.
452       TableName tableName = failedPutStatus.getRegionInfo().getTable();
453       // Decrease the retry count
454       int retryCount = failedPutStatus.getRetryCount() - 1;
455       
456       if (retryCount <= 0) {
457         // Update the failed counter and no retry any more.
458         return false;
459       } else {
460         // Retry one more time
461         return this.htableMultiplexer.put(tableName, failedPut, retryCount);
462       }
463     }
464 
465     @Override
466     @edu.umd.cs.findbugs.annotations.SuppressWarnings
467         (value = "REC_CATCH_EXCEPTION", justification = "na")
468     public void run() {
469       List<PutStatus> processingList = new ArrayList<PutStatus>();
470       /** 
471        * The frequency in milliseconds for the current thread to process the corresponding  
472        * buffer queue.  
473        **/
474       long frequency = conf.getLong(TABLE_MULTIPLEXER_FLUSH_FREQ_MS, 100);
475 
476       // initial delay
477       try {
478         Thread.sleep(frequency);
479       } catch (InterruptedException e) {
480         LOG.warn("Interrupted while sleeping");
481         Thread.currentThread().interrupt();
482       }
483 
484       long start, elapsed;
485       int failedCount = 0;
486       while (true) {
487         try {
488           start = elapsed = EnvironmentEdgeManager.currentTimeMillis();
489 
490           // Clear the processingList, putToStatusMap and failedCount
491           processingList.clear();
492           failedCount = 0;
493 
494           // drain all the queued puts into the tmp list
495           queue.drainTo(processingList);
496           currentProcessingPutCount.set(processingList.size());
497 
498           if (processingList.size() > 0) {
499             ArrayList<Put> list = new ArrayList<Put>(processingList.size());
500             for (PutStatus putStatus: processingList) {
501               list.add(putStatus.getPut());
502             }
503             
504             // Process this multiput request
505             List<Put> failed = null;
506             Object[] results = new Object[list.size()];
507             try {
508               htable.batch(list, results);
509             } catch (IOException e) {
510               LOG.debug("Caught some exceptions " + e
511                   + " when flushing puts to region server " + addr.getHostnamePort());
512             } finally {
513               // mutate list so that it is empty for complete success, or
514               // contains only failed records
515               // results are returned in the same order as the requests in list
516               // walk the list backwards, so we can remove from list without
517               // impacting the indexes of earlier members
518               for (int i = results.length - 1; i >= 0; i--) {
519                 if (results[i] instanceof Result) {
520                   // successful Puts are removed from the list here.
521                   list.remove(i);
522                 }
523               }
524               failed = list;
525             }
526 
527             if (failed != null) {
528               if (failed.size() == processingList.size()) {
529                 // All the puts for this region server are failed. Going to retry it later
530                 for (PutStatus putStatus: processingList) {
531                   if (!resubmitFailedPut(putStatus, this.addr)) {
532                     failedCount++;
533                   }
534                 }
535               } else {
536                 Set<Put> failedPutSet = new HashSet<Put>(failed);
537                 for (PutStatus putStatus: processingList) {
538                   if (failedPutSet.contains(putStatus.getPut())
539                       && !resubmitFailedPut(putStatus, this.addr)) {
540                     failedCount++;
541                   }
542                 }
543               }
544             }
545             // Update the totalFailedCount
546             this.totalFailedPutCount.addAndGet(failedCount);
547             
548             elapsed = EnvironmentEdgeManager.currentTimeMillis() - start;
549             // Update latency counters
550             averageLatency.add(elapsed);
551             if (elapsed > maxLatency.get()) {
552               maxLatency.set(elapsed);
553             }
554             
555             // Log some basic info
556             if (LOG.isDebugEnabled()) {
557               LOG.debug("Processed " + currentProcessingPutCount
558                   + " put requests for " + addr.getHostnamePort() + " and "
559                   + failedCount + " failed" + ", latency for this send: "
560                   + elapsed);
561             }
562 
563             // Reset the current processing put count
564             currentProcessingPutCount.set(0);
565           }
566 
567           // Sleep for a while
568           if (elapsed == start) {
569             elapsed = EnvironmentEdgeManager.currentTimeMillis() - start;
570           }
571           if (elapsed < frequency) {
572             try {
573               Thread.sleep(frequency - elapsed);
574             } catch (InterruptedException e) {
575               LOG.warn("Interrupted while sleeping");
576               Thread.currentThread().interrupt();
577             }
578           }
579         } catch (Exception e) {
580           // Log all the exceptions and move on
581           LOG.debug("Caught some exceptions " + e
582               + " when flushing puts to region server "
583               + addr.getHostnamePort());
584         }
585       }
586     }
587   }
588 }