View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.tool;
21  
22  import java.io.Closeable;
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Arrays;
26  import java.util.Collections;
27  import java.util.HashMap;
28  import java.util.HashSet;
29  import java.util.LinkedList;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.Random;
33  import java.util.Set;
34  import java.util.TreeSet;
35  import java.util.concurrent.atomic.AtomicLong;
36  import java.util.concurrent.Callable;
37  import java.util.concurrent.ExecutionException;
38  import java.util.concurrent.ExecutorService;
39  import java.util.concurrent.Future;
40  import java.util.concurrent.ScheduledThreadPoolExecutor;
41  import java.util.regex.Matcher;
42  import java.util.regex.Pattern;
43  
44  import org.apache.commons.lang.time.StopWatch;
45  import org.apache.commons.logging.Log;
46  import org.apache.commons.logging.LogFactory;
47  import org.apache.hadoop.conf.Configuration;
48  import org.apache.hadoop.hbase.AuthUtil;
49  import org.apache.hadoop.hbase.ChoreService;
50  import org.apache.hadoop.hbase.DoNotRetryIOException;
51  import org.apache.hadoop.hbase.HBaseConfiguration;
52  import org.apache.hadoop.hbase.HColumnDescriptor;
53  import org.apache.hadoop.hbase.HConstants;
54  import org.apache.hadoop.hbase.HRegionInfo;
55  import org.apache.hadoop.hbase.HRegionLocation;
56  import org.apache.hadoop.hbase.HTableDescriptor;
57  import org.apache.hadoop.hbase.NamespaceDescriptor;
58  import org.apache.hadoop.hbase.ScheduledChore;
59  import org.apache.hadoop.hbase.ServerName;
60  import org.apache.hadoop.hbase.TableName;
61  import org.apache.hadoop.hbase.TableNotEnabledException;
62  import org.apache.hadoop.hbase.TableNotFoundException;
63  import org.apache.hadoop.hbase.client.Admin;
64  import org.apache.hadoop.hbase.client.Connection;
65  import org.apache.hadoop.hbase.client.ConnectionFactory;
66  import org.apache.hadoop.hbase.client.Get;
67  import org.apache.hadoop.hbase.client.Put;
68  import org.apache.hadoop.hbase.client.RegionLocator;
69  import org.apache.hadoop.hbase.client.ResultScanner;
70  import org.apache.hadoop.hbase.client.Scan;
71  import org.apache.hadoop.hbase.client.Table;
72  import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
73  import org.apache.hadoop.hbase.tool.Canary.RegionTask.TaskType;
74  import org.apache.hadoop.hbase.util.Bytes;
75  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
76  import org.apache.hadoop.hbase.util.ReflectionUtils;
77  import org.apache.hadoop.hbase.util.RegionSplitter;
78  import org.apache.hadoop.util.GenericOptionsParser;
79  import org.apache.hadoop.util.Tool;
80  import org.apache.hadoop.util.ToolRunner;
81  
82  /**
83   * HBase Canary Tool, that that can be used to do
84   * "canary monitoring" of a running HBase cluster.
85   *
86   * Here are two modes
87   * 1. region mode - Foreach region tries to get one row per column family
88   * and outputs some information about failure or latency.
89   *
90   * 2. regionserver mode - Foreach regionserver tries to get one row from one table
91   * selected randomly and outputs some information about failure or latency.
92   */
93  public final class Canary implements Tool {
94    // Sink interface used by the canary to outputs information
95    public interface Sink {
96      public long getReadFailureCount();
97      public long incReadFailureCount();
98      public void publishReadFailure(HRegionInfo region, Exception e);
99      public void publishReadFailure(HRegionInfo region, HColumnDescriptor column, Exception e);
100     public void publishReadTiming(HRegionInfo region, HColumnDescriptor column, long msTime);
101     public long getWriteFailureCount();
102     public void publishWriteFailure(HRegionInfo region, Exception e);
103     public void publishWriteFailure(HRegionInfo region, HColumnDescriptor column, Exception e);
104     public void publishWriteTiming(HRegionInfo region, HColumnDescriptor column, long msTime);
105   }
106   // new extended sink for output regionserver mode info
107   // do not change the Sink interface directly due to maintaining the API
108   public interface ExtendedSink extends Sink {
109     public void publishReadFailure(String table, String server);
110     public void publishReadTiming(String table, String server, long msTime);
111   }
112 
113   // Simple implementation of canary sink that allows to plot on
114   // file or standard output timings or failures.
115   public static class StdOutSink implements Sink {
116     private AtomicLong readFailureCount = new AtomicLong(0),
117         writeFailureCount = new AtomicLong(0);
118 
119     @Override
120     public long getReadFailureCount() {
121       return readFailureCount.get();
122     }
123 
124     @Override
125     public long incReadFailureCount() {
126       return readFailureCount.incrementAndGet();
127     }
128 
129     @Override
130     public void publishReadFailure(HRegionInfo region, Exception e) {
131       readFailureCount.incrementAndGet();
132       LOG.error(String.format("read from region %s failed", region.getRegionNameAsString()), e);
133     }
134 
135     @Override
136     public void publishReadFailure(HRegionInfo region, HColumnDescriptor column, Exception e) {
137       readFailureCount.incrementAndGet();
138       LOG.error(String.format("read from region %s column family %s failed",
139                 region.getRegionNameAsString(), column.getNameAsString()), e);
140     }
141 
142     @Override
143     public void publishReadTiming(HRegionInfo region, HColumnDescriptor column, long msTime) {
144       LOG.info(String.format("read from region %s column family %s in %dms",
145                region.getRegionNameAsString(), column.getNameAsString(), msTime));
146     }
147 
148     @Override
149     public long getWriteFailureCount() {
150       return writeFailureCount.get();
151     }
152 
153     @Override
154     public void publishWriteFailure(HRegionInfo region, Exception e) {
155       writeFailureCount.incrementAndGet();
156       LOG.error(String.format("write to region %s failed", region.getRegionNameAsString()), e);
157     }
158 
159     @Override
160     public void publishWriteFailure(HRegionInfo region, HColumnDescriptor column, Exception e) {
161       writeFailureCount.incrementAndGet();
162       LOG.error(String.format("write to region %s column family %s failed",
163         region.getRegionNameAsString(), column.getNameAsString()), e);
164     }
165 
166     @Override
167     public void publishWriteTiming(HRegionInfo region, HColumnDescriptor column, long msTime) {
168       LOG.info(String.format("write to region %s column family %s in %dms",
169         region.getRegionNameAsString(), column.getNameAsString(), msTime));
170     }
171   }
172   // a ExtendedSink implementation
173   public static class RegionServerStdOutSink extends StdOutSink implements ExtendedSink {
174 
175     @Override
176     public void publishReadFailure(String table, String server) {
177       incReadFailureCount();
178       LOG.error(String.format("Read from table:%s on region server:%s", table, server));
179     }
180 
181     @Override
182     public void publishReadTiming(String table, String server, long msTime) {
183       LOG.info(String.format("Read from table:%s on region server:%s in %dms",
184           table, server, msTime));
185     }
186   }
187 
188   /**
189    * For each column family of the region tries to get one row and outputs the latency, or the
190    * failure.
191    */
192   static class RegionTask implements Callable<Void> {
193     public enum TaskType{
194       READ, WRITE
195     }
196     private Connection connection;
197     private HRegionInfo region;
198     private Sink sink;
199     private TaskType taskType;
200 
201     RegionTask(Connection connection, HRegionInfo region, Sink sink, TaskType taskType) {
202       this.connection = connection;
203       this.region = region;
204       this.sink = sink;
205       this.taskType = taskType;
206     }
207 
208     @Override
209     public Void call() {
210       switch (taskType) {
211       case READ:
212         return read();
213       case WRITE:
214         return write();
215       default:
216         return read();
217       }
218     }
219 
220     public Void read() {
221       Table table = null;
222       HTableDescriptor tableDesc = null;
223       try {
224         table = connection.getTable(region.getTable());
225         tableDesc = table.getTableDescriptor();
226       } catch (IOException e) {
227         LOG.debug("sniffRegion failed", e);
228         sink.publishReadFailure(region, e);
229         if (table != null) {
230           try {
231             table.close();
232           } catch (IOException ioe) {
233             LOG.error("Close table failed", e);
234           }
235         }
236         return null;
237       }
238 
239       byte[] startKey = null;
240       Get get = null;
241       Scan scan = null;
242       ResultScanner rs = null;
243       StopWatch stopWatch = new StopWatch();
244       for (HColumnDescriptor column : tableDesc.getColumnFamilies()) {
245         stopWatch.reset();
246         startKey = region.getStartKey();
247         // Can't do a get on empty start row so do a Scan of first element if any instead.
248         if (startKey.length > 0) {
249           get = new Get(startKey);
250           get.setCacheBlocks(false);
251           get.setFilter(new FirstKeyOnlyFilter());
252           get.addFamily(column.getName());
253         } else {
254           scan = new Scan();
255           scan.setCaching(1);
256           scan.setCacheBlocks(false);
257           scan.setFilter(new FirstKeyOnlyFilter());
258           scan.addFamily(column.getName());
259           scan.setMaxResultSize(1L);
260         }
261 
262         try {
263           if (startKey.length > 0) {
264             stopWatch.start();
265             table.get(get);
266             stopWatch.stop();
267             sink.publishReadTiming(region, column, stopWatch.getTime());
268           } else {
269             stopWatch.start();
270             rs = table.getScanner(scan);
271             stopWatch.stop();
272             sink.publishReadTiming(region, column, stopWatch.getTime());
273           }
274         } catch (Exception e) {
275           sink.publishReadFailure(region, column, e);
276         } finally {
277           if (rs != null) {
278             rs.close();
279           }
280           scan = null;
281           get = null;
282           startKey = null;
283         }
284       }
285       try {
286         table.close();
287       } catch (IOException e) {
288         LOG.error("Close table failed", e);
289       }
290       return null;
291     }
292 
293     /**
294      * Check writes for the canary table
295      * @return
296      */
297     private Void write() {
298       Table table = null;
299       HTableDescriptor tableDesc = null;
300       try {
301         table = connection.getTable(region.getTable());
302         tableDesc = table.getTableDescriptor();
303         byte[] rowToCheck = region.getStartKey();
304         if (rowToCheck.length == 0) {
305           rowToCheck = new byte[]{0x0};
306         }
307         int writeValueSize =
308             connection.getConfiguration().getInt(HConstants.HBASE_CANARY_WRITE_VALUE_SIZE_KEY, 10);
309         for (HColumnDescriptor column : tableDesc.getColumnFamilies()) {
310           Put put = new Put(rowToCheck);
311           byte[] value = new byte[writeValueSize];
312           Bytes.random(value);
313           put.addColumn(column.getName(), HConstants.EMPTY_BYTE_ARRAY, value);
314           try {
315             long startTime = System.currentTimeMillis();
316             table.put(put);
317             long time = System.currentTimeMillis() - startTime;
318             sink.publishWriteTiming(region, column, time);
319           } catch (Exception e) {
320             sink.publishWriteFailure(region, column, e);
321           }
322         }
323         table.close();
324       } catch (IOException e) {
325         sink.publishWriteFailure(region, e);
326       }
327       return null;
328     }
329   }
330 
331   /**
332    * Get one row from a region on the regionserver and outputs the latency, or the failure.
333    */
334   static class RegionServerTask implements Callable<Void> {
335     private Connection connection;
336     private String serverName;
337     private HRegionInfo region;
338     private ExtendedSink sink;
339     private AtomicLong successes;
340 
341     RegionServerTask(Connection connection, String serverName, HRegionInfo region,
342         ExtendedSink sink, AtomicLong successes) {
343       this.connection = connection;
344       this.serverName = serverName;
345       this.region = region;
346       this.sink = sink;
347       this.successes = successes;
348     }
349 
350     @Override
351     public Void call() {
352       TableName tableName = null;
353       Table table = null;
354       Get get = null;
355       byte[] startKey = null;
356       Scan scan = null;
357       StopWatch stopWatch = new StopWatch();
358       // monitor one region on every region server
359       stopWatch.reset();
360       try {
361         tableName = region.getTable();
362         table = connection.getTable(tableName);
363         startKey = region.getStartKey();
364         // Can't do a get on empty start row so do a Scan of first element if any instead.
365         if (startKey.length > 0) {
366           get = new Get(startKey);
367           get.setCacheBlocks(false);
368           get.setFilter(new FirstKeyOnlyFilter());
369           stopWatch.start();
370           table.get(get);
371           stopWatch.stop();
372         } else {
373           scan = new Scan();
374           scan.setCacheBlocks(false);
375           scan.setFilter(new FirstKeyOnlyFilter());
376           scan.setCaching(1);
377           scan.setMaxResultSize(1L);
378           stopWatch.start();
379           ResultScanner s = table.getScanner(scan);
380           s.close();
381           stopWatch.stop();
382         }
383         successes.incrementAndGet();
384         sink.publishReadTiming(tableName.getNameAsString(), serverName, stopWatch.getTime());
385       } catch (TableNotFoundException tnfe) {
386         LOG.error("Table may be deleted", tnfe);
387         // This is ignored because it doesn't imply that the regionserver is dead
388       } catch (TableNotEnabledException tnee) {
389         // This is considered a success since we got a response.
390         successes.incrementAndGet();
391         LOG.debug("The targeted table was disabled.  Assuming success.");
392       } catch (DoNotRetryIOException dnrioe) {
393         sink.publishReadFailure(tableName.getNameAsString(), serverName);
394         LOG.error(dnrioe);
395       } catch (IOException e) {
396         sink.publishReadFailure(tableName.getNameAsString(), serverName);
397         LOG.error(e);
398       } finally {
399         if (table != null) {
400           try {
401             table.close();
402           } catch (IOException e) {/* DO NOTHING */
403             LOG.error("Close table failed", e);
404           }
405         }
406         scan = null;
407         get = null;
408         startKey = null;
409       }
410       return null;
411     }
412   }
413 
414   private static final int USAGE_EXIT_CODE = 1;
415   private static final int INIT_ERROR_EXIT_CODE = 2;
416   private static final int TIMEOUT_ERROR_EXIT_CODE = 3;
417   private static final int ERROR_EXIT_CODE = 4;
418   private static final int FAILURE_EXIT_CODE = 5;
419 
420   private static final long DEFAULT_INTERVAL = 60000;
421 
422   private static final long DEFAULT_TIMEOUT = 600000; // 10 mins
423   private static final int MAX_THREADS_NUM = 16; // #threads to contact regions
424 
425   private static final Log LOG = LogFactory.getLog(Canary.class);
426 
427   public static final TableName DEFAULT_WRITE_TABLE_NAME = TableName.valueOf(
428     NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "canary");
429 
430   private static final String CANARY_TABLE_FAMILY_NAME = "Test";
431 
432   private Configuration conf = null;
433   private long interval = 0;
434   private Sink sink = null;
435 
436   private boolean useRegExp;
437   private long timeout = DEFAULT_TIMEOUT;
438   private boolean failOnError = true;
439   private boolean regionServerMode = false;
440   private boolean regionServerAllRegions = false;
441   private boolean writeSniffing = false;
442   private boolean treatFailureAsError = false;
443   private TableName writeTableName = DEFAULT_WRITE_TABLE_NAME;
444 
445   private ExecutorService executor; // threads to retrieve data from regionservers
446 
447   public Canary() {
448     this(new ScheduledThreadPoolExecutor(1), new RegionServerStdOutSink());
449   }
450 
451   public Canary(ExecutorService executor, Sink sink) {
452     this.executor = executor;
453     this.sink = sink;
454   }
455 
456   @Override
457   public Configuration getConf() {
458     return conf;
459   }
460 
461   @Override
462   public void setConf(Configuration conf) {
463     this.conf = conf;
464   }
465 
466   private int parseArgs(String[] args) {
467     int index = -1;
468     // Process command line args
469     for (int i = 0; i < args.length; i++) {
470       String cmd = args[i];
471 
472       if (cmd.startsWith("-")) {
473         if (index >= 0) {
474           // command line args must be in the form: [opts] [table 1 [table 2 ...]]
475           System.err.println("Invalid command line options");
476           printUsageAndExit();
477         }
478 
479         if (cmd.equals("-help")) {
480           // user asked for help, print the help and quit.
481           printUsageAndExit();
482         } else if (cmd.equals("-daemon") && interval == 0) {
483           // user asked for daemon mode, set a default interval between checks
484           interval = DEFAULT_INTERVAL;
485         } else if (cmd.equals("-interval")) {
486           // user has specified an interval for canary breaths (-interval N)
487           i++;
488 
489           if (i == args.length) {
490             System.err.println("-interval needs a numeric value argument.");
491             printUsageAndExit();
492           }
493 
494           try {
495             interval = Long.parseLong(args[i]) * 1000;
496           } catch (NumberFormatException e) {
497             System.err.println("-interval needs a numeric value argument.");
498             printUsageAndExit();
499           }
500         } else if(cmd.equals("-regionserver")) {
501           this.regionServerMode = true;
502         } else if(cmd.equals("-allRegions")) {
503           this.regionServerAllRegions = true;
504         } else if(cmd.equals("-writeSniffing")) {
505           this.writeSniffing = true;
506         } else if(cmd.equals("-treatFailureAsError")) {
507           this.treatFailureAsError = true;
508         } else if (cmd.equals("-e")) {
509           this.useRegExp = true;
510         } else if (cmd.equals("-t")) {
511           i++;
512 
513           if (i == args.length) {
514             System.err.println("-t needs a numeric value argument.");
515             printUsageAndExit();
516           }
517 
518           try {
519             this.timeout = Long.parseLong(args[i]);
520           } catch (NumberFormatException e) {
521             System.err.println("-t needs a numeric value argument.");
522             printUsageAndExit();
523           }
524         } else if (cmd.equals("-writeTable")) {
525           i++;
526 
527           if (i == args.length) {
528             System.err.println("-writeTable needs a string value argument.");
529             printUsageAndExit();
530           }
531           this.writeTableName = TableName.valueOf(args[i]);
532         } else if (cmd.equals("-f")) {
533           i++;
534 
535           if (i == args.length) {
536             System.err
537                 .println("-f needs a boolean value argument (true|false).");
538             printUsageAndExit();
539           }
540 
541           this.failOnError = Boolean.parseBoolean(args[i]);
542         } else {
543           // no options match
544           System.err.println(cmd + " options is invalid.");
545           printUsageAndExit();
546         }
547       } else if (index < 0) {
548         // keep track of first table name specified by the user
549         index = i;
550       }
551     }
552     if (this.regionServerAllRegions && !this.regionServerMode) {
553       System.err.println("-allRegions can only be specified in regionserver mode.");
554       printUsageAndExit();
555     }
556     return index;
557   }
558 
559   @Override
560   public int run(String[] args) throws Exception {
561     int index = parseArgs(args);
562     ChoreService choreService = null;
563 
564     // Launches chore for refreshing kerberos credentials if security is enabled.
565     // Please see http://hbase.apache.org/book.html#_running_canary_in_a_kerberos_enabled_cluster
566     // for more details.
567     final ScheduledChore authChore = AuthUtil.getAuthChore(conf);
568     if (authChore != null) {
569       choreService = new ChoreService("CANARY_TOOL");
570       choreService.scheduleChore(authChore);
571     }
572 
573     // Start to prepare the stuffs
574     Monitor monitor = null;
575     Thread monitorThread = null;
576     long startTime = 0;
577     long currentTimeLength = 0;
578     // Get a connection to use in below.
579     // try-with-resources jdk7 construct. See
580     // http://docs.oracle.com/javase/tutorial/essential/exceptions/tryResourceClose.html
581     try (Connection connection = ConnectionFactory.createConnection(this.conf)) {
582       do {
583         // Do monitor !!
584         try {
585           monitor = this.newMonitor(connection, index, args);
586           monitorThread = new Thread(monitor, "CanaryMonitor-" + System.currentTimeMillis());
587           startTime = System.currentTimeMillis();
588           monitorThread.start();
589           while (!monitor.isDone()) {
590             // wait for 1 sec
591             Thread.sleep(1000);
592             // exit if any error occurs
593             if (this.failOnError && monitor.hasError()) {
594               monitorThread.interrupt();
595               if (monitor.initialized) {
596                 return monitor.errorCode;
597               } else {
598                 return INIT_ERROR_EXIT_CODE;
599               }
600             }
601             currentTimeLength = System.currentTimeMillis() - startTime;
602             if (currentTimeLength > this.timeout) {
603               LOG.error("The monitor is running too long (" + currentTimeLength
604                   + ") after timeout limit:" + this.timeout
605                   + " will be killed itself !!");
606               if (monitor.initialized) {
607                 return TIMEOUT_ERROR_EXIT_CODE;
608               } else {
609                 return INIT_ERROR_EXIT_CODE;
610               }
611             }
612           }
613 
614           if (this.failOnError && monitor.finalCheckForErrors()) {
615             monitorThread.interrupt();
616             return monitor.errorCode;
617           }
618         } finally {
619           if (monitor != null) monitor.close();
620         }
621 
622         Thread.sleep(interval);
623       } while (interval > 0);
624     } // try-with-resources close
625 
626     if (choreService != null) {
627       choreService.shutdown();
628     }
629     return monitor.errorCode;
630   }
631 
632   private void printUsageAndExit() {
633     System.err.printf(
634       "Usage: bin/hbase %s [opts] [table1 [table2]...] | [regionserver1 [regionserver2]..]%n",
635         getClass().getName());
636     System.err.println(" where [opts] are:");
637     System.err.println("   -help          Show this help and exit.");
638     System.err.println("   -regionserver  replace the table argument to regionserver,");
639     System.err.println("      which means to enable regionserver mode");
640     System.err.println("   -allRegions    Tries all regions on a regionserver,");
641     System.err.println("      only works in regionserver mode.");
642     System.err.println("   -daemon        Continuous check at defined intervals.");
643     System.err.println("   -interval <N>  Interval between checks (sec)");
644     System.err.println("   -e             Use region/regionserver as regular expression");
645     System.err.println("      which means the region/regionserver is regular expression pattern");
646     System.err.println("   -f <B>         stop whole program if first error occurs," +
647         " default is true");
648     System.err.println("   -t <N>         timeout for a check, default is 600000 (milisecs)");
649     System.err.println("   -writeSniffing enable the write sniffing in canary");
650     System.err.println("   -treatFailureAsError treats read / write failure as error");
651     System.err.println("   -writeTable    The table used for write sniffing."
652         + " Default is hbase:canary");
653     System.err
654         .println("   -D<configProperty>=<value> assigning or override the configuration params");
655     System.exit(USAGE_EXIT_CODE);
656   }
657 
658   /**
659    * A Factory method for {@link Monitor}.
660    * Can be overridden by user.
661    * @param index a start index for monitor target
662    * @param args args passed from user
663    * @return a Monitor instance
664    */
665   public Monitor newMonitor(final Connection connection, int index, String[] args) {
666     Monitor monitor = null;
667     String[] monitorTargets = null;
668 
669     if(index >= 0) {
670       int length = args.length - index;
671       monitorTargets = new String[length];
672       System.arraycopy(args, index, monitorTargets, 0, length);
673     }
674 
675     if (this.regionServerMode) {
676       monitor =
677           new RegionServerMonitor(connection, monitorTargets, this.useRegExp,
678               (ExtendedSink) this.sink, this.executor, this.regionServerAllRegions,
679               this.treatFailureAsError);
680     } else {
681       monitor =
682           new RegionMonitor(connection, monitorTargets, this.useRegExp, this.sink, this.executor,
683               this.writeSniffing, this.writeTableName, this.treatFailureAsError);
684     }
685     return monitor;
686   }
687 
688   // a Monitor super-class can be extended by users
689   public static abstract class Monitor implements Runnable, Closeable {
690 
691     protected Connection connection;
692     protected Admin admin;
693     protected String[] targets;
694     protected boolean useRegExp;
695     protected boolean treatFailureAsError;
696     protected boolean initialized = false;
697 
698     protected boolean done = false;
699     protected int errorCode = 0;
700     protected Sink sink;
701     protected ExecutorService executor;
702 
703     public boolean isDone() {
704       return done;
705     }
706 
707     public boolean hasError() {
708       return errorCode != 0;
709     }
710 
711     public boolean finalCheckForErrors() {
712       if (errorCode != 0) {
713         return true;
714       }
715       if (treatFailureAsError &&
716           (sink.getReadFailureCount() > 0 || sink.getWriteFailureCount() > 0)) {
717         errorCode = FAILURE_EXIT_CODE;
718         return true;
719       }
720       return false;
721     }
722 
723     @Override
724     public void close() throws IOException {
725       if (this.admin != null) this.admin.close();
726     }
727 
728     protected Monitor(Connection connection, String[] monitorTargets, boolean useRegExp, Sink sink,
729         ExecutorService executor, boolean treatFailureAsError) {
730       if (null == connection) throw new IllegalArgumentException("connection shall not be null");
731 
732       this.connection = connection;
733       this.targets = monitorTargets;
734       this.useRegExp = useRegExp;
735       this.treatFailureAsError = treatFailureAsError;
736       this.sink = sink;
737       this.executor = executor;
738     }
739 
740     public abstract void run();
741 
742     protected boolean initAdmin() {
743       if (null == this.admin) {
744         try {
745           this.admin = this.connection.getAdmin();
746         } catch (Exception e) {
747           LOG.error("Initial HBaseAdmin failed...", e);
748           this.errorCode = INIT_ERROR_EXIT_CODE;
749         }
750       } else if (admin.isAborted()) {
751         LOG.error("HBaseAdmin aborted");
752         this.errorCode = INIT_ERROR_EXIT_CODE;
753       }
754       return !this.hasError();
755     }
756   }
757 
758   // a monitor for region mode
759   private static class RegionMonitor extends Monitor {
760     // 10 minutes
761     private static final int DEFAULT_WRITE_TABLE_CHECK_PERIOD = 10 * 60 * 1000;
762     // 1 days
763     private static final int DEFAULT_WRITE_DATA_TTL = 24 * 60 * 60;
764 
765     private long lastCheckTime = -1;
766     private boolean writeSniffing;
767     private TableName writeTableName;
768     private int writeDataTTL;
769     private float regionsLowerLimit;
770     private float regionsUpperLimit;
771     private int checkPeriod;
772 
773     public RegionMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
774         Sink sink, ExecutorService executor, boolean writeSniffing, TableName writeTableName,
775         boolean treatFailureAsError) {
776       super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError);
777       Configuration conf = connection.getConfiguration();
778       this.writeSniffing = writeSniffing;
779       this.writeTableName = writeTableName;
780       this.writeDataTTL =
781           conf.getInt(HConstants.HBASE_CANARY_WRITE_DATA_TTL_KEY, DEFAULT_WRITE_DATA_TTL);
782       this.regionsLowerLimit =
783           conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY, 1.0f);
784       this.regionsUpperLimit =
785           conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_UPPERLIMIT_KEY, 1.5f);
786       this.checkPeriod =
787           conf.getInt(HConstants.HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY,
788             DEFAULT_WRITE_TABLE_CHECK_PERIOD);
789     }
790 
791     @Override
792     public void run() {
793       if (this.initAdmin()) {
794         try {
795           List<Future<Void>> taskFutures = new LinkedList<Future<Void>>();
796           if (this.targets != null && this.targets.length > 0) {
797             String[] tables = generateMonitorTables(this.targets);
798             this.initialized = true;
799             for (String table : tables) {
800               taskFutures.addAll(Canary.sniff(admin, sink, table, executor, TaskType.READ));
801             }
802           } else {
803             taskFutures.addAll(sniff(TaskType.READ));
804           }
805 
806           if (writeSniffing) {
807             if (EnvironmentEdgeManager.currentTime() - lastCheckTime > checkPeriod) {
808               try {
809                 checkWriteTableDistribution();
810               } catch (IOException e) {
811                 LOG.error("Check canary table distribution failed!", e);
812               }
813               lastCheckTime = EnvironmentEdgeManager.currentTime();
814             }
815             // sniff canary table with write operation
816             taskFutures.addAll(Canary.sniff(admin, sink,
817               admin.getTableDescriptor(writeTableName), executor, TaskType.WRITE));
818           }
819 
820           for (Future<Void> future : taskFutures) {
821             try {
822               future.get();
823             } catch (ExecutionException e) {
824               LOG.error("Sniff region failed!", e);
825             }
826           }
827         } catch (Exception e) {
828           LOG.error("Run regionMonitor failed", e);
829           this.errorCode = ERROR_EXIT_CODE;
830         }
831       }
832       this.done = true;
833     }
834 
835     private String[] generateMonitorTables(String[] monitorTargets) throws IOException {
836       String[] returnTables = null;
837 
838       if (this.useRegExp) {
839         Pattern pattern = null;
840         HTableDescriptor[] tds = null;
841         Set<String> tmpTables = new TreeSet<String>();
842         try {
843           for (String monitorTarget : monitorTargets) {
844             pattern = Pattern.compile(monitorTarget);
845             tds = this.admin.listTables(pattern);
846             if (tds != null) {
847               for (HTableDescriptor td : tds) {
848                 tmpTables.add(td.getNameAsString());
849               }
850             }
851           }
852         } catch (IOException e) {
853           LOG.error("Communicate with admin failed", e);
854           throw e;
855         }
856 
857         if (tmpTables.size() > 0) {
858           returnTables = tmpTables.toArray(new String[tmpTables.size()]);
859         } else {
860           String msg = "No HTable found, tablePattern:" + Arrays.toString(monitorTargets);
861           LOG.error(msg);
862           this.errorCode = INIT_ERROR_EXIT_CODE;
863           throw new TableNotFoundException(msg);
864         }
865       } else {
866         returnTables = monitorTargets;
867       }
868 
869       return returnTables;
870     }
871 
872     /*
873      * canary entry point to monitor all the tables.
874      */
875     private List<Future<Void>> sniff(TaskType taskType) throws Exception {
876       List<Future<Void>> taskFutures = new LinkedList<Future<Void>>();
877       for (HTableDescriptor table : admin.listTables()) {
878         if (admin.isTableEnabled(table.getTableName())
879             && (!table.getTableName().equals(writeTableName))) {
880           taskFutures.addAll(Canary.sniff(admin, sink, table, executor, taskType));
881         }
882       }
883       return taskFutures;
884     }
885 
886     private void checkWriteTableDistribution() throws IOException {
887       if (!admin.tableExists(writeTableName)) {
888         int numberOfServers = admin.getClusterStatus().getServers().size();
889         if (numberOfServers == 0) {
890           throw new IllegalStateException("No live regionservers");
891         }
892         createWriteTable(numberOfServers);
893       }
894 
895       if (!admin.isTableEnabled(writeTableName)) {
896         admin.enableTable(writeTableName);
897       }
898 
899       int numberOfServers = admin.getClusterStatus().getServers().size();
900       List<HRegionLocation> locations;
901       RegionLocator locator = connection.getRegionLocator(writeTableName);
902       try {
903         locations = locator.getAllRegionLocations();
904       } finally {
905         locator.close();
906       }
907       int numberOfRegions = locations.size();
908       if (numberOfRegions < numberOfServers * regionsLowerLimit
909           || numberOfRegions > numberOfServers * regionsUpperLimit) {
910         admin.disableTable(writeTableName);
911         admin.deleteTable(writeTableName);
912         createWriteTable(numberOfServers);
913       }
914       HashSet<ServerName> serverSet = new HashSet<ServerName>();
915       for (HRegionLocation location: locations) {
916         serverSet.add(location.getServerName());
917       }
918       int numberOfCoveredServers = serverSet.size();
919       if (numberOfCoveredServers < numberOfServers) {
920         admin.balancer();
921       }
922     }
923 
924     private void createWriteTable(int numberOfServers) throws IOException {
925       int numberOfRegions = (int)(numberOfServers * regionsLowerLimit);
926       LOG.info("Number of live regionservers: " + numberOfServers + ", "
927           + "pre-splitting the canary table into " + numberOfRegions + " regions "
928           + "(current  lower limi of regions per server is " + regionsLowerLimit
929           + " and you can change it by config: "
930           + HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY + " )");
931       HTableDescriptor desc = new HTableDescriptor(writeTableName);
932       HColumnDescriptor family = new HColumnDescriptor(CANARY_TABLE_FAMILY_NAME);
933       family.setMaxVersions(1);
934       family.setTimeToLive(writeDataTTL);
935 
936       desc.addFamily(family);
937       byte[][] splits = new RegionSplitter.HexStringSplit().split(numberOfRegions);
938       admin.createTable(desc, splits);
939     }
940   }
941 
942   /**
943    * Canary entry point for specified table.
944    * @throws Exception
945    */
946   public static void sniff(final Admin admin, TableName tableName)
947       throws Exception {
948     sniff(admin, tableName, TaskType.READ);
949   }
950 
951   /**
952    * Canary entry point for specified table with task type(read/write)
953    * @throws Exception
954    */
955   public static void sniff(final Admin admin, TableName tableName, TaskType taskType)
956       throws Exception {
957     List<Future<Void>> taskFutures =
958         Canary.sniff(admin, new StdOutSink(), tableName.getNameAsString(),
959           new ScheduledThreadPoolExecutor(1), taskType);
960     for (Future<Void> future : taskFutures) {
961       future.get();
962     }
963   }
964 
965   /**
966    * Canary entry point for specified table.
967    * @throws Exception
968    */
969   private static List<Future<Void>> sniff(final Admin admin, final Sink sink, String tableName,
970       ExecutorService executor, TaskType taskType) throws Exception {
971     if (admin.isTableEnabled(TableName.valueOf(tableName))) {
972       return Canary.sniff(admin, sink, admin.getTableDescriptor(TableName.valueOf(tableName)),
973         executor, taskType);
974     } else {
975       LOG.warn(String.format("Table %s is not enabled", tableName));
976     }
977     return new LinkedList<Future<Void>>();
978   }
979 
980   /*
981    * Loops over regions that owns this table, and output some information abouts the state.
982    */
983   private static List<Future<Void>> sniff(final Admin admin, final Sink sink,
984       HTableDescriptor tableDesc, ExecutorService executor, TaskType taskType) throws Exception {
985     Table table = null;
986     try {
987       table = admin.getConnection().getTable(tableDesc.getTableName());
988     } catch (TableNotFoundException e) {
989       return new ArrayList<Future<Void>>();
990     }
991     List<RegionTask> tasks = new ArrayList<RegionTask>();
992     try {
993       for (HRegionInfo region : admin.getTableRegions(tableDesc.getTableName())) {
994         tasks.add(new RegionTask(admin.getConnection(), region, sink, taskType));
995       }
996     } finally {
997       table.close();
998     }
999     return executor.invokeAll(tasks);
1000   }
1001 
1002   /*
1003    * For each column family of the region tries to get one row and outputs the latency, or the
1004    * failure.
1005    */
1006   private static void sniffRegion(
1007       final Admin admin,
1008       final Sink sink,
1009       HRegionInfo region,
1010       Table table) throws Exception {
1011     HTableDescriptor tableDesc = table.getTableDescriptor();
1012     byte[] startKey = null;
1013     Get get = null;
1014     Scan scan = null;
1015     ResultScanner rs = null;
1016     StopWatch stopWatch = new StopWatch();
1017     for (HColumnDescriptor column : tableDesc.getColumnFamilies()) {
1018       stopWatch.reset();
1019       startKey = region.getStartKey();
1020       // Can't do a get on empty start row so do a Scan of first element if any instead.
1021       if (startKey.length > 0) {
1022         get = new Get(startKey);
1023         get.setCacheBlocks(false);
1024         get.setFilter(new FirstKeyOnlyFilter());
1025         get.addFamily(column.getName());
1026       } else {
1027         scan = new Scan();
1028         scan.setRaw(true);
1029         scan.setCaching(1);
1030         scan.setCacheBlocks(false);
1031         scan.setFilter(new FirstKeyOnlyFilter());
1032         scan.addFamily(column.getName());
1033         scan.setMaxResultSize(1L);
1034       }
1035 
1036       try {
1037         if (startKey.length > 0) {
1038           stopWatch.start();
1039           table.get(get);
1040           stopWatch.stop();
1041           sink.publishReadTiming(region, column, stopWatch.getTime());
1042         } else {
1043           stopWatch.start();
1044           rs = table.getScanner(scan);
1045           stopWatch.stop();
1046           sink.publishReadTiming(region, column, stopWatch.getTime());
1047         }
1048       } catch (Exception e) {
1049         sink.publishReadFailure(region, column, e);
1050       } finally {
1051         if (rs != null) {
1052           rs.close();
1053         }
1054         scan = null;
1055         get = null;
1056         startKey = null;
1057       }
1058     }
1059   }
1060   // a monitor for regionserver mode
1061   private static class RegionServerMonitor extends Monitor {
1062 
1063     private boolean allRegions;
1064 
1065     public RegionServerMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
1066         ExtendedSink sink, ExecutorService executor, boolean allRegions,
1067         boolean treatFailureAsError) {
1068       super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError);
1069       this.allRegions = allRegions;
1070     }
1071 
1072     private ExtendedSink getSink() {
1073       return (ExtendedSink) this.sink;
1074     }
1075 
1076     @Override
1077     public void run() {
1078       if (this.initAdmin() && this.checkNoTableNames()) {
1079         Map<String, List<HRegionInfo>> rsAndRMap = this.filterRegionServerByName();
1080         this.initialized = true;
1081         this.monitorRegionServers(rsAndRMap);
1082       }
1083       this.done = true;
1084     }
1085 
1086     private boolean checkNoTableNames() {
1087       List<String> foundTableNames = new ArrayList<String>();
1088       TableName[] tableNames = null;
1089 
1090       try {
1091         tableNames = this.admin.listTableNames();
1092       } catch (IOException e) {
1093         LOG.error("Get listTableNames failed", e);
1094         this.errorCode = INIT_ERROR_EXIT_CODE;
1095         return false;
1096       }
1097 
1098       if (this.targets == null || this.targets.length == 0) return true;
1099 
1100       for (String target : this.targets) {
1101         for (TableName tableName : tableNames) {
1102           if (target.equals(tableName.getNameAsString())) {
1103             foundTableNames.add(target);
1104           }
1105         }
1106       }
1107 
1108       if (foundTableNames.size() > 0) {
1109         System.err.println("Cannot pass a tablename when using the -regionserver " +
1110             "option, tablenames:" + foundTableNames.toString());
1111         this.errorCode = USAGE_EXIT_CODE;
1112       }
1113       return foundTableNames.size() == 0;
1114     }
1115 
1116     private void monitorRegionServers(Map<String, List<HRegionInfo>> rsAndRMap) {
1117       List<RegionServerTask> tasks = new ArrayList<RegionServerTask>();
1118       Map<String, AtomicLong> successMap = new HashMap<String, AtomicLong>();
1119       Random rand = new Random();
1120       for (Map.Entry<String, List<HRegionInfo>> entry : rsAndRMap.entrySet()) {
1121         String serverName = entry.getKey();
1122         AtomicLong successes = new AtomicLong(0);
1123         successMap.put(serverName, successes);
1124         if (entry.getValue().isEmpty()) {
1125           LOG.error(String.format("Regionserver not serving any regions - %s", serverName));
1126         } else if (this.allRegions) {
1127           for (HRegionInfo region : entry.getValue()) {
1128             tasks.add(new RegionServerTask(this.connection,
1129                 serverName,
1130                 region,
1131                 getSink(),
1132                 successes));
1133           }
1134         } else {
1135           // random select a region if flag not set
1136           HRegionInfo region = entry.getValue().get(rand.nextInt(entry.getValue().size()));
1137           tasks.add(new RegionServerTask(this.connection,
1138               serverName,
1139               region,
1140               getSink(),
1141               successes));
1142         }
1143       }
1144       try {
1145         for (Future<Void> future : this.executor.invokeAll(tasks)) {
1146           try {
1147             future.get();
1148           } catch (ExecutionException e) {
1149             LOG.error("Sniff regionserver failed!", e);
1150             this.errorCode = ERROR_EXIT_CODE;
1151           }
1152         }
1153         if (this.allRegions) {
1154           for (Map.Entry<String, List<HRegionInfo>> entry : rsAndRMap.entrySet()) {
1155             String serverName = entry.getKey();
1156             LOG.info("Successfully read " + successMap.get(serverName) + " regions out of "
1157                     + entry.getValue().size() + " on regionserver:" + serverName);
1158           }
1159         }
1160       } catch (InterruptedException e) {
1161         this.errorCode = ERROR_EXIT_CODE;
1162         LOG.error("Sniff regionserver interrupted!", e);
1163       }
1164     }
1165 
1166     private Map<String, List<HRegionInfo>> filterRegionServerByName() {
1167       Map<String, List<HRegionInfo>> regionServerAndRegionsMap = this.getAllRegionServerByName();
1168       regionServerAndRegionsMap = this.doFilterRegionServerByName(regionServerAndRegionsMap);
1169       return regionServerAndRegionsMap;
1170     }
1171 
1172     private Map<String, List<HRegionInfo>> getAllRegionServerByName() {
1173       Map<String, List<HRegionInfo>> rsAndRMap = new HashMap<String, List<HRegionInfo>>();
1174       Table table = null;
1175       RegionLocator regionLocator = null;
1176       try {
1177         HTableDescriptor[] tableDescs = this.admin.listTables();
1178         List<HRegionInfo> regions = null;
1179         for (HTableDescriptor tableDesc : tableDescs) {
1180           table = this.admin.getConnection().getTable(tableDesc.getTableName());
1181           regionLocator = this.admin.getConnection().getRegionLocator(tableDesc.getTableName());
1182 
1183           for (HRegionLocation location : regionLocator.getAllRegionLocations()) {
1184             ServerName rs = location.getServerName();
1185             String rsName = rs.getHostname();
1186             HRegionInfo r = location.getRegionInfo();
1187 
1188             if (rsAndRMap.containsKey(rsName)) {
1189               regions = rsAndRMap.get(rsName);
1190             } else {
1191               regions = new ArrayList<HRegionInfo>();
1192               rsAndRMap.put(rsName, regions);
1193             }
1194             regions.add(r);
1195           }
1196           table.close();
1197         }
1198 
1199         //get any live regionservers not serving any regions
1200         for (ServerName rs : this.admin.getClusterStatus().getServers()) {
1201           String rsName = rs.getHostname();
1202           if (!rsAndRMap.containsKey(rsName)) {
1203             rsAndRMap.put(rsName, Collections.<HRegionInfo>emptyList());
1204           }
1205         }
1206       } catch (IOException e) {
1207         String msg = "Get HTables info failed";
1208         LOG.error(msg, e);
1209         this.errorCode = INIT_ERROR_EXIT_CODE;
1210       } finally {
1211         if (table != null) {
1212           try {
1213             table.close();
1214           } catch (IOException e) {
1215             LOG.warn("Close table failed", e);
1216           }
1217         }
1218       }
1219 
1220       return rsAndRMap;
1221     }
1222 
1223     private Map<String, List<HRegionInfo>> doFilterRegionServerByName(
1224         Map<String, List<HRegionInfo>> fullRsAndRMap) {
1225 
1226       Map<String, List<HRegionInfo>> filteredRsAndRMap = null;
1227 
1228       if (this.targets != null && this.targets.length > 0) {
1229         filteredRsAndRMap = new HashMap<String, List<HRegionInfo>>();
1230         Pattern pattern = null;
1231         Matcher matcher = null;
1232         boolean regExpFound = false;
1233         for (String rsName : this.targets) {
1234           if (this.useRegExp) {
1235             regExpFound = false;
1236             pattern = Pattern.compile(rsName);
1237             for (Map.Entry<String, List<HRegionInfo>> entry : fullRsAndRMap.entrySet()) {
1238               matcher = pattern.matcher(entry.getKey());
1239               if (matcher.matches()) {
1240                 filteredRsAndRMap.put(entry.getKey(), entry.getValue());
1241                 regExpFound = true;
1242               }
1243             }
1244             if (!regExpFound) {
1245               LOG.info("No RegionServerInfo found, regionServerPattern:" + rsName);
1246             }
1247           } else {
1248             if (fullRsAndRMap.containsKey(rsName)) {
1249               filteredRsAndRMap.put(rsName, fullRsAndRMap.get(rsName));
1250             } else {
1251               LOG.info("No RegionServerInfo found, regionServerName:" + rsName);
1252             }
1253           }
1254         }
1255       } else {
1256         filteredRsAndRMap = fullRsAndRMap;
1257       }
1258       return filteredRsAndRMap;
1259     }
1260   }
1261 
1262   public static void main(String[] args) throws Exception {
1263     final Configuration conf = HBaseConfiguration.create();
1264 
1265     // loading the generic options to conf
1266     new GenericOptionsParser(conf, args);
1267 
1268     int numThreads = conf.getInt("hbase.canary.threads.num", MAX_THREADS_NUM);
1269     LOG.info("Number of exection threads " + numThreads);
1270 
1271     ExecutorService executor = new ScheduledThreadPoolExecutor(numThreads);
1272 
1273     Class<? extends Sink> sinkClass =
1274         conf.getClass("hbase.canary.sink.class", RegionServerStdOutSink.class, Sink.class);
1275     Sink sink = ReflectionUtils.newInstance(sinkClass);
1276 
1277     int exitCode = ToolRunner.run(conf, new Canary(executor, sink), args);
1278     executor.shutdown();
1279     System.exit(exitCode);
1280   }
1281 }