001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.tool;
021
022import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT;
023import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_ZNODE_PARENT;
024
025import java.io.Closeable;
026import java.io.IOException;
027import java.net.InetSocketAddress;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Collections;
031import java.util.EnumSet;
032import java.util.HashMap;
033import java.util.HashSet;
034import java.util.LinkedList;
035import java.util.List;
036import java.util.Map;
037import java.util.Random;
038import java.util.Set;
039import java.util.TreeSet;
040import java.util.concurrent.Callable;
041import java.util.concurrent.ConcurrentHashMap;
042import java.util.concurrent.ExecutionException;
043import java.util.concurrent.ExecutorService;
044import java.util.concurrent.Future;
045import java.util.concurrent.ScheduledThreadPoolExecutor;
046import java.util.concurrent.atomic.AtomicLong;
047import java.util.concurrent.atomic.LongAdder;
048import java.util.regex.Matcher;
049import java.util.regex.Pattern;
050import org.apache.commons.lang3.time.StopWatch;
051import org.apache.hadoop.conf.Configuration;
052import org.apache.hadoop.hbase.AuthUtil;
053import org.apache.hadoop.hbase.ChoreService;
054import org.apache.hadoop.hbase.ClusterMetrics;
055import org.apache.hadoop.hbase.ClusterMetrics.Option;
056import org.apache.hadoop.hbase.DoNotRetryIOException;
057import org.apache.hadoop.hbase.HBaseConfiguration;
058import org.apache.hadoop.hbase.HColumnDescriptor;
059import org.apache.hadoop.hbase.HConstants;
060import org.apache.hadoop.hbase.HRegionLocation;
061import org.apache.hadoop.hbase.HTableDescriptor;
062import org.apache.hadoop.hbase.MetaTableAccessor;
063import org.apache.hadoop.hbase.NamespaceDescriptor;
064import org.apache.hadoop.hbase.ScheduledChore;
065import org.apache.hadoop.hbase.ServerName;
066import org.apache.hadoop.hbase.TableName;
067import org.apache.hadoop.hbase.TableNotEnabledException;
068import org.apache.hadoop.hbase.TableNotFoundException;
069import org.apache.hadoop.hbase.client.Admin;
070import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
071import org.apache.hadoop.hbase.client.Connection;
072import org.apache.hadoop.hbase.client.ConnectionFactory;
073import org.apache.hadoop.hbase.client.Get;
074import org.apache.hadoop.hbase.client.Put;
075import org.apache.hadoop.hbase.client.RegionInfo;
076import org.apache.hadoop.hbase.client.RegionLocator;
077import org.apache.hadoop.hbase.client.ResultScanner;
078import org.apache.hadoop.hbase.client.Scan;
079import org.apache.hadoop.hbase.client.Table;
080import org.apache.hadoop.hbase.client.TableDescriptor;
081import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
082import org.apache.hadoop.hbase.tool.Canary.RegionTask.TaskType;
083import org.apache.hadoop.hbase.util.Bytes;
084import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
085import org.apache.hadoop.hbase.util.Pair;
086import org.apache.hadoop.hbase.util.ReflectionUtils;
087import org.apache.hadoop.hbase.util.RegionSplitter;
088import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
089import org.apache.hadoop.hbase.zookeeper.ZKConfig;
090import org.apache.hadoop.util.GenericOptionsParser;
091import org.apache.hadoop.util.Tool;
092import org.apache.hadoop.util.ToolRunner;
093import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
094import org.apache.yetus.audience.InterfaceAudience;
095import org.apache.zookeeper.KeeperException;
096import org.apache.zookeeper.ZooKeeper;
097import org.apache.zookeeper.client.ConnectStringParser;
098import org.apache.zookeeper.data.Stat;
099import org.slf4j.Logger;
100import org.slf4j.LoggerFactory;
101
102import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
103
104/**
105 * HBase Canary Tool for "canary monitoring" of a running HBase cluster.
106 *
107 * There are three modes:
108 * <ol>
109 * <li>region mode (Default): For each region, try to get one row per column family outputting
110 * information on failure (ERROR) or else the latency.
111 * </li>
112 *
113 * <li>regionserver mode: For each regionserver try to get one row from one table selected
114 * randomly outputting information on failure (ERROR) or else the latency.
115 * </li>
116 *
117 * <li>zookeeper mode: for each zookeeper instance, selects a znode outputting information on
118 * failure (ERROR) or else the latency.
119 * </li>
120 * </ol>
121 */
122@InterfaceAudience.Private
123public final class Canary implements Tool {
124  /**
125   * Sink interface used by the canary to output information
126   */
127  public interface Sink {
128    long getReadFailureCount();
129    long incReadFailureCount();
130    Map<String,String> getReadFailures();
131    void updateReadFailures(String regionName, String serverName);
132    long getWriteFailureCount();
133    long incWriteFailureCount();
134    Map<String,String> getWriteFailures();
135    void updateWriteFailures(String regionName, String serverName);
136  }
137
138  /**
139   * Simple implementation of canary sink that allows plotting to a file or standard output.
140   */
141  public static class StdOutSink implements Sink {
142    private AtomicLong readFailureCount = new AtomicLong(0),
143        writeFailureCount = new AtomicLong(0);
144    private Map<String, String> readFailures = new ConcurrentHashMap<>();
145    private Map<String, String> writeFailures = new ConcurrentHashMap<>();
146
147    @Override
148    public long getReadFailureCount() {
149      return readFailureCount.get();
150    }
151
152    @Override
153    public long incReadFailureCount() {
154      return readFailureCount.incrementAndGet();
155    }
156
157    @Override
158    public Map<String, String> getReadFailures() {
159      return readFailures;
160    }
161
162    @Override
163    public void updateReadFailures(String regionName, String serverName) {
164      readFailures.put(regionName, serverName);
165    }
166
167    @Override
168    public long getWriteFailureCount() {
169      return writeFailureCount.get();
170    }
171
172    @Override
173    public long incWriteFailureCount() {
174      return writeFailureCount.incrementAndGet();
175    }
176
177    @Override
178    public Map<String, String> getWriteFailures() {
179      return writeFailures;
180    }
181
182    @Override
183    public void updateWriteFailures(String regionName, String serverName) {
184      writeFailures.put(regionName, serverName);
185    }
186  }
187
188  /**
189   * By RegionServer, for 'regionserver' mode.
190   */
191  public static class RegionServerStdOutSink extends StdOutSink {
192    public void publishReadFailure(String table, String server) {
193      incReadFailureCount();
194      LOG.error("Read from {} on {}", table, server);
195    }
196
197    public void publishReadTiming(String table, String server, long msTime) {
198      LOG.info("Read from {} on {} in {}ms", table, server, msTime);
199    }
200  }
201
202  /**
203   * Output for 'zookeeper' mode.
204   */
205  public static class ZookeeperStdOutSink extends StdOutSink {
206    public void publishReadFailure(String znode, String server) {
207      incReadFailureCount();
208      LOG.error("Read from {} on {}", znode, server);
209    }
210
211    public void publishReadTiming(String znode, String server, long msTime) {
212      LOG.info("Read from {} on {} in {}ms", znode, server, msTime);
213    }
214  }
215
216  /**
217   * By Region, for 'region'  mode.
218   */
219  public static class RegionStdOutSink extends StdOutSink {
220    private Map<String, LongAdder> perTableReadLatency = new HashMap<>();
221    private LongAdder writeLatency = new LongAdder();
222
223    public void publishReadFailure(ServerName serverName, RegionInfo region, Exception e) {
224      incReadFailureCount();
225      LOG.error("Read from {} on {} failed", region.getRegionNameAsString(), serverName, e);
226    }
227
228    public void publishReadFailure(ServerName serverName, RegionInfo region,
229        ColumnFamilyDescriptor column, Exception e) {
230      incReadFailureCount();
231      LOG.error("Read from {} on {} {} failed", region.getRegionNameAsString(), serverName,
232          column.getNameAsString(), e);
233    }
234
235    public void publishReadTiming(ServerName serverName, RegionInfo region,
236        ColumnFamilyDescriptor column, long msTime) {
237      LOG.info("Read from {} on {} {} in {}ms", region.getRegionNameAsString(), serverName,
238          column.getNameAsString(), msTime);
239    }
240
241    public void publishWriteFailure(ServerName serverName, RegionInfo region, Exception e) {
242      incWriteFailureCount();
243      LOG.error("Write to {} on {} failed", region.getRegionNameAsString(), serverName, e);
244    }
245
246    public void publishWriteFailure(ServerName serverName, RegionInfo region,
247        ColumnFamilyDescriptor column, Exception e) {
248      incWriteFailureCount();
249      LOG.error("Write to {} on {} {} failed", region.getRegionNameAsString(), serverName,
250          column.getNameAsString(), e);
251    }
252
253    public void publishWriteTiming(ServerName serverName, RegionInfo region,
254        ColumnFamilyDescriptor column, long msTime) {
255      LOG.info("Write to {} on {} {} in {}ms",
256        region.getRegionNameAsString(), serverName, column.getNameAsString(), msTime);
257    }
258
259    public Map<String, LongAdder> getReadLatencyMap() {
260      return this.perTableReadLatency;
261    }
262
263    public LongAdder initializeAndGetReadLatencyForTable(String tableName) {
264      LongAdder initLatency = new LongAdder();
265      this.perTableReadLatency.put(tableName, initLatency);
266      return initLatency;
267    }
268
269    public void initializeWriteLatency() {
270      this.writeLatency.reset();
271    }
272
273    public LongAdder getWriteLatency() {
274      return this.writeLatency;
275    }
276  }
277
278  /**
279   * Run a single zookeeper Task and then exit.
280   */
281  static class ZookeeperTask implements Callable<Void> {
282    private final Connection connection;
283    private final String host;
284    private String znode;
285    private final int timeout;
286    private ZookeeperStdOutSink sink;
287
288    public ZookeeperTask(Connection connection, String host, String znode, int timeout,
289        ZookeeperStdOutSink sink) {
290      this.connection = connection;
291      this.host = host;
292      this.znode = znode;
293      this.timeout = timeout;
294      this.sink = sink;
295    }
296
297    @Override public Void call() throws Exception {
298      ZooKeeper zooKeeper = null;
299      try {
300        zooKeeper = new ZooKeeper(host, timeout, EmptyWatcher.instance);
301        Stat exists = zooKeeper.exists(znode, false);
302        StopWatch stopwatch = new StopWatch();
303        stopwatch.start();
304        zooKeeper.getData(znode, false, exists);
305        stopwatch.stop();
306        sink.publishReadTiming(znode, host, stopwatch.getTime());
307      } catch (KeeperException | InterruptedException e) {
308        sink.publishReadFailure(znode, host);
309      } finally {
310        if (zooKeeper != null) {
311          zooKeeper.close();
312        }
313      }
314      return null;
315    }
316  }
317
318  /**
319   * Run a single Region Task and then exit. For each column family of the Region, get one row and
320   * output latency or failure.
321   */
322  static class RegionTask implements Callable<Void> {
323    public enum TaskType{
324      READ, WRITE
325    }
326    private Connection connection;
327    private RegionInfo region;
328    private RegionStdOutSink sink;
329    private TaskType taskType;
330    private boolean rawScanEnabled;
331    private ServerName serverName;
332    private LongAdder readWriteLatency;
333
334    RegionTask(Connection connection, RegionInfo region, ServerName serverName,
335        RegionStdOutSink sink, TaskType taskType, boolean rawScanEnabled, LongAdder rwLatency) {
336      this.connection = connection;
337      this.region = region;
338      this.serverName = serverName;
339      this.sink = sink;
340      this.taskType = taskType;
341      this.rawScanEnabled = rawScanEnabled;
342      this.readWriteLatency = rwLatency;
343    }
344
345    @Override
346    public Void call() {
347      switch (taskType) {
348      case READ:
349        return read();
350      case WRITE:
351        return write();
352      default:
353        return read();
354      }
355    }
356
357    public Void read() {
358      Table table = null;
359      TableDescriptor tableDesc = null;
360      try {
361        LOG.debug("Reading table descriptor for table {}", region.getTable());
362        table = connection.getTable(region.getTable());
363        tableDesc = table.getDescriptor();
364      } catch (IOException e) {
365        LOG.debug("sniffRegion {} of {} failed", region.getEncodedName(), e);
366        sink.publishReadFailure(serverName, region, e);
367        if (table != null) {
368          try {
369            table.close();
370          } catch (IOException ioe) {
371            LOG.error("Close table failed", e);
372          }
373        }
374        return null;
375      }
376
377      byte[] startKey = null;
378      Get get = null;
379      Scan scan = null;
380      ResultScanner rs = null;
381      StopWatch stopWatch = new StopWatch();
382      for (ColumnFamilyDescriptor column : tableDesc.getColumnFamilies()) {
383        stopWatch.reset();
384        startKey = region.getStartKey();
385        // Can't do a get on empty start row so do a Scan of first element if any instead.
386        if (startKey.length > 0) {
387          get = new Get(startKey);
388          get.setCacheBlocks(false);
389          get.setFilter(new FirstKeyOnlyFilter());
390          get.addFamily(column.getName());
391        } else {
392          scan = new Scan();
393          LOG.debug("rawScan {} for {}", rawScanEnabled, tableDesc.getTableName());
394          scan.setRaw(rawScanEnabled);
395          scan.setCaching(1);
396          scan.setCacheBlocks(false);
397          scan.setFilter(new FirstKeyOnlyFilter());
398          scan.addFamily(column.getName());
399          scan.setMaxResultSize(1L);
400          scan.setOneRowLimit();
401        }
402        LOG.debug("Reading from {} {} {} {}", tableDesc.getTableName(),
403            region.getRegionNameAsString(), column.getNameAsString(),
404            Bytes.toStringBinary(startKey));
405        try {
406          stopWatch.start();
407          if (startKey.length > 0) {
408            table.get(get);
409          } else {
410            rs = table.getScanner(scan);
411            rs.next();
412          }
413          stopWatch.stop();
414          this.readWriteLatency.add(stopWatch.getTime());
415          sink.publishReadTiming(serverName, region, column, stopWatch.getTime());
416        } catch (Exception e) {
417          sink.publishReadFailure(serverName, region, column, e);
418          sink.updateReadFailures(region.getRegionNameAsString(), serverName.getHostname());
419        } finally {
420          if (rs != null) {
421            rs.close();
422          }
423          scan = null;
424          get = null;
425        }
426      }
427      try {
428        table.close();
429      } catch (IOException e) {
430        LOG.error("Close table failed", e);
431      }
432      return null;
433    }
434
435    /**
436     * Check writes for the canary table
437     */
438    private Void write() {
439      Table table = null;
440      TableDescriptor tableDesc = null;
441      try {
442        table = connection.getTable(region.getTable());
443        tableDesc = table.getDescriptor();
444        byte[] rowToCheck = region.getStartKey();
445        if (rowToCheck.length == 0) {
446          rowToCheck = new byte[]{0x0};
447        }
448        int writeValueSize =
449            connection.getConfiguration().getInt(HConstants.HBASE_CANARY_WRITE_VALUE_SIZE_KEY, 10);
450        for (ColumnFamilyDescriptor column : tableDesc.getColumnFamilies()) {
451          Put put = new Put(rowToCheck);
452          byte[] value = new byte[writeValueSize];
453          Bytes.random(value);
454          put.addColumn(column.getName(), HConstants.EMPTY_BYTE_ARRAY, value);
455
456          LOG.debug("Writing to {} {} {} {}",
457            tableDesc.getTableName(), region.getRegionNameAsString(), column.getNameAsString(),
458            Bytes.toStringBinary(rowToCheck));
459          try {
460            long startTime = System.currentTimeMillis();
461            table.put(put);
462            long time = System.currentTimeMillis() - startTime;
463            this.readWriteLatency.add(time);
464            sink.publishWriteTiming(serverName, region, column, time);
465          } catch (Exception e) {
466            sink.publishWriteFailure(serverName, region, column, e);
467          }
468        }
469        table.close();
470      } catch (IOException e) {
471        sink.publishWriteFailure(serverName, region, e);
472        sink.updateWriteFailures(region.getRegionNameAsString(), serverName.getHostname() );
473      }
474      return null;
475    }
476  }
477
478  /**
479   * Run a single RegionServer Task and then exit.
480   * Get one row from a region on the regionserver and output latency or the failure.
481   */
482  static class RegionServerTask implements Callable<Void> {
483    private Connection connection;
484    private String serverName;
485    private RegionInfo region;
486    private RegionServerStdOutSink sink;
487    private AtomicLong successes;
488
489    RegionServerTask(Connection connection, String serverName, RegionInfo region,
490        RegionServerStdOutSink sink, AtomicLong successes) {
491      this.connection = connection;
492      this.serverName = serverName;
493      this.region = region;
494      this.sink = sink;
495      this.successes = successes;
496    }
497
498    @Override
499    public Void call() {
500      TableName tableName = null;
501      Table table = null;
502      Get get = null;
503      byte[] startKey = null;
504      Scan scan = null;
505      StopWatch stopWatch = new StopWatch();
506      // monitor one region on every region server
507      stopWatch.reset();
508      try {
509        tableName = region.getTable();
510        table = connection.getTable(tableName);
511        startKey = region.getStartKey();
512        // Can't do a get on empty start row so do a Scan of first element if any instead.
513        LOG.debug("Reading from {} {} {} {}",
514          serverName, region.getTable(), region.getRegionNameAsString(),
515          Bytes.toStringBinary(startKey));
516        if (startKey.length > 0) {
517          get = new Get(startKey);
518          get.setCacheBlocks(false);
519          get.setFilter(new FirstKeyOnlyFilter());
520          stopWatch.start();
521          table.get(get);
522          stopWatch.stop();
523        } else {
524          scan = new Scan();
525          scan.setCacheBlocks(false);
526          scan.setFilter(new FirstKeyOnlyFilter());
527          scan.setCaching(1);
528          scan.setMaxResultSize(1L);
529          scan.setOneRowLimit();
530          stopWatch.start();
531          ResultScanner s = table.getScanner(scan);
532          s.next();
533          s.close();
534          stopWatch.stop();
535        }
536        successes.incrementAndGet();
537        sink.publishReadTiming(tableName.getNameAsString(), serverName, stopWatch.getTime());
538      } catch (TableNotFoundException tnfe) {
539        LOG.error("Table may be deleted", tnfe);
540        // This is ignored because it doesn't imply that the regionserver is dead
541      } catch (TableNotEnabledException tnee) {
542        // This is considered a success since we got a response.
543        successes.incrementAndGet();
544        LOG.debug("The targeted table was disabled.  Assuming success.");
545      } catch (DoNotRetryIOException dnrioe) {
546        sink.publishReadFailure(tableName.getNameAsString(), serverName);
547        LOG.error(dnrioe.toString(), dnrioe);
548      } catch (IOException e) {
549        sink.publishReadFailure(tableName.getNameAsString(), serverName);
550        LOG.error(e.toString(), e);
551      } finally {
552        if (table != null) {
553          try {
554            table.close();
555          } catch (IOException e) {/* DO NOTHING */
556            LOG.error("Close table failed", e);
557          }
558        }
559        scan = null;
560        get = null;
561        startKey = null;
562      }
563      return null;
564    }
565  }
566
567  private static final int USAGE_EXIT_CODE = 1;
568  private static final int INIT_ERROR_EXIT_CODE = 2;
569  private static final int TIMEOUT_ERROR_EXIT_CODE = 3;
570  private static final int ERROR_EXIT_CODE = 4;
571  private static final int FAILURE_EXIT_CODE = 5;
572
573  private static final long DEFAULT_INTERVAL = 60000;
574
575  private static final long DEFAULT_TIMEOUT = 600000; // 10 mins
576  private static final int MAX_THREADS_NUM = 16; // #threads to contact regions
577
578  private static final Logger LOG = LoggerFactory.getLogger(Canary.class);
579
580  public static final TableName DEFAULT_WRITE_TABLE_NAME = TableName.valueOf(
581    NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "canary");
582
583  private static final String CANARY_TABLE_FAMILY_NAME = "Test";
584
585  private Configuration conf = null;
586  private long interval = 0;
587  private Sink sink = null;
588
589  private boolean useRegExp;
590  private long timeout = DEFAULT_TIMEOUT;
591  private boolean failOnError = true;
592
593  /**
594   * True if we are to run in 'regionServer' mode.
595   */
596  private boolean regionServerMode = false;
597
598  /**
599   * True if we are to run in zookeeper 'mode'.
600   */
601  private boolean zookeeperMode = false;
602  private long permittedFailures = 0;
603  private boolean regionServerAllRegions = false;
604  private boolean writeSniffing = false;
605  private long configuredWriteTableTimeout = DEFAULT_TIMEOUT;
606  private boolean treatFailureAsError = false;
607  private TableName writeTableName = DEFAULT_WRITE_TABLE_NAME;
608
609  /**
610   * This is a Map of table to timeout. The timeout is for reading all regions in the table; i.e.
611   * we aggregate time to fetch each region and it needs to be less than this value else we
612   * log an ERROR.
613   */
614  private HashMap<String, Long> configuredReadTableTimeouts = new HashMap<>();
615
616  private ExecutorService executor; // threads to retrieve data from regionservers
617
618  public Canary() {
619    this(new ScheduledThreadPoolExecutor(1));
620  }
621
622  public Canary(ExecutorService executor) {
623    this(executor, null);
624  }
625
626  @VisibleForTesting
627  Canary(ExecutorService executor, Sink sink) {
628    this.executor = executor;
629    this.sink = sink;
630  }
631
632  @Override
633  public Configuration getConf() {
634    return conf;
635  }
636
637  @Override
638  public void setConf(Configuration conf) {
639    this.conf = conf;
640  }
641
642  private int parseArgs(String[] args) {
643    int index = -1;
644    // Process command line args
645    for (int i = 0; i < args.length; i++) {
646      String cmd = args[i];
647
648      if (cmd.startsWith("-")) {
649        if (index >= 0) {
650          // command line args must be in the form: [opts] [table 1 [table 2 ...]]
651          System.err.println("Invalid command line options");
652          printUsageAndExit();
653        }
654
655        if (cmd.equals("-help") || cmd.equals("-h")) {
656          // user asked for help, print the help and quit.
657          printUsageAndExit();
658        } else if (cmd.equals("-daemon") && interval == 0) {
659          // user asked for daemon mode, set a default interval between checks
660          interval = DEFAULT_INTERVAL;
661        } else if (cmd.equals("-interval")) {
662          // user has specified an interval for canary breaths (-interval N)
663          i++;
664
665          if (i == args.length) {
666            System.err.println("-interval takes a numeric seconds value argument.");
667            printUsageAndExit();
668          }
669
670          try {
671            interval = Long.parseLong(args[i]) * 1000;
672          } catch (NumberFormatException e) {
673            System.err.println("-interval needs a numeric value argument.");
674            printUsageAndExit();
675          }
676        } else if (cmd.equals("-zookeeper")) {
677          this.zookeeperMode = true;
678        } else if(cmd.equals("-regionserver")) {
679          this.regionServerMode = true;
680        } else if(cmd.equals("-allRegions")) {
681          this.regionServerAllRegions = true;
682        } else if(cmd.equals("-writeSniffing")) {
683          this.writeSniffing = true;
684        } else if(cmd.equals("-treatFailureAsError") || cmd.equals("-failureAsError")) {
685          this.treatFailureAsError = true;
686        } else if (cmd.equals("-e")) {
687          this.useRegExp = true;
688        } else if (cmd.equals("-t")) {
689          i++;
690
691          if (i == args.length) {
692            System.err.println("-t takes a numeric milliseconds value argument.");
693            printUsageAndExit();
694          }
695
696          try {
697            this.timeout = Long.parseLong(args[i]);
698          } catch (NumberFormatException e) {
699            System.err.println("-t takes a numeric milliseconds value argument.");
700            printUsageAndExit();
701          }
702        } else if(cmd.equals("-writeTableTimeout")) {
703          i++;
704
705          if (i == args.length) {
706            System.err.println("-writeTableTimeout takes a numeric milliseconds value argument.");
707            printUsageAndExit();
708          }
709
710          try {
711            this.configuredWriteTableTimeout = Long.parseLong(args[i]);
712          } catch (NumberFormatException e) {
713            System.err.println("-writeTableTimeout takes a numeric milliseconds value argument.");
714            printUsageAndExit();
715          }
716        } else if (cmd.equals("-writeTable")) {
717          i++;
718
719          if (i == args.length) {
720            System.err.println("-writeTable takes a string tablename value argument.");
721            printUsageAndExit();
722          }
723          this.writeTableName = TableName.valueOf(args[i]);
724        } else if (cmd.equals("-f")) {
725          i++;
726
727          if (i == args.length) {
728            System.err
729                .println("-f needs a boolean value argument (true|false).");
730            printUsageAndExit();
731          }
732
733          this.failOnError = Boolean.parseBoolean(args[i]);
734        } else if (cmd.equals("-readTableTimeouts")) {
735          i++;
736
737          if (i == args.length) {
738            System.err.println("-readTableTimeouts needs a comma-separated list of read " +
739                "millisecond timeouts per table (without spaces).");
740            printUsageAndExit();
741          }
742          String [] tableTimeouts = args[i].split(",");
743          for (String tT: tableTimeouts) {
744            String [] nameTimeout = tT.split("=");
745            if (nameTimeout.length < 2) {
746              System.err.println("Each -readTableTimeouts argument must be of the form " +
747                  "<tableName>=<read timeout> (without spaces).");
748              printUsageAndExit();
749            }
750            long timeoutVal = 0L;
751            try {
752              timeoutVal = Long.parseLong(nameTimeout[1]);
753            } catch (NumberFormatException e) {
754              System.err.println("-readTableTimeouts read timeout for each table must be a numeric value argument.");
755              printUsageAndExit();
756            }
757            this.configuredReadTableTimeouts.put(nameTimeout[0], timeoutVal);
758          }
759        } else if (cmd.equals("-permittedZookeeperFailures")) {
760          i++;
761
762          if (i == args.length) {
763            System.err.println("-permittedZookeeperFailures needs a numeric value argument.");
764            printUsageAndExit();
765          }
766          try {
767            this.permittedFailures = Long.parseLong(args[i]);
768          } catch (NumberFormatException e) {
769            System.err.println("-permittedZookeeperFailures needs a numeric value argument.");
770            printUsageAndExit();
771          }
772        } else {
773          // no options match
774          System.err.println(cmd + " options is invalid.");
775          printUsageAndExit();
776        }
777      } else if (index < 0) {
778        // keep track of first table name specified by the user
779        index = i;
780      }
781    }
782    if (this.regionServerAllRegions && !this.regionServerMode) {
783      System.err.println("-allRegions can only be specified in regionserver mode.");
784      printUsageAndExit();
785    }
786    if (this.zookeeperMode) {
787      if (this.regionServerMode || this.regionServerAllRegions || this.writeSniffing) {
788        System.err.println("-zookeeper is exclusive and cannot be combined with "
789            + "other modes.");
790        printUsageAndExit();
791      }
792    }
793    if (this.permittedFailures != 0 && !this.zookeeperMode) {
794      System.err.println("-permittedZookeeperFailures requires -zookeeper mode.");
795      printUsageAndExit();
796    }
797    if (!this.configuredReadTableTimeouts.isEmpty() && (this.regionServerMode || this.zookeeperMode)) {
798      System.err.println("-readTableTimeouts can only be configured in region mode.");
799      printUsageAndExit();
800    }
801    return index;
802  }
803
804  @Override
805  public int run(String[] args) throws Exception {
806    int index = parseArgs(args);
807    ChoreService choreService = null;
808
809    // Launches chore for refreshing kerberos credentials if security is enabled.
810    // Please see http://hbase.apache.org/book.html#_running_canary_in_a_kerberos_enabled_cluster
811    // for more details.
812    final ScheduledChore authChore = AuthUtil.getAuthChore(conf);
813    if (authChore != null) {
814      choreService = new ChoreService("CANARY_TOOL");
815      choreService.scheduleChore(authChore);
816    }
817
818    // Start to prepare the stuffs
819    Monitor monitor = null;
820    Thread monitorThread = null;
821    long startTime = 0;
822    long currentTimeLength = 0;
823    // Get a connection to use in below.
824    try (Connection connection = ConnectionFactory.createConnection(this.conf)) {
825      do {
826        // Do monitor !!
827        try {
828          monitor = this.newMonitor(connection, index, args);
829          monitorThread = new Thread(monitor, "CanaryMonitor-" + System.currentTimeMillis());
830          startTime = System.currentTimeMillis();
831          monitorThread.start();
832          while (!monitor.isDone()) {
833            // wait for 1 sec
834            Thread.sleep(1000);
835            // exit if any error occurs
836            if (this.failOnError && monitor.hasError()) {
837              monitorThread.interrupt();
838              if (monitor.initialized) {
839                return monitor.errorCode;
840              } else {
841                return INIT_ERROR_EXIT_CODE;
842              }
843            }
844            currentTimeLength = System.currentTimeMillis() - startTime;
845            if (currentTimeLength > this.timeout) {
846              LOG.error("The monitor is running too long (" + currentTimeLength
847                  + ") after timeout limit:" + this.timeout
848                  + " will be killed itself !!");
849              if (monitor.initialized) {
850                return TIMEOUT_ERROR_EXIT_CODE;
851              } else {
852                return INIT_ERROR_EXIT_CODE;
853              }
854            }
855          }
856
857          if (this.failOnError && monitor.finalCheckForErrors()) {
858            monitorThread.interrupt();
859            return monitor.errorCode;
860          }
861        } finally {
862          if (monitor != null) monitor.close();
863        }
864
865        Thread.sleep(interval);
866      } while (interval > 0);
867    } // try-with-resources close
868
869    if (choreService != null) {
870      choreService.shutdown();
871    }
872    return monitor.errorCode;
873  }
874
875  public Map<String, String> getReadFailures()  {
876    return sink.getReadFailures();
877  }
878
879  public Map<String, String> getWriteFailures()  {
880    return sink.getWriteFailures();
881  }
882
883  private void printUsageAndExit() {
884    System.err.println(
885      "Usage: canary [OPTIONS] [<TABLE1> [<TABLE2]...] | [<REGIONSERVER1> [<REGIONSERVER2]..]");
886    System.err.println("Where [OPTIONS] are:");
887    System.err.println(" -h,-help        show this help and exit.");
888    System.err.println(" -regionserver   set 'regionserver mode'; gets row from random region on " +
889        "server");
890    System.err.println(" -allRegions     get from ALL regions when 'regionserver mode', not just " +
891        "random one.");
892    System.err.println(" -zookeeper      set 'zookeeper mode'; grab zookeeper.znode.parent on " +
893        "each ensemble member");
894    System.err.println(" -permittedZookeeperFailures <N>     Ignore first N failures when attempting to " +
895        "connect to individual zookeeper nodes in the ensemble");
896    System.err.println(" -daemon         continuous check at defined intervals.");
897    System.err.println(" -interval <N>   interval between checks in seconds");
898    System.err.println(" -e              consider table/regionserver argument as regular " +
899        "expression");
900    System.err.println(" -f <B>          exit on first error; default=true");
901    System.err.println(" -failureAsError treat read/write failure as error");
902    System.err.println(" -t <N>          timeout for canary-test run; default=600000ms");
903    System.err.println(" -writeSniffing  enable write sniffing");
904    System.err.println(" -writeTable     the table used for write sniffing; default=hbase:canary");
905    System.err.println(" -writeTableTimeout <N>  timeout for writeTable; default=600000ms");
906    System.err.println(" -readTableTimeouts <tableName>=<read timeout>," +
907        "<tableName>=<read timeout>,...");
908    System.err.println("                comma-separated list of table read timeouts " +
909        "(no spaces);");
910    System.err.println("                logs 'ERROR' if takes longer. default=600000ms");
911    System.err.println("");
912    System.err.println(" -D<configProperty>=<value> to assign or override configuration params");
913    System.err.println(" -Dhbase.canary.read.raw.enabled=<true/false> Set to enable/disable " +
914        "raw scan; default=false");
915    System.err.println("");
916    System.err.println("Canary runs in one of three modes: region (default), regionserver, or " +
917        "zookeeper.");
918    System.err.println("To sniff/probe all regions, pass no arguments.");
919    System.err.println("To sniff/probe all regions of a table, pass tablename.");
920    System.err.println("To sniff/probe regionservers, pass -regionserver, etc.");
921    System.err.println("See http://hbase.apache.org/book.html#_canary for Canary documentation.");
922    System.exit(USAGE_EXIT_CODE);
923  }
924
925  Sink getSink(Configuration configuration, Class clazz) {
926    // In test context, this.sink might be set. Use it if non-null. For testing.
927    return this.sink != null? this.sink:
928        (Sink)ReflectionUtils.newInstance(configuration.getClass("hbase.canary.sink.class",
929            clazz, Sink.class));
930  }
931
932  /**
933   * A Factory method for {@link Monitor}.
934   * Makes a RegionServerMonitor, or a ZooKeeperMonitor, or a RegionMonitor.
935   * @param index a start index for monitor target
936   * @param args args passed from user
937   * @return a Monitor instance
938   */
939  public Monitor newMonitor(final Connection connection, int index, String[] args) {
940    Monitor monitor = null;
941    String[] monitorTargets = null;
942
943    if (index >= 0) {
944      int length = args.length - index;
945      monitorTargets = new String[length];
946      System.arraycopy(args, index, monitorTargets, 0, length);
947    }
948
949    if (this.regionServerMode) {
950      monitor =
951          new RegionServerMonitor(connection, monitorTargets, this.useRegExp,
952              getSink(connection.getConfiguration(), RegionServerStdOutSink.class),
953              this.executor, this.regionServerAllRegions,
954              this.treatFailureAsError, this.permittedFailures);
955    } else if (this.zookeeperMode) {
956      monitor =
957          new ZookeeperMonitor(connection, monitorTargets, this.useRegExp,
958              getSink(connection.getConfiguration(), ZookeeperStdOutSink.class),
959              this.executor, this.treatFailureAsError, this.permittedFailures);
960    } else {
961      monitor =
962          new RegionMonitor(connection, monitorTargets, this.useRegExp,
963              getSink(connection.getConfiguration(), RegionStdOutSink.class),
964              this.executor, this.writeSniffing,
965              this.writeTableName, this.treatFailureAsError, this.configuredReadTableTimeouts,
966              this.configuredWriteTableTimeout, this.permittedFailures);
967    }
968    return monitor;
969  }
970
971  /**
972   * A Monitor super-class can be extended by users
973   */
974  public static abstract class Monitor implements Runnable, Closeable {
975    protected Connection connection;
976    protected Admin admin;
977    /**
978     * 'Target' dependent on 'mode'. Could be Tables or RegionServers or ZNodes.
979     * Passed on the command-line as arguments.
980     */
981    protected String[] targets;
982    protected boolean useRegExp;
983    protected boolean treatFailureAsError;
984    protected boolean initialized = false;
985
986    protected boolean done = false;
987    protected int errorCode = 0;
988    protected long allowedFailures = 0;
989    protected Sink sink;
990    protected ExecutorService executor;
991
992    public boolean isDone() {
993      return done;
994    }
995
996    public boolean hasError() {
997      return errorCode != 0;
998    }
999
1000    public boolean finalCheckForErrors() {
1001      if (errorCode != 0) {
1002        return true;
1003      }
1004      if (treatFailureAsError &&
1005          (sink.getReadFailureCount() > allowedFailures || sink.getWriteFailureCount() > allowedFailures)) {
1006        LOG.error("Too many failures detected, treating failure as error, failing the Canary.");
1007        errorCode = FAILURE_EXIT_CODE;
1008        return true;
1009      }
1010      return false;
1011    }
1012
1013    @Override
1014    public void close() throws IOException {
1015      if (this.admin != null) this.admin.close();
1016    }
1017
1018    protected Monitor(Connection connection, String[] monitorTargets, boolean useRegExp, Sink sink,
1019        ExecutorService executor, boolean treatFailureAsError, long allowedFailures) {
1020      if (null == connection) throw new IllegalArgumentException("connection shall not be null");
1021
1022      this.connection = connection;
1023      this.targets = monitorTargets;
1024      this.useRegExp = useRegExp;
1025      this.treatFailureAsError = treatFailureAsError;
1026      this.sink = sink;
1027      this.executor = executor;
1028      this.allowedFailures = allowedFailures;
1029    }
1030
1031    @Override
1032    public abstract void run();
1033
1034    protected boolean initAdmin() {
1035      if (null == this.admin) {
1036        try {
1037          this.admin = this.connection.getAdmin();
1038        } catch (Exception e) {
1039          LOG.error("Initial HBaseAdmin failed...", e);
1040          this.errorCode = INIT_ERROR_EXIT_CODE;
1041        }
1042      } else if (admin.isAborted()) {
1043        LOG.error("HBaseAdmin aborted");
1044        this.errorCode = INIT_ERROR_EXIT_CODE;
1045      }
1046      return !this.hasError();
1047    }
1048  }
1049
1050  /**
1051   * A monitor for region mode.
1052   */
1053  private static class RegionMonitor extends Monitor {
1054    // 10 minutes
1055    private static final int DEFAULT_WRITE_TABLE_CHECK_PERIOD = 10 * 60 * 1000;
1056    // 1 days
1057    private static final int DEFAULT_WRITE_DATA_TTL = 24 * 60 * 60;
1058
1059    private long lastCheckTime = -1;
1060    private boolean writeSniffing;
1061    private TableName writeTableName;
1062    private int writeDataTTL;
1063    private float regionsLowerLimit;
1064    private float regionsUpperLimit;
1065    private int checkPeriod;
1066    private boolean rawScanEnabled;
1067
1068    /**
1069     * This is a timeout per table. If read of each region in the table aggregated takes longer
1070     * than what is configured here, we log an ERROR rather than just an INFO.
1071     */
1072    private HashMap<String, Long> configuredReadTableTimeouts;
1073
1074    private long configuredWriteTableTimeout;
1075
1076    public RegionMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
1077        Sink sink, ExecutorService executor, boolean writeSniffing, TableName writeTableName,
1078        boolean treatFailureAsError, HashMap<String, Long> configuredReadTableTimeouts,
1079        long configuredWriteTableTimeout, long allowedFailures) {
1080      super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError, allowedFailures);
1081      Configuration conf = connection.getConfiguration();
1082      this.writeSniffing = writeSniffing;
1083      this.writeTableName = writeTableName;
1084      this.writeDataTTL =
1085          conf.getInt(HConstants.HBASE_CANARY_WRITE_DATA_TTL_KEY, DEFAULT_WRITE_DATA_TTL);
1086      this.regionsLowerLimit =
1087          conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY, 1.0f);
1088      this.regionsUpperLimit =
1089          conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_UPPERLIMIT_KEY, 1.5f);
1090      this.checkPeriod =
1091          conf.getInt(HConstants.HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY,
1092            DEFAULT_WRITE_TABLE_CHECK_PERIOD);
1093      this.rawScanEnabled = conf.getBoolean(HConstants.HBASE_CANARY_READ_RAW_SCAN_KEY, false);
1094      this.configuredReadTableTimeouts = new HashMap<>(configuredReadTableTimeouts);
1095      this.configuredWriteTableTimeout = configuredWriteTableTimeout;
1096    }
1097
1098    private RegionStdOutSink getSink() {
1099      if (!(sink instanceof RegionStdOutSink)) {
1100        throw new RuntimeException("Can only write to Region sink");
1101      }
1102      return ((RegionStdOutSink) sink);
1103    }
1104
1105    @Override
1106    public void run() {
1107      if (this.initAdmin()) {
1108        try {
1109          List<Future<Void>> taskFutures = new LinkedList<>();
1110          RegionStdOutSink regionSink = this.getSink();
1111          if (this.targets != null && this.targets.length > 0) {
1112            String[] tables = generateMonitorTables(this.targets);
1113            // Check to see that each table name passed in the -readTableTimeouts argument is also
1114            // passed as a monitor target.
1115            if (!new HashSet<>(Arrays.asList(tables)).
1116                containsAll(this.configuredReadTableTimeouts.keySet())) {
1117              LOG.error("-readTableTimeouts can only specify read timeouts for monitor targets " +
1118                  "passed via command line.");
1119              this.errorCode = USAGE_EXIT_CODE;
1120              return;
1121            }
1122            this.initialized = true;
1123            for (String table : tables) {
1124              LongAdder readLatency = regionSink.initializeAndGetReadLatencyForTable(table);
1125              taskFutures.addAll(Canary.sniff(admin, regionSink, table, executor, TaskType.READ,
1126                this.rawScanEnabled, readLatency));
1127            }
1128          } else {
1129            taskFutures.addAll(sniff(TaskType.READ, regionSink));
1130          }
1131
1132          if (writeSniffing) {
1133            if (EnvironmentEdgeManager.currentTime() - lastCheckTime > checkPeriod) {
1134              try {
1135                checkWriteTableDistribution();
1136              } catch (IOException e) {
1137                LOG.error("Check canary table distribution failed!", e);
1138              }
1139              lastCheckTime = EnvironmentEdgeManager.currentTime();
1140            }
1141            // sniff canary table with write operation
1142            regionSink.initializeWriteLatency();
1143            LongAdder writeTableLatency = regionSink.getWriteLatency();
1144            taskFutures.addAll(Canary.sniff(admin, regionSink, admin.getDescriptor(writeTableName),
1145              executor, TaskType.WRITE, this.rawScanEnabled, writeTableLatency));
1146          }
1147
1148          for (Future<Void> future : taskFutures) {
1149            try {
1150              future.get();
1151            } catch (ExecutionException e) {
1152              LOG.error("Sniff region failed!", e);
1153            }
1154          }
1155          Map<String, LongAdder> actualReadTableLatency = regionSink.getReadLatencyMap();
1156          for (Map.Entry<String, Long> entry : configuredReadTableTimeouts.entrySet()) {
1157            String tableName = entry.getKey();
1158            if (actualReadTableLatency.containsKey(tableName)) {
1159              Long actual = actualReadTableLatency.get(tableName).longValue();
1160              Long configured = entry.getValue();
1161              if (actual > configured) {
1162                LOG.error("Read operation for {} took {}ms (Configured read timeout {}ms.",
1163                    tableName, actual, configured);
1164              } else {
1165                LOG.info("Read operation for {} took {}ms (Configured read timeout {}ms.",
1166                    tableName, actual, configured);
1167              }
1168            } else {
1169              LOG.error("Read operation for {} failed!", tableName);
1170            }
1171          }
1172          if (this.writeSniffing) {
1173            String writeTableStringName = this.writeTableName.getNameAsString();
1174            long actualWriteLatency = regionSink.getWriteLatency().longValue();
1175            LOG.info("Write operation for {} took {}ms. Configured write timeout {}ms.",
1176                writeTableStringName, actualWriteLatency, this.configuredWriteTableTimeout);
1177            // Check that the writeTable write operation latency does not exceed the configured timeout.
1178            if (actualWriteLatency > this.configuredWriteTableTimeout) {
1179              LOG.error("Write operation for {} exceeded the configured write timeout.",
1180                  writeTableStringName);
1181            }
1182          }
1183        } catch (Exception e) {
1184          LOG.error("Run regionMonitor failed", e);
1185          this.errorCode = ERROR_EXIT_CODE;
1186        } finally {
1187          this.done = true;
1188        }
1189      }
1190      this.done = true;
1191    }
1192
1193    /**
1194     * @return List of tables to use in test.
1195     */
1196    private String[] generateMonitorTables(String[] monitorTargets) throws IOException {
1197      String[] returnTables = null;
1198
1199      if (this.useRegExp) {
1200        Pattern pattern = null;
1201        TableDescriptor[] tds = null;
1202        Set<String> tmpTables = new TreeSet<>();
1203        try {
1204          LOG.debug(String.format("reading list of tables"));
1205          tds = this.admin.listTables(pattern);
1206          if (tds == null) {
1207            tds = new TableDescriptor[0];
1208          }
1209          for (String monitorTarget : monitorTargets) {
1210            pattern = Pattern.compile(monitorTarget);
1211            for (TableDescriptor td : tds) {
1212              if (pattern.matcher(td.getTableName().getNameAsString()).matches()) {
1213                tmpTables.add(td.getTableName().getNameAsString());
1214              }
1215            }
1216          }
1217        } catch (IOException e) {
1218          LOG.error("Communicate with admin failed", e);
1219          throw e;
1220        }
1221
1222        if (tmpTables.size() > 0) {
1223          returnTables = tmpTables.toArray(new String[tmpTables.size()]);
1224        } else {
1225          String msg = "No HTable found, tablePattern:" + Arrays.toString(monitorTargets);
1226          LOG.error(msg);
1227          this.errorCode = INIT_ERROR_EXIT_CODE;
1228          throw new TableNotFoundException(msg);
1229        }
1230      } else {
1231        returnTables = monitorTargets;
1232      }
1233
1234      return returnTables;
1235    }
1236
1237    /*
1238     * Canary entry point to monitor all the tables.
1239     */
1240    private List<Future<Void>> sniff(TaskType taskType, RegionStdOutSink regionSink)
1241        throws Exception {
1242      LOG.debug("Reading list of tables");
1243      List<Future<Void>> taskFutures = new LinkedList<>();
1244      for (TableDescriptor td: admin.listTableDescriptors()) {
1245        if (admin.isTableEnabled(td.getTableName()) &&
1246            (!td.getTableName().equals(writeTableName))) {
1247          LongAdder readLatency =
1248              regionSink.initializeAndGetReadLatencyForTable(td.getTableName().getNameAsString());
1249          taskFutures.addAll(Canary.sniff(admin, sink, td, executor, taskType, this.rawScanEnabled,
1250              readLatency));
1251        }
1252      }
1253      return taskFutures;
1254    }
1255
1256    private void checkWriteTableDistribution() throws IOException {
1257      if (!admin.tableExists(writeTableName)) {
1258        int numberOfServers =
1259            admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics().size();
1260        if (numberOfServers == 0) {
1261          throw new IllegalStateException("No live regionservers");
1262        }
1263        createWriteTable(numberOfServers);
1264      }
1265
1266      if (!admin.isTableEnabled(writeTableName)) {
1267        admin.enableTable(writeTableName);
1268      }
1269
1270      ClusterMetrics status =
1271          admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS, Option.MASTER));
1272      int numberOfServers = status.getLiveServerMetrics().size();
1273      if (status.getLiveServerMetrics().containsKey(status.getMasterName())) {
1274        numberOfServers -= 1;
1275      }
1276
1277      List<Pair<RegionInfo, ServerName>> pairs =
1278          MetaTableAccessor.getTableRegionsAndLocations(connection, writeTableName);
1279      int numberOfRegions = pairs.size();
1280      if (numberOfRegions < numberOfServers * regionsLowerLimit
1281          || numberOfRegions > numberOfServers * regionsUpperLimit) {
1282        admin.disableTable(writeTableName);
1283        admin.deleteTable(writeTableName);
1284        createWriteTable(numberOfServers);
1285      }
1286      HashSet<ServerName> serverSet = new HashSet<>();
1287      for (Pair<RegionInfo, ServerName> pair : pairs) {
1288        serverSet.add(pair.getSecond());
1289      }
1290      int numberOfCoveredServers = serverSet.size();
1291      if (numberOfCoveredServers < numberOfServers) {
1292        admin.balancer();
1293      }
1294    }
1295
1296    private void createWriteTable(int numberOfServers) throws IOException {
1297      int numberOfRegions = (int)(numberOfServers * regionsLowerLimit);
1298      LOG.info("Number of live regionservers {}, pre-splitting the canary table into {} regions " +
1299        "(current lower limit of regions per server is {} and you can change it with config {}).",
1300          numberOfServers, numberOfRegions, regionsLowerLimit,
1301          HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY);
1302      HTableDescriptor desc = new HTableDescriptor(writeTableName);
1303      HColumnDescriptor family = new HColumnDescriptor(CANARY_TABLE_FAMILY_NAME);
1304      family.setMaxVersions(1);
1305      family.setTimeToLive(writeDataTTL);
1306
1307      desc.addFamily(family);
1308      byte[][] splits = new RegionSplitter.HexStringSplit().split(numberOfRegions);
1309      admin.createTable(desc, splits);
1310    }
1311  }
1312
1313  /**
1314   * Canary entry point for specified table.
1315   * @throws Exception
1316   */
1317  private static List<Future<Void>> sniff(final Admin admin, final Sink sink, String tableName,
1318      ExecutorService executor, TaskType taskType, boolean rawScanEnabled, LongAdder readLatency)
1319      throws Exception {
1320    LOG.debug("Checking table is enabled and getting table descriptor for table {}", tableName);
1321    if (admin.isTableEnabled(TableName.valueOf(tableName))) {
1322      return Canary.sniff(admin, sink, admin.getDescriptor(TableName.valueOf(tableName)),
1323        executor, taskType, rawScanEnabled, readLatency);
1324    } else {
1325      LOG.warn("Table {} is not enabled", tableName);
1326    }
1327    return new LinkedList<>();
1328  }
1329
1330  /*
1331   * Loops over regions of this table, and outputs information about the state.
1332   */
1333  private static List<Future<Void>> sniff(final Admin admin, final Sink sink,
1334      TableDescriptor tableDesc, ExecutorService executor, TaskType taskType,
1335      boolean rawScanEnabled, LongAdder rwLatency) throws Exception {
1336    LOG.debug("Reading list of regions for table {}", tableDesc.getTableName());
1337    try (Table table = admin.getConnection().getTable(tableDesc.getTableName())) {
1338      List<RegionTask> tasks = new ArrayList<>();
1339      try (RegionLocator regionLocator =
1340               admin.getConnection().getRegionLocator(tableDesc.getTableName())) {
1341        for (HRegionLocation location: regionLocator.getAllRegionLocations()) {
1342          ServerName rs = location.getServerName();
1343          RegionInfo region = location.getRegion();
1344          tasks.add(new RegionTask(admin.getConnection(), region, rs, (RegionStdOutSink)sink,
1345              taskType, rawScanEnabled, rwLatency));
1346        }
1347        return executor.invokeAll(tasks);
1348      }
1349    } catch (TableNotFoundException e) {
1350      return Collections.EMPTY_LIST;
1351    }
1352  }
1353
1354  //  monitor for zookeeper mode
1355  private static class ZookeeperMonitor extends Monitor {
1356    private List<String> hosts;
1357    private final String znode;
1358    private final int timeout;
1359
1360    protected ZookeeperMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
1361        Sink sink, ExecutorService executor, boolean treatFailureAsError, long allowedFailures)  {
1362      super(connection, monitorTargets, useRegExp,
1363          sink, executor, treatFailureAsError, allowedFailures);
1364      Configuration configuration = connection.getConfiguration();
1365      znode =
1366          configuration.get(ZOOKEEPER_ZNODE_PARENT,
1367              DEFAULT_ZOOKEEPER_ZNODE_PARENT);
1368      timeout = configuration
1369          .getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
1370      ConnectStringParser parser =
1371          new ConnectStringParser(ZKConfig.getZKQuorumServersString(configuration));
1372      hosts = Lists.newArrayList();
1373      for (InetSocketAddress server : parser.getServerAddresses()) {
1374        hosts.add(server.toString());
1375      }
1376      if (allowedFailures > (hosts.size() - 1) / 2) {
1377        LOG.warn("Confirm allowable number of failed ZooKeeper nodes, as quorum will " +
1378                        "already be lost. Setting of {} failures is unexpected for {} ensemble size.",
1379                allowedFailures, hosts.size());
1380      }
1381    }
1382
1383    @Override public void run() {
1384      List<ZookeeperTask> tasks = Lists.newArrayList();
1385      ZookeeperStdOutSink zkSink = null;
1386      try {
1387        zkSink = this.getSink();
1388      } catch (RuntimeException e) {
1389        LOG.error("Run ZooKeeperMonitor failed!", e);
1390        this.errorCode = ERROR_EXIT_CODE;
1391      }
1392      this.initialized = true;
1393      for (final String host : hosts) {
1394        tasks.add(new ZookeeperTask(connection, host, znode, timeout, zkSink));
1395      }
1396      try {
1397        for (Future<Void> future : this.executor.invokeAll(tasks)) {
1398          try {
1399            future.get();
1400          } catch (ExecutionException e) {
1401            LOG.error("Sniff zookeeper failed!", e);
1402            this.errorCode = ERROR_EXIT_CODE;
1403          }
1404        }
1405      } catch (InterruptedException e) {
1406        this.errorCode = ERROR_EXIT_CODE;
1407        Thread.currentThread().interrupt();
1408        LOG.error("Sniff zookeeper interrupted!", e);
1409      }
1410      this.done = true;
1411    }
1412
1413    private ZookeeperStdOutSink getSink() {
1414      if (!(sink instanceof ZookeeperStdOutSink)) {
1415        throw new RuntimeException("Can only write to zookeeper sink");
1416      }
1417      return ((ZookeeperStdOutSink) sink);
1418    }
1419  }
1420
1421
1422  /**
1423   * A monitor for regionserver mode
1424   */
1425  private static class RegionServerMonitor extends Monitor {
1426    private boolean allRegions;
1427
1428    public RegionServerMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
1429        Sink sink, ExecutorService executor, boolean allRegions,
1430        boolean treatFailureAsError, long allowedFailures) {
1431      super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError, allowedFailures);
1432      this.allRegions = allRegions;
1433    }
1434
1435    private RegionServerStdOutSink getSink() {
1436      if (!(sink instanceof RegionServerStdOutSink)) {
1437        throw new RuntimeException("Can only write to regionserver sink");
1438      }
1439      return ((RegionServerStdOutSink) sink);
1440    }
1441
1442    @Override
1443    public void run() {
1444      if (this.initAdmin() && this.checkNoTableNames()) {
1445        RegionServerStdOutSink regionServerSink = null;
1446        try {
1447          regionServerSink = this.getSink();
1448        } catch (RuntimeException e) {
1449          LOG.error("Run RegionServerMonitor failed!", e);
1450          this.errorCode = ERROR_EXIT_CODE;
1451        }
1452        Map<String, List<RegionInfo>> rsAndRMap = this.filterRegionServerByName();
1453        this.initialized = true;
1454        this.monitorRegionServers(rsAndRMap, regionServerSink);
1455      }
1456      this.done = true;
1457    }
1458
1459    private boolean checkNoTableNames() {
1460      List<String> foundTableNames = new ArrayList<>();
1461      TableName[] tableNames = null;
1462      LOG.debug("Reading list of tables");
1463      try {
1464        tableNames = this.admin.listTableNames();
1465      } catch (IOException e) {
1466        LOG.error("Get listTableNames failed", e);
1467        this.errorCode = INIT_ERROR_EXIT_CODE;
1468        return false;
1469      }
1470
1471      if (this.targets == null || this.targets.length == 0) return true;
1472
1473      for (String target : this.targets) {
1474        for (TableName tableName : tableNames) {
1475          if (target.equals(tableName.getNameAsString())) {
1476            foundTableNames.add(target);
1477          }
1478        }
1479      }
1480
1481      if (foundTableNames.size() > 0) {
1482        System.err.println("Cannot pass a tablename when using the -regionserver " +
1483            "option, tablenames:" + foundTableNames.toString());
1484        this.errorCode = USAGE_EXIT_CODE;
1485      }
1486      return foundTableNames.isEmpty();
1487    }
1488
1489    private void monitorRegionServers(Map<String, List<RegionInfo>> rsAndRMap, RegionServerStdOutSink regionServerSink) {
1490      List<RegionServerTask> tasks = new ArrayList<>();
1491      Map<String, AtomicLong> successMap = new HashMap<>();
1492      Random rand = new Random();
1493      for (Map.Entry<String, List<RegionInfo>> entry : rsAndRMap.entrySet()) {
1494        String serverName = entry.getKey();
1495        AtomicLong successes = new AtomicLong(0);
1496        successMap.put(serverName, successes);
1497        if (entry.getValue().isEmpty()) {
1498          LOG.error("Regionserver not serving any regions - {}", serverName);
1499        } else if (this.allRegions) {
1500          for (RegionInfo region : entry.getValue()) {
1501            tasks.add(new RegionServerTask(this.connection,
1502                serverName,
1503                region,
1504                regionServerSink,
1505                successes));
1506          }
1507        } else {
1508          // random select a region if flag not set
1509          RegionInfo region = entry.getValue().get(rand.nextInt(entry.getValue().size()));
1510          tasks.add(new RegionServerTask(this.connection,
1511              serverName,
1512              region,
1513              regionServerSink,
1514              successes));
1515        }
1516      }
1517      try {
1518        for (Future<Void> future : this.executor.invokeAll(tasks)) {
1519          try {
1520            future.get();
1521          } catch (ExecutionException e) {
1522            LOG.error("Sniff regionserver failed!", e);
1523            this.errorCode = ERROR_EXIT_CODE;
1524          }
1525        }
1526        if (this.allRegions) {
1527          for (Map.Entry<String, List<RegionInfo>> entry : rsAndRMap.entrySet()) {
1528            String serverName = entry.getKey();
1529            LOG.info("Successfully read {} regions out of {} on regionserver {}",
1530                successMap.get(serverName), entry.getValue().size(), serverName);
1531          }
1532        }
1533      } catch (InterruptedException e) {
1534        this.errorCode = ERROR_EXIT_CODE;
1535        LOG.error("Sniff regionserver interrupted!", e);
1536      }
1537    }
1538
1539    private Map<String, List<RegionInfo>> filterRegionServerByName() {
1540      Map<String, List<RegionInfo>> regionServerAndRegionsMap = this.getAllRegionServerByName();
1541      regionServerAndRegionsMap = this.doFilterRegionServerByName(regionServerAndRegionsMap);
1542      return regionServerAndRegionsMap;
1543    }
1544
1545    private Map<String, List<RegionInfo>> getAllRegionServerByName() {
1546      Map<String, List<RegionInfo>> rsAndRMap = new HashMap<>();
1547      try {
1548        LOG.debug("Reading list of tables and locations");
1549        List<TableDescriptor> tableDescs = this.admin.listTableDescriptors();
1550        List<RegionInfo> regions = null;
1551        for (TableDescriptor tableDesc: tableDescs) {
1552          try (RegionLocator regionLocator =
1553                   this.admin.getConnection().getRegionLocator(tableDesc.getTableName())) {
1554            for (HRegionLocation location : regionLocator.getAllRegionLocations()) {
1555              ServerName rs = location.getServerName();
1556              String rsName = rs.getHostname();
1557              RegionInfo r = location.getRegion();
1558              if (rsAndRMap.containsKey(rsName)) {
1559                regions = rsAndRMap.get(rsName);
1560              } else {
1561                regions = new ArrayList<>();
1562                rsAndRMap.put(rsName, regions);
1563              }
1564              regions.add(r);
1565            }
1566          }
1567        }
1568
1569        // get any live regionservers not serving any regions
1570        for (ServerName rs: this.admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))
1571          .getLiveServerMetrics().keySet()) {
1572          String rsName = rs.getHostname();
1573          if (!rsAndRMap.containsKey(rsName)) {
1574            rsAndRMap.put(rsName, Collections.<RegionInfo> emptyList());
1575          }
1576        }
1577      } catch (IOException e) {
1578        LOG.error("Get HTables info failed", e);
1579        this.errorCode = INIT_ERROR_EXIT_CODE;
1580      }
1581      return rsAndRMap;
1582    }
1583
1584    private Map<String, List<RegionInfo>> doFilterRegionServerByName(
1585        Map<String, List<RegionInfo>> fullRsAndRMap) {
1586
1587      Map<String, List<RegionInfo>> filteredRsAndRMap = null;
1588
1589      if (this.targets != null && this.targets.length > 0) {
1590        filteredRsAndRMap = new HashMap<>();
1591        Pattern pattern = null;
1592        Matcher matcher = null;
1593        boolean regExpFound = false;
1594        for (String rsName : this.targets) {
1595          if (this.useRegExp) {
1596            regExpFound = false;
1597            pattern = Pattern.compile(rsName);
1598            for (Map.Entry<String, List<RegionInfo>> entry : fullRsAndRMap.entrySet()) {
1599              matcher = pattern.matcher(entry.getKey());
1600              if (matcher.matches()) {
1601                filteredRsAndRMap.put(entry.getKey(), entry.getValue());
1602                regExpFound = true;
1603              }
1604            }
1605            if (!regExpFound) {
1606              LOG.info("No RegionServerInfo found, regionServerPattern {}", rsName);
1607            }
1608          } else {
1609            if (fullRsAndRMap.containsKey(rsName)) {
1610              filteredRsAndRMap.put(rsName, fullRsAndRMap.get(rsName));
1611            } else {
1612              LOG.info("No RegionServerInfo found, regionServerName {}", rsName);
1613            }
1614          }
1615        }
1616      } else {
1617        filteredRsAndRMap = fullRsAndRMap;
1618      }
1619      return filteredRsAndRMap;
1620    }
1621  }
1622
1623  public static void main(String[] args) throws Exception {
1624    final Configuration conf = HBaseConfiguration.create();
1625
1626    // Loading the generic options to conf
1627    new GenericOptionsParser(conf, args);
1628
1629    int numThreads = conf.getInt("hbase.canary.threads.num", MAX_THREADS_NUM);
1630    LOG.info("Execution thread count={}", numThreads);
1631
1632    int exitCode = 0;
1633    ExecutorService executor = new ScheduledThreadPoolExecutor(numThreads);
1634    try {
1635      exitCode = ToolRunner.run(conf, new Canary(executor), args);
1636    } finally {
1637      executor.shutdown();
1638    }
1639    System.exit(exitCode);
1640  }
1641}