View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.tool;
21  
22  import java.io.Closeable;
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Arrays;
26  import java.util.HashMap;
27  import java.util.HashSet;
28  import java.util.LinkedList;
29  import java.util.List;
30  import java.util.Map;
31  import java.util.Random;
32  import java.util.Set;
33  import java.util.TreeSet;
34  import java.util.concurrent.atomic.AtomicLong;
35  import java.util.concurrent.Callable;
36  import java.util.concurrent.ExecutionException;
37  import java.util.concurrent.ExecutorService;
38  import java.util.concurrent.Future;
39  import java.util.concurrent.ScheduledThreadPoolExecutor;
40  import java.util.regex.Matcher;
41  import java.util.regex.Pattern;
42  
43  import org.apache.commons.lang.time.StopWatch;
44  import org.apache.commons.logging.Log;
45  import org.apache.commons.logging.LogFactory;
46  import org.apache.hadoop.conf.Configuration;
47  import org.apache.hadoop.hbase.AuthUtil;
48  import org.apache.hadoop.hbase.ChoreService;
49  import org.apache.hadoop.hbase.DoNotRetryIOException;
50  import org.apache.hadoop.hbase.HBaseConfiguration;
51  import org.apache.hadoop.hbase.HColumnDescriptor;
52  import org.apache.hadoop.hbase.HConstants;
53  import org.apache.hadoop.hbase.HRegionInfo;
54  import org.apache.hadoop.hbase.HRegionLocation;
55  import org.apache.hadoop.hbase.HTableDescriptor;
56  import org.apache.hadoop.hbase.MetaTableAccessor;
57  import org.apache.hadoop.hbase.NamespaceDescriptor;
58  import org.apache.hadoop.hbase.ScheduledChore;
59  import org.apache.hadoop.hbase.ServerName;
60  import org.apache.hadoop.hbase.TableName;
61  import org.apache.hadoop.hbase.TableNotEnabledException;
62  import org.apache.hadoop.hbase.TableNotFoundException;
63  import org.apache.hadoop.hbase.client.Admin;
64  import org.apache.hadoop.hbase.client.Connection;
65  import org.apache.hadoop.hbase.client.ConnectionFactory;
66  import org.apache.hadoop.hbase.client.Get;
67  import org.apache.hadoop.hbase.client.Put;
68  import org.apache.hadoop.hbase.client.RegionLocator;
69  import org.apache.hadoop.hbase.client.ResultScanner;
70  import org.apache.hadoop.hbase.client.Scan;
71  import org.apache.hadoop.hbase.client.Table;
72  import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
73  import org.apache.hadoop.hbase.tool.Canary.RegionTask.TaskType;
74  import org.apache.hadoop.hbase.util.Bytes;
75  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
76  import org.apache.hadoop.hbase.util.Pair;
77  import org.apache.hadoop.hbase.util.ReflectionUtils;
78  import org.apache.hadoop.hbase.util.RegionSplitter;
79  import org.apache.hadoop.util.GenericOptionsParser;
80  import org.apache.hadoop.util.Tool;
81  import org.apache.hadoop.util.ToolRunner;
82  
83  /**
84   * HBase Canary Tool, that that can be used to do
85   * "canary monitoring" of a running HBase cluster.
86   *
87   * Here are two modes
88   * 1. region mode - Foreach region tries to get one row per column family
89   * and outputs some information about failure or latency.
90   *
91   * 2. regionserver mode - Foreach regionserver tries to get one row from one table
92   * selected randomly and outputs some information about failure or latency.
93   */
94  public final class Canary implements Tool {
95    // Sink interface used by the canary to outputs information
96    public interface Sink {
97      public long getReadFailureCount();
98      public void publishReadFailure(HRegionInfo region, Exception e);
99      public void publishReadFailure(HRegionInfo region, HColumnDescriptor column, Exception e);
100     public void publishReadTiming(HRegionInfo region, HColumnDescriptor column, long msTime);
101     public long getWriteFailureCount();
102     public void publishWriteFailure(HRegionInfo region, Exception e);
103     public void publishWriteFailure(HRegionInfo region, HColumnDescriptor column, Exception e);
104     public void publishWriteTiming(HRegionInfo region, HColumnDescriptor column, long msTime);
105   }
106   // new extended sink for output regionserver mode info
107   // do not change the Sink interface directly due to maintaining the API
108   public interface ExtendedSink extends Sink {
109     public void publishReadFailure(String table, String server);
110     public void publishReadTiming(String table, String server, long msTime);
111   }
112 
113   // Simple implementation of canary sink that allows to plot on
114   // file or standard output timings or failures.
115   public static class StdOutSink implements Sink {
116     protected AtomicLong readFailureCount = new AtomicLong(0),
117         writeFailureCount = new AtomicLong(0);
118 
119     @Override
120     public long getReadFailureCount() {
121       return readFailureCount.get();
122     }
123 
124     @Override
125     public void publishReadFailure(HRegionInfo region, Exception e) {
126       readFailureCount.incrementAndGet();
127       LOG.error(String.format("read from region %s failed", region.getRegionNameAsString()), e);
128     }
129 
130     @Override
131     public void publishReadFailure(HRegionInfo region, HColumnDescriptor column, Exception e) {
132       readFailureCount.incrementAndGet();
133       LOG.error(String.format("read from region %s column family %s failed",
134                 region.getRegionNameAsString(), column.getNameAsString()), e);
135     }
136 
137     @Override
138     public void publishReadTiming(HRegionInfo region, HColumnDescriptor column, long msTime) {
139       LOG.info(String.format("read from region %s column family %s in %dms",
140                region.getRegionNameAsString(), column.getNameAsString(), msTime));
141     }
142 
143     @Override
144     public long getWriteFailureCount() {
145       return writeFailureCount.get();
146     }
147 
148     @Override
149     public void publishWriteFailure(HRegionInfo region, Exception e) {
150       writeFailureCount.incrementAndGet();
151       LOG.error(String.format("write to region %s failed", region.getRegionNameAsString()), e);
152     }
153 
154     @Override
155     public void publishWriteFailure(HRegionInfo region, HColumnDescriptor column, Exception e) {
156       writeFailureCount.incrementAndGet();
157       LOG.error(String.format("write to region %s column family %s failed",
158         region.getRegionNameAsString(), column.getNameAsString()), e);
159     }
160 
161     @Override
162     public void publishWriteTiming(HRegionInfo region, HColumnDescriptor column, long msTime) {
163       LOG.info(String.format("write to region %s column family %s in %dms",
164         region.getRegionNameAsString(), column.getNameAsString(), msTime));
165     }
166   }
167   // a ExtendedSink implementation
168   public static class RegionServerStdOutSink extends StdOutSink implements ExtendedSink {
169 
170     @Override
171     public void publishReadFailure(String table, String server) {
172       readFailureCount.incrementAndGet();
173       LOG.error(String.format("Read from table:%s on region server:%s", table, server));
174     }
175 
176     @Override
177     public void publishReadTiming(String table, String server, long msTime) {
178       LOG.info(String.format("Read from table:%s on region server:%s in %dms",
179           table, server, msTime));
180     }
181   }
182 
183   /**
184    * For each column family of the region tries to get one row and outputs the latency, or the
185    * failure.
186    */
187   static class RegionTask implements Callable<Void> {
188     public enum TaskType{
189       READ, WRITE
190     }
191     private Connection connection;
192     private HRegionInfo region;
193     private Sink sink;
194     private TaskType taskType;
195 
196     RegionTask(Connection connection, HRegionInfo region, Sink sink, TaskType taskType) {
197       this.connection = connection;
198       this.region = region;
199       this.sink = sink;
200       this.taskType = taskType;
201     }
202 
203     @Override
204     public Void call() {
205       switch (taskType) {
206       case READ:
207         return read();
208       case WRITE:
209         return write();
210       default:
211         return read();
212       }
213     }
214 
215     public Void read() {
216       Table table = null;
217       HTableDescriptor tableDesc = null;
218       try {
219         if (LOG.isDebugEnabled()) {
220           LOG.debug(String.format("reading table descriptor for table %s",
221             region.getTable()));
222         }
223         table = connection.getTable(region.getTable());
224         tableDesc = table.getTableDescriptor();
225       } catch (IOException e) {
226         LOG.debug("sniffRegion failed", e);
227         sink.publishReadFailure(region, e);
228         if (table != null) {
229           try {
230             table.close();
231           } catch (IOException ioe) {
232             LOG.error("Close table failed", e);
233           }
234         }
235         return null;
236       }
237 
238       byte[] startKey = null;
239       Get get = null;
240       Scan scan = null;
241       ResultScanner rs = null;
242       StopWatch stopWatch = new StopWatch();
243       for (HColumnDescriptor column : tableDesc.getColumnFamilies()) {
244         stopWatch.reset();
245         startKey = region.getStartKey();
246         // Can't do a get on empty start row so do a Scan of first element if any instead.
247         if (startKey.length > 0) {
248           get = new Get(startKey);
249           get.setCacheBlocks(false);
250           get.setFilter(new FirstKeyOnlyFilter());
251           get.addFamily(column.getName());
252         } else {
253           scan = new Scan();
254           scan.setRaw(true);
255           scan.setCaching(1);
256           scan.setCacheBlocks(false);
257           scan.setFilter(new FirstKeyOnlyFilter());
258           scan.addFamily(column.getName());
259           scan.setMaxResultSize(1L);
260           scan.setSmall(true);
261         }
262 
263         if (LOG.isDebugEnabled()) {
264           LOG.debug(String.format("reading from table %s region %s column family %s and key %s",
265             tableDesc.getTableName(), region.getRegionNameAsString(), column.getNameAsString(),
266             Bytes.toStringBinary(startKey)));
267         }
268         try {
269           stopWatch.start();
270           if (startKey.length > 0) {
271             table.get(get);
272           } else {
273             rs = table.getScanner(scan);
274             rs.next();
275           }
276           stopWatch.stop();
277           sink.publishReadTiming(region, column, stopWatch.getTime());
278         } catch (Exception e) {
279           sink.publishReadFailure(region, column, e);
280         } finally {
281           if (rs != null) {
282             rs.close();
283           }
284           scan = null;
285           get = null;
286           startKey = null;
287         }
288       }
289       try {
290         table.close();
291       } catch (IOException e) {
292         LOG.error("Close table failed", e);
293       }
294       return null;
295     }
296 
297     /**
298      * Check writes for the canary table
299      * @return
300      */
301     private Void write() {
302       Table table = null;
303       HTableDescriptor tableDesc = null;
304       try {
305         table = connection.getTable(region.getTable());
306         tableDesc = table.getTableDescriptor();
307         byte[] rowToCheck = region.getStartKey();
308         if (rowToCheck.length == 0) {
309           rowToCheck = new byte[]{0x0};
310         }
311         int writeValueSize =
312             connection.getConfiguration().getInt(HConstants.HBASE_CANARY_WRITE_VALUE_SIZE_KEY, 10);
313         for (HColumnDescriptor column : tableDesc.getColumnFamilies()) {
314           Put put = new Put(rowToCheck);
315           byte[] value = new byte[writeValueSize];
316           Bytes.random(value);
317           put.addColumn(column.getName(), HConstants.EMPTY_BYTE_ARRAY, value);
318 
319           if (LOG.isDebugEnabled()) {
320             LOG.debug(String.format("writing to table %s region %s column family %s and key %s",
321               tableDesc.getTableName(), region.getRegionNameAsString(), column.getNameAsString(),
322               Bytes.toStringBinary(rowToCheck)));
323           }
324           try {
325             long startTime = System.currentTimeMillis();
326             table.put(put);
327             long time = System.currentTimeMillis() - startTime;
328             sink.publishWriteTiming(region, column, time);
329           } catch (Exception e) {
330             sink.publishWriteFailure(region, column, e);
331           }
332         }
333         table.close();
334       } catch (IOException e) {
335         sink.publishWriteFailure(region, e);
336       }
337       return null;
338     }
339   }
340 
341   /**
342    * Get one row from a region on the regionserver and outputs the latency, or the failure.
343    */
344   static class RegionServerTask implements Callable<Void> {
345     private Connection connection;
346     private String serverName;
347     private HRegionInfo region;
348     private ExtendedSink sink;
349     private AtomicLong successes;
350 
351     RegionServerTask(Connection connection, String serverName, HRegionInfo region,
352         ExtendedSink sink, AtomicLong successes) {
353       this.connection = connection;
354       this.serverName = serverName;
355       this.region = region;
356       this.sink = sink;
357       this.successes = successes;
358     }
359 
360     @Override
361     public Void call() {
362       TableName tableName = null;
363       Table table = null;
364       Get get = null;
365       byte[] startKey = null;
366       Scan scan = null;
367       StopWatch stopWatch = new StopWatch();
368       // monitor one region on every region server
369       stopWatch.reset();
370       try {
371         tableName = region.getTable();
372         table = connection.getTable(tableName);
373         startKey = region.getStartKey();
374         // Can't do a get on empty start row so do a Scan of first element if any instead.
375         if (LOG.isDebugEnabled()) {
376           LOG.debug(String.format("reading from region server %s table %s region %s and key %s",
377             serverName, region.getTable(), region.getRegionNameAsString(),
378             Bytes.toStringBinary(startKey)));
379         }
380         if (startKey.length > 0) {
381           get = new Get(startKey);
382           get.setCacheBlocks(false);
383           get.setFilter(new FirstKeyOnlyFilter());
384           stopWatch.start();
385           table.get(get);
386           stopWatch.stop();
387         } else {
388           scan = new Scan();
389           scan.setCacheBlocks(false);
390           scan.setFilter(new FirstKeyOnlyFilter());
391           scan.setCaching(1);
392           scan.setMaxResultSize(1L);
393           scan.setSmall(true);
394           stopWatch.start();
395           ResultScanner s = table.getScanner(scan);
396           s.next();
397           s.close();
398           stopWatch.stop();
399         }
400         successes.incrementAndGet();
401         sink.publishReadTiming(tableName.getNameAsString(), serverName, stopWatch.getTime());
402       } catch (TableNotFoundException tnfe) {
403         LOG.error("Table may be deleted", tnfe);
404         // This is ignored because it doesn't imply that the regionserver is dead
405       } catch (TableNotEnabledException tnee) {
406         // This is considered a success since we got a response.
407         successes.incrementAndGet();
408         LOG.debug("The targeted table was disabled.  Assuming success.");
409       } catch (DoNotRetryIOException dnrioe) {
410         sink.publishReadFailure(tableName.getNameAsString(), serverName);
411         LOG.error(dnrioe);
412       } catch (IOException e) {
413         sink.publishReadFailure(tableName.getNameAsString(), serverName);
414         LOG.error(e);
415       } finally {
416         if (table != null) {
417           try {
418             table.close();
419           } catch (IOException e) {/* DO NOTHING */
420             LOG.error("Close table failed", e);
421           }
422         }
423         scan = null;
424         get = null;
425         startKey = null;
426       }
427       return null;
428     }
429   }
430 
431   private static final int USAGE_EXIT_CODE = 1;
432   private static final int INIT_ERROR_EXIT_CODE = 2;
433   private static final int TIMEOUT_ERROR_EXIT_CODE = 3;
434   private static final int ERROR_EXIT_CODE = 4;
435 
436   private static final long DEFAULT_INTERVAL = 6000;
437 
438   private static final long DEFAULT_TIMEOUT = 600000; // 10 mins
439   private static final int MAX_THREADS_NUM = 16; // #threads to contact regions
440 
441   private static final Log LOG = LogFactory.getLog(Canary.class);
442 
443   public static final TableName DEFAULT_WRITE_TABLE_NAME = TableName.valueOf(
444     NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "canary");
445 
446   private static final String CANARY_TABLE_FAMILY_NAME = "Test";
447 
448   private Configuration conf = null;
449   private long interval = 0;
450   private Sink sink = null;
451 
452   private boolean useRegExp;
453   private long timeout = DEFAULT_TIMEOUT;
454   private boolean failOnError = true;
455   private boolean regionServerMode = false;
456   private boolean regionServerAllRegions = false;
457   private boolean writeSniffing = false;
458   private boolean treatFailureAsError = false;
459   private TableName writeTableName = DEFAULT_WRITE_TABLE_NAME;
460 
461   private ExecutorService executor; // threads to retrieve data from regionservers
462 
463   public Canary() {
464     this(new ScheduledThreadPoolExecutor(1), new RegionServerStdOutSink());
465   }
466 
467   public Canary(ExecutorService executor, Sink sink) {
468     this.executor = executor;
469     this.sink = sink;
470   }
471 
472   @Override
473   public Configuration getConf() {
474     return conf;
475   }
476 
477   @Override
478   public void setConf(Configuration conf) {
479     this.conf = conf;
480   }
481 
482   private int parseArgs(String[] args) {
483     int index = -1;
484     // Process command line args
485     for (int i = 0; i < args.length; i++) {
486       String cmd = args[i];
487 
488       if (cmd.startsWith("-")) {
489         if (index >= 0) {
490           // command line args must be in the form: [opts] [table 1 [table 2 ...]]
491           System.err.println("Invalid command line options");
492           printUsageAndExit();
493         }
494 
495         if (cmd.equals("-help")) {
496           // user asked for help, print the help and quit.
497           printUsageAndExit();
498         } else if (cmd.equals("-daemon") && interval == 0) {
499           // user asked for daemon mode, set a default interval between checks
500           interval = DEFAULT_INTERVAL;
501         } else if (cmd.equals("-interval")) {
502           // user has specified an interval for canary breaths (-interval N)
503           i++;
504 
505           if (i == args.length) {
506             System.err.println("-interval needs a numeric value argument.");
507             printUsageAndExit();
508           }
509 
510           try {
511             interval = Long.parseLong(args[i]) * 1000;
512           } catch (NumberFormatException e) {
513             System.err.println("-interval needs a numeric value argument.");
514             printUsageAndExit();
515           }
516         } else if(cmd.equals("-regionserver")) {
517           this.regionServerMode = true;
518         } else if(cmd.equals("-allRegions")) {
519           this.regionServerAllRegions = true;
520         } else if(cmd.equals("-writeSniffing")) {
521           this.writeSniffing = true;
522         } else if(cmd.equals("-treatFailureAsError")) {
523           this.treatFailureAsError = true;
524         } else if (cmd.equals("-e")) {
525           this.useRegExp = true;
526         } else if (cmd.equals("-t")) {
527           i++;
528 
529           if (i == args.length) {
530             System.err.println("-t needs a numeric value argument.");
531             printUsageAndExit();
532           }
533 
534           try {
535             this.timeout = Long.parseLong(args[i]);
536           } catch (NumberFormatException e) {
537             System.err.println("-t needs a numeric value argument.");
538             printUsageAndExit();
539           }
540         } else if (cmd.equals("-writeTable")) {
541           i++;
542 
543           if (i == args.length) {
544             System.err.println("-writeTable needs a string value argument.");
545             printUsageAndExit();
546           }
547           this.writeTableName = TableName.valueOf(args[i]);
548         } else if (cmd.equals("-f")) {
549           i++;
550 
551           if (i == args.length) {
552             System.err
553                 .println("-f needs a boolean value argument (true|false).");
554             printUsageAndExit();
555           }
556 
557           this.failOnError = Boolean.parseBoolean(args[i]);
558         } else {
559           // no options match
560           System.err.println(cmd + " options is invalid.");
561           printUsageAndExit();
562         }
563       } else if (index < 0) {
564         // keep track of first table name specified by the user
565         index = i;
566       }
567     }
568     if (this.regionServerAllRegions && !this.regionServerMode) {
569       System.err.println("-allRegions can only be specified in regionserver mode.");
570       printUsageAndExit();
571     }
572     return index;
573   }
574 
575   @Override
576   public int run(String[] args) throws Exception {
577     int index = parseArgs(args);
578     ChoreService choreService = null;
579 
580     // Launches chore for refreshing kerberos credentials if security is enabled.
581     // Please see http://hbase.apache.org/book.html#_running_canary_in_a_kerberos_enabled_cluster
582     // for more details.
583     final ScheduledChore authChore = AuthUtil.getAuthChore(conf);
584     if (authChore != null) {
585       choreService = new ChoreService("CANARY_TOOL");
586       choreService.scheduleChore(authChore);
587     }
588 
589     // Start to prepare the stuffs
590     Monitor monitor = null;
591     Thread monitorThread = null;
592     long startTime = 0;
593     long currentTimeLength = 0;
594     // Get a connection to use in below.
595     try (Connection connection = ConnectionFactory.createConnection(this.conf)) {
596       do {
597         // Do monitor !!
598         try {
599           monitor = this.newMonitor(connection, index, args);
600           monitorThread = new Thread(monitor);
601           startTime = System.currentTimeMillis();
602           monitorThread.start();
603           while (!monitor.isDone()) {
604             // wait for 1 sec
605             Thread.sleep(1000);
606             // exit if any error occurs
607             if (this.failOnError && monitor.hasError()) {
608               monitorThread.interrupt();
609               if (monitor.initialized) {
610                 return monitor.errorCode;
611               } else {
612                 return INIT_ERROR_EXIT_CODE;
613               }
614             }
615             currentTimeLength = System.currentTimeMillis() - startTime;
616             if (currentTimeLength > this.timeout) {
617               LOG.error("The monitor is running too long (" + currentTimeLength
618                   + ") after timeout limit:" + this.timeout
619                   + " will be killed itself !!");
620               if (monitor.initialized) {
621                 return TIMEOUT_ERROR_EXIT_CODE;
622               } else {
623                 return INIT_ERROR_EXIT_CODE;
624               }
625             }
626           }
627 
628           if (this.failOnError && monitor.finalCheckForErrors()) {
629             monitorThread.interrupt();
630             return monitor.errorCode;
631           }
632         } finally {
633           if (monitor != null) monitor.close();
634         }
635 
636         Thread.sleep(interval);
637       } while (interval > 0);
638     } // try-with-resources close
639 
640     if (choreService != null) {
641       choreService.shutdown();
642     }
643     return monitor.errorCode;
644   }
645 
646   private void printUsageAndExit() {
647     System.err.printf(
648       "Usage: bin/hbase %s [opts] [table1 [table2]...] | [regionserver1 [regionserver2]..]%n",
649         getClass().getName());
650     System.err.println(" where [opts] are:");
651     System.err.println("   -help          Show this help and exit.");
652     System.err.println("   -regionserver  replace the table argument to regionserver,");
653     System.err.println("      which means to enable regionserver mode");
654     System.err.println("   -allRegions    Tries all regions on a regionserver,");
655     System.err.println("      only works in regionserver mode.");
656     System.err.println("   -daemon        Continuous check at defined intervals.");
657     System.err.println("   -interval <N>  Interval between checks (sec)");
658     System.err.println("   -e             Use table/regionserver as regular expression");
659     System.err.println("      which means the table/regionserver is regular expression pattern");
660     System.err.println("   -f <B>         stop whole program if first error occurs," +
661         " default is true");
662     System.err.println("   -t <N>         timeout for a check, default is 600000 (milisecs)");
663     System.err.println("   -writeSniffing enable the write sniffing in canary");
664     System.err.println("   -treatFailureAsError treats read / write failure as error");
665     System.err.println("   -writeTable    The table used for write sniffing."
666         + " Default is hbase:canary");
667     System.err
668         .println("   -D<configProperty>=<value> assigning or override the configuration params");
669     System.exit(USAGE_EXIT_CODE);
670   }
671 
672   /**
673    * A Factory method for {@link Monitor}.
674    * Can be overridden by user.
675    * @param index a start index for monitor target
676    * @param args args passed from user
677    * @return a Monitor instance
678    */
679   public Monitor newMonitor(final Connection connection, int index, String[] args) {
680     Monitor monitor = null;
681     String[] monitorTargets = null;
682 
683     if(index >= 0) {
684       int length = args.length - index;
685       monitorTargets = new String[length];
686       System.arraycopy(args, index, monitorTargets, 0, length);
687     }
688 
689     if (this.regionServerMode) {
690       monitor =
691           new RegionServerMonitor(connection, monitorTargets, this.useRegExp,
692               (ExtendedSink) this.sink, this.executor, this.regionServerAllRegions,
693               this.treatFailureAsError);
694     } else {
695       monitor =
696           new RegionMonitor(connection, monitorTargets, this.useRegExp, this.sink, this.executor,
697               this.writeSniffing, this.writeTableName, this.treatFailureAsError);
698     }
699     return monitor;
700   }
701 
702   // a Monitor super-class can be extended by users
703   public static abstract class Monitor implements Runnable, Closeable {
704 
705     protected Connection connection;
706     protected Admin admin;
707     protected String[] targets;
708     protected boolean useRegExp;
709     protected boolean treatFailureAsError;
710     protected boolean initialized = false;
711 
712     protected boolean done = false;
713     protected int errorCode = 0;
714     protected Sink sink;
715     protected ExecutorService executor;
716 
717     public boolean isDone() {
718       return done;
719     }
720 
721     public boolean hasError() {
722       return errorCode != 0;
723     }
724 
725     public boolean finalCheckForErrors() {
726       if (errorCode != 0) {
727         return true;
728       }
729       return treatFailureAsError &&
730           (sink.getReadFailureCount() > 0 || sink.getWriteFailureCount() > 0);
731     }
732 
733     @Override
734     public void close() throws IOException {
735       if (this.admin != null) this.admin.close();
736     }
737 
738     protected Monitor(Connection connection, String[] monitorTargets, boolean useRegExp, Sink sink,
739         ExecutorService executor, boolean treatFailureAsError) {
740       if (null == connection) throw new IllegalArgumentException("connection shall not be null");
741 
742       this.connection = connection;
743       this.targets = monitorTargets;
744       this.useRegExp = useRegExp;
745       this.treatFailureAsError = treatFailureAsError;
746       this.sink = sink;
747       this.executor = executor;
748     }
749 
750     @Override
751     public abstract void run();
752 
753     protected boolean initAdmin() {
754       if (null == this.admin) {
755         try {
756           this.admin = this.connection.getAdmin();
757         } catch (Exception e) {
758           LOG.error("Initial HBaseAdmin failed...", e);
759           this.errorCode = INIT_ERROR_EXIT_CODE;
760         }
761       } else if (admin.isAborted()) {
762         LOG.error("HBaseAdmin aborted");
763         this.errorCode = INIT_ERROR_EXIT_CODE;
764       }
765       return !this.hasError();
766     }
767   }
768 
769   // a monitor for region mode
770   private static class RegionMonitor extends Monitor {
771     // 10 minutes
772     private static final int DEFAULT_WRITE_TABLE_CHECK_PERIOD = 10 * 60 * 1000;
773     // 1 days
774     private static final int DEFAULT_WRITE_DATA_TTL = 24 * 60 * 60;
775 
776     private long lastCheckTime = -1;
777     private boolean writeSniffing;
778     private TableName writeTableName;
779     private int writeDataTTL;
780     private float regionsLowerLimit;
781     private float regionsUpperLimit;
782     private int checkPeriod;
783 
784     public RegionMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
785         Sink sink, ExecutorService executor, boolean writeSniffing, TableName writeTableName,
786         boolean treatFailureAsError) {
787       super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError);
788       Configuration conf = connection.getConfiguration();
789       this.writeSniffing = writeSniffing;
790       this.writeTableName = writeTableName;
791       this.writeDataTTL =
792           conf.getInt(HConstants.HBASE_CANARY_WRITE_DATA_TTL_KEY, DEFAULT_WRITE_DATA_TTL);
793       this.regionsLowerLimit =
794           conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY, 1.0f);
795       this.regionsUpperLimit =
796           conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_UPPERLIMIT_KEY, 1.5f);
797       this.checkPeriod =
798           conf.getInt(HConstants.HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY,
799             DEFAULT_WRITE_TABLE_CHECK_PERIOD);
800     }
801 
802     @Override
803     public void run() {
804       if (this.initAdmin()) {
805         try {
806           List<Future<Void>> taskFutures = new LinkedList<Future<Void>>();
807           if (this.targets != null && this.targets.length > 0) {
808             String[] tables = generateMonitorTables(this.targets);
809             this.initialized = true;
810             for (String table : tables) {
811               taskFutures.addAll(Canary.sniff(admin, sink, table, executor, TaskType.READ));
812             }
813           } else {
814             taskFutures.addAll(sniff(TaskType.READ));
815           }
816 
817           if (writeSniffing) {
818             if (EnvironmentEdgeManager.currentTime() - lastCheckTime > checkPeriod) {
819               try {
820                 checkWriteTableDistribution();
821               } catch (IOException e) {
822                 LOG.error("Check canary table distribution failed!", e);
823               }
824               lastCheckTime = EnvironmentEdgeManager.currentTime();
825             }
826             // sniff canary table with write operation
827             taskFutures.addAll(Canary.sniff(admin, sink,
828               admin.getTableDescriptor(writeTableName), executor, TaskType.WRITE));
829           }
830 
831           for (Future<Void> future : taskFutures) {
832             try {
833               future.get();
834             } catch (ExecutionException e) {
835               LOG.error("Sniff region failed!", e);
836             }
837           }
838         } catch (Exception e) {
839           LOG.error("Run regionMonitor failed", e);
840           this.errorCode = ERROR_EXIT_CODE;
841         }
842       }
843       this.done = true;
844     }
845 
846     private String[] generateMonitorTables(String[] monitorTargets) throws IOException {
847       String[] returnTables = null;
848 
849       if (this.useRegExp) {
850         Pattern pattern = null;
851         HTableDescriptor[] tds = null;
852         Set<String> tmpTables = new TreeSet<String>();
853         try {
854           if (LOG.isDebugEnabled()) {
855             LOG.debug(String.format("reading list of tables"));
856           }
857           tds = this.admin.listTables(pattern);
858           if (tds == null) {
859             tds = new HTableDescriptor[0];
860           }
861           for (String monitorTarget : monitorTargets) {
862             pattern = Pattern.compile(monitorTarget);
863             for (HTableDescriptor td : tds) {
864               if (pattern.matcher(td.getNameAsString()).matches()) {
865                 tmpTables.add(td.getNameAsString());
866               }
867             }
868           }
869         } catch (IOException e) {
870           LOG.error("Communicate with admin failed", e);
871           throw e;
872         }
873 
874         if (tmpTables.size() > 0) {
875           returnTables = tmpTables.toArray(new String[tmpTables.size()]);
876         } else {
877           String msg = "No HTable found, tablePattern:" + Arrays.toString(monitorTargets);
878           LOG.error(msg);
879           this.errorCode = INIT_ERROR_EXIT_CODE;
880           throw new TableNotFoundException(msg);
881         }
882       } else {
883         returnTables = monitorTargets;
884       }
885 
886       return returnTables;
887     }
888 
889     /*
890      * canary entry point to monitor all the tables.
891      */
892     private List<Future<Void>> sniff(TaskType taskType) throws Exception {
893       if (LOG.isDebugEnabled()) {
894         LOG.debug(String.format("reading list of tables"));
895       }
896       List<Future<Void>> taskFutures = new LinkedList<Future<Void>>();
897       for (HTableDescriptor table : admin.listTables()) {
898         if (admin.isTableEnabled(table.getTableName())
899             && (!table.getTableName().equals(writeTableName))) {
900           taskFutures.addAll(Canary.sniff(admin, sink, table, executor, taskType));
901         }
902       }
903       return taskFutures;
904     }
905 
906     private void checkWriteTableDistribution() throws IOException {
907       if (!admin.tableExists(writeTableName)) {
908         int numberOfServers = admin.getClusterStatus().getServers().size();
909         if (numberOfServers == 0) {
910           throw new IllegalStateException("No live regionservers");
911         }
912         createWriteTable(numberOfServers);
913       }
914 
915       if (!admin.isTableEnabled(writeTableName)) {
916         admin.enableTable(writeTableName);
917       }
918 
919       int numberOfServers = admin.getClusterStatus().getServers().size();
920       List<Pair<HRegionInfo, ServerName>> pairs =
921           MetaTableAccessor.getTableRegionsAndLocations(connection, writeTableName);
922       int numberOfRegions = pairs.size();
923       if (numberOfRegions < numberOfServers * regionsLowerLimit
924           || numberOfRegions > numberOfServers * regionsUpperLimit) {
925         admin.disableTable(writeTableName);
926         admin.deleteTable(writeTableName);
927         createWriteTable(numberOfServers);
928       }
929       HashSet<ServerName> serverSet = new HashSet<ServerName>();
930       for (Pair<HRegionInfo, ServerName> pair : pairs) {
931         serverSet.add(pair.getSecond());
932       }
933       int numberOfCoveredServers = serverSet.size();
934       if (numberOfCoveredServers < numberOfServers) {
935         admin.balancer();
936       }
937     }
938 
939     private void createWriteTable(int numberOfServers) throws IOException {
940       int numberOfRegions = (int)(numberOfServers * regionsLowerLimit);
941       LOG.info("Number of live regionservers: " + numberOfServers + ", "
942           + "pre-splitting the canary table into " + numberOfRegions + " regions "
943           + "(current lower limit of regions per server is " + regionsLowerLimit
944           + " and you can change it by config: "
945           + HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY + " )");
946       HTableDescriptor desc = new HTableDescriptor(writeTableName);
947       HColumnDescriptor family = new HColumnDescriptor(CANARY_TABLE_FAMILY_NAME);
948       family.setMaxVersions(1);
949       family.setTimeToLive(writeDataTTL);
950 
951       desc.addFamily(family);
952       byte[][] splits = new RegionSplitter.HexStringSplit().split(numberOfRegions);
953       admin.createTable(desc, splits);
954     }
955   }
956 
957   /**
958    * Canary entry point for specified table.
959    * @throws Exception
960    */
961   public static void sniff(final Admin admin, TableName tableName)
962       throws Exception {
963     sniff(admin, tableName, TaskType.READ);
964   }
965 
966   /**
967    * Canary entry point for specified table with task type(read/write)
968    * @throws Exception
969    */
970   public static void sniff(final Admin admin, TableName tableName, TaskType taskType)
971       throws Exception {
972     List<Future<Void>> taskFutures =
973         Canary.sniff(admin, new StdOutSink(), tableName.getNameAsString(),
974           new ScheduledThreadPoolExecutor(1), taskType);
975     for (Future<Void> future : taskFutures) {
976       future.get();
977     }
978   }
979 
980   /**
981    * Canary entry point for specified table.
982    * @throws Exception
983    */
984   private static List<Future<Void>> sniff(final Admin admin, final Sink sink, String tableName,
985       ExecutorService executor, TaskType taskType) throws Exception {
986     if (LOG.isDebugEnabled()) {
987       LOG.debug(String.format("checking table is enabled and getting table descriptor for table %s",
988         tableName));
989     }
990     if (admin.isTableEnabled(TableName.valueOf(tableName))) {
991       return Canary.sniff(admin, sink, admin.getTableDescriptor(TableName.valueOf(tableName)),
992         executor, taskType);
993     } else {
994       LOG.warn(String.format("Table %s is not enabled", tableName));
995     }
996     return new LinkedList<Future<Void>>();
997   }
998 
999   /*
1000    * Loops over regions that owns this table, and output some information abouts the state.
1001    */
1002   private static List<Future<Void>> sniff(final Admin admin, final Sink sink,
1003       HTableDescriptor tableDesc, ExecutorService executor, TaskType taskType) throws Exception {
1004 
1005     if (LOG.isDebugEnabled()) {
1006       LOG.debug(String.format("reading list of regions for table %s", tableDesc.getTableName()));
1007     }
1008 
1009     Table table = null;
1010     try {
1011       table = admin.getConnection().getTable(tableDesc.getTableName());
1012     } catch (TableNotFoundException e) {
1013       return new ArrayList<Future<Void>>();
1014     }
1015     List<RegionTask> tasks = new ArrayList<RegionTask>();
1016     try {
1017       for (HRegionInfo region : admin.getTableRegions(tableDesc.getTableName())) {
1018         tasks.add(new RegionTask(admin.getConnection(), region, sink, taskType));
1019       }
1020     } finally {
1021       table.close();
1022     }
1023     return executor.invokeAll(tasks);
1024   }
1025   // a monitor for regionserver mode
1026   private static class RegionServerMonitor extends Monitor {
1027 
1028     private boolean allRegions;
1029 
1030     public RegionServerMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
1031         ExtendedSink sink, ExecutorService executor, boolean allRegions,
1032         boolean treatFailureAsError) {
1033       super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError);
1034       this.allRegions = allRegions;
1035     }
1036 
1037     private ExtendedSink getSink() {
1038       return (ExtendedSink) this.sink;
1039     }
1040 
1041     @Override
1042     public void run() {
1043       if (this.initAdmin() && this.checkNoTableNames()) {
1044         Map<String, List<HRegionInfo>> rsAndRMap = this.filterRegionServerByName();
1045         this.initialized = true;
1046         this.monitorRegionServers(rsAndRMap);
1047       }
1048       this.done = true;
1049     }
1050 
1051     private boolean checkNoTableNames() {
1052       List<String> foundTableNames = new ArrayList<String>();
1053       TableName[] tableNames = null;
1054 
1055       if (LOG.isDebugEnabled()) {
1056         LOG.debug(String.format("reading list of tables"));
1057       }
1058       try {
1059         tableNames = this.admin.listTableNames();
1060       } catch (IOException e) {
1061         LOG.error("Get listTableNames failed", e);
1062         this.errorCode = INIT_ERROR_EXIT_CODE;
1063         return false;
1064       }
1065 
1066       if (this.targets == null || this.targets.length == 0) return true;
1067 
1068       for (String target : this.targets) {
1069         for (TableName tableName : tableNames) {
1070           if (target.equals(tableName.getNameAsString())) {
1071             foundTableNames.add(target);
1072           }
1073         }
1074       }
1075 
1076       if (foundTableNames.size() > 0) {
1077         System.err.println("Cannot pass a tablename when using the -regionserver " +
1078             "option, tablenames:" + foundTableNames.toString());
1079         this.errorCode = USAGE_EXIT_CODE;
1080       }
1081       return foundTableNames.size() == 0;
1082     }
1083 
1084     private void monitorRegionServers(Map<String, List<HRegionInfo>> rsAndRMap) {
1085       List<RegionServerTask> tasks = new ArrayList<RegionServerTask>();
1086       Map<String, AtomicLong> successMap = new HashMap<String, AtomicLong>();
1087       Random rand = new Random();
1088       for (Map.Entry<String, List<HRegionInfo>> entry : rsAndRMap.entrySet()) {
1089         String serverName = entry.getKey();
1090         AtomicLong successes = new AtomicLong(0);
1091         successMap.put(serverName, successes);
1092         if (this.allRegions) {
1093           for (HRegionInfo region : entry.getValue()) {
1094             tasks.add(new RegionServerTask(this.connection,
1095                 serverName,
1096                 region,
1097                 getSink(),
1098                 successes));
1099           }
1100         } else {
1101           // random select a region if flag not set
1102           HRegionInfo region = entry.getValue().get(rand.nextInt(entry.getValue().size()));
1103           tasks.add(new RegionServerTask(this.connection,
1104               serverName,
1105               region,
1106               getSink(),
1107               successes));
1108         }
1109       }
1110       try {
1111         for (Future<Void> future : this.executor.invokeAll(tasks)) {
1112           try {
1113             future.get();
1114           } catch (ExecutionException e) {
1115             LOG.error("Sniff regionserver failed!", e);
1116             this.errorCode = ERROR_EXIT_CODE;
1117           }
1118         }
1119         if (this.allRegions) {
1120           for (Map.Entry<String, List<HRegionInfo>> entry : rsAndRMap.entrySet()) {
1121             String serverName = entry.getKey();
1122             LOG.info("Successfully read " + successMap.get(serverName) + " regions out of "
1123                     + entry.getValue().size() + " on regionserver:" + serverName);
1124           }
1125         }
1126       } catch (InterruptedException e) {
1127         this.errorCode = ERROR_EXIT_CODE;
1128         LOG.error("Sniff regionserver interrupted!", e);
1129       }
1130     }
1131 
1132     private Map<String, List<HRegionInfo>> filterRegionServerByName() {
1133       Map<String, List<HRegionInfo>> regionServerAndRegionsMap = this.getAllRegionServerByName();
1134       regionServerAndRegionsMap = this.doFilterRegionServerByName(regionServerAndRegionsMap);
1135       return regionServerAndRegionsMap;
1136     }
1137 
1138     private Map<String, List<HRegionInfo>> getAllRegionServerByName() {
1139       Map<String, List<HRegionInfo>> rsAndRMap = new HashMap<String, List<HRegionInfo>>();
1140       Table table = null;
1141       RegionLocator regionLocator = null;
1142       try {
1143         if (LOG.isDebugEnabled()) {
1144           LOG.debug(String.format("reading list of tables and locations"));
1145         }
1146         HTableDescriptor[] tableDescs = this.admin.listTables();
1147         List<HRegionInfo> regions = null;
1148         for (HTableDescriptor tableDesc : tableDescs) {
1149           table = this.admin.getConnection().getTable(tableDesc.getTableName());
1150           regionLocator = this.admin.getConnection().getRegionLocator(tableDesc.getTableName());
1151 
1152           for (HRegionLocation location : regionLocator.getAllRegionLocations()) {
1153             ServerName rs = location.getServerName();
1154             String rsName = rs.getHostname();
1155             HRegionInfo r = location.getRegionInfo();
1156 
1157             if (rsAndRMap.containsKey(rsName)) {
1158               regions = rsAndRMap.get(rsName);
1159             } else {
1160               regions = new ArrayList<HRegionInfo>();
1161               rsAndRMap.put(rsName, regions);
1162             }
1163             regions.add(r);
1164           }
1165           table.close();
1166         }
1167 
1168       } catch (IOException e) {
1169         String msg = "Get HTables info failed";
1170         LOG.error(msg, e);
1171         this.errorCode = INIT_ERROR_EXIT_CODE;
1172       } finally {
1173         if (table != null) {
1174           try {
1175             table.close();
1176           } catch (IOException e) {
1177             LOG.warn("Close table failed", e);
1178           }
1179         }
1180       }
1181 
1182       return rsAndRMap;
1183     }
1184 
1185     private Map<String, List<HRegionInfo>> doFilterRegionServerByName(
1186         Map<String, List<HRegionInfo>> fullRsAndRMap) {
1187 
1188       Map<String, List<HRegionInfo>> filteredRsAndRMap = null;
1189 
1190       if (this.targets != null && this.targets.length > 0) {
1191         filteredRsAndRMap = new HashMap<String, List<HRegionInfo>>();
1192         Pattern pattern = null;
1193         Matcher matcher = null;
1194         boolean regExpFound = false;
1195         for (String rsName : this.targets) {
1196           if (this.useRegExp) {
1197             regExpFound = false;
1198             pattern = Pattern.compile(rsName);
1199             for (Map.Entry<String, List<HRegionInfo>> entry : fullRsAndRMap.entrySet()) {
1200               matcher = pattern.matcher(entry.getKey());
1201               if (matcher.matches()) {
1202                 filteredRsAndRMap.put(entry.getKey(), entry.getValue());
1203                 regExpFound = true;
1204               }
1205             }
1206             if (!regExpFound) {
1207               LOG.info("No RegionServerInfo found, regionServerPattern:" + rsName);
1208             }
1209           } else {
1210             if (fullRsAndRMap.containsKey(rsName)) {
1211               filteredRsAndRMap.put(rsName, fullRsAndRMap.get(rsName));
1212             } else {
1213               LOG.info("No RegionServerInfo found, regionServerName:" + rsName);
1214             }
1215           }
1216         }
1217       } else {
1218         filteredRsAndRMap = fullRsAndRMap;
1219       }
1220       return filteredRsAndRMap;
1221     }
1222   }
1223 
1224   public static void main(String[] args) throws Exception {
1225     final Configuration conf = HBaseConfiguration.create();
1226 
1227     // loading the generic options to conf
1228     new GenericOptionsParser(conf, args);
1229 
1230     int numThreads = conf.getInt("hbase.canary.threads.num", MAX_THREADS_NUM);
1231     LOG.info("Number of exection threads " + numThreads);
1232 
1233     ExecutorService executor = new ScheduledThreadPoolExecutor(numThreads);
1234 
1235     Class<? extends Sink> sinkClass =
1236         conf.getClass("hbase.canary.sink.class", RegionServerStdOutSink.class, Sink.class);
1237     Sink sink = ReflectionUtils.newInstance(sinkClass);
1238 
1239     int exitCode = ToolRunner.run(conf, new Canary(executor, sink), args);
1240     executor.shutdown();
1241     System.exit(exitCode);
1242   }
1243 }