View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.tool;
21  
22  import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT;
23  import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_ZNODE_PARENT;
24
25  import com.google.common.collect.Lists;
26
27  import java.io.Closeable;
28  import java.io.IOException;
29  import java.net.InetSocketAddress;
30  import java.util.ArrayList;
31  import java.util.Arrays;
32  import java.util.Collections;
33  import java.util.HashMap;
34  import java.util.HashSet;
35  import java.util.LinkedList;
36  import java.util.List;
37  import java.util.Map;
38  import java.util.Random;
39  import java.util.Set;
40  import java.util.TreeSet;
41  import java.util.concurrent.Callable;
42  import java.util.concurrent.ExecutionException;
43  import java.util.concurrent.ExecutorService;
44  import java.util.concurrent.Future;
45  import java.util.concurrent.ScheduledThreadPoolExecutor;
46  import java.util.concurrent.atomic.AtomicLong;
47  import java.util.regex.Matcher;
48  import java.util.regex.Pattern;
49
50  import org.apache.commons.lang.time.StopWatch;
51  import org.apache.commons.logging.Log;
52  import org.apache.commons.logging.LogFactory;
53  import org.apache.hadoop.conf.Configuration;
54  import org.apache.hadoop.hbase.AuthUtil;
55  import org.apache.hadoop.hbase.ChoreService;
56  import org.apache.hadoop.hbase.ClusterStatus;
57  import org.apache.hadoop.hbase.DoNotRetryIOException;
58  import org.apache.hadoop.hbase.HBaseConfiguration;
59  import org.apache.hadoop.hbase.HColumnDescriptor;
60  import org.apache.hadoop.hbase.HConstants;
61  import org.apache.hadoop.hbase.HRegionInfo;
62  import org.apache.hadoop.hbase.HRegionLocation;
63  import org.apache.hadoop.hbase.HTableDescriptor;
64  import org.apache.hadoop.hbase.MetaTableAccessor;
65  import org.apache.hadoop.hbase.NamespaceDescriptor;
66  import org.apache.hadoop.hbase.ScheduledChore;
67  import org.apache.hadoop.hbase.ServerName;
68  import org.apache.hadoop.hbase.TableName;
69  import org.apache.hadoop.hbase.TableNotEnabledException;
70  import org.apache.hadoop.hbase.TableNotFoundException;
71  import org.apache.hadoop.hbase.client.Admin;
72  import org.apache.hadoop.hbase.client.Connection;
73  import org.apache.hadoop.hbase.client.ConnectionFactory;
74  import org.apache.hadoop.hbase.client.Get;
75  import org.apache.hadoop.hbase.client.Put;
76  import org.apache.hadoop.hbase.client.RegionLocator;
77  import org.apache.hadoop.hbase.client.ResultScanner;
78  import org.apache.hadoop.hbase.client.Scan;
79  import org.apache.hadoop.hbase.client.Table;
80  import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
81  import org.apache.hadoop.hbase.tool.Canary.RegionTask.TaskType;
82  import org.apache.hadoop.hbase.util.Bytes;
83  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
84  import org.apache.hadoop.hbase.util.Pair;
85  import org.apache.hadoop.hbase.util.ReflectionUtils;
86  import org.apache.hadoop.hbase.util.RegionSplitter;
87  import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
88  import org.apache.hadoop.hbase.zookeeper.ZKConfig;
89  import org.apache.hadoop.util.GenericOptionsParser;
90  import org.apache.hadoop.util.Tool;
91  import org.apache.hadoop.util.ToolRunner;
92  import org.apache.zookeeper.KeeperException;
93  import org.apache.zookeeper.ZooKeeper;
94  import org.apache.zookeeper.client.ConnectStringParser;
95  import org.apache.zookeeper.data.Stat;
96
97  /**
98   * HBase Canary Tool, that that can be used to do
99   * "canary monitoring" of a running HBase cluster.
100  *
101  * Here are three modes
102  * 1. region mode - Foreach region tries to get one row per column family
103  * and outputs some information about failure or latency.
104  *
105  * 2. regionserver mode - Foreach regionserver tries to get one row from one table
106  * selected randomly and outputs some information about failure or latency.
107  *
108  * 3. zookeeper mode - for each zookeeper instance, selects a zNode and
109  * outputs some information about failure or latency.
110  */
111 public final class Canary implements Tool {
112   // Sink interface used by the canary to outputs information
113   public interface Sink {
114     public long getReadFailureCount();
115     public long incReadFailureCount();
116     public void publishReadFailure(HRegionInfo region, Exception e);
117     public void publishReadFailure(HRegionInfo region, HColumnDescriptor column, Exception e);
118     public void publishReadTiming(HRegionInfo region, HColumnDescriptor column, long msTime);
119     public long getWriteFailureCount();
120     public void publishWriteFailure(HRegionInfo region, Exception e);
121     public void publishWriteFailure(HRegionInfo region, HColumnDescriptor column, Exception e);
122     public void publishWriteTiming(HRegionInfo region, HColumnDescriptor column, long msTime);
123   }
124   // new extended sink for output regionserver mode info
125   // do not change the Sink interface directly due to maintaining the API
126   public interface ExtendedSink extends Sink {
127     public void publishReadFailure(String table, String server);
128     public void publishReadTiming(String table, String server, long msTime);
129   }
130
131   // Simple implementation of canary sink that allows to plot on
132   // file or standard output timings or failures.
133   public static class StdOutSink implements Sink {
134     private AtomicLong readFailureCount = new AtomicLong(0),
135         writeFailureCount = new AtomicLong(0);
136
137     @Override
138     public long getReadFailureCount() {
139       return readFailureCount.get();
140     }
141
142     @Override
143     public long incReadFailureCount() {
144       return readFailureCount.incrementAndGet();
145     }
146
147     @Override
148     public void publishReadFailure(HRegionInfo region, Exception e) {
149       readFailureCount.incrementAndGet();
150       LOG.error(String.format("read from region %s failed", region.getRegionNameAsString()), e);
151     }
152
153     @Override
154     public void publishReadFailure(HRegionInfo region, HColumnDescriptor column, Exception e) {
155       readFailureCount.incrementAndGet();
156       LOG.error(String.format("read from region %s column family %s failed",
157                 region.getRegionNameAsString(), column.getNameAsString()), e);
158     }
159
160     @Override
161     public void publishReadTiming(HRegionInfo region, HColumnDescriptor column, long msTime) {
162       LOG.info(String.format("read from region %s column family %s in %dms",
163                region.getRegionNameAsString(), column.getNameAsString(), msTime));
164     }
165
166     @Override
167     public long getWriteFailureCount() {
168       return writeFailureCount.get();
169     }
170
171     @Override
172     public void publishWriteFailure(HRegionInfo region, Exception e) {
173       writeFailureCount.incrementAndGet();
174       LOG.error(String.format("write to region %s failed", region.getRegionNameAsString()), e);
175     }
176
177     @Override
178     public void publishWriteFailure(HRegionInfo region, HColumnDescriptor column, Exception e) {
179       writeFailureCount.incrementAndGet();
180       LOG.error(String.format("write to region %s column family %s failed",
181         region.getRegionNameAsString(), column.getNameAsString()), e);
182     }
183
184     @Override
185     public void publishWriteTiming(HRegionInfo region, HColumnDescriptor column, long msTime) {
186       LOG.info(String.format("write to region %s column family %s in %dms",
187         region.getRegionNameAsString(), column.getNameAsString(), msTime));
188     }
189   }
190   // a ExtendedSink implementation
191   public static class RegionServerStdOutSink extends StdOutSink implements ExtendedSink {
192
193     @Override
194     public void publishReadFailure(String table, String server) {
195       incReadFailureCount();
196       LOG.error(String.format("Read from table:%s on region server:%s", table, server));
197     }
198
199     @Override
200     public void publishReadTiming(String table, String server, long msTime) {
201       LOG.info(String.format("Read from table:%s on region server:%s in %dms",
202           table, server, msTime));
203     }
204   }
205
206   public static class ZookeeperStdOutSink extends StdOutSink implements ExtendedSink {
207     @Override public void publishReadFailure(String zNode, String server) {
208       incReadFailureCount();
209       LOG.error(String.format("Read from zNode:%s on zookeeper instance:%s", zNode, server));
210     }
211
212     @Override public void publishReadTiming(String znode, String server, long msTime) {
213       LOG.info(String.format("Read from zNode:%s on zookeeper instance:%s in %dms",
214           znode, server, msTime));
215     }
216   }
217
218   static class ZookeeperTask implements Callable<Void> {
219     private final Connection connection;
220     private final String host;
221     private String znode;
222     private final int timeout;
223     private ZookeeperStdOutSink sink;
224
225     public ZookeeperTask(Connection connection, String host, String znode, int timeout,
226         ZookeeperStdOutSink sink) {
227       this.connection = connection;
228       this.host = host;
229       this.znode = znode;
230       this.timeout = timeout;
231       this.sink = sink;
232     }
233
234     @Override public Void call() throws Exception {
235       ZooKeeper zooKeeper = null;
236       try {
237         zooKeeper = new ZooKeeper(host, timeout, EmptyWatcher.instance);
238         Stat exists = zooKeeper.exists(znode, false);
239         StopWatch stopwatch = new StopWatch();
240         stopwatch.start();
241         zooKeeper.getData(znode, false, exists);
242         stopwatch.stop();
243         sink.publishReadTiming(znode, host, stopwatch.getTime());
244       } catch (KeeperException | InterruptedException e) {
245         sink.publishReadFailure(znode, host);
246       } finally {
247         if (zooKeeper != null) {
248           zooKeeper.close();
249         }
250       }
251       return null;
252     }
253   }
254
255   /**
256    * For each column family of the region tries to get one row and outputs the latency, or the
257    * failure.
258    */
259   static class RegionTask implements Callable<Void> {
260     public enum TaskType{
261       READ, WRITE
262     }
263     private Connection connection;
264     private HRegionInfo region;
265     private Sink sink;
266     private TaskType taskType;
267
268     RegionTask(Connection connection, HRegionInfo region, Sink sink, TaskType taskType) {
269       this.connection = connection;
270       this.region = region;
271       this.sink = sink;
272       this.taskType = taskType;
273     }
274
275     @Override
276     public Void call() {
277       switch (taskType) {
278       case READ:
279         return read();
280       case WRITE:
281         return write();
282       default:
283         return read();
284       }
285     }
286
287     public Void read() {
288       Table table = null;
289       HTableDescriptor tableDesc = null;
290       try {
291         if (LOG.isDebugEnabled()) {
292           LOG.debug(String.format("reading table descriptor for table %s",
293             region.getTable()));
294         }
295         table = connection.getTable(region.getTable());
296         tableDesc = table.getTableDescriptor();
297       } catch (IOException e) {
298         LOG.debug("sniffRegion failed", e);
299         sink.publishReadFailure(region, e);
300         if (table != null) {
301           try {
302             table.close();
303           } catch (IOException ioe) {
304             LOG.error("Close table failed", e);
305           }
306         }
307         return null;
308       }
309
310       byte[] startKey = null;
311       Get get = null;
312       Scan scan = null;
313       ResultScanner rs = null;
314       StopWatch stopWatch = new StopWatch();
315       for (HColumnDescriptor column : tableDesc.getColumnFamilies()) {
316         stopWatch.reset();
317         startKey = region.getStartKey();
318         // Can't do a get on empty start row so do a Scan of first element if any instead.
319         if (startKey.length > 0) {
320           get = new Get(startKey);
321           get.setCacheBlocks(false);
322           get.setFilter(new FirstKeyOnlyFilter());
323           get.addFamily(column.getName());
324         } else {
325           scan = new Scan();
326           scan.setRaw(true);
327           scan.setCaching(1);
328           scan.setCacheBlocks(false);
329           scan.setFilter(new FirstKeyOnlyFilter());
330           scan.addFamily(column.getName());
331           scan.setMaxResultSize(1L);
332           scan.setSmall(true);
333         }
334
335         if (LOG.isDebugEnabled()) {
336           LOG.debug(String.format("reading from table %s region %s column family %s and key %s",
337             tableDesc.getTableName(), region.getRegionNameAsString(), column.getNameAsString(),
338             Bytes.toStringBinary(startKey)));
339         }
340         try {
341           stopWatch.start();
342           if (startKey.length > 0) {
343             table.get(get);
344           } else {
345             rs = table.getScanner(scan);
346             rs.next();
347           }
348           stopWatch.stop();
349           sink.publishReadTiming(region, column, stopWatch.getTime());
350         } catch (Exception e) {
351           sink.publishReadFailure(region, column, e);
352         } finally {
353           if (rs != null) {
354             rs.close();
355           }
356           scan = null;
357           get = null;
358           startKey = null;
359         }
360       }
361       try {
362         table.close();
363       } catch (IOException e) {
364         LOG.error("Close table failed", e);
365       }
366       return null;
367     }
368
369     /**
370      * Check writes for the canary table
371      * @return
372      */
373     private Void write() {
374       Table table = null;
375       HTableDescriptor tableDesc = null;
376       try {
377         table = connection.getTable(region.getTable());
378         tableDesc = table.getTableDescriptor();
379         byte[] rowToCheck = region.getStartKey();
380         if (rowToCheck.length == 0) {
381           rowToCheck = new byte[]{0x0};
382         }
383         int writeValueSize =
384             connection.getConfiguration().getInt(HConstants.HBASE_CANARY_WRITE_VALUE_SIZE_KEY, 10);
385         for (HColumnDescriptor column : tableDesc.getColumnFamilies()) {
386           Put put = new Put(rowToCheck);
387           byte[] value = new byte[writeValueSize];
388           Bytes.random(value);
389           put.addColumn(column.getName(), HConstants.EMPTY_BYTE_ARRAY, value);
390
391           if (LOG.isDebugEnabled()) {
392             LOG.debug(String.format("writing to table %s region %s column family %s and key %s",
393               tableDesc.getTableName(), region.getRegionNameAsString(), column.getNameAsString(),
394               Bytes.toStringBinary(rowToCheck)));
395           }
396           try {
397             long startTime = System.currentTimeMillis();
398             table.put(put);
399             long time = System.currentTimeMillis() - startTime;
400             sink.publishWriteTiming(region, column, time);
401           } catch (Exception e) {
402             sink.publishWriteFailure(region, column, e);
403           }
404         }
405         table.close();
406       } catch (IOException e) {
407         sink.publishWriteFailure(region, e);
408       }
409       return null;
410     }
411   }
412
413   /**
414    * Get one row from a region on the regionserver and outputs the latency, or the failure.
415    */
416   static class RegionServerTask implements Callable<Void> {
417     private Connection connection;
418     private String serverName;
419     private HRegionInfo region;
420     private ExtendedSink sink;
421     private AtomicLong successes;
422
423     RegionServerTask(Connection connection, String serverName, HRegionInfo region,
424         ExtendedSink sink, AtomicLong successes) {
425       this.connection = connection;
426       this.serverName = serverName;
427       this.region = region;
428       this.sink = sink;
429       this.successes = successes;
430     }
431
432     @Override
433     public Void call() {
434       TableName tableName = null;
435       Table table = null;
436       Get get = null;
437       byte[] startKey = null;
438       Scan scan = null;
439       StopWatch stopWatch = new StopWatch();
440       // monitor one region on every region server
441       stopWatch.reset();
442       try {
443         tableName = region.getTable();
444         table = connection.getTable(tableName);
445         startKey = region.getStartKey();
446         // Can't do a get on empty start row so do a Scan of first element if any instead.
447         if (LOG.isDebugEnabled()) {
448           LOG.debug(String.format("reading from region server %s table %s region %s and key %s",
449             serverName, region.getTable(), region.getRegionNameAsString(),
450             Bytes.toStringBinary(startKey)));
451         }
452         if (startKey.length > 0) {
453           get = new Get(startKey);
454           get.setCacheBlocks(false);
455           get.setFilter(new FirstKeyOnlyFilter());
456           stopWatch.start();
457           table.get(get);
458           stopWatch.stop();
459         } else {
460           scan = new Scan();
461           scan.setCacheBlocks(false);
462           scan.setFilter(new FirstKeyOnlyFilter());
463           scan.setCaching(1);
464           scan.setMaxResultSize(1L);
465           scan.setSmall(true);
466           stopWatch.start();
467           ResultScanner s = table.getScanner(scan);
468           s.next();
469           s.close();
470           stopWatch.stop();
471         }
472         successes.incrementAndGet();
473         sink.publishReadTiming(tableName.getNameAsString(), serverName, stopWatch.getTime());
474       } catch (TableNotFoundException tnfe) {
475         LOG.error("Table may be deleted", tnfe);
476         // This is ignored because it doesn't imply that the regionserver is dead
477       } catch (TableNotEnabledException tnee) {
478         // This is considered a success since we got a response.
479         successes.incrementAndGet();
480         LOG.debug("The targeted table was disabled.  Assuming success.");
481       } catch (DoNotRetryIOException dnrioe) {
482         sink.publishReadFailure(tableName.getNameAsString(), serverName);
483         LOG.error(dnrioe);
484       } catch (IOException e) {
485         sink.publishReadFailure(tableName.getNameAsString(), serverName);
486         LOG.error(e);
487       } finally {
488         if (table != null) {
489           try {
490             table.close();
491           } catch (IOException e) {/* DO NOTHING */
492             LOG.error("Close table failed", e);
493           }
494         }
495         scan = null;
496         get = null;
497         startKey = null;
498       }
499       return null;
500     }
501   }
502
503   private static final int USAGE_EXIT_CODE = 1;
504   private static final int INIT_ERROR_EXIT_CODE = 2;
505   private static final int TIMEOUT_ERROR_EXIT_CODE = 3;
506   private static final int ERROR_EXIT_CODE = 4;
507   private static final int FAILURE_EXIT_CODE = 5;
508
509   private static final long DEFAULT_INTERVAL = 6000;
510
511   private static final long DEFAULT_TIMEOUT = 600000; // 10 mins
512   private static final int MAX_THREADS_NUM = 16; // #threads to contact regions
513
514   private static final Log LOG = LogFactory.getLog(Canary.class);
515
516   public static final TableName DEFAULT_WRITE_TABLE_NAME = TableName.valueOf(
517     NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "canary");
518
519   private static final String CANARY_TABLE_FAMILY_NAME = "Test";
520
521   private Configuration conf = null;
522   private long interval = 0;
523   private Sink sink = null;
524
525   private boolean useRegExp;
526   private long timeout = DEFAULT_TIMEOUT;
527   private boolean failOnError = true;
528   private boolean regionServerMode = false;
529   private boolean zookeeperMode = false;
530   private boolean regionServerAllRegions = false;
531   private boolean writeSniffing = false;
532   private boolean treatFailureAsError = false;
533   private TableName writeTableName = DEFAULT_WRITE_TABLE_NAME;
534
535   private ExecutorService executor; // threads to retrieve data from regionservers
536
537   public Canary() {
538     this(new ScheduledThreadPoolExecutor(1), new RegionServerStdOutSink());
539   }
540
541   public Canary(ExecutorService executor, Sink sink) {
542     this.executor = executor;
543     this.sink = sink;
544   }
545
546   @Override
547   public Configuration getConf() {
548     return conf;
549   }
550
551   @Override
552   public void setConf(Configuration conf) {
553     this.conf = conf;
554   }
555
556   private int parseArgs(String[] args) {
557     int index = -1;
558     // Process command line args
559     for (int i = 0; i < args.length; i++) {
560       String cmd = args[i];
561
562       if (cmd.startsWith("-")) {
563         if (index >= 0) {
564           // command line args must be in the form: [opts] [table 1 [table 2 ...]]
565           System.err.println("Invalid command line options");
566           printUsageAndExit();
567         }
568
569         if (cmd.equals("-help")) {
570           // user asked for help, print the help and quit.
571           printUsageAndExit();
572         } else if (cmd.equals("-daemon") && interval == 0) {
573           // user asked for daemon mode, set a default interval between checks
574           interval = DEFAULT_INTERVAL;
575         } else if (cmd.equals("-interval")) {
576           // user has specified an interval for canary breaths (-interval N)
577           i++;
578
579           if (i == args.length) {
580             System.err.println("-interval needs a numeric value argument.");
581             printUsageAndExit();
582           }
583
584           try {
585             interval = Long.parseLong(args[i]) * 1000;
586           } catch (NumberFormatException e) {
587             System.err.println("-interval needs a numeric value argument.");
588             printUsageAndExit();
589           }
590         } else if (cmd.equals("-zookeeper")) {
591           this.zookeeperMode = true;
592         } else if(cmd.equals("-regionserver")) {
593           this.regionServerMode = true;
594         } else if(cmd.equals("-allRegions")) {
595           this.regionServerAllRegions = true;
596         } else if(cmd.equals("-writeSniffing")) {
597           this.writeSniffing = true;
598         } else if(cmd.equals("-treatFailureAsError")) {
599           this.treatFailureAsError = true;
600         } else if (cmd.equals("-e")) {
601           this.useRegExp = true;
602         } else if (cmd.equals("-t")) {
603           i++;
604
605           if (i == args.length) {
606             System.err.println("-t needs a numeric value argument.");
607             printUsageAndExit();
608           }
609
610           try {
611             this.timeout = Long.parseLong(args[i]);
612           } catch (NumberFormatException e) {
613             System.err.println("-t needs a numeric value argument.");
614             printUsageAndExit();
615           }
616         } else if (cmd.equals("-writeTable")) {
617           i++;
618
619           if (i == args.length) {
620             System.err.println("-writeTable needs a string value argument.");
621             printUsageAndExit();
622           }
623           this.writeTableName = TableName.valueOf(args[i]);
624         } else if (cmd.equals("-f")) {
625           i++;
626
627           if (i == args.length) {
628             System.err
629                 .println("-f needs a boolean value argument (true|false).");
630             printUsageAndExit();
631           }
632
633           this.failOnError = Boolean.parseBoolean(args[i]);
634         } else {
635           // no options match
636           System.err.println(cmd + " options is invalid.");
637           printUsageAndExit();
638         }
639       } else if (index < 0) {
640         // keep track of first table name specified by the user
641         index = i;
642       }
643     }
644     if (this.regionServerAllRegions && !this.regionServerMode) {
645       System.err.println("-allRegions can only be specified in regionserver mode.");
646       printUsageAndExit();
647     }
648     if (this.zookeeperMode) {
649       if (this.regionServerMode || this.regionServerAllRegions || this.writeSniffing) {
650         System.err.println("-zookeeper is exclusive and cannot be combined with "
651             + "other modes.");
652         printUsageAndExit();
653       }
654     }
655     return index;
656   }
657
658   @Override
659   public int run(String[] args) throws Exception {
660     int index = parseArgs(args);
661     ChoreService choreService = null;
662
663     // Launches chore for refreshing kerberos credentials if security is enabled.
664     // Please see http://hbase.apache.org/book.html#_running_canary_in_a_kerberos_enabled_cluster
665     // for more details.
666     final ScheduledChore authChore = AuthUtil.getAuthChore(conf);
667     if (authChore != null) {
668       choreService = new ChoreService("CANARY_TOOL");
669       choreService.scheduleChore(authChore);
670     }
671
672     // Start to prepare the stuffs
673     Monitor monitor = null;
674     Thread monitorThread = null;
675     long startTime = 0;
676     long currentTimeLength = 0;
677     // Get a connection to use in below.
678     try (Connection connection = ConnectionFactory.createConnection(this.conf)) {
679       do {
680         // Do monitor !!
681         try {
682           monitor = this.newMonitor(connection, index, args);
683           monitorThread = new Thread(monitor);
684           startTime = System.currentTimeMillis();
685           monitorThread.start();
686           while (!monitor.isDone()) {
687             // wait for 1 sec
688             Thread.sleep(1000);
689             // exit if any error occurs
690             if (this.failOnError && monitor.hasError()) {
691               monitorThread.interrupt();
692               if (monitor.initialized) {
693                 return monitor.errorCode;
694               } else {
695                 return INIT_ERROR_EXIT_CODE;
696               }
697             }
698             currentTimeLength = System.currentTimeMillis() - startTime;
699             if (currentTimeLength > this.timeout) {
700               LOG.error("The monitor is running too long (" + currentTimeLength
701                   + ") after timeout limit:" + this.timeout
702                   + " will be killed itself !!");
703               if (monitor.initialized) {
704                 return TIMEOUT_ERROR_EXIT_CODE;
705               } else {
706                 return INIT_ERROR_EXIT_CODE;
707               }
708             }
709           }
710
711           if (this.failOnError && monitor.finalCheckForErrors()) {
712             monitorThread.interrupt();
713             return monitor.errorCode;
714           }
715         } finally {
716           if (monitor != null) monitor.close();
717         }
718
719         Thread.sleep(interval);
720       } while (interval > 0);
721     } // try-with-resources close
722
723     if (choreService != null) {
724       choreService.shutdown();
725     }
726     return monitor.errorCode;
727   }
728
729   private void printUsageAndExit() {
730     System.err.printf(
731       "Usage: bin/hbase %s [opts] [table1 [table2]...] | [regionserver1 [regionserver2]..]%n",
732         getClass().getName());
733     System.err.println(" where [opts] are:");
734     System.err.println("   -help          Show this help and exit.");
735     System.err.println("   -regionserver  replace the table argument to regionserver,");
736     System.err.println("      which means to enable regionserver mode");
737     System.err.println("   -allRegions    Tries all regions on a regionserver,");
738     System.err.println("      only works in regionserver mode.");
739     System.err.println("   -zookeeper    Tries to grab zookeeper.znode.parent ");
740     System.err.println("      on each zookeeper instance");
741     System.err.println("   -daemon        Continuous check at defined intervals.");
742     System.err.println("   -interval <N>  Interval between checks (sec)");
743     System.err.println("   -e             Use table/regionserver as regular expression");
744     System.err.println("      which means the table/regionserver is regular expression pattern");
745     System.err.println("   -f <B>         stop whole program if first error occurs," +
746         " default is true");
747     System.err.println("   -t <N>         timeout for a check, default is 600000 (milisecs)");
748     System.err.println("   -writeSniffing enable the write sniffing in canary");
749     System.err.println("   -treatFailureAsError treats read / write failure as error");
750     System.err.println("   -writeTable    The table used for write sniffing."
751         + " Default is hbase:canary");
752     System.err
753         .println("   -D<configProperty>=<value> assigning or override the configuration params");
754     System.exit(USAGE_EXIT_CODE);
755   }
756
757   /**
758    * A Factory method for {@link Monitor}.
759    * Can be overridden by user.
760    * @param index a start index for monitor target
761    * @param args args passed from user
762    * @return a Monitor instance
763    */
764   public Monitor newMonitor(final Connection connection, int index, String[] args) {
765     Monitor monitor = null;
766     String[] monitorTargets = null;
767
768     if(index >= 0) {
769       int length = args.length - index;
770       monitorTargets = new String[length];
771       System.arraycopy(args, index, monitorTargets, 0, length);
772     }
773
774     if (this.regionServerMode) {
775       monitor =
776           new RegionServerMonitor(connection, monitorTargets, this.useRegExp,
777               (ExtendedSink) this.sink, this.executor, this.regionServerAllRegions,
778               this.treatFailureAsError);
779     } else if (this.zookeeperMode) {
780       monitor =
781           new ZookeeperMonitor(connection, monitorTargets, this.useRegExp,
782               (ZookeeperStdOutSink) this.sink, this.executor, this.treatFailureAsError);
783     } else {
784       monitor =
785           new RegionMonitor(connection, monitorTargets, this.useRegExp, this.sink, this.executor,
786               this.writeSniffing, this.writeTableName, this.treatFailureAsError);
787     }
788     return monitor;
789   }
790
791   // a Monitor super-class can be extended by users
792   public static abstract class Monitor implements Runnable, Closeable {
793
794     protected Connection connection;
795     protected Admin admin;
796     protected String[] targets;
797     protected boolean useRegExp;
798     protected boolean treatFailureAsError;
799     protected boolean initialized = false;
800
801     protected boolean done = false;
802     protected int errorCode = 0;
803     protected Sink sink;
804     protected ExecutorService executor;
805
806     public boolean isDone() {
807       return done;
808     }
809
810     public boolean hasError() {
811       return errorCode != 0;
812     }
813
814     public boolean finalCheckForErrors() {
815       if (errorCode != 0) {
816         return true;
817       }
818       if (treatFailureAsError &&
819           (sink.getReadFailureCount() > 0 || sink.getWriteFailureCount() > 0)) {
820         errorCode = FAILURE_EXIT_CODE;
821         return true;
822       }
823       return false;
824     }
825
826     @Override
827     public void close() throws IOException {
828       if (this.admin != null) this.admin.close();
829     }
830
831     protected Monitor(Connection connection, String[] monitorTargets, boolean useRegExp, Sink sink,
832         ExecutorService executor, boolean treatFailureAsError) {
833       if (null == connection) throw new IllegalArgumentException("connection shall not be null");
834
835       this.connection = connection;
836       this.targets = monitorTargets;
837       this.useRegExp = useRegExp;
838       this.treatFailureAsError = treatFailureAsError;
839       this.sink = sink;
840       this.executor = executor;
841     }
842
843     @Override
844     public abstract void run();
845
846     protected boolean initAdmin() {
847       if (null == this.admin) {
848         try {
849           this.admin = this.connection.getAdmin();
850         } catch (Exception e) {
851           LOG.error("Initial HBaseAdmin failed...", e);
852           this.errorCode = INIT_ERROR_EXIT_CODE;
853         }
854       } else if (admin.isAborted()) {
855         LOG.error("HBaseAdmin aborted");
856         this.errorCode = INIT_ERROR_EXIT_CODE;
857       }
858       return !this.hasError();
859     }
860   }
861
862   // a monitor for region mode
863   private static class RegionMonitor extends Monitor {
864     // 10 minutes
865     private static final int DEFAULT_WRITE_TABLE_CHECK_PERIOD = 10 * 60 * 1000;
866     // 1 days
867     private static final int DEFAULT_WRITE_DATA_TTL = 24 * 60 * 60;
868
869     private long lastCheckTime = -1;
870     private boolean writeSniffing;
871     private TableName writeTableName;
872     private int writeDataTTL;
873     private float regionsLowerLimit;
874     private float regionsUpperLimit;
875     private int checkPeriod;
876
877     public RegionMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
878         Sink sink, ExecutorService executor, boolean writeSniffing, TableName writeTableName,
879         boolean treatFailureAsError) {
880       super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError);
881       Configuration conf = connection.getConfiguration();
882       this.writeSniffing = writeSniffing;
883       this.writeTableName = writeTableName;
884       this.writeDataTTL =
885           conf.getInt(HConstants.HBASE_CANARY_WRITE_DATA_TTL_KEY, DEFAULT_WRITE_DATA_TTL);
886       this.regionsLowerLimit =
887           conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY, 1.0f);
888       this.regionsUpperLimit =
889           conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_UPPERLIMIT_KEY, 1.5f);
890       this.checkPeriod =
891           conf.getInt(HConstants.HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY,
892             DEFAULT_WRITE_TABLE_CHECK_PERIOD);
893     }
894
895     @Override
896     public void run() {
897       if (this.initAdmin()) {
898         try {
899           List<Future<Void>> taskFutures = new LinkedList<Future<Void>>();
900           if (this.targets != null && this.targets.length > 0) {
901             String[] tables = generateMonitorTables(this.targets);
902             this.initialized = true;
903             for (String table : tables) {
904               taskFutures.addAll(Canary.sniff(admin, sink, table, executor, TaskType.READ));
905             }
906           } else {
907             taskFutures.addAll(sniff(TaskType.READ));
908           }
909
910           if (writeSniffing) {
911             if (EnvironmentEdgeManager.currentTime() - lastCheckTime > checkPeriod) {
912               try {
913                 checkWriteTableDistribution();
914               } catch (IOException e) {
915                 LOG.error("Check canary table distribution failed!", e);
916               }
917               lastCheckTime = EnvironmentEdgeManager.currentTime();
918             }
919             // sniff canary table with write operation
920             taskFutures.addAll(Canary.sniff(admin, sink,
921               admin.getTableDescriptor(writeTableName), executor, TaskType.WRITE));
922           }
923
924           for (Future<Void> future : taskFutures) {
925             try {
926               future.get();
927             } catch (ExecutionException e) {
928               LOG.error("Sniff region failed!", e);
929             }
930           }
931         } catch (Exception e) {
932           LOG.error("Run regionMonitor failed", e);
933           this.errorCode = ERROR_EXIT_CODE;
934         }
935       }
936       this.done = true;
937     }
938
939     private String[] generateMonitorTables(String[] monitorTargets) throws IOException {
940       String[] returnTables = null;
941
942       if (this.useRegExp) {
943         Pattern pattern = null;
944         HTableDescriptor[] tds = null;
945         Set<String> tmpTables = new TreeSet<String>();
946         try {
947           if (LOG.isDebugEnabled()) {
948             LOG.debug(String.format("reading list of tables"));
949           }
950           tds = this.admin.listTables(pattern);
951           if (tds == null) {
952             tds = new HTableDescriptor[0];
953           }
954           for (String monitorTarget : monitorTargets) {
955             pattern = Pattern.compile(monitorTarget);
956             for (HTableDescriptor td : tds) {
957               if (pattern.matcher(td.getNameAsString()).matches()) {
958                 tmpTables.add(td.getNameAsString());
959               }
960             }
961           }
962         } catch (IOException e) {
963           LOG.error("Communicate with admin failed", e);
964           throw e;
965         }
966
967         if (tmpTables.size() > 0) {
968           returnTables = tmpTables.toArray(new String[tmpTables.size()]);
969         } else {
970           String msg = "No HTable found, tablePattern:" + Arrays.toString(monitorTargets);
971           LOG.error(msg);
972           this.errorCode = INIT_ERROR_EXIT_CODE;
973           throw new TableNotFoundException(msg);
974         }
975       } else {
976         returnTables = monitorTargets;
977       }
978
979       return returnTables;
980     }
981
982     /*
983      * canary entry point to monitor all the tables.
984      */
985     private List<Future<Void>> sniff(TaskType taskType) throws Exception {
986       if (LOG.isDebugEnabled()) {
987         LOG.debug(String.format("reading list of tables"));
988       }
989       List<Future<Void>> taskFutures = new LinkedList<Future<Void>>();
990       for (HTableDescriptor table : admin.listTables()) {
991         if (admin.isTableEnabled(table.getTableName())
992             && (!table.getTableName().equals(writeTableName))) {
993           taskFutures.addAll(Canary.sniff(admin, sink, table, executor, taskType));
994         }
995       }
996       return taskFutures;
997     }
998
999     private void checkWriteTableDistribution() throws IOException {
1000       if (!admin.tableExists(writeTableName)) {
1001         int numberOfServers = admin.getClusterStatus().getServers().size();
1002         if (numberOfServers == 0) {
1003           throw new IllegalStateException("No live regionservers");
1004         }
1005         createWriteTable(numberOfServers);
1006       }
1007
1008       if (!admin.isTableEnabled(writeTableName)) {
1009         admin.enableTable(writeTableName);
1010       }
1011
1012       ClusterStatus status = admin.getClusterStatus();
1013       int numberOfServers = status.getServersSize();
1014       if (status.getServers().contains(status.getMaster())) {
1015         numberOfServers -= 1;
1016       }
1017
1018       List<Pair<HRegionInfo, ServerName>> pairs =
1019           MetaTableAccessor.getTableRegionsAndLocations(connection, writeTableName);
1020       int numberOfRegions = pairs.size();
1021       if (numberOfRegions < numberOfServers * regionsLowerLimit
1022           || numberOfRegions > numberOfServers * regionsUpperLimit) {
1023         admin.disableTable(writeTableName);
1024         admin.deleteTable(writeTableName);
1025         createWriteTable(numberOfServers);
1026       }
1027       HashSet<ServerName> serverSet = new HashSet<ServerName>();
1028       for (Pair<HRegionInfo, ServerName> pair : pairs) {
1029         serverSet.add(pair.getSecond());
1030       }
1031       int numberOfCoveredServers = serverSet.size();
1032       if (numberOfCoveredServers < numberOfServers) {
1033         admin.balancer();
1034       }
1035     }
1036
1037     private void createWriteTable(int numberOfServers) throws IOException {
1038       int numberOfRegions = (int)(numberOfServers * regionsLowerLimit);
1039       LOG.info("Number of live regionservers: " + numberOfServers + ", "
1040           + "pre-splitting the canary table into " + numberOfRegions + " regions "
1041           + "(current lower limit of regions per server is " + regionsLowerLimit
1042           + " and you can change it by config: "
1043           + HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY + " )");
1044       HTableDescriptor desc = new HTableDescriptor(writeTableName);
1045       HColumnDescriptor family = new HColumnDescriptor(CANARY_TABLE_FAMILY_NAME);
1046       family.setMaxVersions(1);
1047       family.setTimeToLive(writeDataTTL);
1048
1049       desc.addFamily(family);
1050       byte[][] splits = new RegionSplitter.HexStringSplit().split(numberOfRegions);
1051       admin.createTable(desc, splits);
1052     }
1053   }
1054
1055   /**
1056    * Canary entry point for specified table.
1057    * @throws Exception
1058    */
1059   public static void sniff(final Admin admin, TableName tableName)
1060       throws Exception {
1061     sniff(admin, tableName, TaskType.READ);
1062   }
1063
1064   /**
1065    * Canary entry point for specified table with task type(read/write)
1066    * @throws Exception
1067    */
1068   public static void sniff(final Admin admin, TableName tableName, TaskType taskType)
1069       throws Exception {
1070     List<Future<Void>> taskFutures =
1071         Canary.sniff(admin, new StdOutSink(), tableName.getNameAsString(),
1072           new ScheduledThreadPoolExecutor(1), taskType);
1073     for (Future<Void> future : taskFutures) {
1074       future.get();
1075     }
1076   }
1077
1078   /**
1079    * Canary entry point for specified table.
1080    * @throws Exception
1081    */
1082   private static List<Future<Void>> sniff(final Admin admin, final Sink sink, String tableName,
1083       ExecutorService executor, TaskType taskType) throws Exception {
1084     if (LOG.isDebugEnabled()) {
1085       LOG.debug(String.format("checking table is enabled and getting table descriptor for table %s",
1086         tableName));
1087     }
1088     if (admin.isTableEnabled(TableName.valueOf(tableName))) {
1089       return Canary.sniff(admin, sink, admin.getTableDescriptor(TableName.valueOf(tableName)),
1090         executor, taskType);
1091     } else {
1092       LOG.warn(String.format("Table %s is not enabled", tableName));
1093     }
1094     return new LinkedList<Future<Void>>();
1095   }
1096
1097   /*
1098    * Loops over regions that owns this table, and output some information abouts the state.
1099    */
1100   private static List<Future<Void>> sniff(final Admin admin, final Sink sink,
1101       HTableDescriptor tableDesc, ExecutorService executor, TaskType taskType) throws Exception {
1102
1103     if (LOG.isDebugEnabled()) {
1104       LOG.debug(String.format("reading list of regions for table %s", tableDesc.getTableName()));
1105     }
1106
1107     Table table = null;
1108     try {
1109       table = admin.getConnection().getTable(tableDesc.getTableName());
1110     } catch (TableNotFoundException e) {
1111       return new ArrayList<Future<Void>>();
1112     }
1113     List<RegionTask> tasks = new ArrayList<RegionTask>();
1114     try {
1115       for (HRegionInfo region : admin.getTableRegions(tableDesc.getTableName())) {
1116         tasks.add(new RegionTask(admin.getConnection(), region, sink, taskType));
1117       }
1118     } finally {
1119       table.close();
1120     }
1121     return executor.invokeAll(tasks);
1122   }
1123
1124   //  monitor for zookeeper mode
1125   private static class ZookeeperMonitor extends Monitor {
1126     private List<String> hosts;
1127     private final String znode;
1128     private final int timeout;
1129
1130     protected ZookeeperMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
1131         ExtendedSink sink, ExecutorService executor, boolean treatFailureAsError)  {
1132       super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError);
1133       Configuration configuration = connection.getConfiguration();
1134       znode =
1135           configuration.get(ZOOKEEPER_ZNODE_PARENT,
1136               DEFAULT_ZOOKEEPER_ZNODE_PARENT);
1137       timeout = configuration
1138           .getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
1139       ConnectStringParser parser =
1140           new ConnectStringParser(ZKConfig.getZKQuorumServersString(configuration));
1141       hosts = Lists.newArrayList();
1142       for (InetSocketAddress server : parser.getServerAddresses()) {
1143         hosts.add(server.toString());
1144       }
1145     }
1146
1147     @Override public void run() {
1148       List<ZookeeperTask> tasks = Lists.newArrayList();
1149       for (final String host : hosts) {
1150         tasks.add(new ZookeeperTask(connection, host, znode, timeout, getSink()));
1151       }
1152       try {
1153         for (Future<Void> future : this.executor.invokeAll(tasks)) {
1154           try {
1155             future.get();
1156           } catch (ExecutionException e) {
1157             LOG.error("Sniff zookeeper failed!", e);
1158             this.errorCode = ERROR_EXIT_CODE;
1159           }
1160         }
1161       } catch (InterruptedException e) {
1162         this.errorCode = ERROR_EXIT_CODE;
1163         Thread.currentThread().interrupt();
1164         LOG.error("Sniff zookeeper interrupted!", e);
1165       }
1166       this.done = true;
1167     }
1168
1169
1170     private ZookeeperStdOutSink getSink() {
1171       if (!(sink instanceof ZookeeperStdOutSink)) {
1172         throw new RuntimeException("Can only write to zookeeper sink");
1173       }
1174       return ((ZookeeperStdOutSink) sink);
1175     }
1176   }
1177
1178
1179   // a monitor for regionserver mode
1180   private static class RegionServerMonitor extends Monitor {
1181
1182     private boolean allRegions;
1183
1184     public RegionServerMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
1185         ExtendedSink sink, ExecutorService executor, boolean allRegions,
1186         boolean treatFailureAsError) {
1187       super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError);
1188       this.allRegions = allRegions;
1189     }
1190
1191     private ExtendedSink getSink() {
1192       return (ExtendedSink) this.sink;
1193     }
1194
1195     @Override
1196     public void run() {
1197       if (this.initAdmin() && this.checkNoTableNames()) {
1198         Map<String, List<HRegionInfo>> rsAndRMap = this.filterRegionServerByName();
1199         this.initialized = true;
1200         this.monitorRegionServers(rsAndRMap);
1201       }
1202       this.done = true;
1203     }
1204
1205     private boolean checkNoTableNames() {
1206       List<String> foundTableNames = new ArrayList<String>();
1207       TableName[] tableNames = null;
1208
1209       if (LOG.isDebugEnabled()) {
1210         LOG.debug(String.format("reading list of tables"));
1211       }
1212       try {
1213         tableNames = this.admin.listTableNames();
1214       } catch (IOException e) {
1215         LOG.error("Get listTableNames failed", e);
1216         this.errorCode = INIT_ERROR_EXIT_CODE;
1217         return false;
1218       }
1219
1220       if (this.targets == null || this.targets.length == 0) return true;
1221
1222       for (String target : this.targets) {
1223         for (TableName tableName : tableNames) {
1224           if (target.equals(tableName.getNameAsString())) {
1225             foundTableNames.add(target);
1226           }
1227         }
1228       }
1229
1230       if (foundTableNames.size() > 0) {
1231         System.err.println("Cannot pass a tablename when using the -regionserver " +
1232             "option, tablenames:" + foundTableNames.toString());
1233         this.errorCode = USAGE_EXIT_CODE;
1234       }
1235       return foundTableNames.size() == 0;
1236     }
1237
1238     private void monitorRegionServers(Map<String, List<HRegionInfo>> rsAndRMap) {
1239       List<RegionServerTask> tasks = new ArrayList<RegionServerTask>();
1240       Map<String, AtomicLong> successMap = new HashMap<String, AtomicLong>();
1241       Random rand = new Random();
1242       for (Map.Entry<String, List<HRegionInfo>> entry : rsAndRMap.entrySet()) {
1243         String serverName = entry.getKey();
1244         AtomicLong successes = new AtomicLong(0);
1245         successMap.put(serverName, successes);
1246         if (entry.getValue().isEmpty()) {
1247           LOG.error(String.format("Regionserver not serving any regions - %s", serverName));
1248         } else if (this.allRegions) {
1249           for (HRegionInfo region : entry.getValue()) {
1250             tasks.add(new RegionServerTask(this.connection,
1251                 serverName,
1252                 region,
1253                 getSink(),
1254                 successes));
1255           }
1256         } else {
1257           // random select a region if flag not set
1258           HRegionInfo region = entry.getValue().get(rand.nextInt(entry.getValue().size()));
1259           tasks.add(new RegionServerTask(this.connection,
1260               serverName,
1261               region,
1262               getSink(),
1263               successes));
1264         }
1265       }
1266       try {
1267         for (Future<Void> future : this.executor.invokeAll(tasks)) {
1268           try {
1269             future.get();
1270           } catch (ExecutionException e) {
1271             LOG.error("Sniff regionserver failed!", e);
1272             this.errorCode = ERROR_EXIT_CODE;
1273           }
1274         }
1275         if (this.allRegions) {
1276           for (Map.Entry<String, List<HRegionInfo>> entry : rsAndRMap.entrySet()) {
1277             String serverName = entry.getKey();
1278             LOG.info("Successfully read " + successMap.get(serverName) + " regions out of "
1279                     + entry.getValue().size() + " on regionserver:" + serverName);
1280           }
1281         }
1282       } catch (InterruptedException e) {
1283         this.errorCode = ERROR_EXIT_CODE;
1284         LOG.error("Sniff regionserver interrupted!", e);
1285       }
1286     }
1287
1288     private Map<String, List<HRegionInfo>> filterRegionServerByName() {
1289       Map<String, List<HRegionInfo>> regionServerAndRegionsMap = this.getAllRegionServerByName();
1290       regionServerAndRegionsMap = this.doFilterRegionServerByName(regionServerAndRegionsMap);
1291       return regionServerAndRegionsMap;
1292     }
1293
1294     private Map<String, List<HRegionInfo>> getAllRegionServerByName() {
1295       Map<String, List<HRegionInfo>> rsAndRMap = new HashMap<String, List<HRegionInfo>>();
1296       Table table = null;
1297       RegionLocator regionLocator = null;
1298       try {
1299         if (LOG.isDebugEnabled()) {
1300           LOG.debug(String.format("reading list of tables and locations"));
1301         }
1302         HTableDescriptor[] tableDescs = this.admin.listTables();
1303         List<HRegionInfo> regions = null;
1304         for (HTableDescriptor tableDesc : tableDescs) {
1305           table = this.admin.getConnection().getTable(tableDesc.getTableName());
1306           regionLocator = this.admin.getConnection().getRegionLocator(tableDesc.getTableName());
1307
1308           for (HRegionLocation location : regionLocator.getAllRegionLocations()) {
1309             ServerName rs = location.getServerName();
1310             String rsName = rs.getHostname();
1311             HRegionInfo r = location.getRegionInfo();
1312
1313             if (rsAndRMap.containsKey(rsName)) {
1314               regions = rsAndRMap.get(rsName);
1315             } else {
1316               regions = new ArrayList<HRegionInfo>();
1317               rsAndRMap.put(rsName, regions);
1318             }
1319             regions.add(r);
1320           }
1321           table.close();
1322         }
1323
1324         //get any live regionservers not serving any regions
1325         for (ServerName rs : this.admin.getClusterStatus().getServers()) {
1326           String rsName = rs.getHostname();
1327           if (!rsAndRMap.containsKey(rsName)) {
1328             rsAndRMap.put(rsName, Collections.<HRegionInfo>emptyList());
1329           }
1330         }
1331       } catch (IOException e) {
1332         String msg = "Get HTables info failed";
1333         LOG.error(msg, e);
1334         this.errorCode = INIT_ERROR_EXIT_CODE;
1335       } finally {
1336         if (table != null) {
1337           try {
1338             table.close();
1339           } catch (IOException e) {
1340             LOG.warn("Close table failed", e);
1341           }
1342         }
1343       }
1344
1345       return rsAndRMap;
1346     }
1347
1348     private Map<String, List<HRegionInfo>> doFilterRegionServerByName(
1349         Map<String, List<HRegionInfo>> fullRsAndRMap) {
1350
1351       Map<String, List<HRegionInfo>> filteredRsAndRMap = null;
1352
1353       if (this.targets != null && this.targets.length > 0) {
1354         filteredRsAndRMap = new HashMap<String, List<HRegionInfo>>();
1355         Pattern pattern = null;
1356         Matcher matcher = null;
1357         boolean regExpFound = false;
1358         for (String rsName : this.targets) {
1359           if (this.useRegExp) {
1360             regExpFound = false;
1361             pattern = Pattern.compile(rsName);
1362             for (Map.Entry<String, List<HRegionInfo>> entry : fullRsAndRMap.entrySet()) {
1363               matcher = pattern.matcher(entry.getKey());
1364               if (matcher.matches()) {
1365                 filteredRsAndRMap.put(entry.getKey(), entry.getValue());
1366                 regExpFound = true;
1367               }
1368             }
1369             if (!regExpFound) {
1370               LOG.info("No RegionServerInfo found, regionServerPattern:" + rsName);
1371             }
1372           } else {
1373             if (fullRsAndRMap.containsKey(rsName)) {
1374               filteredRsAndRMap.put(rsName, fullRsAndRMap.get(rsName));
1375             } else {
1376               LOG.info("No RegionServerInfo found, regionServerName:" + rsName);
1377             }
1378           }
1379         }
1380       } else {
1381         filteredRsAndRMap = fullRsAndRMap;
1382       }
1383       return filteredRsAndRMap;
1384     }
1385   }
1386
1387   public static void main(String[] args) throws Exception {
1388     final Configuration conf = HBaseConfiguration.create();
1389
1390     // loading the generic options to conf
1391     new GenericOptionsParser(conf, args);
1392
1393     int numThreads = conf.getInt("hbase.canary.threads.num", MAX_THREADS_NUM);
1394     LOG.info("Number of execution threads " + numThreads);
1395
1396     ExecutorService executor = new ScheduledThreadPoolExecutor(numThreads);
1397
1398     Class<? extends Sink> sinkClass =
1399         conf.getClass("hbase.canary.sink.class", RegionServerStdOutSink.class, Sink.class);
1400     Sink sink = ReflectionUtils.newInstance(sinkClass);
1401
1402     int exitCode = ToolRunner.run(conf, new Canary(executor, sink), args);
1403     executor.shutdown();
1404     System.exit(exitCode);
1405   }
1406 }