View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.tool;
21  
22  import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT;
23  import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_ZNODE_PARENT;
24
25  import com.google.common.collect.Lists;
26
27  import java.io.Closeable;
28  import java.io.IOException;
29  import java.net.InetSocketAddress;
30  import java.util.ArrayList;
31  import java.util.Arrays;
32  import java.util.Collections;
33  import java.util.HashMap;
34  import java.util.HashSet;
35  import java.util.LinkedList;
36  import java.util.List;
37  import java.util.Map;
38  import java.util.Random;
39  import java.util.Set;
40  import java.util.TreeSet;
41  import java.util.concurrent.Callable;
42  import java.util.concurrent.ConcurrentHashMap;
43  import java.util.concurrent.ExecutionException;
44  import java.util.concurrent.ExecutorService;
45  import java.util.concurrent.Future;
46  import java.util.concurrent.ScheduledThreadPoolExecutor;
47  import java.util.concurrent.atomic.AtomicLong;
48  import java.util.regex.Matcher;
49  import java.util.regex.Pattern;
50
51  import org.apache.commons.lang.time.StopWatch;
52  import org.apache.commons.logging.Log;
53  import org.apache.commons.logging.LogFactory;
54  import org.apache.hadoop.conf.Configuration;
55  import org.apache.hadoop.hbase.AuthUtil;
56  import org.apache.hadoop.hbase.ChoreService;
57  import org.apache.hadoop.hbase.ClusterStatus;
58  import org.apache.hadoop.hbase.DoNotRetryIOException;
59  import org.apache.hadoop.hbase.HBaseConfiguration;
60  import org.apache.hadoop.hbase.HColumnDescriptor;
61  import org.apache.hadoop.hbase.HConstants;
62  import org.apache.hadoop.hbase.HRegionInfo;
63  import org.apache.hadoop.hbase.HRegionLocation;
64  import org.apache.hadoop.hbase.HTableDescriptor;
65  import org.apache.hadoop.hbase.MetaTableAccessor;
66  import org.apache.hadoop.hbase.NamespaceDescriptor;
67  import org.apache.hadoop.hbase.ScheduledChore;
68  import org.apache.hadoop.hbase.ServerName;
69  import org.apache.hadoop.hbase.TableName;
70  import org.apache.hadoop.hbase.TableNotEnabledException;
71  import org.apache.hadoop.hbase.TableNotFoundException;
72  import org.apache.hadoop.hbase.client.Admin;
73  import org.apache.hadoop.hbase.client.Connection;
74  import org.apache.hadoop.hbase.client.ConnectionFactory;
75  import org.apache.hadoop.hbase.client.Get;
76  import org.apache.hadoop.hbase.client.Put;
77  import org.apache.hadoop.hbase.client.RegionLocator;
78  import org.apache.hadoop.hbase.client.ResultScanner;
79  import org.apache.hadoop.hbase.client.Scan;
80  import org.apache.hadoop.hbase.client.Table;
81  import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
82  import org.apache.hadoop.hbase.tool.Canary.RegionTask.TaskType;
83  import org.apache.hadoop.hbase.util.Bytes;
84  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
85  import org.apache.hadoop.hbase.util.Pair;
86  import org.apache.hadoop.hbase.util.ReflectionUtils;
87  import org.apache.hadoop.hbase.util.RegionSplitter;
88  import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
89  import org.apache.hadoop.hbase.zookeeper.ZKConfig;
90  import org.apache.hadoop.util.GenericOptionsParser;
91  import org.apache.hadoop.util.Tool;
92  import org.apache.hadoop.util.ToolRunner;
93  import org.apache.zookeeper.KeeperException;
94  import org.apache.zookeeper.ZooKeeper;
95  import org.apache.zookeeper.client.ConnectStringParser;
96  import org.apache.zookeeper.data.Stat;
97
98  /**
99   * HBase Canary Tool, that that can be used to do
100  * "canary monitoring" of a running HBase cluster.
101  *
102  * Here are three modes
103  * 1. region mode - Foreach region tries to get one row per column family
104  * and outputs some information about failure or latency.
105  *
106  * 2. regionserver mode - Foreach regionserver tries to get one row from one table
107  * selected randomly and outputs some information about failure or latency.
108  *
109  * 3. zookeeper mode - for each zookeeper instance, selects a zNode and
110  * outputs some information about failure or latency.
111  */
112 public final class Canary implements Tool {
113   // Sink interface used by the canary to outputs information
114   public interface Sink {
115     public long getReadFailureCount();
116     public long incReadFailureCount();
117     public void publishReadFailure(HRegionInfo region, Exception e);
118     public void publishReadFailure(HRegionInfo region, HColumnDescriptor column, Exception e);
119     public void updateReadFailedHostList(HRegionInfo region, String serverName);
120     public Map<String,String> getReadFailures();
121     public void publishReadTiming(HRegionInfo region, HColumnDescriptor column, long msTime);
122     public long getWriteFailureCount();
123     public void publishWriteFailure(HRegionInfo region, Exception e);
124     public void publishWriteFailure(HRegionInfo region, HColumnDescriptor column, Exception e);
125     public void publishWriteTiming(HRegionInfo region, HColumnDescriptor column, long msTime);
126     public void updateWriteFailedHostList(HRegionInfo region, String serverName);
127     public Map<String,String> getWriteFailures();
128   }
129   // new extended sink for output regionserver mode info
130   // do not change the Sink interface directly due to maintaining the API
131   public interface ExtendedSink extends Sink {
132     public void publishReadFailure(String table, String server);
133     public void publishReadTiming(String table, String server, long msTime);
134   }
135
136   // Simple implementation of canary sink that allows to plot on
137   // file or standard output timings or failures.
138   public static class StdOutSink implements Sink {
139     private AtomicLong readFailureCount = new AtomicLong(0),
140         writeFailureCount = new AtomicLong(0);
141
142     private Map<String, String> readFailures = new ConcurrentHashMap<String, String>();
143     private Map<String, String> writeFailures = new ConcurrentHashMap<String, String>();
144
145     @Override
146     public long getReadFailureCount() {
147       return readFailureCount.get();
148     }
149
150     @Override
151     public long incReadFailureCount() {
152       return readFailureCount.incrementAndGet();
153     }
154
155     @Override
156     public void publishReadFailure(HRegionInfo region, Exception e) {
157       readFailureCount.incrementAndGet();
158       LOG.error(String.format("read from region %s failed", region.getRegionNameAsString()), e);
159     }
160
161     @Override
162     public void publishReadFailure(HRegionInfo region, HColumnDescriptor column, Exception e) {
163       readFailureCount.incrementAndGet();
164       LOG.error(String.format("read from region %s column family %s failed",
165                 region.getRegionNameAsString(), column.getNameAsString()), e);
166     }
167
168     @Override
169     public void updateReadFailedHostList(HRegionInfo region, String serverName) {
170       readFailures.put(region.getRegionNameAsString(), serverName);
171     }
172
173     @Override
174     public void publishReadTiming(HRegionInfo region, HColumnDescriptor column, long msTime) {
175       LOG.info(String.format("read from region %s column family %s in %dms",
176         region.getRegionNameAsString(), column.getNameAsString(), msTime));
177     }
178
179     @Override
180     public Map<String, String> getReadFailures() {
181       return readFailures;
182     }
183
184     @Override
185     public Map<String, String> getWriteFailures() {
186       return writeFailures;
187     }
188
189     @Override
190     public long getWriteFailureCount() {
191       return writeFailureCount.get();
192     }
193
194     @Override
195     public void publishWriteFailure(HRegionInfo region, Exception e) {
196       writeFailureCount.incrementAndGet();
197       LOG.error(String.format("write to region %s failed", region.getRegionNameAsString()), e);
198     }
199
200     @Override
201     public void publishWriteFailure(HRegionInfo region, HColumnDescriptor column, Exception e) {
202       writeFailureCount.incrementAndGet();
203       LOG.error(String.format("write to region %s column family %s failed",
204         region.getRegionNameAsString(), column.getNameAsString()), e);
205     }
206
207     @Override
208     public void publishWriteTiming(HRegionInfo region, HColumnDescriptor column, long msTime) {
209       LOG.info(String.format("write to region %s column family %s in %dms",
210         region.getRegionNameAsString(), column.getNameAsString(), msTime));
211     }
212
213     @Override
214     public void updateWriteFailedHostList(HRegionInfo region, String serverName) {
215       writeFailures.put(region.getRegionNameAsString(), serverName);
216     }
217
218   }
219   // a ExtendedSink implementation
220   public static class RegionServerStdOutSink extends StdOutSink implements ExtendedSink {
221
222     @Override
223     public void publishReadFailure(String table, String server) {
224       incReadFailureCount();
225       LOG.error(String.format("Read from table:%s on region server:%s", table, server));
226     }
227
228     @Override
229     public void publishReadTiming(String table, String server, long msTime) {
230       LOG.info(String.format("Read from table:%s on region server:%s in %dms",
231           table, server, msTime));
232     }
233   }
234
235   public static class ZookeeperStdOutSink extends StdOutSink implements ExtendedSink {
236     @Override public void publishReadFailure(String zNode, String server) {
237       incReadFailureCount();
238       LOG.error(String.format("Read from zNode:%s on zookeeper instance:%s", zNode, server));
239     }
240
241     @Override public void publishReadTiming(String znode, String server, long msTime) {
242       LOG.info(String.format("Read from zNode:%s on zookeeper instance:%s in %dms",
243           znode, server, msTime));
244     }
245   }
246
247   static class ZookeeperTask implements Callable<Void> {
248     private final Connection connection;
249     private final String host;
250     private String znode;
251     private final int timeout;
252     private ZookeeperStdOutSink sink;
253
254     public ZookeeperTask(Connection connection, String host, String znode, int timeout,
255         ZookeeperStdOutSink sink) {
256       this.connection = connection;
257       this.host = host;
258       this.znode = znode;
259       this.timeout = timeout;
260       this.sink = sink;
261     }
262
263     @Override public Void call() throws Exception {
264       ZooKeeper zooKeeper = null;
265       try {
266         zooKeeper = new ZooKeeper(host, timeout, EmptyWatcher.instance);
267         Stat exists = zooKeeper.exists(znode, false);
268         StopWatch stopwatch = new StopWatch();
269         stopwatch.start();
270         zooKeeper.getData(znode, false, exists);
271         stopwatch.stop();
272         sink.publishReadTiming(znode, host, stopwatch.getTime());
273       } catch (KeeperException | InterruptedException e) {
274         sink.publishReadFailure(znode, host);
275       } finally {
276         if (zooKeeper != null) {
277           zooKeeper.close();
278         }
279       }
280       return null;
281     }
282   }
283
284   /**
285    * For each column family of the region tries to get one row and outputs the latency, or the
286    * failure.
287    */
288   static class RegionTask implements Callable<Void> {
289     public enum TaskType{
290       READ, WRITE
291     }
292     private Connection connection;
293     private HRegionInfo region;
294     private Sink sink;
295     private TaskType taskType;
296     private boolean rawScanEnabled;
297     private ServerName serverName;
298
299     RegionTask(Connection connection, HRegionInfo region, ServerName serverName, Sink sink,
300         TaskType taskType, boolean rawScanEnabled) {
301       this.connection = connection;
302       this.region = region;
303       this.serverName = serverName;
304       this.sink = sink;
305       this.taskType = taskType;
306       this.rawScanEnabled = rawScanEnabled;
307     }
308
309     @Override
310     public Void call() {
311       switch (taskType) {
312       case READ:
313         return read();
314       case WRITE:
315         return write();
316       default:
317         return read();
318       }
319     }
320
321     public Void read() {
322       Table table = null;
323       HTableDescriptor tableDesc = null;
324       try {
325         if (LOG.isDebugEnabled()) {
326           LOG.debug(String.format("reading table descriptor for table %s",
327             region.getTable()));
328         }
329         table = connection.getTable(region.getTable());
330         tableDesc = table.getTableDescriptor();
331       } catch (IOException e) {
332         LOG.debug("sniffRegion failed", e);
333         sink.publishReadFailure(region, e);
334         if (table != null) {
335           try {
336             table.close();
337           } catch (IOException ioe) {
338             LOG.error("Close table failed", e);
339           }
340         }
341         return null;
342       }
343
344       byte[] startKey = null;
345       Get get = null;
346       Scan scan = null;
347       ResultScanner rs = null;
348       StopWatch stopWatch = new StopWatch();
349       for (HColumnDescriptor column : tableDesc.getColumnFamilies()) {
350         stopWatch.reset();
351         startKey = region.getStartKey();
352         // Can't do a get on empty start row so do a Scan of first element if any instead.
353         if (startKey.length > 0) {
354           get = new Get(startKey);
355           get.setCacheBlocks(false);
356           get.setFilter(new FirstKeyOnlyFilter());
357           get.addFamily(column.getName());
358         } else {
359           scan = new Scan();
360           if (LOG.isDebugEnabled()) {
361             LOG.debug(String.format("rawScan : %s for table: %s", rawScanEnabled,
362               tableDesc.getTableName()));
363           }
364           scan.setRaw(rawScanEnabled);
365           scan.setCaching(1);
366           scan.setCacheBlocks(false);
367           scan.setFilter(new FirstKeyOnlyFilter());
368           scan.addFamily(column.getName());
369           scan.setMaxResultSize(1L);
370           scan.setSmall(true);
371         }
372
373         if (LOG.isDebugEnabled()) {
374           LOG.debug(String.format("reading from table %s region %s column family %s and key %s",
375             tableDesc.getTableName(), region.getRegionNameAsString(), column.getNameAsString(),
376             Bytes.toStringBinary(startKey)));
377         }
378         try {
379           stopWatch.start();
380           if (startKey.length > 0) {
381             table.get(get);
382           } else {
383             rs = table.getScanner(scan);
384             rs.next();
385           }
386           stopWatch.stop();
387           sink.publishReadTiming(region, column, stopWatch.getTime());
388         } catch (Exception e) {
389           sink.publishReadFailure(region, column, e);
390           sink.updateReadFailedHostList(region, serverName.getHostname());
391         } finally {
392           if (rs != null) {
393             rs.close();
394           }
395           scan = null;
396           get = null;
397           startKey = null;
398         }
399       }
400       try {
401         table.close();
402       } catch (IOException e) {
403         LOG.error("Close table failed", e);
404       }
405       return null;
406     }
407
408     /**
409      * Check writes for the canary table
410      * @return
411      */
412     private Void write() {
413       Table table = null;
414       HTableDescriptor tableDesc = null;
415       try {
416         table = connection.getTable(region.getTable());
417         tableDesc = table.getTableDescriptor();
418         byte[] rowToCheck = region.getStartKey();
419         if (rowToCheck.length == 0) {
420           rowToCheck = new byte[]{0x0};
421         }
422         int writeValueSize =
423             connection.getConfiguration().getInt(HConstants.HBASE_CANARY_WRITE_VALUE_SIZE_KEY, 10);
424         for (HColumnDescriptor column : tableDesc.getColumnFamilies()) {
425           Put put = new Put(rowToCheck);
426           byte[] value = new byte[writeValueSize];
427           Bytes.random(value);
428           put.addColumn(column.getName(), HConstants.EMPTY_BYTE_ARRAY, value);
429
430           if (LOG.isDebugEnabled()) {
431             LOG.debug(String.format("writing to table %s region %s column family %s and key %s",
432               tableDesc.getTableName(), region.getRegionNameAsString(), column.getNameAsString(),
433               Bytes.toStringBinary(rowToCheck)));
434           }
435           try {
436             long startTime = System.currentTimeMillis();
437             table.put(put);
438             long time = System.currentTimeMillis() - startTime;
439             sink.publishWriteTiming(region, column, time);
440           } catch (Exception e) {
441             sink.publishWriteFailure(region, column, e);
442           }
443         }
444         table.close();
445       } catch (IOException e) {
446         sink.publishWriteFailure(region, e);
447         sink.updateWriteFailedHostList(region, serverName.getHostname());
448       }
449       return null;
450     }
451   }
452
453   /**
454    * Get one row from a region on the regionserver and outputs the latency, or the failure.
455    */
456   static class RegionServerTask implements Callable<Void> {
457     private Connection connection;
458     private String serverName;
459     private HRegionInfo region;
460     private ExtendedSink sink;
461     private AtomicLong successes;
462
463     RegionServerTask(Connection connection, String serverName, HRegionInfo region,
464         ExtendedSink sink, AtomicLong successes) {
465       this.connection = connection;
466       this.serverName = serverName;
467       this.region = region;
468       this.sink = sink;
469       this.successes = successes;
470     }
471
472     @Override
473     public Void call() {
474       TableName tableName = null;
475       Table table = null;
476       Get get = null;
477       byte[] startKey = null;
478       Scan scan = null;
479       StopWatch stopWatch = new StopWatch();
480       // monitor one region on every region server
481       stopWatch.reset();
482       try {
483         tableName = region.getTable();
484         table = connection.getTable(tableName);
485         startKey = region.getStartKey();
486         // Can't do a get on empty start row so do a Scan of first element if any instead.
487         if (LOG.isDebugEnabled()) {
488           LOG.debug(String.format("reading from region server %s table %s region %s and key %s",
489             serverName, region.getTable(), region.getRegionNameAsString(),
490             Bytes.toStringBinary(startKey)));
491         }
492         if (startKey.length > 0) {
493           get = new Get(startKey);
494           get.setCacheBlocks(false);
495           get.setFilter(new FirstKeyOnlyFilter());
496           stopWatch.start();
497           table.get(get);
498           stopWatch.stop();
499         } else {
500           scan = new Scan();
501           scan.setCacheBlocks(false);
502           scan.setFilter(new FirstKeyOnlyFilter());
503           scan.setCaching(1);
504           scan.setMaxResultSize(1L);
505           scan.setSmall(true);
506           stopWatch.start();
507           ResultScanner s = table.getScanner(scan);
508           s.next();
509           s.close();
510           stopWatch.stop();
511         }
512         successes.incrementAndGet();
513         sink.publishReadTiming(tableName.getNameAsString(), serverName, stopWatch.getTime());
514       } catch (TableNotFoundException tnfe) {
515         LOG.error("Table may be deleted", tnfe);
516         // This is ignored because it doesn't imply that the regionserver is dead
517       } catch (TableNotEnabledException tnee) {
518         // This is considered a success since we got a response.
519         successes.incrementAndGet();
520         LOG.debug("The targeted table was disabled.  Assuming success.");
521       } catch (DoNotRetryIOException dnrioe) {
522         sink.publishReadFailure(tableName.getNameAsString(), serverName);
523         LOG.error(dnrioe);
524       } catch (IOException e) {
525         sink.publishReadFailure(tableName.getNameAsString(), serverName);
526         LOG.error(e);
527       } finally {
528         if (table != null) {
529           try {
530             table.close();
531           } catch (IOException e) {/* DO NOTHING */
532             LOG.error("Close table failed", e);
533           }
534         }
535         scan = null;
536         get = null;
537         startKey = null;
538       }
539       return null;
540     }
541   }
542
543   private static final int USAGE_EXIT_CODE = 1;
544   private static final int INIT_ERROR_EXIT_CODE = 2;
545   private static final int TIMEOUT_ERROR_EXIT_CODE = 3;
546   private static final int ERROR_EXIT_CODE = 4;
547   private static final int FAILURE_EXIT_CODE = 5;
548
549   private static final long DEFAULT_INTERVAL = 6000;
550
551   private static final long DEFAULT_TIMEOUT = 600000; // 10 mins
552   private static final int MAX_THREADS_NUM = 16; // #threads to contact regions
553
554   private static final Log LOG = LogFactory.getLog(Canary.class);
555
556   public static final TableName DEFAULT_WRITE_TABLE_NAME = TableName.valueOf(
557     NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "canary");
558
559   private static final String CANARY_TABLE_FAMILY_NAME = "Test";
560
561   private Configuration conf = null;
562   private long interval = 0;
563   private Sink sink = null;
564
565   private boolean useRegExp;
566   private long timeout = DEFAULT_TIMEOUT;
567   private boolean failOnError = true;
568   private boolean regionServerMode = false;
569   private boolean zookeeperMode = false;
570   private boolean regionServerAllRegions = false;
571   private boolean writeSniffing = false;
572   private boolean treatFailureAsError = false;
573   private TableName writeTableName = DEFAULT_WRITE_TABLE_NAME;
574
575   private ExecutorService executor; // threads to retrieve data from regionservers
576
577   public Canary() {
578     this(new ScheduledThreadPoolExecutor(1), new RegionServerStdOutSink());
579   }
580
581   public Canary(ExecutorService executor, Sink sink) {
582     this.executor = executor;
583     this.sink = sink;
584   }
585
586   @Override
587   public Configuration getConf() {
588     return conf;
589   }
590
591   @Override
592   public void setConf(Configuration conf) {
593     this.conf = conf;
594   }
595
596   private int parseArgs(String[] args) {
597     int index = -1;
598     // Process command line args
599     for (int i = 0; i < args.length; i++) {
600       String cmd = args[i];
601
602       if (cmd.startsWith("-")) {
603         if (index >= 0) {
604           // command line args must be in the form: [opts] [table 1 [table 2 ...]]
605           System.err.println("Invalid command line options");
606           printUsageAndExit();
607         }
608
609         if (cmd.equals("-help")) {
610           // user asked for help, print the help and quit.
611           printUsageAndExit();
612         } else if (cmd.equals("-daemon") && interval == 0) {
613           // user asked for daemon mode, set a default interval between checks
614           interval = DEFAULT_INTERVAL;
615         } else if (cmd.equals("-interval")) {
616           // user has specified an interval for canary breaths (-interval N)
617           i++;
618
619           if (i == args.length) {
620             System.err.println("-interval needs a numeric value argument.");
621             printUsageAndExit();
622           }
623
624           try {
625             interval = Long.parseLong(args[i]) * 1000;
626           } catch (NumberFormatException e) {
627             System.err.println("-interval needs a numeric value argument.");
628             printUsageAndExit();
629           }
630         } else if (cmd.equals("-zookeeper")) {
631           this.zookeeperMode = true;
632         } else if(cmd.equals("-regionserver")) {
633           this.regionServerMode = true;
634         } else if(cmd.equals("-allRegions")) {
635           this.regionServerAllRegions = true;
636         } else if(cmd.equals("-writeSniffing")) {
637           this.writeSniffing = true;
638         } else if(cmd.equals("-treatFailureAsError")) {
639           this.treatFailureAsError = true;
640         } else if (cmd.equals("-e")) {
641           this.useRegExp = true;
642         } else if (cmd.equals("-t")) {
643           i++;
644
645           if (i == args.length) {
646             System.err.println("-t needs a numeric value argument.");
647             printUsageAndExit();
648           }
649
650           try {
651             this.timeout = Long.parseLong(args[i]);
652           } catch (NumberFormatException e) {
653             System.err.println("-t needs a numeric value argument.");
654             printUsageAndExit();
655           }
656         } else if (cmd.equals("-writeTable")) {
657           i++;
658
659           if (i == args.length) {
660             System.err.println("-writeTable needs a string value argument.");
661             printUsageAndExit();
662           }
663           this.writeTableName = TableName.valueOf(args[i]);
664         } else if (cmd.equals("-f")) {
665           i++;
666
667           if (i == args.length) {
668             System.err
669                 .println("-f needs a boolean value argument (true|false).");
670             printUsageAndExit();
671           }
672
673           this.failOnError = Boolean.parseBoolean(args[i]);
674         } else {
675           // no options match
676           System.err.println(cmd + " options is invalid.");
677           printUsageAndExit();
678         }
679       } else if (index < 0) {
680         // keep track of first table name specified by the user
681         index = i;
682       }
683     }
684     if (this.regionServerAllRegions && !this.regionServerMode) {
685       System.err.println("-allRegions can only be specified in regionserver mode.");
686       printUsageAndExit();
687     }
688     if (this.zookeeperMode) {
689       if (this.regionServerMode || this.regionServerAllRegions || this.writeSniffing) {
690         System.err.println("-zookeeper is exclusive and cannot be combined with "
691             + "other modes.");
692         printUsageAndExit();
693       }
694     }
695     return index;
696   }
697
698   @Override
699   public int run(String[] args) throws Exception {
700     int index = parseArgs(args);
701     ChoreService choreService = null;
702
703     // Launches chore for refreshing kerberos credentials if security is enabled.
704     // Please see http://hbase.apache.org/book.html#_running_canary_in_a_kerberos_enabled_cluster
705     // for more details.
706     final ScheduledChore authChore = AuthUtil.getAuthChore(conf);
707     if (authChore != null) {
708       choreService = new ChoreService("CANARY_TOOL");
709       choreService.scheduleChore(authChore);
710     }
711
712     // Start to prepare the stuffs
713     Monitor monitor = null;
714     Thread monitorThread = null;
715     long startTime = 0;
716     long currentTimeLength = 0;
717     // Get a connection to use in below.
718     try (Connection connection = ConnectionFactory.createConnection(this.conf)) {
719       do {
720         // Do monitor !!
721         try {
722           monitor = this.newMonitor(connection, index, args);
723           monitorThread = new Thread(monitor);
724           startTime = System.currentTimeMillis();
725           monitorThread.start();
726           while (!monitor.isDone()) {
727             // wait for 1 sec
728             Thread.sleep(1000);
729             // exit if any error occurs
730             if (this.failOnError && monitor.hasError()) {
731               monitorThread.interrupt();
732               if (monitor.initialized) {
733                 return monitor.errorCode;
734               } else {
735                 return INIT_ERROR_EXIT_CODE;
736               }
737             }
738             currentTimeLength = System.currentTimeMillis() - startTime;
739             if (currentTimeLength > this.timeout) {
740               LOG.error("The monitor is running too long (" + currentTimeLength
741                   + ") after timeout limit:" + this.timeout
742                   + " will be killed itself !!");
743               if (monitor.initialized) {
744                 return TIMEOUT_ERROR_EXIT_CODE;
745               } else {
746                 return INIT_ERROR_EXIT_CODE;
747               }
748             }
749           }
750
751           if (this.failOnError && monitor.finalCheckForErrors()) {
752             monitorThread.interrupt();
753             return monitor.errorCode;
754           }
755         } finally {
756           if (monitor != null) monitor.close();
757         }
758
759         Thread.sleep(interval);
760       } while (interval > 0);
761     } // try-with-resources close
762
763     if (choreService != null) {
764       choreService.shutdown();
765     }
766     return monitor.errorCode;
767   }
768
769   public Map<String, String> getReadFailures()  {
770     return sink.getReadFailures();
771   }
772
773   public Map<String, String> getWriteFailures()  {
774     return sink.getWriteFailures();
775   }
776
777   private void printUsageAndExit() {
778     System.err.printf(
779       "Usage: bin/hbase %s [opts] [table1 [table2]...] | [regionserver1 [regionserver2]..]%n",
780         getClass().getName());
781     System.err.println(" where [opts] are:");
782     System.err.println("   -help          Show this help and exit.");
783     System.err.println("   -regionserver  replace the table argument to regionserver,");
784     System.err.println("      which means to enable regionserver mode");
785     System.err.println("   -allRegions    Tries all regions on a regionserver,");
786     System.err.println("      only works in regionserver mode.");
787     System.err.println("   -zookeeper    Tries to grab zookeeper.znode.parent ");
788     System.err.println("      on each zookeeper instance");
789     System.err.println("   -daemon        Continuous check at defined intervals.");
790     System.err.println("   -interval <N>  Interval between checks (sec)");
791     System.err.println("   -e             Use table/regionserver as regular expression");
792     System.err.println("      which means the table/regionserver is regular expression pattern");
793     System.err.println("   -f <B>         stop whole program if first error occurs," +
794         " default is true");
795     System.err.println("   -t <N>         timeout for a check, default is 600000 (milisecs)");
796     System.err.println("   -writeSniffing enable the write sniffing in canary");
797     System.err.println("   -treatFailureAsError treats read / write failure as error");
798     System.err.println("   -writeTable    The table used for write sniffing."
799         + " Default is hbase:canary");
800     System.err.println("   -Dhbase.canary.read.raw.enabled=<true/false> Use this flag to enable or disable raw scan during read canary test"
801         + " Default is false and raw is not enabled during scan");
802     System.err
803         .println("   -D<configProperty>=<value> assigning or override the configuration params");
804     System.exit(USAGE_EXIT_CODE);
805   }
806
807   /**
808    * A Factory method for {@link Monitor}.
809    * Can be overridden by user.
810    * @param index a start index for monitor target
811    * @param args args passed from user
812    * @return a Monitor instance
813    */
814   public Monitor newMonitor(final Connection connection, int index, String[] args) {
815     Monitor monitor = null;
816     String[] monitorTargets = null;
817
818     if(index >= 0) {
819       int length = args.length - index;
820       monitorTargets = new String[length];
821       System.arraycopy(args, index, monitorTargets, 0, length);
822     }
823
824     if (this.regionServerMode) {
825       monitor =
826           new RegionServerMonitor(connection, monitorTargets, this.useRegExp,
827               (ExtendedSink) this.sink, this.executor, this.regionServerAllRegions,
828               this.treatFailureAsError);
829     } else if (this.zookeeperMode) {
830       monitor =
831           new ZookeeperMonitor(connection, monitorTargets, this.useRegExp,
832               (ZookeeperStdOutSink) this.sink, this.executor, this.treatFailureAsError);
833     } else {
834       monitor =
835           new RegionMonitor(connection, monitorTargets, this.useRegExp, this.sink, this.executor,
836               this.writeSniffing, this.writeTableName, this.treatFailureAsError);
837     }
838     return monitor;
839   }
840
841   // a Monitor super-class can be extended by users
842   public static abstract class Monitor implements Runnable, Closeable {
843
844     protected Connection connection;
845     protected Admin admin;
846     protected String[] targets;
847     protected boolean useRegExp;
848     protected boolean treatFailureAsError;
849     protected boolean initialized = false;
850
851     protected boolean done = false;
852     protected int errorCode = 0;
853     protected Sink sink;
854     protected ExecutorService executor;
855
856     public boolean isDone() {
857       return done;
858     }
859
860     public boolean hasError() {
861       return errorCode != 0;
862     }
863
864     public boolean finalCheckForErrors() {
865       if (errorCode != 0) {
866         return true;
867       }
868       if (treatFailureAsError &&
869           (sink.getReadFailureCount() > 0 || sink.getWriteFailureCount() > 0)) {
870         errorCode = FAILURE_EXIT_CODE;
871         return true;
872       }
873       return false;
874     }
875
876     @Override
877     public void close() throws IOException {
878       if (this.admin != null) this.admin.close();
879     }
880
881     protected Monitor(Connection connection, String[] monitorTargets, boolean useRegExp, Sink sink,
882         ExecutorService executor, boolean treatFailureAsError) {
883       if (null == connection) throw new IllegalArgumentException("connection shall not be null");
884
885       this.connection = connection;
886       this.targets = monitorTargets;
887       this.useRegExp = useRegExp;
888       this.treatFailureAsError = treatFailureAsError;
889       this.sink = sink;
890       this.executor = executor;
891     }
892
893     @Override
894     public abstract void run();
895
896     protected boolean initAdmin() {
897       if (null == this.admin) {
898         try {
899           this.admin = this.connection.getAdmin();
900         } catch (Exception e) {
901           LOG.error("Initial HBaseAdmin failed...", e);
902           this.errorCode = INIT_ERROR_EXIT_CODE;
903         }
904       } else if (admin.isAborted()) {
905         LOG.error("HBaseAdmin aborted");
906         this.errorCode = INIT_ERROR_EXIT_CODE;
907       }
908       return !this.hasError();
909     }
910   }
911
912   // a monitor for region mode
913   private static class RegionMonitor extends Monitor {
914     // 10 minutes
915     private static final int DEFAULT_WRITE_TABLE_CHECK_PERIOD = 10 * 60 * 1000;
916     // 1 days
917     private static final int DEFAULT_WRITE_DATA_TTL = 24 * 60 * 60;
918
919     private long lastCheckTime = -1;
920     private boolean writeSniffing;
921     private TableName writeTableName;
922     private int writeDataTTL;
923     private float regionsLowerLimit;
924     private float regionsUpperLimit;
925     private int checkPeriod;
926     private boolean rawScanEnabled;
927
928     public RegionMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
929         Sink sink, ExecutorService executor, boolean writeSniffing, TableName writeTableName,
930         boolean treatFailureAsError) {
931       super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError);
932       Configuration conf = connection.getConfiguration();
933       this.writeSniffing = writeSniffing;
934       this.writeTableName = writeTableName;
935       this.writeDataTTL =
936           conf.getInt(HConstants.HBASE_CANARY_WRITE_DATA_TTL_KEY, DEFAULT_WRITE_DATA_TTL);
937       this.regionsLowerLimit =
938           conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY, 1.0f);
939       this.regionsUpperLimit =
940           conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_UPPERLIMIT_KEY, 1.5f);
941       this.checkPeriod =
942           conf.getInt(HConstants.HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY,
943             DEFAULT_WRITE_TABLE_CHECK_PERIOD);
944       this.rawScanEnabled = conf.getBoolean(HConstants.HBASE_CANARY_READ_RAW_SCAN_KEY, false);
945     }
946
947     @Override
948     public void run() {
949       if (this.initAdmin()) {
950         try {
951           List<Future<Void>> taskFutures = new LinkedList<Future<Void>>();
952           if (this.targets != null && this.targets.length > 0) {
953             String[] tables = generateMonitorTables(this.targets);
954             this.initialized = true;
955             for (String table : tables) {
956               taskFutures.addAll(Canary.sniff(admin, sink, table, executor, TaskType.READ,
957                 this.rawScanEnabled));
958             }
959           } else {
960             taskFutures.addAll(sniff(TaskType.READ));
961           }
962
963           if (writeSniffing) {
964             if (EnvironmentEdgeManager.currentTime() - lastCheckTime > checkPeriod) {
965               try {
966                 checkWriteTableDistribution();
967               } catch (IOException e) {
968                 LOG.error("Check canary table distribution failed!", e);
969               }
970               lastCheckTime = EnvironmentEdgeManager.currentTime();
971             }
972             // sniff canary table with write operation
973             taskFutures.addAll(Canary.sniff(admin, sink, admin.getTableDescriptor(writeTableName),
974               executor, TaskType.WRITE, this.rawScanEnabled));
975           }
976
977           for (Future<Void> future : taskFutures) {
978             try {
979               future.get();
980             } catch (ExecutionException e) {
981               LOG.error("Sniff region failed!", e);
982             }
983           }
984         } catch (Exception e) {
985           LOG.error("Run regionMonitor failed", e);
986           this.errorCode = ERROR_EXIT_CODE;
987         }
988       }
989       this.done = true;
990     }
991
992     private String[] generateMonitorTables(String[] monitorTargets) throws IOException {
993       String[] returnTables = null;
994
995       if (this.useRegExp) {
996         Pattern pattern = null;
997         HTableDescriptor[] tds = null;
998         Set<String> tmpTables = new TreeSet<String>();
999         try {
1000           if (LOG.isDebugEnabled()) {
1001             LOG.debug(String.format("reading list of tables"));
1002           }
1003           tds = this.admin.listTables(pattern);
1004           if (tds == null) {
1005             tds = new HTableDescriptor[0];
1006           }
1007           for (String monitorTarget : monitorTargets) {
1008             pattern = Pattern.compile(monitorTarget);
1009             for (HTableDescriptor td : tds) {
1010               if (pattern.matcher(td.getNameAsString()).matches()) {
1011                 tmpTables.add(td.getNameAsString());
1012               }
1013             }
1014           }
1015         } catch (IOException e) {
1016           LOG.error("Communicate with admin failed", e);
1017           throw e;
1018         }
1019
1020         if (tmpTables.size() > 0) {
1021           returnTables = tmpTables.toArray(new String[tmpTables.size()]);
1022         } else {
1023           String msg = "No HTable found, tablePattern:" + Arrays.toString(monitorTargets);
1024           LOG.error(msg);
1025           this.errorCode = INIT_ERROR_EXIT_CODE;
1026           throw new TableNotFoundException(msg);
1027         }
1028       } else {
1029         returnTables = monitorTargets;
1030       }
1031
1032       return returnTables;
1033     }
1034
1035     /*
1036      * canary entry point to monitor all the tables.
1037      */
1038     private List<Future<Void>> sniff(TaskType taskType) throws Exception {
1039       if (LOG.isDebugEnabled()) {
1040         LOG.debug(String.format("reading list of tables"));
1041       }
1042       List<Future<Void>> taskFutures = new LinkedList<Future<Void>>();
1043       for (HTableDescriptor table : admin.listTables()) {
1044         if (admin.isTableEnabled(table.getTableName())
1045             && (!table.getTableName().equals(writeTableName))) {
1046           taskFutures.addAll(Canary.sniff(admin, sink, table, executor, taskType, this.rawScanEnabled));
1047         }
1048       }
1049       return taskFutures;
1050     }
1051
1052     private void checkWriteTableDistribution() throws IOException {
1053       if (!admin.tableExists(writeTableName)) {
1054         int numberOfServers = admin.getClusterStatus().getServers().size();
1055         if (numberOfServers == 0) {
1056           throw new IllegalStateException("No live regionservers");
1057         }
1058         createWriteTable(numberOfServers);
1059       }
1060
1061       if (!admin.isTableEnabled(writeTableName)) {
1062         admin.enableTable(writeTableName);
1063       }
1064
1065       ClusterStatus status = admin.getClusterStatus();
1066       int numberOfServers = status.getServersSize();
1067       if (status.getServers().contains(status.getMaster())) {
1068         numberOfServers -= 1;
1069       }
1070
1071       List<Pair<HRegionInfo, ServerName>> pairs =
1072           MetaTableAccessor.getTableRegionsAndLocations(connection, writeTableName);
1073       int numberOfRegions = pairs.size();
1074       if (numberOfRegions < numberOfServers * regionsLowerLimit
1075           || numberOfRegions > numberOfServers * regionsUpperLimit) {
1076         admin.disableTable(writeTableName);
1077         admin.deleteTable(writeTableName);
1078         createWriteTable(numberOfServers);
1079       }
1080       HashSet<ServerName> serverSet = new HashSet<ServerName>();
1081       for (Pair<HRegionInfo, ServerName> pair : pairs) {
1082         serverSet.add(pair.getSecond());
1083       }
1084       int numberOfCoveredServers = serverSet.size();
1085       if (numberOfCoveredServers < numberOfServers) {
1086         admin.balancer();
1087       }
1088     }
1089
1090     private void createWriteTable(int numberOfServers) throws IOException {
1091       int numberOfRegions = (int)(numberOfServers * regionsLowerLimit);
1092       LOG.info("Number of live regionservers: " + numberOfServers + ", "
1093           + "pre-splitting the canary table into " + numberOfRegions + " regions "
1094           + "(current lower limit of regions per server is " + regionsLowerLimit
1095           + " and you can change it by config: "
1096           + HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY + " )");
1097       HTableDescriptor desc = new HTableDescriptor(writeTableName);
1098       HColumnDescriptor family = new HColumnDescriptor(CANARY_TABLE_FAMILY_NAME);
1099       family.setMaxVersions(1);
1100       family.setTimeToLive(writeDataTTL);
1101
1102       desc.addFamily(family);
1103       byte[][] splits = new RegionSplitter.HexStringSplit().split(numberOfRegions);
1104       admin.createTable(desc, splits);
1105     }
1106   }
1107
1108   /**
1109    * Canary entry point for specified table.
1110    * @throws Exception
1111    */
1112   public static void sniff(final Admin admin, TableName tableName, boolean rawScanEnabled)
1113       throws Exception {
1114     sniff(admin, tableName, TaskType.READ, rawScanEnabled);
1115   }
1116
1117   /**
1118    * Canary entry point for specified table.
1119    * Keeping this method backward compatibility
1120    * @throws Exception
1121    */
1122   public static void sniff(final Admin admin, TableName tableName)
1123       throws Exception {
1124     sniff(admin, tableName, TaskType.READ, false);
1125   }
1126
1127   /**
1128    * Canary entry point for specified table with task type(read/write)
1129    * @throws Exception
1130    */
1131   public static void sniff(final Admin admin, TableName tableName, TaskType taskType,
1132       boolean rawScanEnabled)   throws Exception {
1133     List<Future<Void>> taskFutures =
1134         Canary.sniff(admin, new StdOutSink(), tableName.getNameAsString(),
1135           new ScheduledThreadPoolExecutor(1), taskType, rawScanEnabled);
1136     for (Future<Void> future : taskFutures) {
1137       future.get();
1138     }
1139   }
1140
1141   /**
1142    * Canary entry point for specified table with task type(read/write)
1143    * Keeping this method backward compatible
1144    * @throws Exception
1145    */
1146   public static void sniff(final Admin admin, TableName tableName, TaskType taskType)
1147       throws Exception {
1148     Canary.sniff(admin, tableName, taskType, false);
1149   }
1150
1151   /**
1152    * Canary entry point for specified table.
1153    * @throws Exception
1154    */
1155   private static List<Future<Void>> sniff(final Admin admin, final Sink sink, String tableName,
1156       ExecutorService executor, TaskType taskType, boolean rawScanEnabled) throws Exception {
1157     if (LOG.isDebugEnabled()) {
1158       LOG.debug(String.format("checking table is enabled and getting table descriptor for table %s",
1159         tableName));
1160     }
1161     if (admin.isTableEnabled(TableName.valueOf(tableName))) {
1162       return Canary.sniff(admin, sink, admin.getTableDescriptor(TableName.valueOf(tableName)),
1163         executor, taskType, rawScanEnabled);
1164     } else {
1165       LOG.warn(String.format("Table %s is not enabled", tableName));
1166     }
1167     return new LinkedList<Future<Void>>();
1168   }
1169
1170   /*
1171    * Loops over regions that owns this table, and output some information abouts the state.
1172    */
1173   private static List<Future<Void>> sniff(final Admin admin, final Sink sink,
1174       HTableDescriptor tableDesc, ExecutorService executor, TaskType taskType,
1175       boolean rawScanEnabled) throws Exception {
1176
1177     if (LOG.isDebugEnabled()) {
1178       LOG.debug(String.format("reading list of regions for table %s", tableDesc.getTableName()));
1179     }
1180
1181     Table table = null;
1182     try {
1183       table = admin.getConnection().getTable(tableDesc.getTableName());
1184     } catch (TableNotFoundException e) {
1185       return new ArrayList<Future<Void>>();
1186     }
1187     finally {
1188       if (table !=null) {
1189         table.close();
1190       }
1191     }
1192
1193     List<RegionTask> tasks = new ArrayList<RegionTask>();
1194     RegionLocator regionLocator = null;
1195     try {
1196       regionLocator = admin.getConnection().getRegionLocator(tableDesc.getTableName());
1197       for (HRegionLocation location : regionLocator.getAllRegionLocations()) {
1198         ServerName rs = location.getServerName();
1199         HRegionInfo region = location.getRegionInfo();
1200         tasks.add(new RegionTask(admin.getConnection(), region, rs, sink, taskType, rawScanEnabled));
1201       }
1202     } finally {
1203       if (regionLocator != null) {
1204         regionLocator.close();
1205       }
1206     }
1207     return executor.invokeAll(tasks);
1208   }
1209
1210   //  monitor for zookeeper mode
1211   private static class ZookeeperMonitor extends Monitor {
1212     private List<String> hosts;
1213     private final String znode;
1214     private final int timeout;
1215
1216     protected ZookeeperMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
1217         ExtendedSink sink, ExecutorService executor, boolean treatFailureAsError)  {
1218       super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError);
1219       Configuration configuration = connection.getConfiguration();
1220       znode =
1221           configuration.get(ZOOKEEPER_ZNODE_PARENT,
1222               DEFAULT_ZOOKEEPER_ZNODE_PARENT);
1223       timeout = configuration
1224           .getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
1225       ConnectStringParser parser =
1226           new ConnectStringParser(ZKConfig.getZKQuorumServersString(configuration));
1227       hosts = Lists.newArrayList();
1228       for (InetSocketAddress server : parser.getServerAddresses()) {
1229         hosts.add(server.toString());
1230       }
1231     }
1232
1233     @Override public void run() {
1234       List<ZookeeperTask> tasks = Lists.newArrayList();
1235       for (final String host : hosts) {
1236         tasks.add(new ZookeeperTask(connection, host, znode, timeout, getSink()));
1237       }
1238       try {
1239         for (Future<Void> future : this.executor.invokeAll(tasks)) {
1240           try {
1241             future.get();
1242           } catch (ExecutionException e) {
1243             LOG.error("Sniff zookeeper failed!", e);
1244             this.errorCode = ERROR_EXIT_CODE;
1245           }
1246         }
1247       } catch (InterruptedException e) {
1248         this.errorCode = ERROR_EXIT_CODE;
1249         Thread.currentThread().interrupt();
1250         LOG.error("Sniff zookeeper interrupted!", e);
1251       }
1252       this.done = true;
1253     }
1254
1255
1256     private ZookeeperStdOutSink getSink() {
1257       if (!(sink instanceof ZookeeperStdOutSink)) {
1258         throw new RuntimeException("Can only write to zookeeper sink");
1259       }
1260       return ((ZookeeperStdOutSink) sink);
1261     }
1262   }
1263
1264
1265   // a monitor for regionserver mode
1266   private static class RegionServerMonitor extends Monitor {
1267
1268     private boolean allRegions;
1269
1270     public RegionServerMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
1271         ExtendedSink sink, ExecutorService executor, boolean allRegions,
1272         boolean treatFailureAsError) {
1273       super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError);
1274       this.allRegions = allRegions;
1275     }
1276
1277     private ExtendedSink getSink() {
1278       return (ExtendedSink) this.sink;
1279     }
1280
1281     @Override
1282     public void run() {
1283       if (this.initAdmin() && this.checkNoTableNames()) {
1284         Map<String, List<HRegionInfo>> rsAndRMap = this.filterRegionServerByName();
1285         this.initialized = true;
1286         this.monitorRegionServers(rsAndRMap);
1287       }
1288       this.done = true;
1289     }
1290
1291     private boolean checkNoTableNames() {
1292       List<String> foundTableNames = new ArrayList<String>();
1293       TableName[] tableNames = null;
1294
1295       if (LOG.isDebugEnabled()) {
1296         LOG.debug(String.format("reading list of tables"));
1297       }
1298       try {
1299         tableNames = this.admin.listTableNames();
1300       } catch (IOException e) {
1301         LOG.error("Get listTableNames failed", e);
1302         this.errorCode = INIT_ERROR_EXIT_CODE;
1303         return false;
1304       }
1305
1306       if (this.targets == null || this.targets.length == 0) return true;
1307
1308       for (String target : this.targets) {
1309         for (TableName tableName : tableNames) {
1310           if (target.equals(tableName.getNameAsString())) {
1311             foundTableNames.add(target);
1312           }
1313         }
1314       }
1315
1316       if (foundTableNames.size() > 0) {
1317         System.err.println("Cannot pass a tablename when using the -regionserver " +
1318             "option, tablenames:" + foundTableNames.toString());
1319         this.errorCode = USAGE_EXIT_CODE;
1320       }
1321       return foundTableNames.size() == 0;
1322     }
1323
1324     private void monitorRegionServers(Map<String, List<HRegionInfo>> rsAndRMap) {
1325       List<RegionServerTask> tasks = new ArrayList<RegionServerTask>();
1326       Map<String, AtomicLong> successMap = new HashMap<String, AtomicLong>();
1327       Random rand = new Random();
1328       for (Map.Entry<String, List<HRegionInfo>> entry : rsAndRMap.entrySet()) {
1329         String serverName = entry.getKey();
1330         AtomicLong successes = new AtomicLong(0);
1331         successMap.put(serverName, successes);
1332         if (entry.getValue().isEmpty()) {
1333           LOG.error(String.format("Regionserver not serving any regions - %s", serverName));
1334         } else if (this.allRegions) {
1335           for (HRegionInfo region : entry.getValue()) {
1336             tasks.add(new RegionServerTask(this.connection,
1337                 serverName,
1338                 region,
1339                 getSink(),
1340                 successes));
1341           }
1342         } else {
1343           // random select a region if flag not set
1344           HRegionInfo region = entry.getValue().get(rand.nextInt(entry.getValue().size()));
1345           tasks.add(new RegionServerTask(this.connection,
1346               serverName,
1347               region,
1348               getSink(),
1349               successes));
1350         }
1351       }
1352       try {
1353         for (Future<Void> future : this.executor.invokeAll(tasks)) {
1354           try {
1355             future.get();
1356           } catch (ExecutionException e) {
1357             LOG.error("Sniff regionserver failed!", e);
1358             this.errorCode = ERROR_EXIT_CODE;
1359           }
1360         }
1361         if (this.allRegions) {
1362           for (Map.Entry<String, List<HRegionInfo>> entry : rsAndRMap.entrySet()) {
1363             String serverName = entry.getKey();
1364             LOG.info("Successfully read " + successMap.get(serverName) + " regions out of "
1365                     + entry.getValue().size() + " on regionserver:" + serverName);
1366           }
1367         }
1368       } catch (InterruptedException e) {
1369         this.errorCode = ERROR_EXIT_CODE;
1370         LOG.error("Sniff regionserver interrupted!", e);
1371       }
1372     }
1373
1374     private Map<String, List<HRegionInfo>> filterRegionServerByName() {
1375       Map<String, List<HRegionInfo>> regionServerAndRegionsMap = this.getAllRegionServerByName();
1376       regionServerAndRegionsMap = this.doFilterRegionServerByName(regionServerAndRegionsMap);
1377       return regionServerAndRegionsMap;
1378     }
1379
1380     private Map<String, List<HRegionInfo>> getAllRegionServerByName() {
1381       Map<String, List<HRegionInfo>> rsAndRMap = new HashMap<String, List<HRegionInfo>>();
1382       Table table = null;
1383       RegionLocator regionLocator = null;
1384       try {
1385         if (LOG.isDebugEnabled()) {
1386           LOG.debug(String.format("reading list of tables and locations"));
1387         }
1388         HTableDescriptor[] tableDescs = this.admin.listTables();
1389         List<HRegionInfo> regions = null;
1390         for (HTableDescriptor tableDesc : tableDescs) {
1391           table = this.admin.getConnection().getTable(tableDesc.getTableName());
1392           regionLocator = this.admin.getConnection().getRegionLocator(tableDesc.getTableName());
1393
1394           for (HRegionLocation location : regionLocator.getAllRegionLocations()) {
1395             ServerName rs = location.getServerName();
1396             String rsName = rs.getHostname();
1397             HRegionInfo r = location.getRegionInfo();
1398
1399             if (rsAndRMap.containsKey(rsName)) {
1400               regions = rsAndRMap.get(rsName);
1401             } else {
1402               regions = new ArrayList<HRegionInfo>();
1403               rsAndRMap.put(rsName, regions);
1404             }
1405             regions.add(r);
1406           }
1407           table.close();
1408         }
1409
1410         //get any live regionservers not serving any regions
1411         for (ServerName rs : this.admin.getClusterStatus().getServers()) {
1412           String rsName = rs.getHostname();
1413           if (!rsAndRMap.containsKey(rsName)) {
1414             rsAndRMap.put(rsName, Collections.<HRegionInfo>emptyList());
1415           }
1416         }
1417       } catch (IOException e) {
1418         String msg = "Get HTables info failed";
1419         LOG.error(msg, e);
1420         this.errorCode = INIT_ERROR_EXIT_CODE;
1421       } finally {
1422         if (table != null) {
1423           try {
1424             table.close();
1425           } catch (IOException e) {
1426             LOG.warn("Close table failed", e);
1427           }
1428         }
1429       }
1430
1431       return rsAndRMap;
1432     }
1433
1434     private Map<String, List<HRegionInfo>> doFilterRegionServerByName(
1435         Map<String, List<HRegionInfo>> fullRsAndRMap) {
1436
1437       Map<String, List<HRegionInfo>> filteredRsAndRMap = null;
1438
1439       if (this.targets != null && this.targets.length > 0) {
1440         filteredRsAndRMap = new HashMap<String, List<HRegionInfo>>();
1441         Pattern pattern = null;
1442         Matcher matcher = null;
1443         boolean regExpFound = false;
1444         for (String rsName : this.targets) {
1445           if (this.useRegExp) {
1446             regExpFound = false;
1447             pattern = Pattern.compile(rsName);
1448             for (Map.Entry<String, List<HRegionInfo>> entry : fullRsAndRMap.entrySet()) {
1449               matcher = pattern.matcher(entry.getKey());
1450               if (matcher.matches()) {
1451                 filteredRsAndRMap.put(entry.getKey(), entry.getValue());
1452                 regExpFound = true;
1453               }
1454             }
1455             if (!regExpFound) {
1456               LOG.info("No RegionServerInfo found, regionServerPattern:" + rsName);
1457             }
1458           } else {
1459             if (fullRsAndRMap.containsKey(rsName)) {
1460               filteredRsAndRMap.put(rsName, fullRsAndRMap.get(rsName));
1461             } else {
1462               LOG.info("No RegionServerInfo found, regionServerName:" + rsName);
1463             }
1464           }
1465         }
1466       } else {
1467         filteredRsAndRMap = fullRsAndRMap;
1468       }
1469       return filteredRsAndRMap;
1470     }
1471   }
1472
1473   public static void main(String[] args) throws Exception {
1474     final Configuration conf = HBaseConfiguration.create();
1475
1476     // loading the generic options to conf
1477     new GenericOptionsParser(conf, args);
1478
1479     int numThreads = conf.getInt("hbase.canary.threads.num", MAX_THREADS_NUM);
1480     LOG.info("Number of execution threads " + numThreads);
1481
1482     ExecutorService executor = new ScheduledThreadPoolExecutor(numThreads);
1483
1484     Class<? extends Sink> sinkClass =
1485         conf.getClass("hbase.canary.sink.class", RegionServerStdOutSink.class, Sink.class);
1486     Sink sink = ReflectionUtils.newInstance(sinkClass);
1487
1488     int exitCode = ToolRunner.run(conf, new Canary(executor, sink), args);
1489     executor.shutdown();
1490     System.exit(exitCode);
1491   }
1492 }