View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.tool;
21  
22  import java.io.Closeable;
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Arrays;
26  import java.util.HashMap;
27  import java.util.HashSet;
28  import java.util.LinkedList;
29  import java.util.List;
30  import java.util.Map;
31  import java.util.Random;
32  import java.util.Set;
33  import java.util.TreeSet;
34  import java.util.concurrent.atomic.AtomicLong;
35  import java.util.concurrent.Callable;
36  import java.util.concurrent.ExecutionException;
37  import java.util.concurrent.ExecutorService;
38  import java.util.concurrent.Future;
39  import java.util.concurrent.ScheduledThreadPoolExecutor;
40  import java.util.regex.Matcher;
41  import java.util.regex.Pattern;
42  
43  import org.apache.commons.lang.time.StopWatch;
44  import org.apache.commons.logging.Log;
45  import org.apache.commons.logging.LogFactory;
46  import org.apache.hadoop.conf.Configuration;
47  import org.apache.hadoop.hbase.AuthUtil;
48  import org.apache.hadoop.hbase.ChoreService;
49  import org.apache.hadoop.hbase.ClusterStatus;
50  import org.apache.hadoop.hbase.DoNotRetryIOException;
51  import org.apache.hadoop.hbase.HBaseConfiguration;
52  import org.apache.hadoop.hbase.HColumnDescriptor;
53  import org.apache.hadoop.hbase.HConstants;
54  import org.apache.hadoop.hbase.HRegionInfo;
55  import org.apache.hadoop.hbase.HRegionLocation;
56  import org.apache.hadoop.hbase.HTableDescriptor;
57  import org.apache.hadoop.hbase.MetaTableAccessor;
58  import org.apache.hadoop.hbase.NamespaceDescriptor;
59  import org.apache.hadoop.hbase.ScheduledChore;
60  import org.apache.hadoop.hbase.ServerName;
61  import org.apache.hadoop.hbase.TableName;
62  import org.apache.hadoop.hbase.TableNotEnabledException;
63  import org.apache.hadoop.hbase.TableNotFoundException;
64  import org.apache.hadoop.hbase.client.Admin;
65  import org.apache.hadoop.hbase.client.Connection;
66  import org.apache.hadoop.hbase.client.ConnectionFactory;
67  import org.apache.hadoop.hbase.client.Get;
68  import org.apache.hadoop.hbase.client.Put;
69  import org.apache.hadoop.hbase.client.RegionLocator;
70  import org.apache.hadoop.hbase.client.ResultScanner;
71  import org.apache.hadoop.hbase.client.Scan;
72  import org.apache.hadoop.hbase.client.Table;
73  import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
74  import org.apache.hadoop.hbase.tool.Canary.RegionTask.TaskType;
75  import org.apache.hadoop.hbase.util.Bytes;
76  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
77  import org.apache.hadoop.hbase.util.Pair;
78  import org.apache.hadoop.hbase.util.ReflectionUtils;
79  import org.apache.hadoop.hbase.util.RegionSplitter;
80  import org.apache.hadoop.util.GenericOptionsParser;
81  import org.apache.hadoop.util.Tool;
82  import org.apache.hadoop.util.ToolRunner;
83  
84  /**
85   * HBase Canary Tool, that that can be used to do
86   * "canary monitoring" of a running HBase cluster.
87   *
88   * Here are two modes
89   * 1. region mode - Foreach region tries to get one row per column family
90   * and outputs some information about failure or latency.
91   *
92   * 2. regionserver mode - Foreach regionserver tries to get one row from one table
93   * selected randomly and outputs some information about failure or latency.
94   */
95  public final class Canary implements Tool {
96    // Sink interface used by the canary to outputs information
97    public interface Sink {
98      public long getReadFailureCount();
99      public long incReadFailureCount();
100     public void publishReadFailure(HRegionInfo region, Exception e);
101     public void publishReadFailure(HRegionInfo region, HColumnDescriptor column, Exception e);
102     public void publishReadTiming(HRegionInfo region, HColumnDescriptor column, long msTime);
103     public long getWriteFailureCount();
104     public void publishWriteFailure(HRegionInfo region, Exception e);
105     public void publishWriteFailure(HRegionInfo region, HColumnDescriptor column, Exception e);
106     public void publishWriteTiming(HRegionInfo region, HColumnDescriptor column, long msTime);
107   }
108   // new extended sink for output regionserver mode info
109   // do not change the Sink interface directly due to maintaining the API
110   public interface ExtendedSink extends Sink {
111     public void publishReadFailure(String table, String server);
112     public void publishReadTiming(String table, String server, long msTime);
113   }
114 
115   // Simple implementation of canary sink that allows to plot on
116   // file or standard output timings or failures.
117   public static class StdOutSink implements Sink {
118     private AtomicLong readFailureCount = new AtomicLong(0),
119         writeFailureCount = new AtomicLong(0);
120 
121     @Override
122     public long getReadFailureCount() {
123       return readFailureCount.get();
124     }
125 
126     @Override
127     public long incReadFailureCount() {
128       return readFailureCount.incrementAndGet();
129     }
130 
131     @Override
132     public void publishReadFailure(HRegionInfo region, Exception e) {
133       readFailureCount.incrementAndGet();
134       LOG.error(String.format("read from region %s failed", region.getRegionNameAsString()), e);
135     }
136 
137     @Override
138     public void publishReadFailure(HRegionInfo region, HColumnDescriptor column, Exception e) {
139       readFailureCount.incrementAndGet();
140       LOG.error(String.format("read from region %s column family %s failed",
141                 region.getRegionNameAsString(), column.getNameAsString()), e);
142     }
143 
144     @Override
145     public void publishReadTiming(HRegionInfo region, HColumnDescriptor column, long msTime) {
146       LOG.info(String.format("read from region %s column family %s in %dms",
147                region.getRegionNameAsString(), column.getNameAsString(), msTime));
148     }
149 
150     @Override
151     public long getWriteFailureCount() {
152       return writeFailureCount.get();
153     }
154 
155     @Override
156     public void publishWriteFailure(HRegionInfo region, Exception e) {
157       writeFailureCount.incrementAndGet();
158       LOG.error(String.format("write to region %s failed", region.getRegionNameAsString()), e);
159     }
160 
161     @Override
162     public void publishWriteFailure(HRegionInfo region, HColumnDescriptor column, Exception e) {
163       writeFailureCount.incrementAndGet();
164       LOG.error(String.format("write to region %s column family %s failed",
165         region.getRegionNameAsString(), column.getNameAsString()), e);
166     }
167 
168     @Override
169     public void publishWriteTiming(HRegionInfo region, HColumnDescriptor column, long msTime) {
170       LOG.info(String.format("write to region %s column family %s in %dms",
171         region.getRegionNameAsString(), column.getNameAsString(), msTime));
172     }
173   }
174   // a ExtendedSink implementation
175   public static class RegionServerStdOutSink extends StdOutSink implements ExtendedSink {
176 
177     @Override
178     public void publishReadFailure(String table, String server) {
179       incReadFailureCount();
180       LOG.error(String.format("Read from table:%s on region server:%s", table, server));
181     }
182 
183     @Override
184     public void publishReadTiming(String table, String server, long msTime) {
185       LOG.info(String.format("Read from table:%s on region server:%s in %dms",
186           table, server, msTime));
187     }
188   }
189 
190   /**
191    * For each column family of the region tries to get one row and outputs the latency, or the
192    * failure.
193    */
194   static class RegionTask implements Callable<Void> {
195     public enum TaskType{
196       READ, WRITE
197     }
198     private Connection connection;
199     private HRegionInfo region;
200     private Sink sink;
201     private TaskType taskType;
202 
203     RegionTask(Connection connection, HRegionInfo region, Sink sink, TaskType taskType) {
204       this.connection = connection;
205       this.region = region;
206       this.sink = sink;
207       this.taskType = taskType;
208     }
209 
210     @Override
211     public Void call() {
212       switch (taskType) {
213       case READ:
214         return read();
215       case WRITE:
216         return write();
217       default:
218         return read();
219       }
220     }
221 
222     public Void read() {
223       Table table = null;
224       HTableDescriptor tableDesc = null;
225       try {
226         if (LOG.isDebugEnabled()) {
227           LOG.debug(String.format("reading table descriptor for table %s",
228             region.getTable()));
229         }
230         table = connection.getTable(region.getTable());
231         tableDesc = table.getTableDescriptor();
232       } catch (IOException e) {
233         LOG.debug("sniffRegion failed", e);
234         sink.publishReadFailure(region, e);
235         if (table != null) {
236           try {
237             table.close();
238           } catch (IOException ioe) {
239             LOG.error("Close table failed", e);
240           }
241         }
242         return null;
243       }
244 
245       byte[] startKey = null;
246       Get get = null;
247       Scan scan = null;
248       ResultScanner rs = null;
249       StopWatch stopWatch = new StopWatch();
250       for (HColumnDescriptor column : tableDesc.getColumnFamilies()) {
251         stopWatch.reset();
252         startKey = region.getStartKey();
253         // Can't do a get on empty start row so do a Scan of first element if any instead.
254         if (startKey.length > 0) {
255           get = new Get(startKey);
256           get.setCacheBlocks(false);
257           get.setFilter(new FirstKeyOnlyFilter());
258           get.addFamily(column.getName());
259         } else {
260           scan = new Scan();
261           scan.setRaw(true);
262           scan.setCaching(1);
263           scan.setCacheBlocks(false);
264           scan.setFilter(new FirstKeyOnlyFilter());
265           scan.addFamily(column.getName());
266           scan.setMaxResultSize(1L);
267           scan.setSmall(true);
268         }
269 
270         if (LOG.isDebugEnabled()) {
271           LOG.debug(String.format("reading from table %s region %s column family %s and key %s",
272             tableDesc.getTableName(), region.getRegionNameAsString(), column.getNameAsString(),
273             Bytes.toStringBinary(startKey)));
274         }
275         try {
276           stopWatch.start();
277           if (startKey.length > 0) {
278             table.get(get);
279           } else {
280             rs = table.getScanner(scan);
281             rs.next();
282           }
283           stopWatch.stop();
284           sink.publishReadTiming(region, column, stopWatch.getTime());
285         } catch (Exception e) {
286           sink.publishReadFailure(region, column, e);
287         } finally {
288           if (rs != null) {
289             rs.close();
290           }
291           scan = null;
292           get = null;
293           startKey = null;
294         }
295       }
296       try {
297         table.close();
298       } catch (IOException e) {
299         LOG.error("Close table failed", e);
300       }
301       return null;
302     }
303 
304     /**
305      * Check writes for the canary table
306      * @return
307      */
308     private Void write() {
309       Table table = null;
310       HTableDescriptor tableDesc = null;
311       try {
312         table = connection.getTable(region.getTable());
313         tableDesc = table.getTableDescriptor();
314         byte[] rowToCheck = region.getStartKey();
315         if (rowToCheck.length == 0) {
316           rowToCheck = new byte[]{0x0};
317         }
318         int writeValueSize =
319             connection.getConfiguration().getInt(HConstants.HBASE_CANARY_WRITE_VALUE_SIZE_KEY, 10);
320         for (HColumnDescriptor column : tableDesc.getColumnFamilies()) {
321           Put put = new Put(rowToCheck);
322           byte[] value = new byte[writeValueSize];
323           Bytes.random(value);
324           put.addColumn(column.getName(), HConstants.EMPTY_BYTE_ARRAY, value);
325 
326           if (LOG.isDebugEnabled()) {
327             LOG.debug(String.format("writing to table %s region %s column family %s and key %s",
328               tableDesc.getTableName(), region.getRegionNameAsString(), column.getNameAsString(),
329               Bytes.toStringBinary(rowToCheck)));
330           }
331           try {
332             long startTime = System.currentTimeMillis();
333             table.put(put);
334             long time = System.currentTimeMillis() - startTime;
335             sink.publishWriteTiming(region, column, time);
336           } catch (Exception e) {
337             sink.publishWriteFailure(region, column, e);
338           }
339         }
340         table.close();
341       } catch (IOException e) {
342         sink.publishWriteFailure(region, e);
343       }
344       return null;
345     }
346   }
347 
348   /**
349    * Get one row from a region on the regionserver and outputs the latency, or the failure.
350    */
351   static class RegionServerTask implements Callable<Void> {
352     private Connection connection;
353     private String serverName;
354     private HRegionInfo region;
355     private ExtendedSink sink;
356     private AtomicLong successes;
357 
358     RegionServerTask(Connection connection, String serverName, HRegionInfo region,
359         ExtendedSink sink, AtomicLong successes) {
360       this.connection = connection;
361       this.serverName = serverName;
362       this.region = region;
363       this.sink = sink;
364       this.successes = successes;
365     }
366 
367     @Override
368     public Void call() {
369       TableName tableName = null;
370       Table table = null;
371       Get get = null;
372       byte[] startKey = null;
373       Scan scan = null;
374       StopWatch stopWatch = new StopWatch();
375       // monitor one region on every region server
376       stopWatch.reset();
377       try {
378         tableName = region.getTable();
379         table = connection.getTable(tableName);
380         startKey = region.getStartKey();
381         // Can't do a get on empty start row so do a Scan of first element if any instead.
382         if (LOG.isDebugEnabled()) {
383           LOG.debug(String.format("reading from region server %s table %s region %s and key %s",
384             serverName, region.getTable(), region.getRegionNameAsString(),
385             Bytes.toStringBinary(startKey)));
386         }
387         if (startKey.length > 0) {
388           get = new Get(startKey);
389           get.setCacheBlocks(false);
390           get.setFilter(new FirstKeyOnlyFilter());
391           stopWatch.start();
392           table.get(get);
393           stopWatch.stop();
394         } else {
395           scan = new Scan();
396           scan.setCacheBlocks(false);
397           scan.setFilter(new FirstKeyOnlyFilter());
398           scan.setCaching(1);
399           scan.setMaxResultSize(1L);
400           scan.setSmall(true);
401           stopWatch.start();
402           ResultScanner s = table.getScanner(scan);
403           s.next();
404           s.close();
405           stopWatch.stop();
406         }
407         successes.incrementAndGet();
408         sink.publishReadTiming(tableName.getNameAsString(), serverName, stopWatch.getTime());
409       } catch (TableNotFoundException tnfe) {
410         LOG.error("Table may be deleted", tnfe);
411         // This is ignored because it doesn't imply that the regionserver is dead
412       } catch (TableNotEnabledException tnee) {
413         // This is considered a success since we got a response.
414         successes.incrementAndGet();
415         LOG.debug("The targeted table was disabled.  Assuming success.");
416       } catch (DoNotRetryIOException dnrioe) {
417         sink.publishReadFailure(tableName.getNameAsString(), serverName);
418         LOG.error(dnrioe);
419       } catch (IOException e) {
420         sink.publishReadFailure(tableName.getNameAsString(), serverName);
421         LOG.error(e);
422       } finally {
423         if (table != null) {
424           try {
425             table.close();
426           } catch (IOException e) {/* DO NOTHING */
427             LOG.error("Close table failed", e);
428           }
429         }
430         scan = null;
431         get = null;
432         startKey = null;
433       }
434       return null;
435     }
436   }
437 
438   private static final int USAGE_EXIT_CODE = 1;
439   private static final int INIT_ERROR_EXIT_CODE = 2;
440   private static final int TIMEOUT_ERROR_EXIT_CODE = 3;
441   private static final int ERROR_EXIT_CODE = 4;
442   private static final int FAILURE_EXIT_CODE = 5;
443 
444   private static final long DEFAULT_INTERVAL = 6000;
445 
446   private static final long DEFAULT_TIMEOUT = 600000; // 10 mins
447   private static final int MAX_THREADS_NUM = 16; // #threads to contact regions
448 
449   private static final Log LOG = LogFactory.getLog(Canary.class);
450 
451   public static final TableName DEFAULT_WRITE_TABLE_NAME = TableName.valueOf(
452     NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "canary");
453 
454   private static final String CANARY_TABLE_FAMILY_NAME = "Test";
455 
456   private Configuration conf = null;
457   private long interval = 0;
458   private Sink sink = null;
459 
460   private boolean useRegExp;
461   private long timeout = DEFAULT_TIMEOUT;
462   private boolean failOnError = true;
463   private boolean regionServerMode = false;
464   private boolean regionServerAllRegions = false;
465   private boolean writeSniffing = false;
466   private boolean treatFailureAsError = false;
467   private TableName writeTableName = DEFAULT_WRITE_TABLE_NAME;
468 
469   private ExecutorService executor; // threads to retrieve data from regionservers
470 
471   public Canary() {
472     this(new ScheduledThreadPoolExecutor(1), new RegionServerStdOutSink());
473   }
474 
475   public Canary(ExecutorService executor, Sink sink) {
476     this.executor = executor;
477     this.sink = sink;
478   }
479 
480   @Override
481   public Configuration getConf() {
482     return conf;
483   }
484 
485   @Override
486   public void setConf(Configuration conf) {
487     this.conf = conf;
488   }
489 
490   private int parseArgs(String[] args) {
491     int index = -1;
492     // Process command line args
493     for (int i = 0; i < args.length; i++) {
494       String cmd = args[i];
495 
496       if (cmd.startsWith("-")) {
497         if (index >= 0) {
498           // command line args must be in the form: [opts] [table 1 [table 2 ...]]
499           System.err.println("Invalid command line options");
500           printUsageAndExit();
501         }
502 
503         if (cmd.equals("-help")) {
504           // user asked for help, print the help and quit.
505           printUsageAndExit();
506         } else if (cmd.equals("-daemon") && interval == 0) {
507           // user asked for daemon mode, set a default interval between checks
508           interval = DEFAULT_INTERVAL;
509         } else if (cmd.equals("-interval")) {
510           // user has specified an interval for canary breaths (-interval N)
511           i++;
512 
513           if (i == args.length) {
514             System.err.println("-interval needs a numeric value argument.");
515             printUsageAndExit();
516           }
517 
518           try {
519             interval = Long.parseLong(args[i]) * 1000;
520           } catch (NumberFormatException e) {
521             System.err.println("-interval needs a numeric value argument.");
522             printUsageAndExit();
523           }
524         } else if(cmd.equals("-regionserver")) {
525           this.regionServerMode = true;
526         } else if(cmd.equals("-allRegions")) {
527           this.regionServerAllRegions = true;
528         } else if(cmd.equals("-writeSniffing")) {
529           this.writeSniffing = true;
530         } else if(cmd.equals("-treatFailureAsError")) {
531           this.treatFailureAsError = true;
532         } else if (cmd.equals("-e")) {
533           this.useRegExp = true;
534         } else if (cmd.equals("-t")) {
535           i++;
536 
537           if (i == args.length) {
538             System.err.println("-t needs a numeric value argument.");
539             printUsageAndExit();
540           }
541 
542           try {
543             this.timeout = Long.parseLong(args[i]);
544           } catch (NumberFormatException e) {
545             System.err.println("-t needs a numeric value argument.");
546             printUsageAndExit();
547           }
548         } else if (cmd.equals("-writeTable")) {
549           i++;
550 
551           if (i == args.length) {
552             System.err.println("-writeTable needs a string value argument.");
553             printUsageAndExit();
554           }
555           this.writeTableName = TableName.valueOf(args[i]);
556         } else if (cmd.equals("-f")) {
557           i++;
558 
559           if (i == args.length) {
560             System.err
561                 .println("-f needs a boolean value argument (true|false).");
562             printUsageAndExit();
563           }
564 
565           this.failOnError = Boolean.parseBoolean(args[i]);
566         } else {
567           // no options match
568           System.err.println(cmd + " options is invalid.");
569           printUsageAndExit();
570         }
571       } else if (index < 0) {
572         // keep track of first table name specified by the user
573         index = i;
574       }
575     }
576     if (this.regionServerAllRegions && !this.regionServerMode) {
577       System.err.println("-allRegions can only be specified in regionserver mode.");
578       printUsageAndExit();
579     }
580     return index;
581   }
582 
583   @Override
584   public int run(String[] args) throws Exception {
585     int index = parseArgs(args);
586     ChoreService choreService = null;
587 
588     // Launches chore for refreshing kerberos credentials if security is enabled.
589     // Please see http://hbase.apache.org/book.html#_running_canary_in_a_kerberos_enabled_cluster
590     // for more details.
591     final ScheduledChore authChore = AuthUtil.getAuthChore(conf);
592     if (authChore != null) {
593       choreService = new ChoreService("CANARY_TOOL");
594       choreService.scheduleChore(authChore);
595     }
596 
597     // Start to prepare the stuffs
598     Monitor monitor = null;
599     Thread monitorThread = null;
600     long startTime = 0;
601     long currentTimeLength = 0;
602     // Get a connection to use in below.
603     try (Connection connection = ConnectionFactory.createConnection(this.conf)) {
604       do {
605         // Do monitor !!
606         try {
607           monitor = this.newMonitor(connection, index, args);
608           monitorThread = new Thread(monitor);
609           startTime = System.currentTimeMillis();
610           monitorThread.start();
611           while (!monitor.isDone()) {
612             // wait for 1 sec
613             Thread.sleep(1000);
614             // exit if any error occurs
615             if (this.failOnError && monitor.hasError()) {
616               monitorThread.interrupt();
617               if (monitor.initialized) {
618                 return monitor.errorCode;
619               } else {
620                 return INIT_ERROR_EXIT_CODE;
621               }
622             }
623             currentTimeLength = System.currentTimeMillis() - startTime;
624             if (currentTimeLength > this.timeout) {
625               LOG.error("The monitor is running too long (" + currentTimeLength
626                   + ") after timeout limit:" + this.timeout
627                   + " will be killed itself !!");
628               if (monitor.initialized) {
629                 return TIMEOUT_ERROR_EXIT_CODE;
630               } else {
631                 return INIT_ERROR_EXIT_CODE;
632               }
633             }
634           }
635 
636           if (this.failOnError && monitor.finalCheckForErrors()) {
637             monitorThread.interrupt();
638             return monitor.errorCode;
639           }
640         } finally {
641           if (monitor != null) monitor.close();
642         }
643 
644         Thread.sleep(interval);
645       } while (interval > 0);
646     } // try-with-resources close
647 
648     if (choreService != null) {
649       choreService.shutdown();
650     }
651     return monitor.errorCode;
652   }
653 
654   private void printUsageAndExit() {
655     System.err.printf(
656       "Usage: bin/hbase %s [opts] [table1 [table2]...] | [regionserver1 [regionserver2]..]%n",
657         getClass().getName());
658     System.err.println(" where [opts] are:");
659     System.err.println("   -help          Show this help and exit.");
660     System.err.println("   -regionserver  replace the table argument to regionserver,");
661     System.err.println("      which means to enable regionserver mode");
662     System.err.println("   -allRegions    Tries all regions on a regionserver,");
663     System.err.println("      only works in regionserver mode.");
664     System.err.println("   -daemon        Continuous check at defined intervals.");
665     System.err.println("   -interval <N>  Interval between checks (sec)");
666     System.err.println("   -e             Use table/regionserver as regular expression");
667     System.err.println("      which means the table/regionserver is regular expression pattern");
668     System.err.println("   -f <B>         stop whole program if first error occurs," +
669         " default is true");
670     System.err.println("   -t <N>         timeout for a check, default is 600000 (milisecs)");
671     System.err.println("   -writeSniffing enable the write sniffing in canary");
672     System.err.println("   -treatFailureAsError treats read / write failure as error");
673     System.err.println("   -writeTable    The table used for write sniffing."
674         + " Default is hbase:canary");
675     System.err
676         .println("   -D<configProperty>=<value> assigning or override the configuration params");
677     System.exit(USAGE_EXIT_CODE);
678   }
679 
680   /**
681    * A Factory method for {@link Monitor}.
682    * Can be overridden by user.
683    * @param index a start index for monitor target
684    * @param args args passed from user
685    * @return a Monitor instance
686    */
687   public Monitor newMonitor(final Connection connection, int index, String[] args) {
688     Monitor monitor = null;
689     String[] monitorTargets = null;
690 
691     if(index >= 0) {
692       int length = args.length - index;
693       monitorTargets = new String[length];
694       System.arraycopy(args, index, monitorTargets, 0, length);
695     }
696 
697     if (this.regionServerMode) {
698       monitor =
699           new RegionServerMonitor(connection, monitorTargets, this.useRegExp,
700               (ExtendedSink) this.sink, this.executor, this.regionServerAllRegions,
701               this.treatFailureAsError);
702     } else {
703       monitor =
704           new RegionMonitor(connection, monitorTargets, this.useRegExp, this.sink, this.executor,
705               this.writeSniffing, this.writeTableName, this.treatFailureAsError);
706     }
707     return monitor;
708   }
709 
710   // a Monitor super-class can be extended by users
711   public static abstract class Monitor implements Runnable, Closeable {
712 
713     protected Connection connection;
714     protected Admin admin;
715     protected String[] targets;
716     protected boolean useRegExp;
717     protected boolean treatFailureAsError;
718     protected boolean initialized = false;
719 
720     protected boolean done = false;
721     protected int errorCode = 0;
722     protected Sink sink;
723     protected ExecutorService executor;
724 
725     public boolean isDone() {
726       return done;
727     }
728 
729     public boolean hasError() {
730       return errorCode != 0;
731     }
732 
733     public boolean finalCheckForErrors() {
734       if (errorCode != 0) {
735         return true;
736       }
737       if (treatFailureAsError &&
738           (sink.getReadFailureCount() > 0 || sink.getWriteFailureCount() > 0)) {
739         errorCode = FAILURE_EXIT_CODE;
740         return true;
741       }
742       return false;
743     }
744 
745     @Override
746     public void close() throws IOException {
747       if (this.admin != null) this.admin.close();
748     }
749 
750     protected Monitor(Connection connection, String[] monitorTargets, boolean useRegExp, Sink sink,
751         ExecutorService executor, boolean treatFailureAsError) {
752       if (null == connection) throw new IllegalArgumentException("connection shall not be null");
753 
754       this.connection = connection;
755       this.targets = monitorTargets;
756       this.useRegExp = useRegExp;
757       this.treatFailureAsError = treatFailureAsError;
758       this.sink = sink;
759       this.executor = executor;
760     }
761 
762     @Override
763     public abstract void run();
764 
765     protected boolean initAdmin() {
766       if (null == this.admin) {
767         try {
768           this.admin = this.connection.getAdmin();
769         } catch (Exception e) {
770           LOG.error("Initial HBaseAdmin failed...", e);
771           this.errorCode = INIT_ERROR_EXIT_CODE;
772         }
773       } else if (admin.isAborted()) {
774         LOG.error("HBaseAdmin aborted");
775         this.errorCode = INIT_ERROR_EXIT_CODE;
776       }
777       return !this.hasError();
778     }
779   }
780 
781   // a monitor for region mode
782   private static class RegionMonitor extends Monitor {
783     // 10 minutes
784     private static final int DEFAULT_WRITE_TABLE_CHECK_PERIOD = 10 * 60 * 1000;
785     // 1 days
786     private static final int DEFAULT_WRITE_DATA_TTL = 24 * 60 * 60;
787 
788     private long lastCheckTime = -1;
789     private boolean writeSniffing;
790     private TableName writeTableName;
791     private int writeDataTTL;
792     private float regionsLowerLimit;
793     private float regionsUpperLimit;
794     private int checkPeriod;
795 
796     public RegionMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
797         Sink sink, ExecutorService executor, boolean writeSniffing, TableName writeTableName,
798         boolean treatFailureAsError) {
799       super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError);
800       Configuration conf = connection.getConfiguration();
801       this.writeSniffing = writeSniffing;
802       this.writeTableName = writeTableName;
803       this.writeDataTTL =
804           conf.getInt(HConstants.HBASE_CANARY_WRITE_DATA_TTL_KEY, DEFAULT_WRITE_DATA_TTL);
805       this.regionsLowerLimit =
806           conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY, 1.0f);
807       this.regionsUpperLimit =
808           conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_UPPERLIMIT_KEY, 1.5f);
809       this.checkPeriod =
810           conf.getInt(HConstants.HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY,
811             DEFAULT_WRITE_TABLE_CHECK_PERIOD);
812     }
813 
814     @Override
815     public void run() {
816       if (this.initAdmin()) {
817         try {
818           List<Future<Void>> taskFutures = new LinkedList<Future<Void>>();
819           if (this.targets != null && this.targets.length > 0) {
820             String[] tables = generateMonitorTables(this.targets);
821             this.initialized = true;
822             for (String table : tables) {
823               taskFutures.addAll(Canary.sniff(admin, sink, table, executor, TaskType.READ));
824             }
825           } else {
826             taskFutures.addAll(sniff(TaskType.READ));
827           }
828 
829           if (writeSniffing) {
830             if (EnvironmentEdgeManager.currentTime() - lastCheckTime > checkPeriod) {
831               try {
832                 checkWriteTableDistribution();
833               } catch (IOException e) {
834                 LOG.error("Check canary table distribution failed!", e);
835               }
836               lastCheckTime = EnvironmentEdgeManager.currentTime();
837             }
838             // sniff canary table with write operation
839             taskFutures.addAll(Canary.sniff(admin, sink,
840               admin.getTableDescriptor(writeTableName), executor, TaskType.WRITE));
841           }
842 
843           for (Future<Void> future : taskFutures) {
844             try {
845               future.get();
846             } catch (ExecutionException e) {
847               LOG.error("Sniff region failed!", e);
848             }
849           }
850         } catch (Exception e) {
851           LOG.error("Run regionMonitor failed", e);
852           this.errorCode = ERROR_EXIT_CODE;
853         }
854       }
855       this.done = true;
856     }
857 
858     private String[] generateMonitorTables(String[] monitorTargets) throws IOException {
859       String[] returnTables = null;
860 
861       if (this.useRegExp) {
862         Pattern pattern = null;
863         HTableDescriptor[] tds = null;
864         Set<String> tmpTables = new TreeSet<String>();
865         try {
866           if (LOG.isDebugEnabled()) {
867             LOG.debug(String.format("reading list of tables"));
868           }
869           tds = this.admin.listTables(pattern);
870           if (tds == null) {
871             tds = new HTableDescriptor[0];
872           }
873           for (String monitorTarget : monitorTargets) {
874             pattern = Pattern.compile(monitorTarget);
875             for (HTableDescriptor td : tds) {
876               if (pattern.matcher(td.getNameAsString()).matches()) {
877                 tmpTables.add(td.getNameAsString());
878               }
879             }
880           }
881         } catch (IOException e) {
882           LOG.error("Communicate with admin failed", e);
883           throw e;
884         }
885 
886         if (tmpTables.size() > 0) {
887           returnTables = tmpTables.toArray(new String[tmpTables.size()]);
888         } else {
889           String msg = "No HTable found, tablePattern:" + Arrays.toString(monitorTargets);
890           LOG.error(msg);
891           this.errorCode = INIT_ERROR_EXIT_CODE;
892           throw new TableNotFoundException(msg);
893         }
894       } else {
895         returnTables = monitorTargets;
896       }
897 
898       return returnTables;
899     }
900 
901     /*
902      * canary entry point to monitor all the tables.
903      */
904     private List<Future<Void>> sniff(TaskType taskType) throws Exception {
905       if (LOG.isDebugEnabled()) {
906         LOG.debug(String.format("reading list of tables"));
907       }
908       List<Future<Void>> taskFutures = new LinkedList<Future<Void>>();
909       for (HTableDescriptor table : admin.listTables()) {
910         if (admin.isTableEnabled(table.getTableName())
911             && (!table.getTableName().equals(writeTableName))) {
912           taskFutures.addAll(Canary.sniff(admin, sink, table, executor, taskType));
913         }
914       }
915       return taskFutures;
916     }
917 
918     private void checkWriteTableDistribution() throws IOException {
919       if (!admin.tableExists(writeTableName)) {
920         int numberOfServers = admin.getClusterStatus().getServers().size();
921         if (numberOfServers == 0) {
922           throw new IllegalStateException("No live regionservers");
923         }
924         createWriteTable(numberOfServers);
925       }
926 
927       if (!admin.isTableEnabled(writeTableName)) {
928         admin.enableTable(writeTableName);
929       }
930 
931       ClusterStatus status = admin.getClusterStatus();
932       int numberOfServers = status.getServersSize();
933       if (status.getServers().contains(status.getMaster())) {
934         numberOfServers -= 1;
935       }
936 
937       List<Pair<HRegionInfo, ServerName>> pairs =
938           MetaTableAccessor.getTableRegionsAndLocations(connection, writeTableName);
939       int numberOfRegions = pairs.size();
940       if (numberOfRegions < numberOfServers * regionsLowerLimit
941           || numberOfRegions > numberOfServers * regionsUpperLimit) {
942         admin.disableTable(writeTableName);
943         admin.deleteTable(writeTableName);
944         createWriteTable(numberOfServers);
945       }
946       HashSet<ServerName> serverSet = new HashSet<ServerName>();
947       for (Pair<HRegionInfo, ServerName> pair : pairs) {
948         serverSet.add(pair.getSecond());
949       }
950       int numberOfCoveredServers = serverSet.size();
951       if (numberOfCoveredServers < numberOfServers) {
952         admin.balancer();
953       }
954     }
955 
956     private void createWriteTable(int numberOfServers) throws IOException {
957       int numberOfRegions = (int)(numberOfServers * regionsLowerLimit);
958       LOG.info("Number of live regionservers: " + numberOfServers + ", "
959           + "pre-splitting the canary table into " + numberOfRegions + " regions "
960           + "(current lower limit of regions per server is " + regionsLowerLimit
961           + " and you can change it by config: "
962           + HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY + " )");
963       HTableDescriptor desc = new HTableDescriptor(writeTableName);
964       HColumnDescriptor family = new HColumnDescriptor(CANARY_TABLE_FAMILY_NAME);
965       family.setMaxVersions(1);
966       family.setTimeToLive(writeDataTTL);
967 
968       desc.addFamily(family);
969       byte[][] splits = new RegionSplitter.HexStringSplit().split(numberOfRegions);
970       admin.createTable(desc, splits);
971     }
972   }
973 
974   /**
975    * Canary entry point for specified table.
976    * @throws Exception
977    */
978   public static void sniff(final Admin admin, TableName tableName)
979       throws Exception {
980     sniff(admin, tableName, TaskType.READ);
981   }
982 
983   /**
984    * Canary entry point for specified table with task type(read/write)
985    * @throws Exception
986    */
987   public static void sniff(final Admin admin, TableName tableName, TaskType taskType)
988       throws Exception {
989     List<Future<Void>> taskFutures =
990         Canary.sniff(admin, new StdOutSink(), tableName.getNameAsString(),
991           new ScheduledThreadPoolExecutor(1), taskType);
992     for (Future<Void> future : taskFutures) {
993       future.get();
994     }
995   }
996 
997   /**
998    * Canary entry point for specified table.
999    * @throws Exception
1000    */
1001   private static List<Future<Void>> sniff(final Admin admin, final Sink sink, String tableName,
1002       ExecutorService executor, TaskType taskType) throws Exception {
1003     if (LOG.isDebugEnabled()) {
1004       LOG.debug(String.format("checking table is enabled and getting table descriptor for table %s",
1005         tableName));
1006     }
1007     if (admin.isTableEnabled(TableName.valueOf(tableName))) {
1008       return Canary.sniff(admin, sink, admin.getTableDescriptor(TableName.valueOf(tableName)),
1009         executor, taskType);
1010     } else {
1011       LOG.warn(String.format("Table %s is not enabled", tableName));
1012     }
1013     return new LinkedList<Future<Void>>();
1014   }
1015 
1016   /*
1017    * Loops over regions that owns this table, and output some information abouts the state.
1018    */
1019   private static List<Future<Void>> sniff(final Admin admin, final Sink sink,
1020       HTableDescriptor tableDesc, ExecutorService executor, TaskType taskType) throws Exception {
1021 
1022     if (LOG.isDebugEnabled()) {
1023       LOG.debug(String.format("reading list of regions for table %s", tableDesc.getTableName()));
1024     }
1025 
1026     Table table = null;
1027     try {
1028       table = admin.getConnection().getTable(tableDesc.getTableName());
1029     } catch (TableNotFoundException e) {
1030       return new ArrayList<Future<Void>>();
1031     }
1032     List<RegionTask> tasks = new ArrayList<RegionTask>();
1033     try {
1034       for (HRegionInfo region : admin.getTableRegions(tableDesc.getTableName())) {
1035         tasks.add(new RegionTask(admin.getConnection(), region, sink, taskType));
1036       }
1037     } finally {
1038       table.close();
1039     }
1040     return executor.invokeAll(tasks);
1041   }
1042   // a monitor for regionserver mode
1043   private static class RegionServerMonitor extends Monitor {
1044 
1045     private boolean allRegions;
1046 
1047     public RegionServerMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
1048         ExtendedSink sink, ExecutorService executor, boolean allRegions,
1049         boolean treatFailureAsError) {
1050       super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError);
1051       this.allRegions = allRegions;
1052     }
1053 
1054     private ExtendedSink getSink() {
1055       return (ExtendedSink) this.sink;
1056     }
1057 
1058     @Override
1059     public void run() {
1060       if (this.initAdmin() && this.checkNoTableNames()) {
1061         Map<String, List<HRegionInfo>> rsAndRMap = this.filterRegionServerByName();
1062         this.initialized = true;
1063         this.monitorRegionServers(rsAndRMap);
1064       }
1065       this.done = true;
1066     }
1067 
1068     private boolean checkNoTableNames() {
1069       List<String> foundTableNames = new ArrayList<String>();
1070       TableName[] tableNames = null;
1071 
1072       if (LOG.isDebugEnabled()) {
1073         LOG.debug(String.format("reading list of tables"));
1074       }
1075       try {
1076         tableNames = this.admin.listTableNames();
1077       } catch (IOException e) {
1078         LOG.error("Get listTableNames failed", e);
1079         this.errorCode = INIT_ERROR_EXIT_CODE;
1080         return false;
1081       }
1082 
1083       if (this.targets == null || this.targets.length == 0) return true;
1084 
1085       for (String target : this.targets) {
1086         for (TableName tableName : tableNames) {
1087           if (target.equals(tableName.getNameAsString())) {
1088             foundTableNames.add(target);
1089           }
1090         }
1091       }
1092 
1093       if (foundTableNames.size() > 0) {
1094         System.err.println("Cannot pass a tablename when using the -regionserver " +
1095             "option, tablenames:" + foundTableNames.toString());
1096         this.errorCode = USAGE_EXIT_CODE;
1097       }
1098       return foundTableNames.size() == 0;
1099     }
1100 
1101     private void monitorRegionServers(Map<String, List<HRegionInfo>> rsAndRMap) {
1102       List<RegionServerTask> tasks = new ArrayList<RegionServerTask>();
1103       Map<String, AtomicLong> successMap = new HashMap<String, AtomicLong>();
1104       Random rand = new Random();
1105       for (Map.Entry<String, List<HRegionInfo>> entry : rsAndRMap.entrySet()) {
1106         String serverName = entry.getKey();
1107         AtomicLong successes = new AtomicLong(0);
1108         successMap.put(serverName, successes);
1109         if (this.allRegions) {
1110           for (HRegionInfo region : entry.getValue()) {
1111             tasks.add(new RegionServerTask(this.connection,
1112                 serverName,
1113                 region,
1114                 getSink(),
1115                 successes));
1116           }
1117         } else {
1118           // random select a region if flag not set
1119           HRegionInfo region = entry.getValue().get(rand.nextInt(entry.getValue().size()));
1120           tasks.add(new RegionServerTask(this.connection,
1121               serverName,
1122               region,
1123               getSink(),
1124               successes));
1125         }
1126       }
1127       try {
1128         for (Future<Void> future : this.executor.invokeAll(tasks)) {
1129           try {
1130             future.get();
1131           } catch (ExecutionException e) {
1132             LOG.error("Sniff regionserver failed!", e);
1133             this.errorCode = ERROR_EXIT_CODE;
1134           }
1135         }
1136         if (this.allRegions) {
1137           for (Map.Entry<String, List<HRegionInfo>> entry : rsAndRMap.entrySet()) {
1138             String serverName = entry.getKey();
1139             LOG.info("Successfully read " + successMap.get(serverName) + " regions out of "
1140                     + entry.getValue().size() + " on regionserver:" + serverName);
1141           }
1142         }
1143       } catch (InterruptedException e) {
1144         this.errorCode = ERROR_EXIT_CODE;
1145         LOG.error("Sniff regionserver interrupted!", e);
1146       }
1147     }
1148 
1149     private Map<String, List<HRegionInfo>> filterRegionServerByName() {
1150       Map<String, List<HRegionInfo>> regionServerAndRegionsMap = this.getAllRegionServerByName();
1151       regionServerAndRegionsMap = this.doFilterRegionServerByName(regionServerAndRegionsMap);
1152       return regionServerAndRegionsMap;
1153     }
1154 
1155     private Map<String, List<HRegionInfo>> getAllRegionServerByName() {
1156       Map<String, List<HRegionInfo>> rsAndRMap = new HashMap<String, List<HRegionInfo>>();
1157       Table table = null;
1158       RegionLocator regionLocator = null;
1159       try {
1160         if (LOG.isDebugEnabled()) {
1161           LOG.debug(String.format("reading list of tables and locations"));
1162         }
1163         HTableDescriptor[] tableDescs = this.admin.listTables();
1164         List<HRegionInfo> regions = null;
1165         for (HTableDescriptor tableDesc : tableDescs) {
1166           table = this.admin.getConnection().getTable(tableDesc.getTableName());
1167           regionLocator = this.admin.getConnection().getRegionLocator(tableDesc.getTableName());
1168 
1169           for (HRegionLocation location : regionLocator.getAllRegionLocations()) {
1170             ServerName rs = location.getServerName();
1171             String rsName = rs.getHostname();
1172             HRegionInfo r = location.getRegionInfo();
1173 
1174             if (rsAndRMap.containsKey(rsName)) {
1175               regions = rsAndRMap.get(rsName);
1176             } else {
1177               regions = new ArrayList<HRegionInfo>();
1178               rsAndRMap.put(rsName, regions);
1179             }
1180             regions.add(r);
1181           }
1182           table.close();
1183         }
1184 
1185       } catch (IOException e) {
1186         String msg = "Get HTables info failed";
1187         LOG.error(msg, e);
1188         this.errorCode = INIT_ERROR_EXIT_CODE;
1189       } finally {
1190         if (table != null) {
1191           try {
1192             table.close();
1193           } catch (IOException e) {
1194             LOG.warn("Close table failed", e);
1195           }
1196         }
1197       }
1198 
1199       return rsAndRMap;
1200     }
1201 
1202     private Map<String, List<HRegionInfo>> doFilterRegionServerByName(
1203         Map<String, List<HRegionInfo>> fullRsAndRMap) {
1204 
1205       Map<String, List<HRegionInfo>> filteredRsAndRMap = null;
1206 
1207       if (this.targets != null && this.targets.length > 0) {
1208         filteredRsAndRMap = new HashMap<String, List<HRegionInfo>>();
1209         Pattern pattern = null;
1210         Matcher matcher = null;
1211         boolean regExpFound = false;
1212         for (String rsName : this.targets) {
1213           if (this.useRegExp) {
1214             regExpFound = false;
1215             pattern = Pattern.compile(rsName);
1216             for (Map.Entry<String, List<HRegionInfo>> entry : fullRsAndRMap.entrySet()) {
1217               matcher = pattern.matcher(entry.getKey());
1218               if (matcher.matches()) {
1219                 filteredRsAndRMap.put(entry.getKey(), entry.getValue());
1220                 regExpFound = true;
1221               }
1222             }
1223             if (!regExpFound) {
1224               LOG.info("No RegionServerInfo found, regionServerPattern:" + rsName);
1225             }
1226           } else {
1227             if (fullRsAndRMap.containsKey(rsName)) {
1228               filteredRsAndRMap.put(rsName, fullRsAndRMap.get(rsName));
1229             } else {
1230               LOG.info("No RegionServerInfo found, regionServerName:" + rsName);
1231             }
1232           }
1233         }
1234       } else {
1235         filteredRsAndRMap = fullRsAndRMap;
1236       }
1237       return filteredRsAndRMap;
1238     }
1239   }
1240 
1241   public static void main(String[] args) throws Exception {
1242     final Configuration conf = HBaseConfiguration.create();
1243 
1244     // loading the generic options to conf
1245     new GenericOptionsParser(conf, args);
1246 
1247     int numThreads = conf.getInt("hbase.canary.threads.num", MAX_THREADS_NUM);
1248     LOG.info("Number of exection threads " + numThreads);
1249 
1250     ExecutorService executor = new ScheduledThreadPoolExecutor(numThreads);
1251 
1252     Class<? extends Sink> sinkClass =
1253         conf.getClass("hbase.canary.sink.class", RegionServerStdOutSink.class, Sink.class);
1254     Sink sink = ReflectionUtils.newInstance(sinkClass);
1255 
1256     int exitCode = ToolRunner.run(conf, new Canary(executor, sink), args);
1257     executor.shutdown();
1258     System.exit(exitCode);
1259   }
1260 }