View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.tool;
21  
22  import java.io.Closeable;
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Arrays;
26  import java.util.Collections;
27  import java.util.HashMap;
28  import java.util.HashSet;
29  import java.util.LinkedList;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.Random;
33  import java.util.Set;
34  import java.util.TreeSet;
35  import java.util.concurrent.atomic.AtomicLong;
36  import java.util.concurrent.Callable;
37  import java.util.concurrent.ExecutionException;
38  import java.util.concurrent.ExecutorService;
39  import java.util.concurrent.Future;
40  import java.util.concurrent.ScheduledThreadPoolExecutor;
41  import java.util.regex.Matcher;
42  import java.util.regex.Pattern;
43
44  import org.apache.commons.lang.time.StopWatch;
45  import org.apache.commons.logging.Log;
46  import org.apache.commons.logging.LogFactory;
47  import org.apache.hadoop.conf.Configuration;
48  import org.apache.hadoop.hbase.AuthUtil;
49  import org.apache.hadoop.hbase.ChoreService;
50  import org.apache.hadoop.hbase.ClusterStatus;
51  import org.apache.hadoop.hbase.DoNotRetryIOException;
52  import org.apache.hadoop.hbase.HBaseConfiguration;
53  import org.apache.hadoop.hbase.HColumnDescriptor;
54  import org.apache.hadoop.hbase.HConstants;
55  import org.apache.hadoop.hbase.HRegionInfo;
56  import org.apache.hadoop.hbase.HRegionLocation;
57  import org.apache.hadoop.hbase.HTableDescriptor;
58  import org.apache.hadoop.hbase.MetaTableAccessor;
59  import org.apache.hadoop.hbase.NamespaceDescriptor;
60  import org.apache.hadoop.hbase.ScheduledChore;
61  import org.apache.hadoop.hbase.ServerName;
62  import org.apache.hadoop.hbase.TableName;
63  import org.apache.hadoop.hbase.TableNotEnabledException;
64  import org.apache.hadoop.hbase.TableNotFoundException;
65  import org.apache.hadoop.hbase.client.Admin;
66  import org.apache.hadoop.hbase.client.Connection;
67  import org.apache.hadoop.hbase.client.ConnectionFactory;
68  import org.apache.hadoop.hbase.client.Get;
69  import org.apache.hadoop.hbase.client.Put;
70  import org.apache.hadoop.hbase.client.RegionLocator;
71  import org.apache.hadoop.hbase.client.ResultScanner;
72  import org.apache.hadoop.hbase.client.Scan;
73  import org.apache.hadoop.hbase.client.Table;
74  import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
75  import org.apache.hadoop.hbase.tool.Canary.RegionTask.TaskType;
76  import org.apache.hadoop.hbase.util.Bytes;
77  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
78  import org.apache.hadoop.hbase.util.Pair;
79  import org.apache.hadoop.hbase.util.ReflectionUtils;
80  import org.apache.hadoop.hbase.util.RegionSplitter;
81  import org.apache.hadoop.util.GenericOptionsParser;
82  import org.apache.hadoop.util.Tool;
83  import org.apache.hadoop.util.ToolRunner;
84
85  /**
86   * HBase Canary Tool, that that can be used to do
87   * "canary monitoring" of a running HBase cluster.
88   *
89   * Here are two modes
90   * 1. region mode - Foreach region tries to get one row per column family
91   * and outputs some information about failure or latency.
92   *
93   * 2. regionserver mode - Foreach regionserver tries to get one row from one table
94   * selected randomly and outputs some information about failure or latency.
95   */
96  public final class Canary implements Tool {
97    // Sink interface used by the canary to outputs information
98    public interface Sink {
99      public long getReadFailureCount();
100     public long incReadFailureCount();
101     public void publishReadFailure(HRegionInfo region, Exception e);
102     public void publishReadFailure(HRegionInfo region, HColumnDescriptor column, Exception e);
103     public void publishReadTiming(HRegionInfo region, HColumnDescriptor column, long msTime);
104     public long getWriteFailureCount();
105     public void publishWriteFailure(HRegionInfo region, Exception e);
106     public void publishWriteFailure(HRegionInfo region, HColumnDescriptor column, Exception e);
107     public void publishWriteTiming(HRegionInfo region, HColumnDescriptor column, long msTime);
108   }
109   // new extended sink for output regionserver mode info
110   // do not change the Sink interface directly due to maintaining the API
111   public interface ExtendedSink extends Sink {
112     public void publishReadFailure(String table, String server);
113     public void publishReadTiming(String table, String server, long msTime);
114   }
115
116   // Simple implementation of canary sink that allows to plot on
117   // file or standard output timings or failures.
118   public static class StdOutSink implements Sink {
119     private AtomicLong readFailureCount = new AtomicLong(0),
120         writeFailureCount = new AtomicLong(0);
121
122     @Override
123     public long getReadFailureCount() {
124       return readFailureCount.get();
125     }
126
127     @Override
128     public long incReadFailureCount() {
129       return readFailureCount.incrementAndGet();
130     }
131
132     @Override
133     public void publishReadFailure(HRegionInfo region, Exception e) {
134       readFailureCount.incrementAndGet();
135       LOG.error(String.format("read from region %s failed", region.getRegionNameAsString()), e);
136     }
137
138     @Override
139     public void publishReadFailure(HRegionInfo region, HColumnDescriptor column, Exception e) {
140       readFailureCount.incrementAndGet();
141       LOG.error(String.format("read from region %s column family %s failed",
142                 region.getRegionNameAsString(), column.getNameAsString()), e);
143     }
144
145     @Override
146     public void publishReadTiming(HRegionInfo region, HColumnDescriptor column, long msTime) {
147       LOG.info(String.format("read from region %s column family %s in %dms",
148                region.getRegionNameAsString(), column.getNameAsString(), msTime));
149     }
150
151     @Override
152     public long getWriteFailureCount() {
153       return writeFailureCount.get();
154     }
155
156     @Override
157     public void publishWriteFailure(HRegionInfo region, Exception e) {
158       writeFailureCount.incrementAndGet();
159       LOG.error(String.format("write to region %s failed", region.getRegionNameAsString()), e);
160     }
161
162     @Override
163     public void publishWriteFailure(HRegionInfo region, HColumnDescriptor column, Exception e) {
164       writeFailureCount.incrementAndGet();
165       LOG.error(String.format("write to region %s column family %s failed",
166         region.getRegionNameAsString(), column.getNameAsString()), e);
167     }
168
169     @Override
170     public void publishWriteTiming(HRegionInfo region, HColumnDescriptor column, long msTime) {
171       LOG.info(String.format("write to region %s column family %s in %dms",
172         region.getRegionNameAsString(), column.getNameAsString(), msTime));
173     }
174   }
175   // a ExtendedSink implementation
176   public static class RegionServerStdOutSink extends StdOutSink implements ExtendedSink {
177
178     @Override
179     public void publishReadFailure(String table, String server) {
180       incReadFailureCount();
181       LOG.error(String.format("Read from table:%s on region server:%s", table, server));
182     }
183
184     @Override
185     public void publishReadTiming(String table, String server, long msTime) {
186       LOG.info(String.format("Read from table:%s on region server:%s in %dms",
187           table, server, msTime));
188     }
189   }
190
191   /**
192    * For each column family of the region tries to get one row and outputs the latency, or the
193    * failure.
194    */
195   static class RegionTask implements Callable<Void> {
196     public enum TaskType{
197       READ, WRITE
198     }
199     private Connection connection;
200     private HRegionInfo region;
201     private Sink sink;
202     private TaskType taskType;
203
204     RegionTask(Connection connection, HRegionInfo region, Sink sink, TaskType taskType) {
205       this.connection = connection;
206       this.region = region;
207       this.sink = sink;
208       this.taskType = taskType;
209     }
210
211     @Override
212     public Void call() {
213       switch (taskType) {
214       case READ:
215         return read();
216       case WRITE:
217         return write();
218       default:
219         return read();
220       }
221     }
222
223     public Void read() {
224       Table table = null;
225       HTableDescriptor tableDesc = null;
226       try {
227         if (LOG.isDebugEnabled()) {
228           LOG.debug(String.format("reading table descriptor for table %s",
229             region.getTable()));
230         }
231         table = connection.getTable(region.getTable());
232         tableDesc = table.getTableDescriptor();
233       } catch (IOException e) {
234         LOG.debug("sniffRegion failed", e);
235         sink.publishReadFailure(region, e);
236         if (table != null) {
237           try {
238             table.close();
239           } catch (IOException ioe) {
240             LOG.error("Close table failed", e);
241           }
242         }
243         return null;
244       }
245
246       byte[] startKey = null;
247       Get get = null;
248       Scan scan = null;
249       ResultScanner rs = null;
250       StopWatch stopWatch = new StopWatch();
251       for (HColumnDescriptor column : tableDesc.getColumnFamilies()) {
252         stopWatch.reset();
253         startKey = region.getStartKey();
254         // Can't do a get on empty start row so do a Scan of first element if any instead.
255         if (startKey.length > 0) {
256           get = new Get(startKey);
257           get.setCacheBlocks(false);
258           get.setFilter(new FirstKeyOnlyFilter());
259           get.addFamily(column.getName());
260         } else {
261           scan = new Scan();
262           scan.setRaw(true);
263           scan.setCaching(1);
264           scan.setCacheBlocks(false);
265           scan.setFilter(new FirstKeyOnlyFilter());
266           scan.addFamily(column.getName());
267           scan.setMaxResultSize(1L);
268           scan.setSmall(true);
269         }
270
271         if (LOG.isDebugEnabled()) {
272           LOG.debug(String.format("reading from table %s region %s column family %s and key %s",
273             tableDesc.getTableName(), region.getRegionNameAsString(), column.getNameAsString(),
274             Bytes.toStringBinary(startKey)));
275         }
276         try {
277           stopWatch.start();
278           if (startKey.length > 0) {
279             table.get(get);
280           } else {
281             rs = table.getScanner(scan);
282             rs.next();
283           }
284           stopWatch.stop();
285           sink.publishReadTiming(region, column, stopWatch.getTime());
286         } catch (Exception e) {
287           sink.publishReadFailure(region, column, e);
288         } finally {
289           if (rs != null) {
290             rs.close();
291           }
292           scan = null;
293           get = null;
294           startKey = null;
295         }
296       }
297       try {
298         table.close();
299       } catch (IOException e) {
300         LOG.error("Close table failed", e);
301       }
302       return null;
303     }
304
305     /**
306      * Check writes for the canary table
307      * @return
308      */
309     private Void write() {
310       Table table = null;
311       HTableDescriptor tableDesc = null;
312       try {
313         table = connection.getTable(region.getTable());
314         tableDesc = table.getTableDescriptor();
315         byte[] rowToCheck = region.getStartKey();
316         if (rowToCheck.length == 0) {
317           rowToCheck = new byte[]{0x0};
318         }
319         int writeValueSize =
320             connection.getConfiguration().getInt(HConstants.HBASE_CANARY_WRITE_VALUE_SIZE_KEY, 10);
321         for (HColumnDescriptor column : tableDesc.getColumnFamilies()) {
322           Put put = new Put(rowToCheck);
323           byte[] value = new byte[writeValueSize];
324           Bytes.random(value);
325           put.addColumn(column.getName(), HConstants.EMPTY_BYTE_ARRAY, value);
326
327           if (LOG.isDebugEnabled()) {
328             LOG.debug(String.format("writing to table %s region %s column family %s and key %s",
329               tableDesc.getTableName(), region.getRegionNameAsString(), column.getNameAsString(),
330               Bytes.toStringBinary(rowToCheck)));
331           }
332           try {
333             long startTime = System.currentTimeMillis();
334             table.put(put);
335             long time = System.currentTimeMillis() - startTime;
336             sink.publishWriteTiming(region, column, time);
337           } catch (Exception e) {
338             sink.publishWriteFailure(region, column, e);
339           }
340         }
341         table.close();
342       } catch (IOException e) {
343         sink.publishWriteFailure(region, e);
344       }
345       return null;
346     }
347   }
348
349   /**
350    * Get one row from a region on the regionserver and outputs the latency, or the failure.
351    */
352   static class RegionServerTask implements Callable<Void> {
353     private Connection connection;
354     private String serverName;
355     private HRegionInfo region;
356     private ExtendedSink sink;
357     private AtomicLong successes;
358
359     RegionServerTask(Connection connection, String serverName, HRegionInfo region,
360         ExtendedSink sink, AtomicLong successes) {
361       this.connection = connection;
362       this.serverName = serverName;
363       this.region = region;
364       this.sink = sink;
365       this.successes = successes;
366     }
367
368     @Override
369     public Void call() {
370       TableName tableName = null;
371       Table table = null;
372       Get get = null;
373       byte[] startKey = null;
374       Scan scan = null;
375       StopWatch stopWatch = new StopWatch();
376       // monitor one region on every region server
377       stopWatch.reset();
378       try {
379         tableName = region.getTable();
380         table = connection.getTable(tableName);
381         startKey = region.getStartKey();
382         // Can't do a get on empty start row so do a Scan of first element if any instead.
383         if (LOG.isDebugEnabled()) {
384           LOG.debug(String.format("reading from region server %s table %s region %s and key %s",
385             serverName, region.getTable(), region.getRegionNameAsString(),
386             Bytes.toStringBinary(startKey)));
387         }
388         if (startKey.length > 0) {
389           get = new Get(startKey);
390           get.setCacheBlocks(false);
391           get.setFilter(new FirstKeyOnlyFilter());
392           stopWatch.start();
393           table.get(get);
394           stopWatch.stop();
395         } else {
396           scan = new Scan();
397           scan.setCacheBlocks(false);
398           scan.setFilter(new FirstKeyOnlyFilter());
399           scan.setCaching(1);
400           scan.setMaxResultSize(1L);
401           scan.setSmall(true);
402           stopWatch.start();
403           ResultScanner s = table.getScanner(scan);
404           s.next();
405           s.close();
406           stopWatch.stop();
407         }
408         successes.incrementAndGet();
409         sink.publishReadTiming(tableName.getNameAsString(), serverName, stopWatch.getTime());
410       } catch (TableNotFoundException tnfe) {
411         LOG.error("Table may be deleted", tnfe);
412         // This is ignored because it doesn't imply that the regionserver is dead
413       } catch (TableNotEnabledException tnee) {
414         // This is considered a success since we got a response.
415         successes.incrementAndGet();
416         LOG.debug("The targeted table was disabled.  Assuming success.");
417       } catch (DoNotRetryIOException dnrioe) {
418         sink.publishReadFailure(tableName.getNameAsString(), serverName);
419         LOG.error(dnrioe);
420       } catch (IOException e) {
421         sink.publishReadFailure(tableName.getNameAsString(), serverName);
422         LOG.error(e);
423       } finally {
424         if (table != null) {
425           try {
426             table.close();
427           } catch (IOException e) {/* DO NOTHING */
428             LOG.error("Close table failed", e);
429           }
430         }
431         scan = null;
432         get = null;
433         startKey = null;
434       }
435       return null;
436     }
437   }
438
439   private static final int USAGE_EXIT_CODE = 1;
440   private static final int INIT_ERROR_EXIT_CODE = 2;
441   private static final int TIMEOUT_ERROR_EXIT_CODE = 3;
442   private static final int ERROR_EXIT_CODE = 4;
443   private static final int FAILURE_EXIT_CODE = 5;
444
445   private static final long DEFAULT_INTERVAL = 6000;
446
447   private static final long DEFAULT_TIMEOUT = 600000; // 10 mins
448   private static final int MAX_THREADS_NUM = 16; // #threads to contact regions
449
450   private static final Log LOG = LogFactory.getLog(Canary.class);
451
452   public static final TableName DEFAULT_WRITE_TABLE_NAME = TableName.valueOf(
453     NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "canary");
454
455   private static final String CANARY_TABLE_FAMILY_NAME = "Test";
456
457   private Configuration conf = null;
458   private long interval = 0;
459   private Sink sink = null;
460
461   private boolean useRegExp;
462   private long timeout = DEFAULT_TIMEOUT;
463   private boolean failOnError = true;
464   private boolean regionServerMode = false;
465   private boolean regionServerAllRegions = false;
466   private boolean writeSniffing = false;
467   private boolean treatFailureAsError = false;
468   private TableName writeTableName = DEFAULT_WRITE_TABLE_NAME;
469
470   private ExecutorService executor; // threads to retrieve data from regionservers
471
472   public Canary() {
473     this(new ScheduledThreadPoolExecutor(1), new RegionServerStdOutSink());
474   }
475
476   public Canary(ExecutorService executor, Sink sink) {
477     this.executor = executor;
478     this.sink = sink;
479   }
480
481   @Override
482   public Configuration getConf() {
483     return conf;
484   }
485
486   @Override
487   public void setConf(Configuration conf) {
488     this.conf = conf;
489   }
490
491   private int parseArgs(String[] args) {
492     int index = -1;
493     // Process command line args
494     for (int i = 0; i < args.length; i++) {
495       String cmd = args[i];
496
497       if (cmd.startsWith("-")) {
498         if (index >= 0) {
499           // command line args must be in the form: [opts] [table 1 [table 2 ...]]
500           System.err.println("Invalid command line options");
501           printUsageAndExit();
502         }
503
504         if (cmd.equals("-help")) {
505           // user asked for help, print the help and quit.
506           printUsageAndExit();
507         } else if (cmd.equals("-daemon") && interval == 0) {
508           // user asked for daemon mode, set a default interval between checks
509           interval = DEFAULT_INTERVAL;
510         } else if (cmd.equals("-interval")) {
511           // user has specified an interval for canary breaths (-interval N)
512           i++;
513
514           if (i == args.length) {
515             System.err.println("-interval needs a numeric value argument.");
516             printUsageAndExit();
517           }
518
519           try {
520             interval = Long.parseLong(args[i]) * 1000;
521           } catch (NumberFormatException e) {
522             System.err.println("-interval needs a numeric value argument.");
523             printUsageAndExit();
524           }
525         } else if(cmd.equals("-regionserver")) {
526           this.regionServerMode = true;
527         } else if(cmd.equals("-allRegions")) {
528           this.regionServerAllRegions = true;
529         } else if(cmd.equals("-writeSniffing")) {
530           this.writeSniffing = true;
531         } else if(cmd.equals("-treatFailureAsError")) {
532           this.treatFailureAsError = true;
533         } else if (cmd.equals("-e")) {
534           this.useRegExp = true;
535         } else if (cmd.equals("-t")) {
536           i++;
537
538           if (i == args.length) {
539             System.err.println("-t needs a numeric value argument.");
540             printUsageAndExit();
541           }
542
543           try {
544             this.timeout = Long.parseLong(args[i]);
545           } catch (NumberFormatException e) {
546             System.err.println("-t needs a numeric value argument.");
547             printUsageAndExit();
548           }
549         } else if (cmd.equals("-writeTable")) {
550           i++;
551
552           if (i == args.length) {
553             System.err.println("-writeTable needs a string value argument.");
554             printUsageAndExit();
555           }
556           this.writeTableName = TableName.valueOf(args[i]);
557         } else if (cmd.equals("-f")) {
558           i++;
559
560           if (i == args.length) {
561             System.err
562                 .println("-f needs a boolean value argument (true|false).");
563             printUsageAndExit();
564           }
565
566           this.failOnError = Boolean.parseBoolean(args[i]);
567         } else {
568           // no options match
569           System.err.println(cmd + " options is invalid.");
570           printUsageAndExit();
571         }
572       } else if (index < 0) {
573         // keep track of first table name specified by the user
574         index = i;
575       }
576     }
577     if (this.regionServerAllRegions && !this.regionServerMode) {
578       System.err.println("-allRegions can only be specified in regionserver mode.");
579       printUsageAndExit();
580     }
581     return index;
582   }
583
584   @Override
585   public int run(String[] args) throws Exception {
586     int index = parseArgs(args);
587     ChoreService choreService = null;
588
589     // Launches chore for refreshing kerberos credentials if security is enabled.
590     // Please see http://hbase.apache.org/book.html#_running_canary_in_a_kerberos_enabled_cluster
591     // for more details.
592     final ScheduledChore authChore = AuthUtil.getAuthChore(conf);
593     if (authChore != null) {
594       choreService = new ChoreService("CANARY_TOOL");
595       choreService.scheduleChore(authChore);
596     }
597
598     // Start to prepare the stuffs
599     Monitor monitor = null;
600     Thread monitorThread = null;
601     long startTime = 0;
602     long currentTimeLength = 0;
603     // Get a connection to use in below.
604     try (Connection connection = ConnectionFactory.createConnection(this.conf)) {
605       do {
606         // Do monitor !!
607         try {
608           monitor = this.newMonitor(connection, index, args);
609           monitorThread = new Thread(monitor);
610           startTime = System.currentTimeMillis();
611           monitorThread.start();
612           while (!monitor.isDone()) {
613             // wait for 1 sec
614             Thread.sleep(1000);
615             // exit if any error occurs
616             if (this.failOnError && monitor.hasError()) {
617               monitorThread.interrupt();
618               if (monitor.initialized) {
619                 return monitor.errorCode;
620               } else {
621                 return INIT_ERROR_EXIT_CODE;
622               }
623             }
624             currentTimeLength = System.currentTimeMillis() - startTime;
625             if (currentTimeLength > this.timeout) {
626               LOG.error("The monitor is running too long (" + currentTimeLength
627                   + ") after timeout limit:" + this.timeout
628                   + " will be killed itself !!");
629               if (monitor.initialized) {
630                 return TIMEOUT_ERROR_EXIT_CODE;
631               } else {
632                 return INIT_ERROR_EXIT_CODE;
633               }
634             }
635           }
636
637           if (this.failOnError && monitor.finalCheckForErrors()) {
638             monitorThread.interrupt();
639             return monitor.errorCode;
640           }
641         } finally {
642           if (monitor != null) monitor.close();
643         }
644
645         Thread.sleep(interval);
646       } while (interval > 0);
647     } // try-with-resources close
648
649     if (choreService != null) {
650       choreService.shutdown();
651     }
652     return monitor.errorCode;
653   }
654
655   private void printUsageAndExit() {
656     System.err.printf(
657       "Usage: bin/hbase %s [opts] [table1 [table2]...] | [regionserver1 [regionserver2]..]%n",
658         getClass().getName());
659     System.err.println(" where [opts] are:");
660     System.err.println("   -help          Show this help and exit.");
661     System.err.println("   -regionserver  replace the table argument to regionserver,");
662     System.err.println("      which means to enable regionserver mode");
663     System.err.println("   -allRegions    Tries all regions on a regionserver,");
664     System.err.println("      only works in regionserver mode.");
665     System.err.println("   -daemon        Continuous check at defined intervals.");
666     System.err.println("   -interval <N>  Interval between checks (sec)");
667     System.err.println("   -e             Use table/regionserver as regular expression");
668     System.err.println("      which means the table/regionserver is regular expression pattern");
669     System.err.println("   -f <B>         stop whole program if first error occurs," +
670         " default is true");
671     System.err.println("   -t <N>         timeout for a check, default is 600000 (milisecs)");
672     System.err.println("   -writeSniffing enable the write sniffing in canary");
673     System.err.println("   -treatFailureAsError treats read / write failure as error");
674     System.err.println("   -writeTable    The table used for write sniffing."
675         + " Default is hbase:canary");
676     System.err
677         .println("   -D<configProperty>=<value> assigning or override the configuration params");
678     System.exit(USAGE_EXIT_CODE);
679   }
680
681   /**
682    * A Factory method for {@link Monitor}.
683    * Can be overridden by user.
684    * @param index a start index for monitor target
685    * @param args args passed from user
686    * @return a Monitor instance
687    */
688   public Monitor newMonitor(final Connection connection, int index, String[] args) {
689     Monitor monitor = null;
690     String[] monitorTargets = null;
691
692     if(index >= 0) {
693       int length = args.length - index;
694       monitorTargets = new String[length];
695       System.arraycopy(args, index, monitorTargets, 0, length);
696     }
697
698     if (this.regionServerMode) {
699       monitor =
700           new RegionServerMonitor(connection, monitorTargets, this.useRegExp,
701               (ExtendedSink) this.sink, this.executor, this.regionServerAllRegions,
702               this.treatFailureAsError);
703     } else {
704       monitor =
705           new RegionMonitor(connection, monitorTargets, this.useRegExp, this.sink, this.executor,
706               this.writeSniffing, this.writeTableName, this.treatFailureAsError);
707     }
708     return monitor;
709   }
710
711   // a Monitor super-class can be extended by users
712   public static abstract class Monitor implements Runnable, Closeable {
713
714     protected Connection connection;
715     protected Admin admin;
716     protected String[] targets;
717     protected boolean useRegExp;
718     protected boolean treatFailureAsError;
719     protected boolean initialized = false;
720
721     protected boolean done = false;
722     protected int errorCode = 0;
723     protected Sink sink;
724     protected ExecutorService executor;
725
726     public boolean isDone() {
727       return done;
728     }
729
730     public boolean hasError() {
731       return errorCode != 0;
732     }
733
734     public boolean finalCheckForErrors() {
735       if (errorCode != 0) {
736         return true;
737       }
738       if (treatFailureAsError &&
739           (sink.getReadFailureCount() > 0 || sink.getWriteFailureCount() > 0)) {
740         errorCode = FAILURE_EXIT_CODE;
741         return true;
742       }
743       return false;
744     }
745
746     @Override
747     public void close() throws IOException {
748       if (this.admin != null) this.admin.close();
749     }
750
751     protected Monitor(Connection connection, String[] monitorTargets, boolean useRegExp, Sink sink,
752         ExecutorService executor, boolean treatFailureAsError) {
753       if (null == connection) throw new IllegalArgumentException("connection shall not be null");
754
755       this.connection = connection;
756       this.targets = monitorTargets;
757       this.useRegExp = useRegExp;
758       this.treatFailureAsError = treatFailureAsError;
759       this.sink = sink;
760       this.executor = executor;
761     }
762
763     @Override
764     public abstract void run();
765
766     protected boolean initAdmin() {
767       if (null == this.admin) {
768         try {
769           this.admin = this.connection.getAdmin();
770         } catch (Exception e) {
771           LOG.error("Initial HBaseAdmin failed...", e);
772           this.errorCode = INIT_ERROR_EXIT_CODE;
773         }
774       } else if (admin.isAborted()) {
775         LOG.error("HBaseAdmin aborted");
776         this.errorCode = INIT_ERROR_EXIT_CODE;
777       }
778       return !this.hasError();
779     }
780   }
781
782   // a monitor for region mode
783   private static class RegionMonitor extends Monitor {
784     // 10 minutes
785     private static final int DEFAULT_WRITE_TABLE_CHECK_PERIOD = 10 * 60 * 1000;
786     // 1 days
787     private static final int DEFAULT_WRITE_DATA_TTL = 24 * 60 * 60;
788
789     private long lastCheckTime = -1;
790     private boolean writeSniffing;
791     private TableName writeTableName;
792     private int writeDataTTL;
793     private float regionsLowerLimit;
794     private float regionsUpperLimit;
795     private int checkPeriod;
796
797     public RegionMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
798         Sink sink, ExecutorService executor, boolean writeSniffing, TableName writeTableName,
799         boolean treatFailureAsError) {
800       super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError);
801       Configuration conf = connection.getConfiguration();
802       this.writeSniffing = writeSniffing;
803       this.writeTableName = writeTableName;
804       this.writeDataTTL =
805           conf.getInt(HConstants.HBASE_CANARY_WRITE_DATA_TTL_KEY, DEFAULT_WRITE_DATA_TTL);
806       this.regionsLowerLimit =
807           conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY, 1.0f);
808       this.regionsUpperLimit =
809           conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_UPPERLIMIT_KEY, 1.5f);
810       this.checkPeriod =
811           conf.getInt(HConstants.HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY,
812             DEFAULT_WRITE_TABLE_CHECK_PERIOD);
813     }
814
815     @Override
816     public void run() {
817       if (this.initAdmin()) {
818         try {
819           List<Future<Void>> taskFutures = new LinkedList<Future<Void>>();
820           if (this.targets != null && this.targets.length > 0) {
821             String[] tables = generateMonitorTables(this.targets);
822             this.initialized = true;
823             for (String table : tables) {
824               taskFutures.addAll(Canary.sniff(admin, sink, table, executor, TaskType.READ));
825             }
826           } else {
827             taskFutures.addAll(sniff(TaskType.READ));
828           }
829
830           if (writeSniffing) {
831             if (EnvironmentEdgeManager.currentTime() - lastCheckTime > checkPeriod) {
832               try {
833                 checkWriteTableDistribution();
834               } catch (IOException e) {
835                 LOG.error("Check canary table distribution failed!", e);
836               }
837               lastCheckTime = EnvironmentEdgeManager.currentTime();
838             }
839             // sniff canary table with write operation
840             taskFutures.addAll(Canary.sniff(admin, sink,
841               admin.getTableDescriptor(writeTableName), executor, TaskType.WRITE));
842           }
843
844           for (Future<Void> future : taskFutures) {
845             try {
846               future.get();
847             } catch (ExecutionException e) {
848               LOG.error("Sniff region failed!", e);
849             }
850           }
851         } catch (Exception e) {
852           LOG.error("Run regionMonitor failed", e);
853           this.errorCode = ERROR_EXIT_CODE;
854         }
855       }
856       this.done = true;
857     }
858
859     private String[] generateMonitorTables(String[] monitorTargets) throws IOException {
860       String[] returnTables = null;
861
862       if (this.useRegExp) {
863         Pattern pattern = null;
864         HTableDescriptor[] tds = null;
865         Set<String> tmpTables = new TreeSet<String>();
866         try {
867           if (LOG.isDebugEnabled()) {
868             LOG.debug(String.format("reading list of tables"));
869           }
870           tds = this.admin.listTables(pattern);
871           if (tds == null) {
872             tds = new HTableDescriptor[0];
873           }
874           for (String monitorTarget : monitorTargets) {
875             pattern = Pattern.compile(monitorTarget);
876             for (HTableDescriptor td : tds) {
877               if (pattern.matcher(td.getNameAsString()).matches()) {
878                 tmpTables.add(td.getNameAsString());
879               }
880             }
881           }
882         } catch (IOException e) {
883           LOG.error("Communicate with admin failed", e);
884           throw e;
885         }
886
887         if (tmpTables.size() > 0) {
888           returnTables = tmpTables.toArray(new String[tmpTables.size()]);
889         } else {
890           String msg = "No HTable found, tablePattern:" + Arrays.toString(monitorTargets);
891           LOG.error(msg);
892           this.errorCode = INIT_ERROR_EXIT_CODE;
893           throw new TableNotFoundException(msg);
894         }
895       } else {
896         returnTables = monitorTargets;
897       }
898
899       return returnTables;
900     }
901
902     /*
903      * canary entry point to monitor all the tables.
904      */
905     private List<Future<Void>> sniff(TaskType taskType) throws Exception {
906       if (LOG.isDebugEnabled()) {
907         LOG.debug(String.format("reading list of tables"));
908       }
909       List<Future<Void>> taskFutures = new LinkedList<Future<Void>>();
910       for (HTableDescriptor table : admin.listTables()) {
911         if (admin.isTableEnabled(table.getTableName())
912             && (!table.getTableName().equals(writeTableName))) {
913           taskFutures.addAll(Canary.sniff(admin, sink, table, executor, taskType));
914         }
915       }
916       return taskFutures;
917     }
918
919     private void checkWriteTableDistribution() throws IOException {
920       if (!admin.tableExists(writeTableName)) {
921         int numberOfServers = admin.getClusterStatus().getServers().size();
922         if (numberOfServers == 0) {
923           throw new IllegalStateException("No live regionservers");
924         }
925         createWriteTable(numberOfServers);
926       }
927
928       if (!admin.isTableEnabled(writeTableName)) {
929         admin.enableTable(writeTableName);
930       }
931
932       ClusterStatus status = admin.getClusterStatus();
933       int numberOfServers = status.getServersSize();
934       if (status.getServers().contains(status.getMaster())) {
935         numberOfServers -= 1;
936       }
937
938       List<Pair<HRegionInfo, ServerName>> pairs =
939           MetaTableAccessor.getTableRegionsAndLocations(connection, writeTableName);
940       int numberOfRegions = pairs.size();
941       if (numberOfRegions < numberOfServers * regionsLowerLimit
942           || numberOfRegions > numberOfServers * regionsUpperLimit) {
943         admin.disableTable(writeTableName);
944         admin.deleteTable(writeTableName);
945         createWriteTable(numberOfServers);
946       }
947       HashSet<ServerName> serverSet = new HashSet<ServerName>();
948       for (Pair<HRegionInfo, ServerName> pair : pairs) {
949         serverSet.add(pair.getSecond());
950       }
951       int numberOfCoveredServers = serverSet.size();
952       if (numberOfCoveredServers < numberOfServers) {
953         admin.balancer();
954       }
955     }
956
957     private void createWriteTable(int numberOfServers) throws IOException {
958       int numberOfRegions = (int)(numberOfServers * regionsLowerLimit);
959       LOG.info("Number of live regionservers: " + numberOfServers + ", "
960           + "pre-splitting the canary table into " + numberOfRegions + " regions "
961           + "(current lower limit of regions per server is " + regionsLowerLimit
962           + " and you can change it by config: "
963           + HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY + " )");
964       HTableDescriptor desc = new HTableDescriptor(writeTableName);
965       HColumnDescriptor family = new HColumnDescriptor(CANARY_TABLE_FAMILY_NAME);
966       family.setMaxVersions(1);
967       family.setTimeToLive(writeDataTTL);
968
969       desc.addFamily(family);
970       byte[][] splits = new RegionSplitter.HexStringSplit().split(numberOfRegions);
971       admin.createTable(desc, splits);
972     }
973   }
974
975   /**
976    * Canary entry point for specified table.
977    * @throws Exception
978    */
979   public static void sniff(final Admin admin, TableName tableName)
980       throws Exception {
981     sniff(admin, tableName, TaskType.READ);
982   }
983
984   /**
985    * Canary entry point for specified table with task type(read/write)
986    * @throws Exception
987    */
988   public static void sniff(final Admin admin, TableName tableName, TaskType taskType)
989       throws Exception {
990     List<Future<Void>> taskFutures =
991         Canary.sniff(admin, new StdOutSink(), tableName.getNameAsString(),
992           new ScheduledThreadPoolExecutor(1), taskType);
993     for (Future<Void> future : taskFutures) {
994       future.get();
995     }
996   }
997
998   /**
999    * Canary entry point for specified table.
1000    * @throws Exception
1001    */
1002   private static List<Future<Void>> sniff(final Admin admin, final Sink sink, String tableName,
1003       ExecutorService executor, TaskType taskType) throws Exception {
1004     if (LOG.isDebugEnabled()) {
1005       LOG.debug(String.format("checking table is enabled and getting table descriptor for table %s",
1006         tableName));
1007     }
1008     if (admin.isTableEnabled(TableName.valueOf(tableName))) {
1009       return Canary.sniff(admin, sink, admin.getTableDescriptor(TableName.valueOf(tableName)),
1010         executor, taskType);
1011     } else {
1012       LOG.warn(String.format("Table %s is not enabled", tableName));
1013     }
1014     return new LinkedList<Future<Void>>();
1015   }
1016
1017   /*
1018    * Loops over regions that owns this table, and output some information abouts the state.
1019    */
1020   private static List<Future<Void>> sniff(final Admin admin, final Sink sink,
1021       HTableDescriptor tableDesc, ExecutorService executor, TaskType taskType) throws Exception {
1022
1023     if (LOG.isDebugEnabled()) {
1024       LOG.debug(String.format("reading list of regions for table %s", tableDesc.getTableName()));
1025     }
1026
1027     Table table = null;
1028     try {
1029       table = admin.getConnection().getTable(tableDesc.getTableName());
1030     } catch (TableNotFoundException e) {
1031       return new ArrayList<Future<Void>>();
1032     }
1033     List<RegionTask> tasks = new ArrayList<RegionTask>();
1034     try {
1035       for (HRegionInfo region : admin.getTableRegions(tableDesc.getTableName())) {
1036         tasks.add(new RegionTask(admin.getConnection(), region, sink, taskType));
1037       }
1038     } finally {
1039       table.close();
1040     }
1041     return executor.invokeAll(tasks);
1042   }
1043   // a monitor for regionserver mode
1044   private static class RegionServerMonitor extends Monitor {
1045
1046     private boolean allRegions;
1047
1048     public RegionServerMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
1049         ExtendedSink sink, ExecutorService executor, boolean allRegions,
1050         boolean treatFailureAsError) {
1051       super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError);
1052       this.allRegions = allRegions;
1053     }
1054
1055     private ExtendedSink getSink() {
1056       return (ExtendedSink) this.sink;
1057     }
1058
1059     @Override
1060     public void run() {
1061       if (this.initAdmin() && this.checkNoTableNames()) {
1062         Map<String, List<HRegionInfo>> rsAndRMap = this.filterRegionServerByName();
1063         this.initialized = true;
1064         this.monitorRegionServers(rsAndRMap);
1065       }
1066       this.done = true;
1067     }
1068
1069     private boolean checkNoTableNames() {
1070       List<String> foundTableNames = new ArrayList<String>();
1071       TableName[] tableNames = null;
1072
1073       if (LOG.isDebugEnabled()) {
1074         LOG.debug(String.format("reading list of tables"));
1075       }
1076       try {
1077         tableNames = this.admin.listTableNames();
1078       } catch (IOException e) {
1079         LOG.error("Get listTableNames failed", e);
1080         this.errorCode = INIT_ERROR_EXIT_CODE;
1081         return false;
1082       }
1083
1084       if (this.targets == null || this.targets.length == 0) return true;
1085
1086       for (String target : this.targets) {
1087         for (TableName tableName : tableNames) {
1088           if (target.equals(tableName.getNameAsString())) {
1089             foundTableNames.add(target);
1090           }
1091         }
1092       }
1093
1094       if (foundTableNames.size() > 0) {
1095         System.err.println("Cannot pass a tablename when using the -regionserver " +
1096             "option, tablenames:" + foundTableNames.toString());
1097         this.errorCode = USAGE_EXIT_CODE;
1098       }
1099       return foundTableNames.size() == 0;
1100     }
1101
1102     private void monitorRegionServers(Map<String, List<HRegionInfo>> rsAndRMap) {
1103       List<RegionServerTask> tasks = new ArrayList<RegionServerTask>();
1104       Map<String, AtomicLong> successMap = new HashMap<String, AtomicLong>();
1105       Random rand = new Random();
1106       for (Map.Entry<String, List<HRegionInfo>> entry : rsAndRMap.entrySet()) {
1107         String serverName = entry.getKey();
1108         AtomicLong successes = new AtomicLong(0);
1109         successMap.put(serverName, successes);
1110         if (entry.getValue().isEmpty()) {
1111           LOG.error(String.format("Regionserver not serving any regions - %s", serverName));
1112         } else if (this.allRegions) {
1113           for (HRegionInfo region : entry.getValue()) {
1114             tasks.add(new RegionServerTask(this.connection,
1115                 serverName,
1116                 region,
1117                 getSink(),
1118                 successes));
1119           }
1120         } else {
1121           // random select a region if flag not set
1122           HRegionInfo region = entry.getValue().get(rand.nextInt(entry.getValue().size()));
1123           tasks.add(new RegionServerTask(this.connection,
1124               serverName,
1125               region,
1126               getSink(),
1127               successes));
1128         }
1129       }
1130       try {
1131         for (Future<Void> future : this.executor.invokeAll(tasks)) {
1132           try {
1133             future.get();
1134           } catch (ExecutionException e) {
1135             LOG.error("Sniff regionserver failed!", e);
1136             this.errorCode = ERROR_EXIT_CODE;
1137           }
1138         }
1139         if (this.allRegions) {
1140           for (Map.Entry<String, List<HRegionInfo>> entry : rsAndRMap.entrySet()) {
1141             String serverName = entry.getKey();
1142             LOG.info("Successfully read " + successMap.get(serverName) + " regions out of "
1143                     + entry.getValue().size() + " on regionserver:" + serverName);
1144           }
1145         }
1146       } catch (InterruptedException e) {
1147         this.errorCode = ERROR_EXIT_CODE;
1148         LOG.error("Sniff regionserver interrupted!", e);
1149       }
1150     }
1151
1152     private Map<String, List<HRegionInfo>> filterRegionServerByName() {
1153       Map<String, List<HRegionInfo>> regionServerAndRegionsMap = this.getAllRegionServerByName();
1154       regionServerAndRegionsMap = this.doFilterRegionServerByName(regionServerAndRegionsMap);
1155       return regionServerAndRegionsMap;
1156     }
1157
1158     private Map<String, List<HRegionInfo>> getAllRegionServerByName() {
1159       Map<String, List<HRegionInfo>> rsAndRMap = new HashMap<String, List<HRegionInfo>>();
1160       Table table = null;
1161       RegionLocator regionLocator = null;
1162       try {
1163         if (LOG.isDebugEnabled()) {
1164           LOG.debug(String.format("reading list of tables and locations"));
1165         }
1166         HTableDescriptor[] tableDescs = this.admin.listTables();
1167         List<HRegionInfo> regions = null;
1168         for (HTableDescriptor tableDesc : tableDescs) {
1169           table = this.admin.getConnection().getTable(tableDesc.getTableName());
1170           regionLocator = this.admin.getConnection().getRegionLocator(tableDesc.getTableName());
1171
1172           for (HRegionLocation location : regionLocator.getAllRegionLocations()) {
1173             ServerName rs = location.getServerName();
1174             String rsName = rs.getHostname();
1175             HRegionInfo r = location.getRegionInfo();
1176
1177             if (rsAndRMap.containsKey(rsName)) {
1178               regions = rsAndRMap.get(rsName);
1179             } else {
1180               regions = new ArrayList<HRegionInfo>();
1181               rsAndRMap.put(rsName, regions);
1182             }
1183             regions.add(r);
1184           }
1185           table.close();
1186         }
1187
1188         //get any live regionservers not serving any regions
1189         for (ServerName rs : this.admin.getClusterStatus().getServers()) {
1190           String rsName = rs.getHostname();
1191           if (!rsAndRMap.containsKey(rsName)) {
1192             rsAndRMap.put(rsName, Collections.<HRegionInfo>emptyList());
1193           }
1194         }
1195       } catch (IOException e) {
1196         String msg = "Get HTables info failed";
1197         LOG.error(msg, e);
1198         this.errorCode = INIT_ERROR_EXIT_CODE;
1199       } finally {
1200         if (table != null) {
1201           try {
1202             table.close();
1203           } catch (IOException e) {
1204             LOG.warn("Close table failed", e);
1205           }
1206         }
1207       }
1208
1209       return rsAndRMap;
1210     }
1211
1212     private Map<String, List<HRegionInfo>> doFilterRegionServerByName(
1213         Map<String, List<HRegionInfo>> fullRsAndRMap) {
1214
1215       Map<String, List<HRegionInfo>> filteredRsAndRMap = null;
1216
1217       if (this.targets != null && this.targets.length > 0) {
1218         filteredRsAndRMap = new HashMap<String, List<HRegionInfo>>();
1219         Pattern pattern = null;
1220         Matcher matcher = null;
1221         boolean regExpFound = false;
1222         for (String rsName : this.targets) {
1223           if (this.useRegExp) {
1224             regExpFound = false;
1225             pattern = Pattern.compile(rsName);
1226             for (Map.Entry<String, List<HRegionInfo>> entry : fullRsAndRMap.entrySet()) {
1227               matcher = pattern.matcher(entry.getKey());
1228               if (matcher.matches()) {
1229                 filteredRsAndRMap.put(entry.getKey(), entry.getValue());
1230                 regExpFound = true;
1231               }
1232             }
1233             if (!regExpFound) {
1234               LOG.info("No RegionServerInfo found, regionServerPattern:" + rsName);
1235             }
1236           } else {
1237             if (fullRsAndRMap.containsKey(rsName)) {
1238               filteredRsAndRMap.put(rsName, fullRsAndRMap.get(rsName));
1239             } else {
1240               LOG.info("No RegionServerInfo found, regionServerName:" + rsName);
1241             }
1242           }
1243         }
1244       } else {
1245         filteredRsAndRMap = fullRsAndRMap;
1246       }
1247       return filteredRsAndRMap;
1248     }
1249   }
1250
1251   public static void main(String[] args) throws Exception {
1252     final Configuration conf = HBaseConfiguration.create();
1253
1254     // loading the generic options to conf
1255     new GenericOptionsParser(conf, args);
1256
1257     int numThreads = conf.getInt("hbase.canary.threads.num", MAX_THREADS_NUM);
1258     LOG.info("Number of exection threads " + numThreads);
1259
1260     ExecutorService executor = new ScheduledThreadPoolExecutor(numThreads);
1261
1262     Class<? extends Sink> sinkClass =
1263         conf.getClass("hbase.canary.sink.class", RegionServerStdOutSink.class, Sink.class);
1264     Sink sink = ReflectionUtils.newInstance(sinkClass);
1265
1266     int exitCode = ToolRunner.run(conf, new Canary(executor, sink), args);
1267     executor.shutdown();
1268     System.exit(exitCode);
1269   }
1270 }