View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.tool;
21  
22  import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT;
23  import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_ZNODE_PARENT;
24
25  import com.google.common.collect.Lists;
26
27  import java.io.Closeable;
28  import java.io.IOException;
29  import java.net.InetSocketAddress;
30  import java.util.ArrayList;
31  import java.util.Arrays;
32  import java.util.Collections;
33  import java.util.HashMap;
34  import java.util.HashSet;
35  import java.util.LinkedList;
36  import java.util.List;
37  import java.util.Map;
38  import java.util.Random;
39  import java.util.Set;
40  import java.util.TreeSet;
41  import java.util.concurrent.Callable;
42  import java.util.concurrent.ExecutionException;
43  import java.util.concurrent.ExecutorService;
44  import java.util.concurrent.Future;
45  import java.util.concurrent.ScheduledThreadPoolExecutor;
46  import java.util.concurrent.atomic.AtomicLong;
47  import java.util.regex.Matcher;
48  import java.util.regex.Pattern;
49
50  import org.apache.commons.lang.time.StopWatch;
51  import org.apache.commons.logging.Log;
52  import org.apache.commons.logging.LogFactory;
53  import org.apache.hadoop.conf.Configuration;
54  import org.apache.hadoop.hbase.AuthUtil;
55  import org.apache.hadoop.hbase.ChoreService;
56  import org.apache.hadoop.hbase.ClusterStatus;
57  import org.apache.hadoop.hbase.DoNotRetryIOException;
58  import org.apache.hadoop.hbase.HBaseConfiguration;
59  import org.apache.hadoop.hbase.HColumnDescriptor;
60  import org.apache.hadoop.hbase.HConstants;
61  import org.apache.hadoop.hbase.HRegionInfo;
62  import org.apache.hadoop.hbase.HRegionLocation;
63  import org.apache.hadoop.hbase.HTableDescriptor;
64  import org.apache.hadoop.hbase.MetaTableAccessor;
65  import org.apache.hadoop.hbase.NamespaceDescriptor;
66  import org.apache.hadoop.hbase.ScheduledChore;
67  import org.apache.hadoop.hbase.ServerName;
68  import org.apache.hadoop.hbase.TableName;
69  import org.apache.hadoop.hbase.TableNotEnabledException;
70  import org.apache.hadoop.hbase.TableNotFoundException;
71  import org.apache.hadoop.hbase.client.Admin;
72  import org.apache.hadoop.hbase.client.Connection;
73  import org.apache.hadoop.hbase.client.ConnectionFactory;
74  import org.apache.hadoop.hbase.client.Get;
75  import org.apache.hadoop.hbase.client.Put;
76  import org.apache.hadoop.hbase.client.RegionLocator;
77  import org.apache.hadoop.hbase.client.ResultScanner;
78  import org.apache.hadoop.hbase.client.Scan;
79  import org.apache.hadoop.hbase.client.Table;
80  import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
81  import org.apache.hadoop.hbase.tool.Canary.RegionTask.TaskType;
82  import org.apache.hadoop.hbase.util.Bytes;
83  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
84  import org.apache.hadoop.hbase.util.Pair;
85  import org.apache.hadoop.hbase.util.ReflectionUtils;
86  import org.apache.hadoop.hbase.util.RegionSplitter;
87  import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
88  import org.apache.hadoop.hbase.zookeeper.ZKConfig;
89  import org.apache.hadoop.util.GenericOptionsParser;
90  import org.apache.hadoop.util.Tool;
91  import org.apache.hadoop.util.ToolRunner;
92  import org.apache.zookeeper.KeeperException;
93  import org.apache.zookeeper.ZooKeeper;
94  import org.apache.zookeeper.client.ConnectStringParser;
95  import org.apache.zookeeper.data.Stat;
96
97  /**
98   * HBase Canary Tool, that that can be used to do
99   * "canary monitoring" of a running HBase cluster.
100  *
101  * Here are three modes
102  * 1. region mode - Foreach region tries to get one row per column family
103  * and outputs some information about failure or latency.
104  *
105  * 2. regionserver mode - Foreach regionserver tries to get one row from one table
106  * selected randomly and outputs some information about failure or latency.
107  *
108  * 3. zookeeper mode - for each zookeeper instance, selects a zNode and
109  * outputs some information about failure or latency.
110  */
111 public final class Canary implements Tool {
112   // Sink interface used by the canary to outputs information
113   public interface Sink {
114     public long getReadFailureCount();
115     public long incReadFailureCount();
116     public void publishReadFailure(HRegionInfo region, Exception e);
117     public void publishReadFailure(HRegionInfo region, HColumnDescriptor column, Exception e);
118     public void publishReadTiming(HRegionInfo region, HColumnDescriptor column, long msTime);
119     public long getWriteFailureCount();
120     public void publishWriteFailure(HRegionInfo region, Exception e);
121     public void publishWriteFailure(HRegionInfo region, HColumnDescriptor column, Exception e);
122     public void publishWriteTiming(HRegionInfo region, HColumnDescriptor column, long msTime);
123   }
124   // new extended sink for output regionserver mode info
125   // do not change the Sink interface directly due to maintaining the API
126   public interface ExtendedSink extends Sink {
127     public void publishReadFailure(String table, String server);
128     public void publishReadTiming(String table, String server, long msTime);
129   }
130
131   // Simple implementation of canary sink that allows to plot on
132   // file or standard output timings or failures.
133   public static class StdOutSink implements Sink {
134     private AtomicLong readFailureCount = new AtomicLong(0),
135         writeFailureCount = new AtomicLong(0);
136
137     @Override
138     public long getReadFailureCount() {
139       return readFailureCount.get();
140     }
141
142     @Override
143     public long incReadFailureCount() {
144       return readFailureCount.incrementAndGet();
145     }
146
147     @Override
148     public void publishReadFailure(HRegionInfo region, Exception e) {
149       readFailureCount.incrementAndGet();
150       LOG.error(String.format("read from region %s failed", region.getRegionNameAsString()), e);
151     }
152
153     @Override
154     public void publishReadFailure(HRegionInfo region, HColumnDescriptor column, Exception e) {
155       readFailureCount.incrementAndGet();
156       LOG.error(String.format("read from region %s column family %s failed",
157                 region.getRegionNameAsString(), column.getNameAsString()), e);
158     }
159
160     @Override
161     public void publishReadTiming(HRegionInfo region, HColumnDescriptor column, long msTime) {
162       LOG.info(String.format("read from region %s column family %s in %dms",
163                region.getRegionNameAsString(), column.getNameAsString(), msTime));
164     }
165
166     @Override
167     public long getWriteFailureCount() {
168       return writeFailureCount.get();
169     }
170
171     @Override
172     public void publishWriteFailure(HRegionInfo region, Exception e) {
173       writeFailureCount.incrementAndGet();
174       LOG.error(String.format("write to region %s failed", region.getRegionNameAsString()), e);
175     }
176
177     @Override
178     public void publishWriteFailure(HRegionInfo region, HColumnDescriptor column, Exception e) {
179       writeFailureCount.incrementAndGet();
180       LOG.error(String.format("write to region %s column family %s failed",
181         region.getRegionNameAsString(), column.getNameAsString()), e);
182     }
183
184     @Override
185     public void publishWriteTiming(HRegionInfo region, HColumnDescriptor column, long msTime) {
186       LOG.info(String.format("write to region %s column family %s in %dms",
187         region.getRegionNameAsString(), column.getNameAsString(), msTime));
188     }
189   }
190   // a ExtendedSink implementation
191   public static class RegionServerStdOutSink extends StdOutSink implements ExtendedSink {
192
193     @Override
194     public void publishReadFailure(String table, String server) {
195       incReadFailureCount();
196       LOG.error(String.format("Read from table:%s on region server:%s", table, server));
197     }
198
199     @Override
200     public void publishReadTiming(String table, String server, long msTime) {
201       LOG.info(String.format("Read from table:%s on region server:%s in %dms",
202           table, server, msTime));
203     }
204   }
205
206   public static class ZookeeperStdOutSink extends StdOutSink implements ExtendedSink {
207     @Override public void publishReadFailure(String zNode, String server) {
208       incReadFailureCount();
209       LOG.error(String.format("Read from zNode:%s on zookeeper instance:%s", zNode, server));
210     }
211
212     @Override public void publishReadTiming(String znode, String server, long msTime) {
213       LOG.info(String.format("Read from zNode:%s on zookeeper instance:%s in %dms",
214           znode, server, msTime));
215     }
216   }
217
218   static class ZookeeperTask implements Callable<Void> {
219     private final Connection connection;
220     private final String host;
221     private String znode;
222     private final int timeout;
223     private ZookeeperStdOutSink sink;
224
225     public ZookeeperTask(Connection connection, String host, String znode, int timeout,
226         ZookeeperStdOutSink sink) {
227       this.connection = connection;
228       this.host = host;
229       this.znode = znode;
230       this.timeout = timeout;
231       this.sink = sink;
232     }
233
234     @Override public Void call() throws Exception {
235       ZooKeeper zooKeeper = null;
236       try {
237         zooKeeper = new ZooKeeper(host, timeout, EmptyWatcher.instance);
238         Stat exists = zooKeeper.exists(znode, false);
239         StopWatch stopwatch = new StopWatch();
240         stopwatch.start();
241         zooKeeper.getData(znode, false, exists);
242         stopwatch.stop();
243         sink.publishReadTiming(znode, host, stopwatch.getTime());
244       } catch (KeeperException | InterruptedException e) {
245         sink.publishReadFailure(znode, host);
246       } finally {
247         if (zooKeeper != null) {
248           zooKeeper.close();
249         }
250       }
251       return null;
252     }
253   }
254
255   /**
256    * For each column family of the region tries to get one row and outputs the latency, or the
257    * failure.
258    */
259   static class RegionTask implements Callable<Void> {
260     public enum TaskType{
261       READ, WRITE
262     }
263     private Connection connection;
264     private HRegionInfo region;
265     private Sink sink;
266     private TaskType taskType;
267     private boolean rawScanEnabled;
268
269     RegionTask(Connection connection, HRegionInfo region, Sink sink, TaskType taskType,
270         boolean rawScanEnabled) {
271       this.connection = connection;
272       this.region = region;
273       this.sink = sink;
274       this.taskType = taskType;
275       this.rawScanEnabled = rawScanEnabled;
276     }
277
278     @Override
279     public Void call() {
280       switch (taskType) {
281       case READ:
282         return read();
283       case WRITE:
284         return write();
285       default:
286         return read();
287       }
288     }
289
290     public Void read() {
291       Table table = null;
292       HTableDescriptor tableDesc = null;
293       try {
294         if (LOG.isDebugEnabled()) {
295           LOG.debug(String.format("reading table descriptor for table %s",
296             region.getTable()));
297         }
298         table = connection.getTable(region.getTable());
299         tableDesc = table.getTableDescriptor();
300       } catch (IOException e) {
301         LOG.debug("sniffRegion failed", e);
302         sink.publishReadFailure(region, e);
303         if (table != null) {
304           try {
305             table.close();
306           } catch (IOException ioe) {
307             LOG.error("Close table failed", e);
308           }
309         }
310         return null;
311       }
312
313       byte[] startKey = null;
314       Get get = null;
315       Scan scan = null;
316       ResultScanner rs = null;
317       StopWatch stopWatch = new StopWatch();
318       for (HColumnDescriptor column : tableDesc.getColumnFamilies()) {
319         stopWatch.reset();
320         startKey = region.getStartKey();
321         // Can't do a get on empty start row so do a Scan of first element if any instead.
322         if (startKey.length > 0) {
323           get = new Get(startKey);
324           get.setCacheBlocks(false);
325           get.setFilter(new FirstKeyOnlyFilter());
326           get.addFamily(column.getName());
327         } else {
328           scan = new Scan();
329           if (LOG.isDebugEnabled()) {
330             LOG.debug(String.format("rawScan : %s for table: %s", rawScanEnabled,
331               tableDesc.getTableName()));
332           }
333           scan.setRaw(rawScanEnabled);
334           scan.setCaching(1);
335           scan.setCacheBlocks(false);
336           scan.setFilter(new FirstKeyOnlyFilter());
337           scan.addFamily(column.getName());
338           scan.setMaxResultSize(1L);
339           scan.setSmall(true);
340         }
341
342         if (LOG.isDebugEnabled()) {
343           LOG.debug(String.format("reading from table %s region %s column family %s and key %s",
344             tableDesc.getTableName(), region.getRegionNameAsString(), column.getNameAsString(),
345             Bytes.toStringBinary(startKey)));
346         }
347         try {
348           stopWatch.start();
349           if (startKey.length > 0) {
350             table.get(get);
351           } else {
352             rs = table.getScanner(scan);
353             rs.next();
354           }
355           stopWatch.stop();
356           sink.publishReadTiming(region, column, stopWatch.getTime());
357         } catch (Exception e) {
358           sink.publishReadFailure(region, column, e);
359         } finally {
360           if (rs != null) {
361             rs.close();
362           }
363           scan = null;
364           get = null;
365           startKey = null;
366         }
367       }
368       try {
369         table.close();
370       } catch (IOException e) {
371         LOG.error("Close table failed", e);
372       }
373       return null;
374     }
375
376     /**
377      * Check writes for the canary table
378      * @return
379      */
380     private Void write() {
381       Table table = null;
382       HTableDescriptor tableDesc = null;
383       try {
384         table = connection.getTable(region.getTable());
385         tableDesc = table.getTableDescriptor();
386         byte[] rowToCheck = region.getStartKey();
387         if (rowToCheck.length == 0) {
388           rowToCheck = new byte[]{0x0};
389         }
390         int writeValueSize =
391             connection.getConfiguration().getInt(HConstants.HBASE_CANARY_WRITE_VALUE_SIZE_KEY, 10);
392         for (HColumnDescriptor column : tableDesc.getColumnFamilies()) {
393           Put put = new Put(rowToCheck);
394           byte[] value = new byte[writeValueSize];
395           Bytes.random(value);
396           put.addColumn(column.getName(), HConstants.EMPTY_BYTE_ARRAY, value);
397
398           if (LOG.isDebugEnabled()) {
399             LOG.debug(String.format("writing to table %s region %s column family %s and key %s",
400               tableDesc.getTableName(), region.getRegionNameAsString(), column.getNameAsString(),
401               Bytes.toStringBinary(rowToCheck)));
402           }
403           try {
404             long startTime = System.currentTimeMillis();
405             table.put(put);
406             long time = System.currentTimeMillis() - startTime;
407             sink.publishWriteTiming(region, column, time);
408           } catch (Exception e) {
409             sink.publishWriteFailure(region, column, e);
410           }
411         }
412         table.close();
413       } catch (IOException e) {
414         sink.publishWriteFailure(region, e);
415       }
416       return null;
417     }
418   }
419
420   /**
421    * Get one row from a region on the regionserver and outputs the latency, or the failure.
422    */
423   static class RegionServerTask implements Callable<Void> {
424     private Connection connection;
425     private String serverName;
426     private HRegionInfo region;
427     private ExtendedSink sink;
428     private AtomicLong successes;
429
430     RegionServerTask(Connection connection, String serverName, HRegionInfo region,
431         ExtendedSink sink, AtomicLong successes) {
432       this.connection = connection;
433       this.serverName = serverName;
434       this.region = region;
435       this.sink = sink;
436       this.successes = successes;
437     }
438
439     @Override
440     public Void call() {
441       TableName tableName = null;
442       Table table = null;
443       Get get = null;
444       byte[] startKey = null;
445       Scan scan = null;
446       StopWatch stopWatch = new StopWatch();
447       // monitor one region on every region server
448       stopWatch.reset();
449       try {
450         tableName = region.getTable();
451         table = connection.getTable(tableName);
452         startKey = region.getStartKey();
453         // Can't do a get on empty start row so do a Scan of first element if any instead.
454         if (LOG.isDebugEnabled()) {
455           LOG.debug(String.format("reading from region server %s table %s region %s and key %s",
456             serverName, region.getTable(), region.getRegionNameAsString(),
457             Bytes.toStringBinary(startKey)));
458         }
459         if (startKey.length > 0) {
460           get = new Get(startKey);
461           get.setCacheBlocks(false);
462           get.setFilter(new FirstKeyOnlyFilter());
463           stopWatch.start();
464           table.get(get);
465           stopWatch.stop();
466         } else {
467           scan = new Scan();
468           scan.setCacheBlocks(false);
469           scan.setFilter(new FirstKeyOnlyFilter());
470           scan.setCaching(1);
471           scan.setMaxResultSize(1L);
472           scan.setSmall(true);
473           stopWatch.start();
474           ResultScanner s = table.getScanner(scan);
475           s.next();
476           s.close();
477           stopWatch.stop();
478         }
479         successes.incrementAndGet();
480         sink.publishReadTiming(tableName.getNameAsString(), serverName, stopWatch.getTime());
481       } catch (TableNotFoundException tnfe) {
482         LOG.error("Table may be deleted", tnfe);
483         // This is ignored because it doesn't imply that the regionserver is dead
484       } catch (TableNotEnabledException tnee) {
485         // This is considered a success since we got a response.
486         successes.incrementAndGet();
487         LOG.debug("The targeted table was disabled.  Assuming success.");
488       } catch (DoNotRetryIOException dnrioe) {
489         sink.publishReadFailure(tableName.getNameAsString(), serverName);
490         LOG.error(dnrioe);
491       } catch (IOException e) {
492         sink.publishReadFailure(tableName.getNameAsString(), serverName);
493         LOG.error(e);
494       } finally {
495         if (table != null) {
496           try {
497             table.close();
498           } catch (IOException e) {/* DO NOTHING */
499             LOG.error("Close table failed", e);
500           }
501         }
502         scan = null;
503         get = null;
504         startKey = null;
505       }
506       return null;
507     }
508   }
509
510   private static final int USAGE_EXIT_CODE = 1;
511   private static final int INIT_ERROR_EXIT_CODE = 2;
512   private static final int TIMEOUT_ERROR_EXIT_CODE = 3;
513   private static final int ERROR_EXIT_CODE = 4;
514   private static final int FAILURE_EXIT_CODE = 5;
515
516   private static final long DEFAULT_INTERVAL = 6000;
517
518   private static final long DEFAULT_TIMEOUT = 600000; // 10 mins
519   private static final int MAX_THREADS_NUM = 16; // #threads to contact regions
520
521   private static final Log LOG = LogFactory.getLog(Canary.class);
522
523   public static final TableName DEFAULT_WRITE_TABLE_NAME = TableName.valueOf(
524     NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "canary");
525
526   private static final String CANARY_TABLE_FAMILY_NAME = "Test";
527
528   private Configuration conf = null;
529   private long interval = 0;
530   private Sink sink = null;
531
532   private boolean useRegExp;
533   private long timeout = DEFAULT_TIMEOUT;
534   private boolean failOnError = true;
535   private boolean regionServerMode = false;
536   private boolean zookeeperMode = false;
537   private boolean regionServerAllRegions = false;
538   private boolean writeSniffing = false;
539   private boolean treatFailureAsError = false;
540   private TableName writeTableName = DEFAULT_WRITE_TABLE_NAME;
541
542   private ExecutorService executor; // threads to retrieve data from regionservers
543
544   public Canary() {
545     this(new ScheduledThreadPoolExecutor(1), new RegionServerStdOutSink());
546   }
547
548   public Canary(ExecutorService executor, Sink sink) {
549     this.executor = executor;
550     this.sink = sink;
551   }
552
553   @Override
554   public Configuration getConf() {
555     return conf;
556   }
557
558   @Override
559   public void setConf(Configuration conf) {
560     this.conf = conf;
561   }
562
563   private int parseArgs(String[] args) {
564     int index = -1;
565     // Process command line args
566     for (int i = 0; i < args.length; i++) {
567       String cmd = args[i];
568
569       if (cmd.startsWith("-")) {
570         if (index >= 0) {
571           // command line args must be in the form: [opts] [table 1 [table 2 ...]]
572           System.err.println("Invalid command line options");
573           printUsageAndExit();
574         }
575
576         if (cmd.equals("-help")) {
577           // user asked for help, print the help and quit.
578           printUsageAndExit();
579         } else if (cmd.equals("-daemon") && interval == 0) {
580           // user asked for daemon mode, set a default interval between checks
581           interval = DEFAULT_INTERVAL;
582         } else if (cmd.equals("-interval")) {
583           // user has specified an interval for canary breaths (-interval N)
584           i++;
585
586           if (i == args.length) {
587             System.err.println("-interval needs a numeric value argument.");
588             printUsageAndExit();
589           }
590
591           try {
592             interval = Long.parseLong(args[i]) * 1000;
593           } catch (NumberFormatException e) {
594             System.err.println("-interval needs a numeric value argument.");
595             printUsageAndExit();
596           }
597         } else if (cmd.equals("-zookeeper")) {
598           this.zookeeperMode = true;
599         } else if(cmd.equals("-regionserver")) {
600           this.regionServerMode = true;
601         } else if(cmd.equals("-allRegions")) {
602           this.regionServerAllRegions = true;
603         } else if(cmd.equals("-writeSniffing")) {
604           this.writeSniffing = true;
605         } else if(cmd.equals("-treatFailureAsError")) {
606           this.treatFailureAsError = true;
607         } else if (cmd.equals("-e")) {
608           this.useRegExp = true;
609         } else if (cmd.equals("-t")) {
610           i++;
611
612           if (i == args.length) {
613             System.err.println("-t needs a numeric value argument.");
614             printUsageAndExit();
615           }
616
617           try {
618             this.timeout = Long.parseLong(args[i]);
619           } catch (NumberFormatException e) {
620             System.err.println("-t needs a numeric value argument.");
621             printUsageAndExit();
622           }
623         } else if (cmd.equals("-writeTable")) {
624           i++;
625
626           if (i == args.length) {
627             System.err.println("-writeTable needs a string value argument.");
628             printUsageAndExit();
629           }
630           this.writeTableName = TableName.valueOf(args[i]);
631         } else if (cmd.equals("-f")) {
632           i++;
633
634           if (i == args.length) {
635             System.err
636                 .println("-f needs a boolean value argument (true|false).");
637             printUsageAndExit();
638           }
639
640           this.failOnError = Boolean.parseBoolean(args[i]);
641         } else {
642           // no options match
643           System.err.println(cmd + " options is invalid.");
644           printUsageAndExit();
645         }
646       } else if (index < 0) {
647         // keep track of first table name specified by the user
648         index = i;
649       }
650     }
651     if (this.regionServerAllRegions && !this.regionServerMode) {
652       System.err.println("-allRegions can only be specified in regionserver mode.");
653       printUsageAndExit();
654     }
655     if (this.zookeeperMode) {
656       if (this.regionServerMode || this.regionServerAllRegions || this.writeSniffing) {
657         System.err.println("-zookeeper is exclusive and cannot be combined with "
658             + "other modes.");
659         printUsageAndExit();
660       }
661     }
662     return index;
663   }
664
665   @Override
666   public int run(String[] args) throws Exception {
667     int index = parseArgs(args);
668     ChoreService choreService = null;
669
670     // Launches chore for refreshing kerberos credentials if security is enabled.
671     // Please see http://hbase.apache.org/book.html#_running_canary_in_a_kerberos_enabled_cluster
672     // for more details.
673     final ScheduledChore authChore = AuthUtil.getAuthChore(conf);
674     if (authChore != null) {
675       choreService = new ChoreService("CANARY_TOOL");
676       choreService.scheduleChore(authChore);
677     }
678
679     // Start to prepare the stuffs
680     Monitor monitor = null;
681     Thread monitorThread = null;
682     long startTime = 0;
683     long currentTimeLength = 0;
684     // Get a connection to use in below.
685     try (Connection connection = ConnectionFactory.createConnection(this.conf)) {
686       do {
687         // Do monitor !!
688         try {
689           monitor = this.newMonitor(connection, index, args);
690           monitorThread = new Thread(monitor);
691           startTime = System.currentTimeMillis();
692           monitorThread.start();
693           while (!monitor.isDone()) {
694             // wait for 1 sec
695             Thread.sleep(1000);
696             // exit if any error occurs
697             if (this.failOnError && monitor.hasError()) {
698               monitorThread.interrupt();
699               if (monitor.initialized) {
700                 return monitor.errorCode;
701               } else {
702                 return INIT_ERROR_EXIT_CODE;
703               }
704             }
705             currentTimeLength = System.currentTimeMillis() - startTime;
706             if (currentTimeLength > this.timeout) {
707               LOG.error("The monitor is running too long (" + currentTimeLength
708                   + ") after timeout limit:" + this.timeout
709                   + " will be killed itself !!");
710               if (monitor.initialized) {
711                 return TIMEOUT_ERROR_EXIT_CODE;
712               } else {
713                 return INIT_ERROR_EXIT_CODE;
714               }
715             }
716           }
717
718           if (this.failOnError && monitor.finalCheckForErrors()) {
719             monitorThread.interrupt();
720             return monitor.errorCode;
721           }
722         } finally {
723           if (monitor != null) monitor.close();
724         }
725
726         Thread.sleep(interval);
727       } while (interval > 0);
728     } // try-with-resources close
729
730     if (choreService != null) {
731       choreService.shutdown();
732     }
733     return monitor.errorCode;
734   }
735
736   private void printUsageAndExit() {
737     System.err.printf(
738       "Usage: bin/hbase %s [opts] [table1 [table2]...] | [regionserver1 [regionserver2]..]%n",
739         getClass().getName());
740     System.err.println(" where [opts] are:");
741     System.err.println("   -help          Show this help and exit.");
742     System.err.println("   -regionserver  replace the table argument to regionserver,");
743     System.err.println("      which means to enable regionserver mode");
744     System.err.println("   -allRegions    Tries all regions on a regionserver,");
745     System.err.println("      only works in regionserver mode.");
746     System.err.println("   -zookeeper    Tries to grab zookeeper.znode.parent ");
747     System.err.println("      on each zookeeper instance");
748     System.err.println("   -daemon        Continuous check at defined intervals.");
749     System.err.println("   -interval <N>  Interval between checks (sec)");
750     System.err.println("   -e             Use table/regionserver as regular expression");
751     System.err.println("      which means the table/regionserver is regular expression pattern");
752     System.err.println("   -f <B>         stop whole program if first error occurs," +
753         " default is true");
754     System.err.println("   -t <N>         timeout for a check, default is 600000 (milisecs)");
755     System.err.println("   -writeSniffing enable the write sniffing in canary");
756     System.err.println("   -treatFailureAsError treats read / write failure as error");
757     System.err.println("   -writeTable    The table used for write sniffing."
758         + " Default is hbase:canary");
759     System.err.println("   -Dhbase.canary.read.raw.enabled=<true/false> Use this flag to enable or disable raw scan during read canary test"
760         + " Default is false and raw is not enabled during scan");
761     System.err
762         .println("   -D<configProperty>=<value> assigning or override the configuration params");
763     System.exit(USAGE_EXIT_CODE);
764   }
765
766   /**
767    * A Factory method for {@link Monitor}.
768    * Can be overridden by user.
769    * @param index a start index for monitor target
770    * @param args args passed from user
771    * @return a Monitor instance
772    */
773   public Monitor newMonitor(final Connection connection, int index, String[] args) {
774     Monitor monitor = null;
775     String[] monitorTargets = null;
776
777     if(index >= 0) {
778       int length = args.length - index;
779       monitorTargets = new String[length];
780       System.arraycopy(args, index, monitorTargets, 0, length);
781     }
782
783     if (this.regionServerMode) {
784       monitor =
785           new RegionServerMonitor(connection, monitorTargets, this.useRegExp,
786               (ExtendedSink) this.sink, this.executor, this.regionServerAllRegions,
787               this.treatFailureAsError);
788     } else if (this.zookeeperMode) {
789       monitor =
790           new ZookeeperMonitor(connection, monitorTargets, this.useRegExp,
791               (ZookeeperStdOutSink) this.sink, this.executor, this.treatFailureAsError);
792     } else {
793       monitor =
794           new RegionMonitor(connection, monitorTargets, this.useRegExp, this.sink, this.executor,
795               this.writeSniffing, this.writeTableName, this.treatFailureAsError);
796     }
797     return monitor;
798   }
799
800   // a Monitor super-class can be extended by users
801   public static abstract class Monitor implements Runnable, Closeable {
802
803     protected Connection connection;
804     protected Admin admin;
805     protected String[] targets;
806     protected boolean useRegExp;
807     protected boolean treatFailureAsError;
808     protected boolean initialized = false;
809
810     protected boolean done = false;
811     protected int errorCode = 0;
812     protected Sink sink;
813     protected ExecutorService executor;
814
815     public boolean isDone() {
816       return done;
817     }
818
819     public boolean hasError() {
820       return errorCode != 0;
821     }
822
823     public boolean finalCheckForErrors() {
824       if (errorCode != 0) {
825         return true;
826       }
827       if (treatFailureAsError &&
828           (sink.getReadFailureCount() > 0 || sink.getWriteFailureCount() > 0)) {
829         errorCode = FAILURE_EXIT_CODE;
830         return true;
831       }
832       return false;
833     }
834
835     @Override
836     public void close() throws IOException {
837       if (this.admin != null) this.admin.close();
838     }
839
840     protected Monitor(Connection connection, String[] monitorTargets, boolean useRegExp, Sink sink,
841         ExecutorService executor, boolean treatFailureAsError) {
842       if (null == connection) throw new IllegalArgumentException("connection shall not be null");
843
844       this.connection = connection;
845       this.targets = monitorTargets;
846       this.useRegExp = useRegExp;
847       this.treatFailureAsError = treatFailureAsError;
848       this.sink = sink;
849       this.executor = executor;
850     }
851
852     @Override
853     public abstract void run();
854
855     protected boolean initAdmin() {
856       if (null == this.admin) {
857         try {
858           this.admin = this.connection.getAdmin();
859         } catch (Exception e) {
860           LOG.error("Initial HBaseAdmin failed...", e);
861           this.errorCode = INIT_ERROR_EXIT_CODE;
862         }
863       } else if (admin.isAborted()) {
864         LOG.error("HBaseAdmin aborted");
865         this.errorCode = INIT_ERROR_EXIT_CODE;
866       }
867       return !this.hasError();
868     }
869   }
870
871   // a monitor for region mode
872   private static class RegionMonitor extends Monitor {
873     // 10 minutes
874     private static final int DEFAULT_WRITE_TABLE_CHECK_PERIOD = 10 * 60 * 1000;
875     // 1 days
876     private static final int DEFAULT_WRITE_DATA_TTL = 24 * 60 * 60;
877
878     private long lastCheckTime = -1;
879     private boolean writeSniffing;
880     private TableName writeTableName;
881     private int writeDataTTL;
882     private float regionsLowerLimit;
883     private float regionsUpperLimit;
884     private int checkPeriod;
885     private boolean rawScanEnabled;
886
887     public RegionMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
888         Sink sink, ExecutorService executor, boolean writeSniffing, TableName writeTableName,
889         boolean treatFailureAsError) {
890       super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError);
891       Configuration conf = connection.getConfiguration();
892       this.writeSniffing = writeSniffing;
893       this.writeTableName = writeTableName;
894       this.writeDataTTL =
895           conf.getInt(HConstants.HBASE_CANARY_WRITE_DATA_TTL_KEY, DEFAULT_WRITE_DATA_TTL);
896       this.regionsLowerLimit =
897           conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY, 1.0f);
898       this.regionsUpperLimit =
899           conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_UPPERLIMIT_KEY, 1.5f);
900       this.checkPeriod =
901           conf.getInt(HConstants.HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY,
902             DEFAULT_WRITE_TABLE_CHECK_PERIOD);
903       this.rawScanEnabled = conf.getBoolean(HConstants.HBASE_CANARY_READ_RAW_SCAN_KEY, false);
904     }
905
906     @Override
907     public void run() {
908       if (this.initAdmin()) {
909         try {
910           List<Future<Void>> taskFutures = new LinkedList<Future<Void>>();
911           if (this.targets != null && this.targets.length > 0) {
912             String[] tables = generateMonitorTables(this.targets);
913             this.initialized = true;
914             for (String table : tables) {
915               taskFutures.addAll(Canary.sniff(admin, sink, table, executor, TaskType.READ,
916                 this.rawScanEnabled));
917             }
918           } else {
919             taskFutures.addAll(sniff(TaskType.READ));
920           }
921
922           if (writeSniffing) {
923             if (EnvironmentEdgeManager.currentTime() - lastCheckTime > checkPeriod) {
924               try {
925                 checkWriteTableDistribution();
926               } catch (IOException e) {
927                 LOG.error("Check canary table distribution failed!", e);
928               }
929               lastCheckTime = EnvironmentEdgeManager.currentTime();
930             }
931             // sniff canary table with write operation
932             taskFutures.addAll(Canary.sniff(admin, sink, admin.getTableDescriptor(writeTableName),
933               executor, TaskType.WRITE, this.rawScanEnabled));
934           }
935
936           for (Future<Void> future : taskFutures) {
937             try {
938               future.get();
939             } catch (ExecutionException e) {
940               LOG.error("Sniff region failed!", e);
941             }
942           }
943         } catch (Exception e) {
944           LOG.error("Run regionMonitor failed", e);
945           this.errorCode = ERROR_EXIT_CODE;
946         }
947       }
948       this.done = true;
949     }
950
951     private String[] generateMonitorTables(String[] monitorTargets) throws IOException {
952       String[] returnTables = null;
953
954       if (this.useRegExp) {
955         Pattern pattern = null;
956         HTableDescriptor[] tds = null;
957         Set<String> tmpTables = new TreeSet<String>();
958         try {
959           if (LOG.isDebugEnabled()) {
960             LOG.debug(String.format("reading list of tables"));
961           }
962           tds = this.admin.listTables(pattern);
963           if (tds == null) {
964             tds = new HTableDescriptor[0];
965           }
966           for (String monitorTarget : monitorTargets) {
967             pattern = Pattern.compile(monitorTarget);
968             for (HTableDescriptor td : tds) {
969               if (pattern.matcher(td.getNameAsString()).matches()) {
970                 tmpTables.add(td.getNameAsString());
971               }
972             }
973           }
974         } catch (IOException e) {
975           LOG.error("Communicate with admin failed", e);
976           throw e;
977         }
978
979         if (tmpTables.size() > 0) {
980           returnTables = tmpTables.toArray(new String[tmpTables.size()]);
981         } else {
982           String msg = "No HTable found, tablePattern:" + Arrays.toString(monitorTargets);
983           LOG.error(msg);
984           this.errorCode = INIT_ERROR_EXIT_CODE;
985           throw new TableNotFoundException(msg);
986         }
987       } else {
988         returnTables = monitorTargets;
989       }
990
991       return returnTables;
992     }
993
994     /*
995      * canary entry point to monitor all the tables.
996      */
997     private List<Future<Void>> sniff(TaskType taskType) throws Exception {
998       if (LOG.isDebugEnabled()) {
999         LOG.debug(String.format("reading list of tables"));
1000       }
1001       List<Future<Void>> taskFutures = new LinkedList<Future<Void>>();
1002       for (HTableDescriptor table : admin.listTables()) {
1003         if (admin.isTableEnabled(table.getTableName())
1004             && (!table.getTableName().equals(writeTableName))) {
1005           taskFutures.addAll(Canary.sniff(admin, sink, table, executor, taskType, this.rawScanEnabled));
1006         }
1007       }
1008       return taskFutures;
1009     }
1010
1011     private void checkWriteTableDistribution() throws IOException {
1012       if (!admin.tableExists(writeTableName)) {
1013         int numberOfServers = admin.getClusterStatus().getServers().size();
1014         if (numberOfServers == 0) {
1015           throw new IllegalStateException("No live regionservers");
1016         }
1017         createWriteTable(numberOfServers);
1018       }
1019
1020       if (!admin.isTableEnabled(writeTableName)) {
1021         admin.enableTable(writeTableName);
1022       }
1023
1024       ClusterStatus status = admin.getClusterStatus();
1025       int numberOfServers = status.getServersSize();
1026       if (status.getServers().contains(status.getMaster())) {
1027         numberOfServers -= 1;
1028       }
1029
1030       List<Pair<HRegionInfo, ServerName>> pairs =
1031           MetaTableAccessor.getTableRegionsAndLocations(connection, writeTableName);
1032       int numberOfRegions = pairs.size();
1033       if (numberOfRegions < numberOfServers * regionsLowerLimit
1034           || numberOfRegions > numberOfServers * regionsUpperLimit) {
1035         admin.disableTable(writeTableName);
1036         admin.deleteTable(writeTableName);
1037         createWriteTable(numberOfServers);
1038       }
1039       HashSet<ServerName> serverSet = new HashSet<ServerName>();
1040       for (Pair<HRegionInfo, ServerName> pair : pairs) {
1041         serverSet.add(pair.getSecond());
1042       }
1043       int numberOfCoveredServers = serverSet.size();
1044       if (numberOfCoveredServers < numberOfServers) {
1045         admin.balancer();
1046       }
1047     }
1048
1049     private void createWriteTable(int numberOfServers) throws IOException {
1050       int numberOfRegions = (int)(numberOfServers * regionsLowerLimit);
1051       LOG.info("Number of live regionservers: " + numberOfServers + ", "
1052           + "pre-splitting the canary table into " + numberOfRegions + " regions "
1053           + "(current lower limit of regions per server is " + regionsLowerLimit
1054           + " and you can change it by config: "
1055           + HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY + " )");
1056       HTableDescriptor desc = new HTableDescriptor(writeTableName);
1057       HColumnDescriptor family = new HColumnDescriptor(CANARY_TABLE_FAMILY_NAME);
1058       family.setMaxVersions(1);
1059       family.setTimeToLive(writeDataTTL);
1060
1061       desc.addFamily(family);
1062       byte[][] splits = new RegionSplitter.HexStringSplit().split(numberOfRegions);
1063       admin.createTable(desc, splits);
1064     }
1065   }
1066
1067   /**
1068    * Canary entry point for specified table.
1069    * @throws Exception
1070    */
1071   public static void sniff(final Admin admin, TableName tableName, boolean rawScanEnabled)
1072       throws Exception {
1073     sniff(admin, tableName, TaskType.READ, rawScanEnabled);
1074   }
1075
1076   /**
1077    * Canary entry point for specified table.
1078    * Keeping this method backward compatibility
1079    * @throws Exception
1080    */
1081   public static void sniff(final Admin admin, TableName tableName)
1082       throws Exception {
1083     sniff(admin, tableName, TaskType.READ, false);
1084   }
1085
1086   /**
1087    * Canary entry point for specified table with task type(read/write)
1088    * @throws Exception
1089    */
1090   public static void sniff(final Admin admin, TableName tableName, TaskType taskType,
1091       boolean rawScanEnabled)   throws Exception {
1092     List<Future<Void>> taskFutures =
1093         Canary.sniff(admin, new StdOutSink(), tableName.getNameAsString(),
1094           new ScheduledThreadPoolExecutor(1), taskType, rawScanEnabled);
1095     for (Future<Void> future : taskFutures) {
1096       future.get();
1097     }
1098   }
1099
1100   /**
1101    * Canary entry point for specified table with task type(read/write)
1102    * Keeping this method backward compatible
1103    * @throws Exception
1104    */
1105   public static void sniff(final Admin admin, TableName tableName, TaskType taskType)
1106       throws Exception {
1107     Canary.sniff(admin, tableName, taskType, false);
1108   }
1109
1110   /**
1111    * Canary entry point for specified table.
1112    * @throws Exception
1113    */
1114   private static List<Future<Void>> sniff(final Admin admin, final Sink sink, String tableName,
1115       ExecutorService executor, TaskType taskType, boolean rawScanEnabled) throws Exception {
1116     if (LOG.isDebugEnabled()) {
1117       LOG.debug(String.format("checking table is enabled and getting table descriptor for table %s",
1118         tableName));
1119     }
1120     if (admin.isTableEnabled(TableName.valueOf(tableName))) {
1121       return Canary.sniff(admin, sink, admin.getTableDescriptor(TableName.valueOf(tableName)),
1122         executor, taskType, rawScanEnabled);
1123     } else {
1124       LOG.warn(String.format("Table %s is not enabled", tableName));
1125     }
1126     return new LinkedList<Future<Void>>();
1127   }
1128
1129   /*
1130    * Loops over regions that owns this table, and output some information abouts the state.
1131    */
1132   private static List<Future<Void>> sniff(final Admin admin, final Sink sink,
1133       HTableDescriptor tableDesc, ExecutorService executor, TaskType taskType,
1134       boolean rawScanEnabled) throws Exception {
1135
1136     if (LOG.isDebugEnabled()) {
1137       LOG.debug(String.format("reading list of regions for table %s", tableDesc.getTableName()));
1138     }
1139
1140     Table table = null;
1141     try {
1142       table = admin.getConnection().getTable(tableDesc.getTableName());
1143     } catch (TableNotFoundException e) {
1144       return new ArrayList<Future<Void>>();
1145     }
1146     List<RegionTask> tasks = new ArrayList<RegionTask>();
1147     try {
1148       for (HRegionInfo region : admin.getTableRegions(tableDesc.getTableName())) {
1149         tasks.add(new RegionTask(admin.getConnection(), region, sink, taskType, rawScanEnabled));
1150       }
1151     } finally {
1152       table.close();
1153     }
1154     return executor.invokeAll(tasks);
1155   }
1156
1157   //  monitor for zookeeper mode
1158   private static class ZookeeperMonitor extends Monitor {
1159     private List<String> hosts;
1160     private final String znode;
1161     private final int timeout;
1162
1163     protected ZookeeperMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
1164         ExtendedSink sink, ExecutorService executor, boolean treatFailureAsError)  {
1165       super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError);
1166       Configuration configuration = connection.getConfiguration();
1167       znode =
1168           configuration.get(ZOOKEEPER_ZNODE_PARENT,
1169               DEFAULT_ZOOKEEPER_ZNODE_PARENT);
1170       timeout = configuration
1171           .getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
1172       ConnectStringParser parser =
1173           new ConnectStringParser(ZKConfig.getZKQuorumServersString(configuration));
1174       hosts = Lists.newArrayList();
1175       for (InetSocketAddress server : parser.getServerAddresses()) {
1176         hosts.add(server.toString());
1177       }
1178     }
1179
1180     @Override public void run() {
1181       List<ZookeeperTask> tasks = Lists.newArrayList();
1182       for (final String host : hosts) {
1183         tasks.add(new ZookeeperTask(connection, host, znode, timeout, getSink()));
1184       }
1185       try {
1186         for (Future<Void> future : this.executor.invokeAll(tasks)) {
1187           try {
1188             future.get();
1189           } catch (ExecutionException e) {
1190             LOG.error("Sniff zookeeper failed!", e);
1191             this.errorCode = ERROR_EXIT_CODE;
1192           }
1193         }
1194       } catch (InterruptedException e) {
1195         this.errorCode = ERROR_EXIT_CODE;
1196         Thread.currentThread().interrupt();
1197         LOG.error("Sniff zookeeper interrupted!", e);
1198       }
1199       this.done = true;
1200     }
1201
1202
1203     private ZookeeperStdOutSink getSink() {
1204       if (!(sink instanceof ZookeeperStdOutSink)) {
1205         throw new RuntimeException("Can only write to zookeeper sink");
1206       }
1207       return ((ZookeeperStdOutSink) sink);
1208     }
1209   }
1210
1211
1212   // a monitor for regionserver mode
1213   private static class RegionServerMonitor extends Monitor {
1214
1215     private boolean allRegions;
1216
1217     public RegionServerMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
1218         ExtendedSink sink, ExecutorService executor, boolean allRegions,
1219         boolean treatFailureAsError) {
1220       super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError);
1221       this.allRegions = allRegions;
1222     }
1223
1224     private ExtendedSink getSink() {
1225       return (ExtendedSink) this.sink;
1226     }
1227
1228     @Override
1229     public void run() {
1230       if (this.initAdmin() && this.checkNoTableNames()) {
1231         Map<String, List<HRegionInfo>> rsAndRMap = this.filterRegionServerByName();
1232         this.initialized = true;
1233         this.monitorRegionServers(rsAndRMap);
1234       }
1235       this.done = true;
1236     }
1237
1238     private boolean checkNoTableNames() {
1239       List<String> foundTableNames = new ArrayList<String>();
1240       TableName[] tableNames = null;
1241
1242       if (LOG.isDebugEnabled()) {
1243         LOG.debug(String.format("reading list of tables"));
1244       }
1245       try {
1246         tableNames = this.admin.listTableNames();
1247       } catch (IOException e) {
1248         LOG.error("Get listTableNames failed", e);
1249         this.errorCode = INIT_ERROR_EXIT_CODE;
1250         return false;
1251       }
1252
1253       if (this.targets == null || this.targets.length == 0) return true;
1254
1255       for (String target : this.targets) {
1256         for (TableName tableName : tableNames) {
1257           if (target.equals(tableName.getNameAsString())) {
1258             foundTableNames.add(target);
1259           }
1260         }
1261       }
1262
1263       if (foundTableNames.size() > 0) {
1264         System.err.println("Cannot pass a tablename when using the -regionserver " +
1265             "option, tablenames:" + foundTableNames.toString());
1266         this.errorCode = USAGE_EXIT_CODE;
1267       }
1268       return foundTableNames.size() == 0;
1269     }
1270
1271     private void monitorRegionServers(Map<String, List<HRegionInfo>> rsAndRMap) {
1272       List<RegionServerTask> tasks = new ArrayList<RegionServerTask>();
1273       Map<String, AtomicLong> successMap = new HashMap<String, AtomicLong>();
1274       Random rand = new Random();
1275       for (Map.Entry<String, List<HRegionInfo>> entry : rsAndRMap.entrySet()) {
1276         String serverName = entry.getKey();
1277         AtomicLong successes = new AtomicLong(0);
1278         successMap.put(serverName, successes);
1279         if (entry.getValue().isEmpty()) {
1280           LOG.error(String.format("Regionserver not serving any regions - %s", serverName));
1281         } else if (this.allRegions) {
1282           for (HRegionInfo region : entry.getValue()) {
1283             tasks.add(new RegionServerTask(this.connection,
1284                 serverName,
1285                 region,
1286                 getSink(),
1287                 successes));
1288           }
1289         } else {
1290           // random select a region if flag not set
1291           HRegionInfo region = entry.getValue().get(rand.nextInt(entry.getValue().size()));
1292           tasks.add(new RegionServerTask(this.connection,
1293               serverName,
1294               region,
1295               getSink(),
1296               successes));
1297         }
1298       }
1299       try {
1300         for (Future<Void> future : this.executor.invokeAll(tasks)) {
1301           try {
1302             future.get();
1303           } catch (ExecutionException e) {
1304             LOG.error("Sniff regionserver failed!", e);
1305             this.errorCode = ERROR_EXIT_CODE;
1306           }
1307         }
1308         if (this.allRegions) {
1309           for (Map.Entry<String, List<HRegionInfo>> entry : rsAndRMap.entrySet()) {
1310             String serverName = entry.getKey();
1311             LOG.info("Successfully read " + successMap.get(serverName) + " regions out of "
1312                     + entry.getValue().size() + " on regionserver:" + serverName);
1313           }
1314         }
1315       } catch (InterruptedException e) {
1316         this.errorCode = ERROR_EXIT_CODE;
1317         LOG.error("Sniff regionserver interrupted!", e);
1318       }
1319     }
1320
1321     private Map<String, List<HRegionInfo>> filterRegionServerByName() {
1322       Map<String, List<HRegionInfo>> regionServerAndRegionsMap = this.getAllRegionServerByName();
1323       regionServerAndRegionsMap = this.doFilterRegionServerByName(regionServerAndRegionsMap);
1324       return regionServerAndRegionsMap;
1325     }
1326
1327     private Map<String, List<HRegionInfo>> getAllRegionServerByName() {
1328       Map<String, List<HRegionInfo>> rsAndRMap = new HashMap<String, List<HRegionInfo>>();
1329       Table table = null;
1330       RegionLocator regionLocator = null;
1331       try {
1332         if (LOG.isDebugEnabled()) {
1333           LOG.debug(String.format("reading list of tables and locations"));
1334         }
1335         HTableDescriptor[] tableDescs = this.admin.listTables();
1336         List<HRegionInfo> regions = null;
1337         for (HTableDescriptor tableDesc : tableDescs) {
1338           table = this.admin.getConnection().getTable(tableDesc.getTableName());
1339           regionLocator = this.admin.getConnection().getRegionLocator(tableDesc.getTableName());
1340
1341           for (HRegionLocation location : regionLocator.getAllRegionLocations()) {
1342             ServerName rs = location.getServerName();
1343             String rsName = rs.getHostname();
1344             HRegionInfo r = location.getRegionInfo();
1345
1346             if (rsAndRMap.containsKey(rsName)) {
1347               regions = rsAndRMap.get(rsName);
1348             } else {
1349               regions = new ArrayList<HRegionInfo>();
1350               rsAndRMap.put(rsName, regions);
1351             }
1352             regions.add(r);
1353           }
1354           table.close();
1355         }
1356
1357         //get any live regionservers not serving any regions
1358         for (ServerName rs : this.admin.getClusterStatus().getServers()) {
1359           String rsName = rs.getHostname();
1360           if (!rsAndRMap.containsKey(rsName)) {
1361             rsAndRMap.put(rsName, Collections.<HRegionInfo>emptyList());
1362           }
1363         }
1364       } catch (IOException e) {
1365         String msg = "Get HTables info failed";
1366         LOG.error(msg, e);
1367         this.errorCode = INIT_ERROR_EXIT_CODE;
1368       } finally {
1369         if (table != null) {
1370           try {
1371             table.close();
1372           } catch (IOException e) {
1373             LOG.warn("Close table failed", e);
1374           }
1375         }
1376       }
1377
1378       return rsAndRMap;
1379     }
1380
1381     private Map<String, List<HRegionInfo>> doFilterRegionServerByName(
1382         Map<String, List<HRegionInfo>> fullRsAndRMap) {
1383
1384       Map<String, List<HRegionInfo>> filteredRsAndRMap = null;
1385
1386       if (this.targets != null && this.targets.length > 0) {
1387         filteredRsAndRMap = new HashMap<String, List<HRegionInfo>>();
1388         Pattern pattern = null;
1389         Matcher matcher = null;
1390         boolean regExpFound = false;
1391         for (String rsName : this.targets) {
1392           if (this.useRegExp) {
1393             regExpFound = false;
1394             pattern = Pattern.compile(rsName);
1395             for (Map.Entry<String, List<HRegionInfo>> entry : fullRsAndRMap.entrySet()) {
1396               matcher = pattern.matcher(entry.getKey());
1397               if (matcher.matches()) {
1398                 filteredRsAndRMap.put(entry.getKey(), entry.getValue());
1399                 regExpFound = true;
1400               }
1401             }
1402             if (!regExpFound) {
1403               LOG.info("No RegionServerInfo found, regionServerPattern:" + rsName);
1404             }
1405           } else {
1406             if (fullRsAndRMap.containsKey(rsName)) {
1407               filteredRsAndRMap.put(rsName, fullRsAndRMap.get(rsName));
1408             } else {
1409               LOG.info("No RegionServerInfo found, regionServerName:" + rsName);
1410             }
1411           }
1412         }
1413       } else {
1414         filteredRsAndRMap = fullRsAndRMap;
1415       }
1416       return filteredRsAndRMap;
1417     }
1418   }
1419
1420   public static void main(String[] args) throws Exception {
1421     final Configuration conf = HBaseConfiguration.create();
1422
1423     // loading the generic options to conf
1424     new GenericOptionsParser(conf, args);
1425
1426     int numThreads = conf.getInt("hbase.canary.threads.num", MAX_THREADS_NUM);
1427     LOG.info("Number of execution threads " + numThreads);
1428
1429     ExecutorService executor = new ScheduledThreadPoolExecutor(numThreads);
1430
1431     Class<? extends Sink> sinkClass =
1432         conf.getClass("hbase.canary.sink.class", RegionServerStdOutSink.class, Sink.class);
1433     Sink sink = ReflectionUtils.newInstance(sinkClass);
1434
1435     int exitCode = ToolRunner.run(conf, new Canary(executor, sink), args);
1436     executor.shutdown();
1437     System.exit(exitCode);
1438   }
1439 }