View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.tool;
21  
22  import java.io.Closeable;
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Arrays;
26  import java.util.HashMap;
27  import java.util.LinkedList;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.Random;
31  import java.util.Set;
32  import java.util.TreeSet;
33  import java.util.concurrent.Callable;
34  import java.util.concurrent.ExecutionException;
35  import java.util.concurrent.ExecutorService;
36  import java.util.concurrent.Future;
37  import java.util.concurrent.ScheduledThreadPoolExecutor;
38  import java.util.regex.Matcher;
39  import java.util.regex.Pattern;
40  
41  import org.apache.commons.lang.time.StopWatch;
42  import org.apache.commons.logging.Log;
43  import org.apache.commons.logging.LogFactory;
44  import org.apache.hadoop.conf.Configuration;
45  import org.apache.hadoop.hbase.AuthUtil;
46  import org.apache.hadoop.hbase.ChoreService;
47  import org.apache.hadoop.hbase.DoNotRetryIOException;
48  import org.apache.hadoop.hbase.HBaseConfiguration;
49  import org.apache.hadoop.hbase.HColumnDescriptor;
50  import org.apache.hadoop.hbase.HRegionInfo;
51  import org.apache.hadoop.hbase.HRegionLocation;
52  import org.apache.hadoop.hbase.HTableDescriptor;
53  import org.apache.hadoop.hbase.ScheduledChore;
54  import org.apache.hadoop.hbase.ServerName;
55  import org.apache.hadoop.hbase.TableName;
56  import org.apache.hadoop.hbase.TableNotEnabledException;
57  import org.apache.hadoop.hbase.TableNotFoundException;
58  import org.apache.hadoop.hbase.client.Admin;
59  import org.apache.hadoop.hbase.client.Connection;
60  import org.apache.hadoop.hbase.client.ConnectionFactory;
61  import org.apache.hadoop.hbase.client.Get;
62  import org.apache.hadoop.hbase.client.RegionLocator;
63  import org.apache.hadoop.hbase.client.ResultScanner;
64  import org.apache.hadoop.hbase.client.Scan;
65  import org.apache.hadoop.hbase.client.Table;
66  import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
67  import org.apache.hadoop.hbase.util.ReflectionUtils;
68  import org.apache.hadoop.util.Tool;
69  import org.apache.hadoop.util.ToolRunner;
70  
71  /**
72   * HBase Canary Tool, that that can be used to do
73   * "canary monitoring" of a running HBase cluster.
74   *
75   * Here are two modes
76   * 1. region mode - Foreach region tries to get one row per column family
77   * and outputs some information about failure or latency.
78   *
79   * 2. regionserver mode - Foreach regionserver tries to get one row from one table
80   * selected randomly and outputs some information about failure or latency.
81   */
82  public final class Canary implements Tool {
83    // Sink interface used by the canary to outputs information
84    public interface Sink {
85      public void publishReadFailure(HRegionInfo region, Exception e);
86      public void publishReadFailure(HRegionInfo region, HColumnDescriptor column, Exception e);
87      public void publishReadTiming(HRegionInfo region, HColumnDescriptor column, long msTime);
88    }
89    // new extended sink for output regionserver mode info
90    // do not change the Sink interface directly due to maintaining the API
91    public interface ExtendedSink extends Sink {
92      public void publishReadFailure(String table, String server);
93      public void publishReadTiming(String table, String server, long msTime);
94    }
95  
96    // Simple implementation of canary sink that allows to plot on
97    // file or standard output timings or failures.
98    public static class StdOutSink implements Sink {
99      @Override
100     public void publishReadFailure(HRegionInfo region, Exception e) {
101       LOG.error(String.format("read from region %s failed", region.getRegionNameAsString()), e);
102     }
103 
104     @Override
105     public void publishReadFailure(HRegionInfo region, HColumnDescriptor column, Exception e) {
106       LOG.error(String.format("read from region %s column family %s failed",
107                 region.getRegionNameAsString(), column.getNameAsString()), e);
108     }
109 
110     @Override
111     public void publishReadTiming(HRegionInfo region, HColumnDescriptor column, long msTime) {
112       LOG.info(String.format("read from region %s column family %s in %dms",
113                region.getRegionNameAsString(), column.getNameAsString(), msTime));
114     }
115   }
116   // a ExtendedSink implementation
117   public static class RegionServerStdOutSink extends StdOutSink implements ExtendedSink {
118 
119     @Override
120     public void publishReadFailure(String table, String server) {
121       LOG.error(String.format("Read from table:%s on region server:%s", table, server));
122     }
123 
124     @Override
125     public void publishReadTiming(String table, String server, long msTime) {
126       LOG.info(String.format("Read from table:%s on region server:%s in %dms",
127           table, server, msTime));
128     }
129   }
130 
131   /**
132    * For each column family of the region tries to get one row and outputs the latency, or the
133    * failure.
134    */
135   static class RegionTask implements Callable<Void> {
136     private Connection connection;
137     private HRegionInfo region;
138     private Sink sink;
139 
140     RegionTask(Connection connection, HRegionInfo region, Sink sink) {
141       this.connection = connection;
142       this.region = region;
143       this.sink = sink;
144     }
145 
146     @Override
147     public Void call() {
148       Table table = null;
149       HTableDescriptor tableDesc = null;
150       try {
151         table = connection.getTable(region.getTable());
152         tableDesc = table.getTableDescriptor();
153       } catch (IOException e) {
154         LOG.debug("sniffRegion failed", e);
155         sink.publishReadFailure(region, e);
156         if (table != null) {
157           try {
158             table.close();
159           } catch (IOException ioe) {
160           }
161         }
162         return null;
163       }
164 
165       byte[] startKey = null;
166       Get get = null;
167       Scan scan = null;
168       ResultScanner rs = null;
169       StopWatch stopWatch = new StopWatch();
170       for (HColumnDescriptor column : tableDesc.getColumnFamilies()) {
171         stopWatch.reset();
172         startKey = region.getStartKey();
173         // Can't do a get on empty start row so do a Scan of first element if any instead.
174         if (startKey.length > 0) {
175           get = new Get(startKey);
176           get.setCacheBlocks(false);
177           get.setFilter(new FirstKeyOnlyFilter());
178           get.addFamily(column.getName());
179         } else {
180           scan = new Scan();
181           scan.setRaw(true);
182           scan.setCaching(1);
183           scan.setCacheBlocks(false);
184           scan.setFilter(new FirstKeyOnlyFilter());
185           scan.addFamily(column.getName());
186           scan.setMaxResultSize(1L);
187         }
188 
189         try {
190           if (startKey.length > 0) {
191             stopWatch.start();
192             table.get(get);
193             stopWatch.stop();
194             sink.publishReadTiming(region, column, stopWatch.getTime());
195           } else {
196             stopWatch.start();
197             rs = table.getScanner(scan);
198             stopWatch.stop();
199             sink.publishReadTiming(region, column, stopWatch.getTime());
200           }
201         } catch (Exception e) {
202           sink.publishReadFailure(region, column, e);
203         } finally {
204           if (rs != null) {
205             rs.close();
206           }
207           scan = null;
208           get = null;
209           startKey = null;
210         }
211       }
212       try {
213         table.close();
214       } catch (IOException e) {
215       }
216       return null;
217     }
218   }
219 
220   /**
221    * Get one row from a region on the regionserver and outputs the latency, or the failure.
222    */
223   static class RegionServerTask implements Callable<Void> {
224     private Connection connection;
225     private String serverName;
226     private HRegionInfo region;
227     private ExtendedSink sink;
228 
229     RegionServerTask(Connection connection, String serverName, HRegionInfo region,
230         ExtendedSink sink) {
231       this.connection = connection;
232       this.serverName = serverName;
233       this.region = region;
234       this.sink = sink;
235     }
236 
237     @Override
238     public Void call() {
239       TableName tableName = null;
240       Table table = null;
241       Get get = null;
242       byte[] startKey = null;
243       Scan scan = null;
244       StopWatch stopWatch = new StopWatch();
245       // monitor one region on every region server
246       stopWatch.reset();
247       try {
248         tableName = region.getTable();
249         table = connection.getTable(tableName);
250         startKey = region.getStartKey();
251         // Can't do a get on empty start row so do a Scan of first element if any instead.
252         if (startKey.length > 0) {
253           get = new Get(startKey);
254           get.setCacheBlocks(false);
255           get.setFilter(new FirstKeyOnlyFilter());
256           stopWatch.start();
257           table.get(get);
258           stopWatch.stop();
259         } else {
260           scan = new Scan();
261           scan.setCacheBlocks(false);
262           scan.setFilter(new FirstKeyOnlyFilter());
263           scan.setCaching(1);
264           scan.setMaxResultSize(1L);
265           stopWatch.start();
266           ResultScanner s = table.getScanner(scan);
267           s.close();
268           stopWatch.stop();
269         }
270         sink.publishReadTiming(tableName.getNameAsString(), serverName, stopWatch.getTime());
271       } catch (TableNotFoundException tnfe) {
272         // This is ignored because it doesn't imply that the regionserver is dead
273       } catch (TableNotEnabledException tnee) {
274         // This is considered a success since we got a response.
275         LOG.debug("The targeted table was disabled.  Assuming success.");
276       } catch (DoNotRetryIOException dnrioe) {
277         sink.publishReadFailure(tableName.getNameAsString(), serverName);
278         LOG.error(dnrioe);
279       } catch (IOException e) {
280         sink.publishReadFailure(tableName.getNameAsString(), serverName);
281         LOG.error(e);
282       } finally {
283         if (table != null) {
284           try {
285             table.close();
286           } catch (IOException e) {/* DO NOTHING */
287           }
288         }
289         scan = null;
290         get = null;
291         startKey = null;
292       }
293       return null;
294     }
295   }
296 
297   private static final int USAGE_EXIT_CODE = 1;
298   private static final int INIT_ERROR_EXIT_CODE = 2;
299   private static final int TIMEOUT_ERROR_EXIT_CODE = 3;
300   private static final int ERROR_EXIT_CODE = 4;
301 
302   private static final long DEFAULT_INTERVAL = 6000;
303 
304   private static final long DEFAULT_TIMEOUT = 600000; // 10 mins
305 
306   private static final int MAX_THREADS_NUM = 16; // #threads to contact regions
307 
308   private static final Log LOG = LogFactory.getLog(Canary.class);
309 
310   private Configuration conf = null;
311   private long interval = 0;
312   private Sink sink = null;
313 
314   private boolean useRegExp;
315   private long timeout = DEFAULT_TIMEOUT;
316   private boolean failOnError = true;
317   private boolean regionServerMode = false;
318   private ExecutorService executor; // threads to retrieve data from regionservers
319 
320   public Canary() {
321     this(new ScheduledThreadPoolExecutor(1), new RegionServerStdOutSink());
322   }
323 
324   public Canary(ExecutorService executor, Sink sink) {
325     this.executor = executor;
326     this.sink = sink;
327   }
328 
329   @Override
330   public Configuration getConf() {
331     return conf;
332   }
333 
334   @Override
335   public void setConf(Configuration conf) {
336     this.conf = conf;
337   }
338 
339   @Override
340   public int run(String[] args) throws Exception {
341     int index = -1;
342     ChoreService choreService = null;
343 
344     // Process command line args
345     for (int i = 0; i < args.length; i++) {
346       String cmd = args[i];
347 
348       if (cmd.startsWith("-")) {
349         if (index >= 0) {
350           // command line args must be in the form: [opts] [table 1 [table 2 ...]]
351           System.err.println("Invalid command line options");
352           printUsageAndExit();
353         }
354 
355         if (cmd.equals("-help")) {
356           // user asked for help, print the help and quit.
357           printUsageAndExit();
358         } else if (cmd.equals("-daemon") && interval == 0) {
359           // user asked for daemon mode, set a default interval between checks
360           interval = DEFAULT_INTERVAL;
361         } else if (cmd.equals("-interval")) {
362           // user has specified an interval for canary breaths (-interval N)
363           i++;
364 
365           if (i == args.length) {
366             System.err.println("-interval needs a numeric value argument.");
367             printUsageAndExit();
368           }
369 
370           try {
371             interval = Long.parseLong(args[i]) * 1000;
372           } catch (NumberFormatException e) {
373             System.err.println("-interval needs a numeric value argument.");
374             printUsageAndExit();
375           }
376         } else if(cmd.equals("-regionserver")) {
377           this.regionServerMode = true;
378         } else if (cmd.equals("-e")) {
379           this.useRegExp = true;
380         } else if (cmd.equals("-t")) {
381           i++;
382 
383           if (i == args.length) {
384             System.err.println("-t needs a numeric value argument.");
385             printUsageAndExit();
386           }
387 
388           try {
389             this.timeout = Long.parseLong(args[i]);
390           } catch (NumberFormatException e) {
391             System.err.println("-t needs a numeric value argument.");
392             printUsageAndExit();
393           }
394 
395         } else if (cmd.equals("-f")) {
396           i++;
397 
398           if (i == args.length) {
399             System.err
400                 .println("-f needs a boolean value argument (true|false).");
401             printUsageAndExit();
402           }
403 
404           this.failOnError = Boolean.parseBoolean(args[i]);
405         } else {
406           // no options match
407           System.err.println(cmd + " options is invalid.");
408           printUsageAndExit();
409         }
410       } else if (index < 0) {
411         // keep track of first table name specified by the user
412         index = i;
413       }
414     }
415 
416     // Launches chore for refreshing kerberos credentials if security is enabled.
417     // Please see http://hbase.apache.org/book.html#_running_canary_in_a_kerberos_enabled_cluster
418     // for more details.
419     final ScheduledChore authChore = AuthUtil.getAuthChore(conf);
420     if (authChore != null) {
421       choreService = new ChoreService("CANARY_TOOL");
422       choreService.scheduleChore(authChore);
423     }
424 
425     // Start to prepare the stuffs
426     Monitor monitor = null;
427     Thread monitorThread = null;
428     long startTime = 0;
429     long currentTimeLength = 0;
430     // Get a connection to use in below.
431     // try-with-resources jdk7 construct. See
432     // http://docs.oracle.com/javase/tutorial/essential/exceptions/tryResourceClose.html
433     try (Connection connection = ConnectionFactory.createConnection(this.conf)) {
434       do {
435         // Do monitor !!
436         try {
437           monitor = this.newMonitor(connection, index, args);
438           monitorThread = new Thread(monitor);
439           startTime = System.currentTimeMillis();
440           monitorThread.start();
441           while (!monitor.isDone()) {
442             // wait for 1 sec
443             Thread.sleep(1000);
444             // exit if any error occurs
445             if (this.failOnError && monitor.hasError()) {
446               monitorThread.interrupt();
447               if (monitor.initialized) {
448                 System.exit(monitor.errorCode);
449               } else {
450                 System.exit(INIT_ERROR_EXIT_CODE);
451               }
452             }
453             currentTimeLength = System.currentTimeMillis() - startTime;
454             if (currentTimeLength > this.timeout) {
455               LOG.error("The monitor is running too long (" + currentTimeLength
456                   + ") after timeout limit:" + this.timeout
457                   + " will be killed itself !!");
458               if (monitor.initialized) {
459                 System.exit(TIMEOUT_ERROR_EXIT_CODE);
460               } else {
461                 System.exit(INIT_ERROR_EXIT_CODE);
462               }
463               break;
464             }
465           }
466 
467           if (this.failOnError && monitor.hasError()) {
468             monitorThread.interrupt();
469             System.exit(monitor.errorCode);
470           }
471         } finally {
472           if (monitor != null) monitor.close();
473         }
474 
475         Thread.sleep(interval);
476       } while (interval > 0);
477     } // try-with-resources close
478 
479     if (choreService != null) {
480       choreService.shutdown();
481     }
482     return(monitor.errorCode);
483   }
484 
485   private void printUsageAndExit() {
486     System.err.printf(
487       "Usage: bin/hbase %s [opts] [table1 [table2]...] | [regionserver1 [regionserver2]..]%n",
488         getClass().getName());
489     System.err.println(" where [opts] are:");
490     System.err.println("   -help          Show this help and exit.");
491     System.err.println("   -regionserver  replace the table argument to regionserver,");
492     System.err.println("      which means to enable regionserver mode");
493     System.err.println("   -daemon        Continuous check at defined intervals.");
494     System.err.println("   -interval <N>  Interval between checks (sec)");
495     System.err.println("   -e             Use region/regionserver as regular expression");
496     System.err.println("      which means the region/regionserver is regular expression pattern");
497     System.err.println("   -f <B>         stop whole program if first error occurs," +
498         " default is true");
499     System.err.println("   -t <N>         timeout for a check, default is 600000 (milisecs)");
500     System.exit(USAGE_EXIT_CODE);
501   }
502 
503   /**
504    * A Factory method for {@link Monitor}.
505    * Can be overridden by user.
506    * @param index a start index for monitor target
507    * @param args args passed from user
508    * @return a Monitor instance
509    */
510   public Monitor newMonitor(final Connection connection, int index, String[] args) {
511     Monitor monitor = null;
512     String[] monitorTargets = null;
513 
514     if(index >= 0) {
515       int length = args.length - index;
516       monitorTargets = new String[length];
517       System.arraycopy(args, index, monitorTargets, 0, length);
518     }
519 
520     if (this.regionServerMode) {
521       monitor =
522           new RegionServerMonitor(connection, monitorTargets, this.useRegExp,
523               (ExtendedSink) this.sink, this.executor);
524     } else {
525       monitor =
526           new RegionMonitor(connection, monitorTargets, this.useRegExp, this.sink, this.executor);
527     }
528     return monitor;
529   }
530 
531   // a Monitor super-class can be extended by users
532   public static abstract class Monitor implements Runnable, Closeable {
533 
534     protected Connection connection;
535     protected Admin admin;
536     protected String[] targets;
537     protected boolean useRegExp;
538     protected boolean initialized = false;
539 
540     protected boolean done = false;
541     protected int errorCode = 0;
542     protected Sink sink;
543     protected ExecutorService executor;
544 
545     public boolean isDone() {
546       return done;
547     }
548 
549     public boolean hasError() {
550       return errorCode != 0;
551     }
552 
553     @Override
554     public void close() throws IOException {
555       if (this.admin != null) this.admin.close();
556     }
557 
558     protected Monitor(Connection connection, String[] monitorTargets, boolean useRegExp, Sink sink,
559         ExecutorService executor) {
560       if (null == connection) throw new IllegalArgumentException("connection shall not be null");
561 
562       this.connection = connection;
563       this.targets = monitorTargets;
564       this.useRegExp = useRegExp;
565       this.sink = sink;
566       this.executor = executor;
567     }
568 
569     public abstract void run();
570 
571     protected boolean initAdmin() {
572       if (null == this.admin) {
573         try {
574           this.admin = this.connection.getAdmin();
575         } catch (Exception e) {
576           LOG.error("Initial HBaseAdmin failed...", e);
577           this.errorCode = INIT_ERROR_EXIT_CODE;
578         }
579       } else if (admin.isAborted()) {
580         LOG.error("HBaseAdmin aborted");
581         this.errorCode = INIT_ERROR_EXIT_CODE;
582       }
583       return !this.hasError();
584     }
585   }
586 
587   // a monitor for region mode
588   private static class RegionMonitor extends Monitor {
589 
590     public RegionMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
591         Sink sink, ExecutorService executor) {
592       super(connection, monitorTargets, useRegExp, sink, executor);
593     }
594 
595     @Override
596     public void run() {
597       if (this.initAdmin()) {
598         try {
599           List<Future<Void>> taskFutures = new LinkedList<Future<Void>>();
600           if (this.targets != null && this.targets.length > 0) {
601             String[] tables = generateMonitorTables(this.targets);
602             this.initialized = true;
603             for (String table : tables) {
604               taskFutures.addAll(Canary.sniff(admin, sink, table, executor));
605             }
606           } else {
607             taskFutures.addAll(sniff());
608           }
609           for (Future<Void> future : taskFutures) {
610             try {
611               future.get();
612             } catch (ExecutionException e) {
613               LOG.error("Sniff region failed!", e);
614             }
615           }
616         } catch (Exception e) {
617           LOG.error("Run regionMonitor failed", e);
618           this.errorCode = ERROR_EXIT_CODE;
619         }
620       }
621       this.done = true;
622     }
623 
624     private String[] generateMonitorTables(String[] monitorTargets) throws IOException {
625       String[] returnTables = null;
626 
627       if (this.useRegExp) {
628         Pattern pattern = null;
629         HTableDescriptor[] tds = null;
630         Set<String> tmpTables = new TreeSet<String>();
631         try {
632           for (String monitorTarget : monitorTargets) {
633             pattern = Pattern.compile(monitorTarget);
634             tds = this.admin.listTables(pattern);
635             if (tds != null) {
636               for (HTableDescriptor td : tds) {
637                 tmpTables.add(td.getNameAsString());
638               }
639             }
640           }
641         } catch (IOException e) {
642           LOG.error("Communicate with admin failed", e);
643           throw e;
644         }
645 
646         if (tmpTables.size() > 0) {
647           returnTables = tmpTables.toArray(new String[tmpTables.size()]);
648         } else {
649           String msg = "No HTable found, tablePattern:" + Arrays.toString(monitorTargets);
650           LOG.error(msg);
651           this.errorCode = INIT_ERROR_EXIT_CODE;
652           throw new TableNotFoundException(msg);
653         }
654       } else {
655         returnTables = monitorTargets;
656       }
657 
658       return returnTables;
659     }
660 
661     /*
662      * canary entry point to monitor all the tables.
663      */
664     private List<Future<Void>> sniff() throws Exception {
665       List<Future<Void>> taskFutures = new LinkedList<Future<Void>>();
666       for (HTableDescriptor table : admin.listTables()) {
667         if (admin.isTableEnabled(table.getTableName())) {
668           taskFutures.addAll(Canary.sniff(admin, sink, table, executor));
669         }
670       }
671       return taskFutures;
672     }
673   }
674 
675   /**
676    * Canary entry point for specified table.
677    * @throws Exception
678    */
679   public static void sniff(final Admin admin, TableName tableName) throws Exception {
680     List<Future<Void>> taskFutures =
681         Canary.sniff(admin, new StdOutSink(), tableName.getNameAsString(),
682           new ScheduledThreadPoolExecutor(1));
683     for (Future<Void> future : taskFutures) {
684       future.get();
685     }
686   }
687 
688   /**
689    * Canary entry point for specified table.
690    * @throws Exception
691    */
692   private static List<Future<Void>> sniff(final Admin admin, final Sink sink, String tableName,
693       ExecutorService executor) throws Exception {
694     if (admin.isTableEnabled(TableName.valueOf(tableName))) {
695       return Canary.sniff(admin, sink, admin.getTableDescriptor(TableName.valueOf(tableName)),
696         executor);
697     } else {
698       LOG.warn(String.format("Table %s is not enabled", tableName));
699     }
700     return new LinkedList<Future<Void>>();
701   }
702 
703   /*
704    * Loops over regions that owns this table, and output some information abouts the state.
705    */
706   private static List<Future<Void>> sniff(final Admin admin, final Sink sink,
707       HTableDescriptor tableDesc, ExecutorService executor) throws Exception {
708     Table table = null;
709     try {
710       table = admin.getConnection().getTable(tableDesc.getTableName());
711     } catch (TableNotFoundException e) {
712       return new ArrayList<Future<Void>>();
713     }
714     List<RegionTask> tasks = new ArrayList<RegionTask>();
715     try {
716       for (HRegionInfo region : admin.getTableRegions(tableDesc.getTableName())) {
717         tasks.add(new RegionTask(admin.getConnection(), region, sink));
718       }
719     } finally {
720       table.close();
721     }
722     return executor.invokeAll(tasks);
723   }
724   // a monitor for regionserver mode
725   private static class RegionServerMonitor extends Monitor {
726 
727     public RegionServerMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
728         ExtendedSink sink, ExecutorService executor) {
729       super(connection, monitorTargets, useRegExp, sink, executor);
730     }
731 
732     private ExtendedSink getSink() {
733       return (ExtendedSink) this.sink;
734     }
735 
736     @Override
737     public void run() {
738       if (this.initAdmin() && this.checkNoTableNames()) {
739         Map<String, List<HRegionInfo>> rsAndRMap = this.filterRegionServerByName();
740         this.initialized = true;
741         this.monitorRegionServers(rsAndRMap);
742       }
743       this.done = true;
744     }
745 
746     private boolean checkNoTableNames() {
747       List<String> foundTableNames = new ArrayList<String>();
748       TableName[] tableNames = null;
749 
750       try {
751         tableNames = this.admin.listTableNames();
752       } catch (IOException e) {
753         LOG.error("Get listTableNames failed", e);
754         this.errorCode = INIT_ERROR_EXIT_CODE;
755         return false;
756       }
757 
758       if (this.targets == null || this.targets.length == 0) return true;
759 
760       for (String target : this.targets) {
761         for (TableName tableName : tableNames) {
762           if (target.equals(tableName.getNameAsString())) {
763             foundTableNames.add(target);
764           }
765         }
766       }
767 
768       if (foundTableNames.size() > 0) {
769         System.err.println("Cannot pass a tablename when using the -regionserver " +
770             "option, tablenames:" + foundTableNames.toString());
771         this.errorCode = USAGE_EXIT_CODE;
772       }
773       return foundTableNames.size() == 0;
774     }
775 
776     private void monitorRegionServers(Map<String, List<HRegionInfo>> rsAndRMap) {
777       List<RegionServerTask> tasks = new ArrayList<RegionServerTask>();
778       Random rand =new Random();
779       // monitor one region on every region server
780       for (Map.Entry<String, List<HRegionInfo>> entry : rsAndRMap.entrySet()) {
781         String serverName = entry.getKey();
782         // random select a region
783         HRegionInfo region = entry.getValue().get(rand.nextInt(entry.getValue().size()));
784         tasks.add(new RegionServerTask(this.connection, serverName, region, getSink()));
785       }
786       try {
787         for (Future<Void> future : this.executor.invokeAll(tasks)) {
788           try {
789             future.get();
790           } catch (ExecutionException e) {
791             LOG.error("Sniff regionserver failed!", e);
792             this.errorCode = ERROR_EXIT_CODE;
793           }
794         }
795       } catch (InterruptedException e) {
796         this.errorCode = ERROR_EXIT_CODE;
797         LOG.error("Sniff regionserver failed!", e);
798       }
799     }
800 
801     private Map<String, List<HRegionInfo>> filterRegionServerByName() {
802       Map<String, List<HRegionInfo>> regionServerAndRegionsMap = this.getAllRegionServerByName();
803       regionServerAndRegionsMap = this.doFilterRegionServerByName(regionServerAndRegionsMap);
804       return regionServerAndRegionsMap;
805     }
806 
807     private Map<String, List<HRegionInfo>> getAllRegionServerByName() {
808       Map<String, List<HRegionInfo>> rsAndRMap = new HashMap<String, List<HRegionInfo>>();
809       Table table = null;
810       RegionLocator regionLocator = null;
811       try {
812         HTableDescriptor[] tableDescs = this.admin.listTables();
813         List<HRegionInfo> regions = null;
814         for (HTableDescriptor tableDesc : tableDescs) {
815           table = this.admin.getConnection().getTable(tableDesc.getTableName());
816           regionLocator = this.admin.getConnection().getRegionLocator(tableDesc.getTableName());
817 
818           for (HRegionLocation location : regionLocator.getAllRegionLocations()) {
819             ServerName rs = location.getServerName();
820             String rsName = rs.getHostname();
821             HRegionInfo r = location.getRegionInfo();
822 
823             if (rsAndRMap.containsKey(rsName)) {
824               regions = rsAndRMap.get(rsName);
825             } else {
826               regions = new ArrayList<HRegionInfo>();
827               rsAndRMap.put(rsName, regions);
828             }
829             regions.add(r);
830           }
831           table.close();
832         }
833 
834       } catch (IOException e) {
835         String msg = "Get HTables info failed";
836         LOG.error(msg, e);
837         this.errorCode = INIT_ERROR_EXIT_CODE;
838       } finally {
839         if (table != null) {
840           try {
841             table.close();
842           } catch (IOException e) {
843             LOG.warn("Close table failed", e);
844           }
845         }
846       }
847 
848       return rsAndRMap;
849     }
850 
851     private Map<String, List<HRegionInfo>> doFilterRegionServerByName(
852         Map<String, List<HRegionInfo>> fullRsAndRMap) {
853 
854       Map<String, List<HRegionInfo>> filteredRsAndRMap = null;
855 
856       if (this.targets != null && this.targets.length > 0) {
857         filteredRsAndRMap = new HashMap<String, List<HRegionInfo>>();
858         Pattern pattern = null;
859         Matcher matcher = null;
860         boolean regExpFound = false;
861         for (String rsName : this.targets) {
862           if (this.useRegExp) {
863             regExpFound = false;
864             pattern = Pattern.compile(rsName);
865             for (Map.Entry<String, List<HRegionInfo>> entry : fullRsAndRMap.entrySet()) {
866               matcher = pattern.matcher(entry.getKey());
867               if (matcher.matches()) {
868                 filteredRsAndRMap.put(entry.getKey(), entry.getValue());
869                 regExpFound = true;
870               }
871             }
872             if (!regExpFound) {
873               LOG.info("No RegionServerInfo found, regionServerPattern:" + rsName);
874             }
875           } else {
876             if (fullRsAndRMap.containsKey(rsName)) {
877               filteredRsAndRMap.put(rsName, fullRsAndRMap.get(rsName));
878             } else {
879               LOG.info("No RegionServerInfo found, regionServerName:" + rsName);
880             }
881           }
882         }
883       } else {
884         filteredRsAndRMap = fullRsAndRMap;
885       }
886       return filteredRsAndRMap;
887     }
888   }
889 
890   public static void main(String[] args) throws Exception {
891     final Configuration conf = HBaseConfiguration.create();
892     int numThreads = conf.getInt("hbase.canary.threads.num", MAX_THREADS_NUM);
893     ExecutorService executor = new ScheduledThreadPoolExecutor(numThreads);
894 
895     Class<? extends Sink> sinkClass =
896         conf.getClass("hbase.canary.sink.class", StdOutSink.class, Sink.class);
897     Sink sink = ReflectionUtils.newInstance(sinkClass);
898 
899     int exitCode = ToolRunner.run(conf, new Canary(executor, sink), args);
900     executor.shutdown();
901     System.exit(exitCode);
902   }
903 }