001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.tool;
021
022import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT;
023import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_ZNODE_PARENT;
024
025import java.io.Closeable;
026import java.io.IOException;
027import java.net.InetSocketAddress;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Collections;
031import java.util.EnumSet;
032import java.util.HashMap;
033import java.util.HashSet;
034import java.util.LinkedList;
035import java.util.List;
036import java.util.Map;
037import java.util.Random;
038import java.util.Set;
039import java.util.TreeSet;
040import java.util.concurrent.Callable;
041import java.util.concurrent.ConcurrentHashMap;
042import java.util.concurrent.ExecutionException;
043import java.util.concurrent.ExecutorService;
044import java.util.concurrent.Future;
045import java.util.concurrent.ScheduledThreadPoolExecutor;
046import java.util.concurrent.atomic.AtomicLong;
047import java.util.concurrent.atomic.LongAdder;
048import java.util.regex.Matcher;
049import java.util.regex.Pattern;
050
051import org.apache.commons.lang3.time.StopWatch;
052import org.apache.hadoop.conf.Configuration;
053import org.apache.hadoop.hbase.AuthUtil;
054import org.apache.hadoop.hbase.ChoreService;
055import org.apache.hadoop.hbase.ClusterMetrics;
056import org.apache.hadoop.hbase.ClusterMetrics.Option;
057import org.apache.hadoop.hbase.DoNotRetryIOException;
058import org.apache.hadoop.hbase.HBaseConfiguration;
059import org.apache.hadoop.hbase.HBaseInterfaceAudience;
060import org.apache.hadoop.hbase.HColumnDescriptor;
061import org.apache.hadoop.hbase.HConstants;
062import org.apache.hadoop.hbase.HRegionLocation;
063import org.apache.hadoop.hbase.HTableDescriptor;
064import org.apache.hadoop.hbase.MetaTableAccessor;
065import org.apache.hadoop.hbase.NamespaceDescriptor;
066import org.apache.hadoop.hbase.ScheduledChore;
067import org.apache.hadoop.hbase.ServerName;
068import org.apache.hadoop.hbase.TableName;
069import org.apache.hadoop.hbase.TableNotEnabledException;
070import org.apache.hadoop.hbase.TableNotFoundException;
071import org.apache.hadoop.hbase.client.Admin;
072import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
073import org.apache.hadoop.hbase.client.Connection;
074import org.apache.hadoop.hbase.client.ConnectionFactory;
075import org.apache.hadoop.hbase.client.Get;
076import org.apache.hadoop.hbase.client.Put;
077import org.apache.hadoop.hbase.client.RegionInfo;
078import org.apache.hadoop.hbase.client.RegionLocator;
079import org.apache.hadoop.hbase.client.ResultScanner;
080import org.apache.hadoop.hbase.client.Scan;
081import org.apache.hadoop.hbase.client.Table;
082import org.apache.hadoop.hbase.client.TableDescriptor;
083import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
084import org.apache.hadoop.hbase.tool.CanaryTool.RegionTask.TaskType;
085import org.apache.hadoop.hbase.util.Bytes;
086import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
087import org.apache.hadoop.hbase.util.Pair;
088import org.apache.hadoop.hbase.util.ReflectionUtils;
089import org.apache.hadoop.hbase.util.RegionSplitter;
090import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
091import org.apache.hadoop.hbase.zookeeper.ZKConfig;
092import org.apache.hadoop.util.Tool;
093import org.apache.hadoop.util.ToolRunner;
094import org.apache.yetus.audience.InterfaceAudience;
095import org.apache.zookeeper.KeeperException;
096import org.apache.zookeeper.ZooKeeper;
097import org.apache.zookeeper.client.ConnectStringParser;
098import org.apache.zookeeper.data.Stat;
099import org.slf4j.Logger;
100import org.slf4j.LoggerFactory;
101
102import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
103import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
104
105/**
106 * HBase Canary Tool for "canary monitoring" of a running HBase cluster.
107 *
108 * There are three modes:
109 * <ol>
110 * <li>region mode (Default): For each region, try to get one row per column family outputting
111 * information on failure (ERROR) or else the latency.
112 * </li>
113 *
114 * <li>regionserver mode: For each regionserver try to get one row from one table selected
115 * randomly outputting information on failure (ERROR) or else the latency.
116 * </li>
117 *
118 * <li>zookeeper mode: for each zookeeper instance, selects a znode outputting information on
119 * failure (ERROR) or else the latency.
120 * </li>
121 * </ol>
122 */
123@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
124public class CanaryTool implements Tool, Canary {
125
126  @Override
127  public int checkRegions(String[] targets) throws Exception {
128    String configuredReadTableTimeoutsStr = conf.get(HBASE_CANARY_REGION_READ_TABLE_TIMEOUT);
129    try {
130      if (configuredReadTableTimeoutsStr != null) {
131        populateReadTableTimeoutsMap(configuredReadTableTimeoutsStr);
132      }
133    } catch (IllegalArgumentException e) {
134      LOG.error("Constructing read table timeouts map failed ", e);
135      return USAGE_EXIT_CODE;
136    }
137    return runMonitor(targets);
138  }
139
140  @Override
141  public int checkRegionServers(String[] targets) throws Exception {
142    regionServerMode = true;
143    return runMonitor(targets);
144  }
145
146  @Override
147  public int checkZooKeeper() throws Exception {
148    zookeeperMode = true;
149    return runMonitor(null);
150  }
151
152  /**
153   * Sink interface used by the canary to output information
154   */
155  public interface Sink {
156    long getReadFailureCount();
157    long incReadFailureCount();
158    Map<String,String> getReadFailures();
159    void updateReadFailures(String regionName, String serverName);
160    long getWriteFailureCount();
161    long incWriteFailureCount();
162    Map<String,String> getWriteFailures();
163    void updateWriteFailures(String regionName, String serverName);
164    long getReadSuccessCount();
165    long incReadSuccessCount();
166    long getWriteSuccessCount();
167    long incWriteSuccessCount();
168  }
169
170  /**
171   * Simple implementation of canary sink that allows plotting to a file or standard output.
172   */
173  public static class StdOutSink implements Sink {
174    private AtomicLong readFailureCount = new AtomicLong(0),
175        writeFailureCount = new AtomicLong(0),
176        readSuccessCount = new AtomicLong(0),
177        writeSuccessCount = new AtomicLong(0);
178    private Map<String, String> readFailures = new ConcurrentHashMap<>();
179    private Map<String, String> writeFailures = new ConcurrentHashMap<>();
180
181    @Override
182    public long getReadFailureCount() {
183      return readFailureCount.get();
184    }
185
186    @Override
187    public long incReadFailureCount() {
188      return readFailureCount.incrementAndGet();
189    }
190
191    @Override
192    public Map<String, String> getReadFailures() {
193      return readFailures;
194    }
195
196    @Override
197    public void updateReadFailures(String regionName, String serverName) {
198      readFailures.put(regionName, serverName);
199    }
200
201    @Override
202    public long getWriteFailureCount() {
203      return writeFailureCount.get();
204    }
205
206    @Override
207    public long incWriteFailureCount() {
208      return writeFailureCount.incrementAndGet();
209    }
210
211    @Override
212    public Map<String, String> getWriteFailures() {
213      return writeFailures;
214    }
215
216    @Override
217    public void updateWriteFailures(String regionName, String serverName) {
218      writeFailures.put(regionName, serverName);
219    }
220
221    @Override
222    public long getReadSuccessCount() {
223      return readSuccessCount.get();
224    }
225
226    @Override
227    public long incReadSuccessCount() {
228      return readSuccessCount.incrementAndGet();
229    }
230
231    @Override
232    public long getWriteSuccessCount() {
233      return writeSuccessCount.get();
234    }
235
236    @Override
237    public long incWriteSuccessCount() {
238      return writeSuccessCount.incrementAndGet();
239    }
240  }
241
242  /**
243   * By RegionServer, for 'regionserver' mode.
244   */
245  public static class RegionServerStdOutSink extends StdOutSink {
246    public void publishReadFailure(String table, String server) {
247      incReadFailureCount();
248      LOG.error("Read from {} on {}", table, server);
249    }
250
251    public void publishReadTiming(String table, String server, long msTime) {
252      LOG.info("Read from {} on {} in {}ms", table, server, msTime);
253    }
254  }
255
256  /**
257   * Output for 'zookeeper' mode.
258   */
259  public static class ZookeeperStdOutSink extends StdOutSink {
260    public void publishReadFailure(String znode, String server) {
261      incReadFailureCount();
262      LOG.error("Read from {} on {}", znode, server);
263    }
264
265    public void publishReadTiming(String znode, String server, long msTime) {
266      LOG.info("Read from {} on {} in {}ms", znode, server, msTime);
267    }
268  }
269
270  /**
271   * By Region, for 'region'  mode.
272   */
273  public static class RegionStdOutSink extends StdOutSink {
274    private Map<String, LongAdder> perTableReadLatency = new HashMap<>();
275    private LongAdder writeLatency = new LongAdder();
276    private final Map<String, List<RegionTaskResult>> regionMap = new ConcurrentHashMap<>();
277
278    public void publishReadFailure(ServerName serverName, RegionInfo region, Exception e) {
279      incReadFailureCount();
280      LOG.error("Read from {} on {} failed", region.getRegionNameAsString(), serverName, e);
281    }
282
283    public void publishReadFailure(ServerName serverName, RegionInfo region,
284        ColumnFamilyDescriptor column, Exception e) {
285      incReadFailureCount();
286      LOG.error("Read from {} on {} {} failed", region.getRegionNameAsString(), serverName,
287          column.getNameAsString(), e);
288    }
289
290    public void publishReadTiming(ServerName serverName, RegionInfo region,
291        ColumnFamilyDescriptor column, long msTime) {
292      RegionTaskResult rtr = new RegionTaskResult(region, region.getTable(), serverName, column);
293      rtr.setReadSuccess();
294      rtr.setReadLatency(msTime);
295      List<RegionTaskResult> rtrs = regionMap.get(region.getRegionNameAsString());
296      rtrs.add(rtr);
297      // Note that read success count will be equal to total column family read successes.
298      incReadSuccessCount();
299      LOG.info("Read from {} on {} {} in {}ms", region.getRegionNameAsString(), serverName,
300          column.getNameAsString(), msTime);
301    }
302
303    public void publishWriteFailure(ServerName serverName, RegionInfo region, Exception e) {
304      incWriteFailureCount();
305      LOG.error("Write to {} on {} failed", region.getRegionNameAsString(), serverName, e);
306    }
307
308    public void publishWriteFailure(ServerName serverName, RegionInfo region,
309        ColumnFamilyDescriptor column, Exception e) {
310      incWriteFailureCount();
311      LOG.error("Write to {} on {} {} failed", region.getRegionNameAsString(), serverName,
312          column.getNameAsString(), e);
313    }
314
315    public void publishWriteTiming(ServerName serverName, RegionInfo region,
316        ColumnFamilyDescriptor column, long msTime) {
317      RegionTaskResult rtr = new RegionTaskResult(region, region.getTable(), serverName, column);
318      rtr.setWriteSuccess();
319      rtr.setWriteLatency(msTime);
320      List<RegionTaskResult> rtrs = regionMap.get(region.getRegionNameAsString());
321      rtrs.add(rtr);
322      // Note that write success count will be equal to total column family write successes.
323      incWriteSuccessCount();
324      LOG.info("Write to {} on {} {} in {}ms",
325        region.getRegionNameAsString(), serverName, column.getNameAsString(), msTime);
326    }
327
328    public Map<String, LongAdder> getReadLatencyMap() {
329      return this.perTableReadLatency;
330    }
331
332    public LongAdder initializeAndGetReadLatencyForTable(String tableName) {
333      LongAdder initLatency = new LongAdder();
334      this.perTableReadLatency.put(tableName, initLatency);
335      return initLatency;
336    }
337
338    public void initializeWriteLatency() {
339      this.writeLatency.reset();
340    }
341
342    public LongAdder getWriteLatency() {
343      return this.writeLatency;
344    }
345
346    public Map<String, List<RegionTaskResult>> getRegionMap() {
347      return this.regionMap;
348    }
349
350    public int getTotalExpectedRegions() {
351      return this.regionMap.size();
352    }
353  }
354
355  /**
356   * Run a single zookeeper Task and then exit.
357   */
358  static class ZookeeperTask implements Callable<Void> {
359    private final Connection connection;
360    private final String host;
361    private String znode;
362    private final int timeout;
363    private ZookeeperStdOutSink sink;
364
365    public ZookeeperTask(Connection connection, String host, String znode, int timeout,
366        ZookeeperStdOutSink sink) {
367      this.connection = connection;
368      this.host = host;
369      this.znode = znode;
370      this.timeout = timeout;
371      this.sink = sink;
372    }
373
374    @Override public Void call() throws Exception {
375      ZooKeeper zooKeeper = null;
376      try {
377        zooKeeper = new ZooKeeper(host, timeout, EmptyWatcher.instance);
378        Stat exists = zooKeeper.exists(znode, false);
379        StopWatch stopwatch = new StopWatch();
380        stopwatch.start();
381        zooKeeper.getData(znode, false, exists);
382        stopwatch.stop();
383        sink.publishReadTiming(znode, host, stopwatch.getTime());
384      } catch (KeeperException | InterruptedException e) {
385        sink.publishReadFailure(znode, host);
386      } finally {
387        if (zooKeeper != null) {
388          zooKeeper.close();
389        }
390      }
391      return null;
392    }
393  }
394
395  /**
396   * Run a single Region Task and then exit. For each column family of the Region, get one row and
397   * output latency or failure.
398   */
399  static class RegionTask implements Callable<Void> {
400    public enum TaskType{
401      READ, WRITE
402    }
403    private Connection connection;
404    private RegionInfo region;
405    private RegionStdOutSink sink;
406    private TaskType taskType;
407    private boolean rawScanEnabled;
408    private ServerName serverName;
409    private LongAdder readWriteLatency;
410
411    RegionTask(Connection connection, RegionInfo region, ServerName serverName,
412        RegionStdOutSink sink, TaskType taskType, boolean rawScanEnabled, LongAdder rwLatency) {
413      this.connection = connection;
414      this.region = region;
415      this.serverName = serverName;
416      this.sink = sink;
417      this.taskType = taskType;
418      this.rawScanEnabled = rawScanEnabled;
419      this.readWriteLatency = rwLatency;
420    }
421
422    @Override
423    public Void call() {
424      switch (taskType) {
425      case READ:
426        return read();
427      case WRITE:
428        return write();
429      default:
430        return read();
431      }
432    }
433
434    public Void read() {
435      Table table = null;
436      TableDescriptor tableDesc = null;
437      try {
438        LOG.debug("Reading table descriptor for table {}", region.getTable());
439        table = connection.getTable(region.getTable());
440        tableDesc = table.getDescriptor();
441      } catch (IOException e) {
442        LOG.debug("sniffRegion {} of {} failed", region.getEncodedName(), e);
443        sink.publishReadFailure(serverName, region, e);
444        if (table != null) {
445          try {
446            table.close();
447          } catch (IOException ioe) {
448            LOG.error("Close table failed", e);
449          }
450        }
451        return null;
452      }
453
454      byte[] startKey = null;
455      Get get = null;
456      Scan scan = null;
457      ResultScanner rs = null;
458      StopWatch stopWatch = new StopWatch();
459      for (ColumnFamilyDescriptor column : tableDesc.getColumnFamilies()) {
460        stopWatch.reset();
461        startKey = region.getStartKey();
462        // Can't do a get on empty start row so do a Scan of first element if any instead.
463        if (startKey.length > 0) {
464          get = new Get(startKey);
465          get.setCacheBlocks(false);
466          get.setFilter(new FirstKeyOnlyFilter());
467          get.addFamily(column.getName());
468        } else {
469          scan = new Scan();
470          LOG.debug("rawScan {} for {}", rawScanEnabled, tableDesc.getTableName());
471          scan.setRaw(rawScanEnabled);
472          scan.setCaching(1);
473          scan.setCacheBlocks(false);
474          scan.setFilter(new FirstKeyOnlyFilter());
475          scan.addFamily(column.getName());
476          scan.setMaxResultSize(1L);
477          scan.setOneRowLimit();
478        }
479        LOG.debug("Reading from {} {} {} {}", tableDesc.getTableName(),
480            region.getRegionNameAsString(), column.getNameAsString(),
481            Bytes.toStringBinary(startKey));
482        try {
483          stopWatch.start();
484          if (startKey.length > 0) {
485            table.get(get);
486          } else {
487            rs = table.getScanner(scan);
488            rs.next();
489          }
490          stopWatch.stop();
491          this.readWriteLatency.add(stopWatch.getTime());
492          sink.publishReadTiming(serverName, region, column, stopWatch.getTime());
493        } catch (Exception e) {
494          sink.publishReadFailure(serverName, region, column, e);
495          sink.updateReadFailures(region.getRegionNameAsString(),
496              serverName == null? "NULL": serverName.getHostname());
497        } finally {
498          if (rs != null) {
499            rs.close();
500          }
501          scan = null;
502          get = null;
503        }
504      }
505      try {
506        table.close();
507      } catch (IOException e) {
508        LOG.error("Close table failed", e);
509      }
510      return null;
511    }
512
513    /**
514     * Check writes for the canary table
515     */
516    private Void write() {
517      Table table = null;
518      TableDescriptor tableDesc = null;
519      try {
520        table = connection.getTable(region.getTable());
521        tableDesc = table.getDescriptor();
522        byte[] rowToCheck = region.getStartKey();
523        if (rowToCheck.length == 0) {
524          rowToCheck = new byte[]{0x0};
525        }
526        int writeValueSize =
527            connection.getConfiguration().getInt(HConstants.HBASE_CANARY_WRITE_VALUE_SIZE_KEY, 10);
528        for (ColumnFamilyDescriptor column : tableDesc.getColumnFamilies()) {
529          Put put = new Put(rowToCheck);
530          byte[] value = new byte[writeValueSize];
531          Bytes.random(value);
532          put.addColumn(column.getName(), HConstants.EMPTY_BYTE_ARRAY, value);
533
534          LOG.debug("Writing to {} {} {} {}",
535            tableDesc.getTableName(), region.getRegionNameAsString(), column.getNameAsString(),
536            Bytes.toStringBinary(rowToCheck));
537          try {
538            long startTime = System.currentTimeMillis();
539            table.put(put);
540            long time = System.currentTimeMillis() - startTime;
541            this.readWriteLatency.add(time);
542            sink.publishWriteTiming(serverName, region, column, time);
543          } catch (Exception e) {
544            sink.publishWriteFailure(serverName, region, column, e);
545          }
546        }
547        table.close();
548      } catch (IOException e) {
549        sink.publishWriteFailure(serverName, region, e);
550        sink.updateWriteFailures(region.getRegionNameAsString(), serverName.getHostname() );
551      }
552      return null;
553    }
554  }
555
556  /**
557   * Run a single RegionServer Task and then exit.
558   * Get one row from a region on the regionserver and output latency or the failure.
559   */
560  static class RegionServerTask implements Callable<Void> {
561    private Connection connection;
562    private String serverName;
563    private RegionInfo region;
564    private RegionServerStdOutSink sink;
565    private AtomicLong successes;
566
567    RegionServerTask(Connection connection, String serverName, RegionInfo region,
568        RegionServerStdOutSink sink, AtomicLong successes) {
569      this.connection = connection;
570      this.serverName = serverName;
571      this.region = region;
572      this.sink = sink;
573      this.successes = successes;
574    }
575
576    @Override
577    public Void call() {
578      TableName tableName = null;
579      Table table = null;
580      Get get = null;
581      byte[] startKey = null;
582      Scan scan = null;
583      StopWatch stopWatch = new StopWatch();
584      // monitor one region on every region server
585      stopWatch.reset();
586      try {
587        tableName = region.getTable();
588        table = connection.getTable(tableName);
589        startKey = region.getStartKey();
590        // Can't do a get on empty start row so do a Scan of first element if any instead.
591        LOG.debug("Reading from {} {} {} {}",
592          serverName, region.getTable(), region.getRegionNameAsString(),
593          Bytes.toStringBinary(startKey));
594        if (startKey.length > 0) {
595          get = new Get(startKey);
596          get.setCacheBlocks(false);
597          get.setFilter(new FirstKeyOnlyFilter());
598          stopWatch.start();
599          table.get(get);
600          stopWatch.stop();
601        } else {
602          scan = new Scan();
603          scan.setCacheBlocks(false);
604          scan.setFilter(new FirstKeyOnlyFilter());
605          scan.setCaching(1);
606          scan.setMaxResultSize(1L);
607          scan.setOneRowLimit();
608          stopWatch.start();
609          ResultScanner s = table.getScanner(scan);
610          s.next();
611          s.close();
612          stopWatch.stop();
613        }
614        successes.incrementAndGet();
615        sink.publishReadTiming(tableName.getNameAsString(), serverName, stopWatch.getTime());
616      } catch (TableNotFoundException tnfe) {
617        LOG.error("Table may be deleted", tnfe);
618        // This is ignored because it doesn't imply that the regionserver is dead
619      } catch (TableNotEnabledException tnee) {
620        // This is considered a success since we got a response.
621        successes.incrementAndGet();
622        LOG.debug("The targeted table was disabled.  Assuming success.");
623      } catch (DoNotRetryIOException dnrioe) {
624        sink.publishReadFailure(tableName.getNameAsString(), serverName);
625        LOG.error(dnrioe.toString(), dnrioe);
626      } catch (IOException e) {
627        sink.publishReadFailure(tableName.getNameAsString(), serverName);
628        LOG.error(e.toString(), e);
629      } finally {
630        if (table != null) {
631          try {
632            table.close();
633          } catch (IOException e) {/* DO NOTHING */
634            LOG.error("Close table failed", e);
635          }
636        }
637        scan = null;
638        get = null;
639        startKey = null;
640      }
641      return null;
642    }
643  }
644
645  private static final int USAGE_EXIT_CODE = 1;
646  private static final int INIT_ERROR_EXIT_CODE = 2;
647  private static final int TIMEOUT_ERROR_EXIT_CODE = 3;
648  private static final int ERROR_EXIT_CODE = 4;
649  private static final int FAILURE_EXIT_CODE = 5;
650
651  private static final long DEFAULT_INTERVAL = 60000;
652
653  private static final long DEFAULT_TIMEOUT = 600000; // 10 mins
654  private static final int MAX_THREADS_NUM = 16; // #threads to contact regions
655
656  private static final Logger LOG = LoggerFactory.getLogger(Canary.class);
657
658  public static final TableName DEFAULT_WRITE_TABLE_NAME = TableName.valueOf(
659    NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "canary");
660
661  private static final String CANARY_TABLE_FAMILY_NAME = "Test";
662
663  private Configuration conf = null;
664  private long interval = 0;
665  private Sink sink = null;
666
667  /**
668   * True if we are to run in 'regionServer' mode.
669   */
670  private boolean regionServerMode = false;
671
672  /**
673   * True if we are to run in zookeeper 'mode'.
674   */
675  private boolean zookeeperMode = false;
676
677  /**
678   * This is a Map of table to timeout. The timeout is for reading all regions in the table; i.e.
679   * we aggregate time to fetch each region and it needs to be less than this value else we
680   * log an ERROR.
681   */
682  private HashMap<String, Long> configuredReadTableTimeouts = new HashMap<>();
683
684  public static final String HBASE_CANARY_REGIONSERVER_ALL_REGIONS
685          = "hbase.canary.regionserver_all_regions";
686
687  public static final String HBASE_CANARY_REGION_WRITE_SNIFFING
688          = "hbase.canary.region.write.sniffing";
689  public static final String HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT
690          = "hbase.canary.region.write.table.timeout";
691  public static final String HBASE_CANARY_REGION_WRITE_TABLE_NAME
692          = "hbase.canary.region.write.table.name";
693  public static final String HBASE_CANARY_REGION_READ_TABLE_TIMEOUT
694          = "hbase.canary.region.read.table.timeout";
695
696  public static final String HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES
697          = "hbase.canary.zookeeper.permitted.failures";
698
699  public static final String HBASE_CANARY_USE_REGEX = "hbase.canary.use.regex";
700  public static final String HBASE_CANARY_TIMEOUT = "hbase.canary.timeout";
701  public static final String HBASE_CANARY_FAIL_ON_ERROR = "hbase.canary.fail.on.error";
702
703
704  private ExecutorService executor; // threads to retrieve data from regionservers
705
706  public CanaryTool() {
707    this(new ScheduledThreadPoolExecutor(1));
708  }
709
710  public CanaryTool(ExecutorService executor) {
711    this(executor, null);
712  }
713
714  @VisibleForTesting
715  CanaryTool(ExecutorService executor, Sink sink) {
716    this.executor = executor;
717    this.sink = sink;
718  }
719
720  CanaryTool(Configuration conf, ExecutorService executor) {
721    this(conf, executor, null);
722  }
723
724  CanaryTool(Configuration conf, ExecutorService executor, Sink sink) {
725    this(executor, sink);
726    setConf(conf);
727  }
728
729  @Override
730  public Configuration getConf() {
731    return conf;
732  }
733
734  @Override
735  public void setConf(Configuration conf) {
736    if (conf == null) {
737      conf = HBaseConfiguration.create();
738    }
739    this.conf = conf;
740  }
741
742  private int parseArgs(String[] args) {
743    int index = -1;
744    long permittedFailures = 0;
745    boolean regionServerAllRegions = false, writeSniffing = false;
746    String readTableTimeoutsStr = null;
747
748    // Process command line args
749    for (int i = 0; i < args.length; i++) {
750      String cmd = args[i];
751
752      if (cmd.startsWith("-")) {
753        if (index >= 0) {
754          // command line args must be in the form: [opts] [table 1 [table 2 ...]]
755          System.err.println("Invalid command line options");
756          printUsageAndExit();
757        }
758
759        if (cmd.equals("-help") || cmd.equals("-h")) {
760          // user asked for help, print the help and quit.
761          printUsageAndExit();
762        } else if (cmd.equals("-daemon") && interval == 0) {
763          // user asked for daemon mode, set a default interval between checks
764          interval = DEFAULT_INTERVAL;
765        } else if (cmd.equals("-interval")) {
766          // user has specified an interval for canary breaths (-interval N)
767          i++;
768
769          if (i == args.length) {
770            System.err.println("-interval takes a numeric seconds value argument.");
771            printUsageAndExit();
772          }
773
774          try {
775            interval = Long.parseLong(args[i]) * 1000;
776          } catch (NumberFormatException e) {
777            System.err.println("-interval needs a numeric value argument.");
778            printUsageAndExit();
779          }
780        } else if (cmd.equals("-zookeeper")) {
781          this.zookeeperMode = true;
782        } else if(cmd.equals("-regionserver")) {
783          this.regionServerMode = true;
784        } else if(cmd.equals("-allRegions")) {
785          conf.setBoolean(HBASE_CANARY_REGIONSERVER_ALL_REGIONS, true);
786          regionServerAllRegions = true;
787        } else if(cmd.equals("-writeSniffing")) {
788          writeSniffing = true;
789          conf.setBoolean(HBASE_CANARY_REGION_WRITE_SNIFFING, true);
790        } else if(cmd.equals("-treatFailureAsError") || cmd.equals("-failureAsError")) {
791          conf.setBoolean(HBASE_CANARY_FAIL_ON_ERROR, true);
792        } else if (cmd.equals("-e")) {
793          conf.setBoolean(HBASE_CANARY_USE_REGEX, true);
794        } else if (cmd.equals("-t")) {
795          i++;
796
797          if (i == args.length) {
798            System.err.println("-t takes a numeric milliseconds value argument.");
799            printUsageAndExit();
800          }
801          long timeout = 0;
802          try {
803            timeout = Long.parseLong(args[i]);
804          } catch (NumberFormatException e) {
805            System.err.println("-t takes a numeric milliseconds value argument.");
806            printUsageAndExit();
807          }
808          conf.setLong(HBASE_CANARY_TIMEOUT, timeout);
809        } else if(cmd.equals("-writeTableTimeout")) {
810          i++;
811
812          if (i == args.length) {
813            System.err.println("-writeTableTimeout takes a numeric milliseconds value argument.");
814            printUsageAndExit();
815          }
816          long configuredWriteTableTimeout = 0;
817          try {
818            configuredWriteTableTimeout = Long.parseLong(args[i]);
819          } catch (NumberFormatException e) {
820            System.err.println("-writeTableTimeout takes a numeric milliseconds value argument.");
821            printUsageAndExit();
822          }
823          conf.setLong(HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT, configuredWriteTableTimeout);
824        } else if (cmd.equals("-writeTable")) {
825          i++;
826
827          if (i == args.length) {
828            System.err.println("-writeTable takes a string tablename value argument.");
829            printUsageAndExit();
830          }
831          conf.set(HBASE_CANARY_REGION_WRITE_TABLE_NAME, args[i]);
832        } else if (cmd.equals("-f")) {
833          i++;
834
835          if (i == args.length) {
836            System.err
837                .println("-f needs a boolean value argument (true|false).");
838            printUsageAndExit();
839          }
840
841          conf.setBoolean(HBASE_CANARY_FAIL_ON_ERROR, Boolean.parseBoolean(args[i]));
842        } else if (cmd.equals("-readTableTimeouts")) {
843          i++;
844
845          if (i == args.length) {
846            System.err.println("-readTableTimeouts needs a comma-separated list of read " +
847                "millisecond timeouts per table (without spaces).");
848            printUsageAndExit();
849          }
850          readTableTimeoutsStr = args[i];
851          conf.set(HBASE_CANARY_REGION_READ_TABLE_TIMEOUT, readTableTimeoutsStr);
852        } else if (cmd.equals("-permittedZookeeperFailures")) {
853          i++;
854
855          if (i == args.length) {
856            System.err.println("-permittedZookeeperFailures needs a numeric value argument.");
857            printUsageAndExit();
858          }
859          try {
860            permittedFailures = Long.parseLong(args[i]);
861          } catch (NumberFormatException e) {
862            System.err.println("-permittedZookeeperFailures needs a numeric value argument.");
863            printUsageAndExit();
864          }
865          conf.setLong(HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES, permittedFailures);
866        } else {
867          // no options match
868          System.err.println(cmd + " options is invalid.");
869          printUsageAndExit();
870        }
871      } else if (index < 0) {
872        // keep track of first table name specified by the user
873        index = i;
874      }
875    }
876    if (regionServerAllRegions && !this.regionServerMode) {
877      System.err.println("-allRegions can only be specified in regionserver mode.");
878      printUsageAndExit();
879    }
880    if (this.zookeeperMode) {
881      if (this.regionServerMode || regionServerAllRegions || writeSniffing) {
882        System.err.println("-zookeeper is exclusive and cannot be combined with "
883            + "other modes.");
884        printUsageAndExit();
885      }
886    }
887    if (permittedFailures != 0 && !this.zookeeperMode) {
888      System.err.println("-permittedZookeeperFailures requires -zookeeper mode.");
889      printUsageAndExit();
890    }
891    if (readTableTimeoutsStr != null && (this.regionServerMode || this.zookeeperMode)) {
892      System.err.println("-readTableTimeouts can only be configured in region mode.");
893      printUsageAndExit();
894    }
895    return index;
896  }
897
898  @Override
899  public int run(String[] args) throws Exception {
900    int index = parseArgs(args);
901    String[] monitorTargets = null;
902
903    if (index >= 0) {
904      int length = args.length - index;
905      monitorTargets = new String[length];
906      System.arraycopy(args, index, monitorTargets, 0, length);
907    }
908
909    if (zookeeperMode) {
910      return checkZooKeeper();
911    } else if (regionServerMode) {
912      return checkRegionServers(monitorTargets);
913    } else {
914      return checkRegions(monitorTargets);
915    }
916  }
917
918  private int runMonitor(String[] monitorTargets) throws Exception {
919    ChoreService choreService = null;
920
921    // Launches chore for refreshing kerberos credentials if security is enabled.
922    // Please see http://hbase.apache.org/book.html#_running_canary_in_a_kerberos_enabled_cluster
923    // for more details.
924    final ScheduledChore authChore = AuthUtil.getAuthChore(conf);
925    if (authChore != null) {
926      choreService = new ChoreService("CANARY_TOOL");
927      choreService.scheduleChore(authChore);
928    }
929
930    // Start to prepare the stuffs
931    Monitor monitor = null;
932    Thread monitorThread;
933    long startTime = 0;
934    long currentTimeLength = 0;
935    boolean failOnError = conf.getBoolean(HBASE_CANARY_FAIL_ON_ERROR, true);
936    long timeout = conf.getLong(HBASE_CANARY_TIMEOUT, DEFAULT_TIMEOUT);
937    // Get a connection to use in below.
938    try (Connection connection = ConnectionFactory.createConnection(this.conf)) {
939      do {
940        // Do monitor !!
941        try {
942          monitor = this.newMonitor(connection, monitorTargets);
943          monitorThread = new Thread(monitor, "CanaryMonitor-" + System.currentTimeMillis());
944          startTime = System.currentTimeMillis();
945          monitorThread.start();
946          while (!monitor.isDone()) {
947            // wait for 1 sec
948            Thread.sleep(1000);
949            // exit if any error occurs
950            if (failOnError && monitor.hasError()) {
951              monitorThread.interrupt();
952              if (monitor.initialized) {
953                return monitor.errorCode;
954              } else {
955                return INIT_ERROR_EXIT_CODE;
956              }
957            }
958            currentTimeLength = System.currentTimeMillis() - startTime;
959            if (currentTimeLength > timeout) {
960              LOG.error("The monitor is running too long (" + currentTimeLength
961                  + ") after timeout limit:" + timeout
962                  + " will be killed itself !!");
963              if (monitor.initialized) {
964                return TIMEOUT_ERROR_EXIT_CODE;
965              } else {
966                return INIT_ERROR_EXIT_CODE;
967              }
968            }
969          }
970
971          if (failOnError && monitor.finalCheckForErrors()) {
972            monitorThread.interrupt();
973            return monitor.errorCode;
974          }
975        } finally {
976          if (monitor != null) monitor.close();
977        }
978
979        Thread.sleep(interval);
980      } while (interval > 0);
981    } // try-with-resources close
982
983    if (choreService != null) {
984      choreService.shutdown();
985    }
986    return monitor.errorCode;
987  }
988
989  @Override
990  public Map<String, String> getReadFailures()  {
991    return sink.getReadFailures();
992  }
993
994  @Override
995  public Map<String, String> getWriteFailures()  {
996    return sink.getWriteFailures();
997  }
998
999  private void printUsageAndExit() {
1000    System.err.println(
1001      "Usage: canary [OPTIONS] [<TABLE1> [<TABLE2]...] | [<REGIONSERVER1> [<REGIONSERVER2]..]");
1002    System.err.println("Where [OPTIONS] are:");
1003    System.err.println(" -h,-help        show this help and exit.");
1004    System.err.println(" -regionserver   set 'regionserver mode'; gets row from random region on " +
1005        "server");
1006    System.err.println(" -allRegions     get from ALL regions when 'regionserver mode', not just " +
1007        "random one.");
1008    System.err.println(" -zookeeper      set 'zookeeper mode'; grab zookeeper.znode.parent on " +
1009        "each ensemble member");
1010    System.err.println(" -daemon         continuous check at defined intervals.");
1011    System.err.println(" -interval <N>   interval between checks in seconds");
1012    System.err.println(" -e              consider table/regionserver argument as regular " +
1013        "expression");
1014    System.err.println(" -f <B>          exit on first error; default=true");
1015    System.err.println(" -failureAsError treat read/write failure as error");
1016    System.err.println(" -t <N>          timeout for canary-test run; default=600000ms");
1017    System.err.println(" -writeSniffing  enable write sniffing");
1018    System.err.println(" -writeTable     the table used for write sniffing; default=hbase:canary");
1019    System.err.println(" -writeTableTimeout <N>  timeout for writeTable; default=600000ms");
1020    System.err.println(" -readTableTimeouts <tableName>=<read timeout>," +
1021        "<tableName>=<read timeout>,...");
1022    System.err.println("                comma-separated list of table read timeouts " +
1023        "(no spaces);");
1024    System.err.println("                logs 'ERROR' if takes longer. default=600000ms");
1025    System.err.println(" -permittedZookeeperFailures <N>  Ignore first N failures attempting to ");
1026    System.err.println("                connect to individual zookeeper nodes in ensemble");
1027    System.err.println("");
1028    System.err.println(" -D<configProperty>=<value> to assign or override configuration params");
1029    System.err.println(" -Dhbase.canary.read.raw.enabled=<true/false> Set to enable/disable " +
1030        "raw scan; default=false");
1031    System.err.println("");
1032    System.err.println("Canary runs in one of three modes: region (default), regionserver, or " +
1033        "zookeeper.");
1034    System.err.println("To sniff/probe all regions, pass no arguments.");
1035    System.err.println("To sniff/probe all regions of a table, pass tablename.");
1036    System.err.println("To sniff/probe regionservers, pass -regionserver, etc.");
1037    System.err.println("See http://hbase.apache.org/book.html#_canary for Canary documentation.");
1038    System.exit(USAGE_EXIT_CODE);
1039  }
1040
1041  Sink getSink(Configuration configuration, Class clazz) {
1042    // In test context, this.sink might be set. Use it if non-null. For testing.
1043    return this.sink != null? this.sink:
1044        (Sink)ReflectionUtils.newInstance(configuration.getClass("hbase.canary.sink.class",
1045            clazz, Sink.class));
1046  }
1047
1048  /**
1049   * Canary region mode-specific data structure which stores information about each region
1050   * to be scanned
1051   */
1052  public static class RegionTaskResult {
1053    private RegionInfo region;
1054    private TableName tableName;
1055    private ServerName serverName;
1056    private ColumnFamilyDescriptor column;
1057    private AtomicLong readLatency = null;
1058    private AtomicLong writeLatency = null;
1059    private boolean readSuccess = false;
1060    private boolean writeSuccess = false;
1061
1062    public RegionTaskResult(RegionInfo region, TableName tableName, ServerName serverName,
1063        ColumnFamilyDescriptor column) {
1064      this.region = region;
1065      this.tableName = tableName;
1066      this.serverName = serverName;
1067      this.column = column;
1068    }
1069
1070    public RegionInfo getRegionInfo() {
1071      return this.region;
1072    }
1073
1074    public String getRegionNameAsString() {
1075      return this.region.getRegionNameAsString();
1076    }
1077
1078    public TableName getTableName() {
1079      return this.tableName;
1080    }
1081
1082    public String getTableNameAsString() {
1083      return this.tableName.getNameAsString();
1084    }
1085
1086    public ServerName getServerName() {
1087      return this.serverName;
1088    }
1089
1090    public String getServerNameAsString() {
1091      return this.serverName.getServerName();
1092    }
1093
1094    public ColumnFamilyDescriptor getColumnFamily() {
1095      return this.column;
1096    }
1097
1098    public String getColumnFamilyNameAsString() {
1099      return this.column.getNameAsString();
1100    }
1101
1102    public long getReadLatency() {
1103      if (this.readLatency == null) {
1104        return -1;
1105      }
1106      return this.readLatency.get();
1107    }
1108
1109    public void setReadLatency(long readLatency) {
1110      if (this.readLatency != null) {
1111        this.readLatency.set(readLatency);
1112      } else {
1113        this.readLatency = new AtomicLong(readLatency);
1114      }
1115    }
1116
1117    public long getWriteLatency() {
1118      if (this.writeLatency == null) {
1119        return -1;
1120      }
1121      return this.writeLatency.get();
1122    }
1123
1124    public void setWriteLatency(long writeLatency) {
1125      if (this.writeLatency != null) {
1126        this.writeLatency.set(writeLatency);
1127      } else {
1128        this.writeLatency = new AtomicLong(writeLatency);
1129      }
1130    }
1131
1132    public boolean isReadSuccess() {
1133      return this.readSuccess;
1134    }
1135
1136    public void setReadSuccess() {
1137      this.readSuccess = true;
1138    }
1139
1140    public boolean isWriteSuccess() {
1141      return this.writeSuccess;
1142    }
1143
1144    public void setWriteSuccess() {
1145      this.writeSuccess = true;
1146    }
1147  }
1148
1149  /**
1150   * A Factory method for {@link Monitor}.
1151   * Makes a RegionServerMonitor, or a ZooKeeperMonitor, or a RegionMonitor.
1152   * @return a Monitor instance
1153   */
1154  private Monitor newMonitor(final Connection connection, String[] monitorTargets) {
1155    Monitor monitor;
1156    boolean useRegExp = conf.getBoolean(HBASE_CANARY_USE_REGEX, false);
1157    boolean regionServerAllRegions
1158            = conf.getBoolean(HBASE_CANARY_REGIONSERVER_ALL_REGIONS, false);
1159    boolean failOnError
1160            = conf.getBoolean(HBASE_CANARY_FAIL_ON_ERROR, true);
1161    int permittedFailures
1162            = conf.getInt(HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES, 0);
1163    boolean writeSniffing
1164            = conf.getBoolean(HBASE_CANARY_REGION_WRITE_SNIFFING, false);
1165    String writeTableName = conf.get(HBASE_CANARY_REGION_WRITE_TABLE_NAME,
1166            DEFAULT_WRITE_TABLE_NAME.getNameAsString());
1167    long configuredWriteTableTimeout
1168            = conf.getLong(HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT, DEFAULT_TIMEOUT);
1169
1170    if (this.regionServerMode) {
1171      monitor =
1172          new RegionServerMonitor(connection, monitorTargets, useRegExp,
1173              getSink(connection.getConfiguration(), RegionServerStdOutSink.class),
1174              this.executor, regionServerAllRegions,
1175              failOnError, permittedFailures);
1176
1177    } else if (this.zookeeperMode) {
1178      monitor =
1179          new ZookeeperMonitor(connection, monitorTargets, useRegExp,
1180              getSink(connection.getConfiguration(), ZookeeperStdOutSink.class),
1181              this.executor, failOnError, permittedFailures);
1182    } else {
1183      monitor =
1184          new RegionMonitor(connection, monitorTargets, useRegExp,
1185              getSink(connection.getConfiguration(), RegionStdOutSink.class),
1186              this.executor, writeSniffing,
1187              TableName.valueOf(writeTableName), failOnError, configuredReadTableTimeouts,
1188              configuredWriteTableTimeout, permittedFailures);
1189    }
1190    return monitor;
1191  }
1192
1193  private void populateReadTableTimeoutsMap(String configuredReadTableTimeoutsStr) {
1194    String[] tableTimeouts = configuredReadTableTimeoutsStr.split(",");
1195    for (String tT : tableTimeouts) {
1196      String[] nameTimeout = tT.split("=");
1197      if (nameTimeout.length < 2) {
1198        throw new IllegalArgumentException("Each -readTableTimeouts argument must be of the form " +
1199            "<tableName>=<read timeout> (without spaces).");
1200      }
1201      long timeoutVal;
1202      try {
1203        timeoutVal = Long.parseLong(nameTimeout[1]);
1204      } catch (NumberFormatException e) {
1205        throw new IllegalArgumentException("-readTableTimeouts read timeout for each table" +
1206            " must be a numeric value argument.");
1207      }
1208      configuredReadTableTimeouts.put(nameTimeout[0], timeoutVal);
1209    }
1210  }
1211  /**
1212   * A Monitor super-class can be extended by users
1213   */
1214  public static abstract class Monitor implements Runnable, Closeable {
1215    protected Connection connection;
1216    protected Admin admin;
1217    /**
1218     * 'Target' dependent on 'mode'. Could be Tables or RegionServers or ZNodes.
1219     * Passed on the command-line as arguments.
1220     */
1221    protected String[] targets;
1222    protected boolean useRegExp;
1223    protected boolean treatFailureAsError;
1224    protected boolean initialized = false;
1225
1226    protected boolean done = false;
1227    protected int errorCode = 0;
1228    protected long allowedFailures = 0;
1229    protected Sink sink;
1230    protected ExecutorService executor;
1231
1232    public boolean isDone() {
1233      return done;
1234    }
1235
1236    public boolean hasError() {
1237      return errorCode != 0;
1238    }
1239
1240    public boolean finalCheckForErrors() {
1241      if (errorCode != 0) {
1242        return true;
1243      }
1244      if (treatFailureAsError &&
1245          (sink.getReadFailureCount() > allowedFailures || sink.getWriteFailureCount() > allowedFailures)) {
1246        LOG.error("Too many failures detected, treating failure as error, failing the Canary.");
1247        errorCode = FAILURE_EXIT_CODE;
1248        return true;
1249      }
1250      return false;
1251    }
1252
1253    @Override
1254    public void close() throws IOException {
1255      if (this.admin != null) this.admin.close();
1256    }
1257
1258    protected Monitor(Connection connection, String[] monitorTargets, boolean useRegExp, Sink sink,
1259        ExecutorService executor, boolean treatFailureAsError, long allowedFailures) {
1260      if (null == connection) throw new IllegalArgumentException("connection shall not be null");
1261
1262      this.connection = connection;
1263      this.targets = monitorTargets;
1264      this.useRegExp = useRegExp;
1265      this.treatFailureAsError = treatFailureAsError;
1266      this.sink = sink;
1267      this.executor = executor;
1268      this.allowedFailures = allowedFailures;
1269    }
1270
1271    @Override
1272    public abstract void run();
1273
1274    protected boolean initAdmin() {
1275      if (null == this.admin) {
1276        try {
1277          this.admin = this.connection.getAdmin();
1278        } catch (Exception e) {
1279          LOG.error("Initial HBaseAdmin failed...", e);
1280          this.errorCode = INIT_ERROR_EXIT_CODE;
1281        }
1282      } else if (admin.isAborted()) {
1283        LOG.error("HBaseAdmin aborted");
1284        this.errorCode = INIT_ERROR_EXIT_CODE;
1285      }
1286      return !this.hasError();
1287    }
1288  }
1289
1290  /**
1291   * A monitor for region mode.
1292   */
1293  private static class RegionMonitor extends Monitor {
1294    // 10 minutes
1295    private static final int DEFAULT_WRITE_TABLE_CHECK_PERIOD = 10 * 60 * 1000;
1296    // 1 days
1297    private static final int DEFAULT_WRITE_DATA_TTL = 24 * 60 * 60;
1298
1299    private long lastCheckTime = -1;
1300    private boolean writeSniffing;
1301    private TableName writeTableName;
1302    private int writeDataTTL;
1303    private float regionsLowerLimit;
1304    private float regionsUpperLimit;
1305    private int checkPeriod;
1306    private boolean rawScanEnabled;
1307
1308    /**
1309     * This is a timeout per table. If read of each region in the table aggregated takes longer
1310     * than what is configured here, we log an ERROR rather than just an INFO.
1311     */
1312    private HashMap<String, Long> configuredReadTableTimeouts;
1313
1314    private long configuredWriteTableTimeout;
1315
1316    public RegionMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
1317        Sink sink, ExecutorService executor, boolean writeSniffing, TableName writeTableName,
1318        boolean treatFailureAsError, HashMap<String, Long> configuredReadTableTimeouts,
1319        long configuredWriteTableTimeout,
1320        long allowedFailures) {
1321      super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError,
1322          allowedFailures);
1323      Configuration conf = connection.getConfiguration();
1324      this.writeSniffing = writeSniffing;
1325      this.writeTableName = writeTableName;
1326      this.writeDataTTL =
1327          conf.getInt(HConstants.HBASE_CANARY_WRITE_DATA_TTL_KEY, DEFAULT_WRITE_DATA_TTL);
1328      this.regionsLowerLimit =
1329          conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY, 1.0f);
1330      this.regionsUpperLimit =
1331          conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_UPPERLIMIT_KEY, 1.5f);
1332      this.checkPeriod =
1333          conf.getInt(HConstants.HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY,
1334            DEFAULT_WRITE_TABLE_CHECK_PERIOD);
1335      this.rawScanEnabled = conf.getBoolean(HConstants.HBASE_CANARY_READ_RAW_SCAN_KEY, false);
1336      this.configuredReadTableTimeouts = new HashMap<>(configuredReadTableTimeouts);
1337      this.configuredWriteTableTimeout = configuredWriteTableTimeout;
1338    }
1339
1340    private RegionStdOutSink getSink() {
1341      if (!(sink instanceof RegionStdOutSink)) {
1342        throw new RuntimeException("Can only write to Region sink");
1343      }
1344      return ((RegionStdOutSink) sink);
1345    }
1346
1347    @Override
1348    public void run() {
1349      if (this.initAdmin()) {
1350        try {
1351          List<Future<Void>> taskFutures = new LinkedList<>();
1352          RegionStdOutSink regionSink = this.getSink();
1353          if (this.targets != null && this.targets.length > 0) {
1354            String[] tables = generateMonitorTables(this.targets);
1355            // Check to see that each table name passed in the -readTableTimeouts argument is also
1356            // passed as a monitor target.
1357            if (!new HashSet<>(Arrays.asList(tables)).
1358                containsAll(this.configuredReadTableTimeouts.keySet())) {
1359              LOG.error("-readTableTimeouts can only specify read timeouts for monitor targets " +
1360                  "passed via command line.");
1361              this.errorCode = USAGE_EXIT_CODE;
1362              return;
1363            }
1364            this.initialized = true;
1365            for (String table : tables) {
1366              LongAdder readLatency = regionSink.initializeAndGetReadLatencyForTable(table);
1367              taskFutures.addAll(CanaryTool.sniff(admin, regionSink, table, executor, TaskType.READ,
1368                this.rawScanEnabled, readLatency));
1369            }
1370          } else {
1371            taskFutures.addAll(sniff(TaskType.READ, regionSink));
1372          }
1373
1374          if (writeSniffing) {
1375            if (EnvironmentEdgeManager.currentTime() - lastCheckTime > checkPeriod) {
1376              try {
1377                checkWriteTableDistribution();
1378              } catch (IOException e) {
1379                LOG.error("Check canary table distribution failed!", e);
1380              }
1381              lastCheckTime = EnvironmentEdgeManager.currentTime();
1382            }
1383            // sniff canary table with write operation
1384            regionSink.initializeWriteLatency();
1385            LongAdder writeTableLatency = regionSink.getWriteLatency();
1386            taskFutures.addAll(CanaryTool.sniff(admin, regionSink, admin.getDescriptor(writeTableName),
1387              executor, TaskType.WRITE, this.rawScanEnabled, writeTableLatency));
1388          }
1389
1390          for (Future<Void> future : taskFutures) {
1391            try {
1392              future.get();
1393            } catch (ExecutionException e) {
1394              LOG.error("Sniff region failed!", e);
1395            }
1396          }
1397          Map<String, LongAdder> actualReadTableLatency = regionSink.getReadLatencyMap();
1398          for (Map.Entry<String, Long> entry : configuredReadTableTimeouts.entrySet()) {
1399            String tableName = entry.getKey();
1400            if (actualReadTableLatency.containsKey(tableName)) {
1401              Long actual = actualReadTableLatency.get(tableName).longValue();
1402              Long configured = entry.getValue();
1403              if (actual > configured) {
1404                LOG.error("Read operation for {} took {}ms exceeded the configured read timeout." +
1405                    "(Configured read timeout {}ms.", tableName, actual, configured);
1406              } else {
1407                LOG.info("Read operation for {} took {}ms (Configured read timeout {}ms.",
1408                    tableName, actual, configured);
1409              }
1410            } else {
1411              LOG.error("Read operation for {} failed!", tableName);
1412            }
1413          }
1414          if (this.writeSniffing) {
1415            String writeTableStringName = this.writeTableName.getNameAsString();
1416            long actualWriteLatency = regionSink.getWriteLatency().longValue();
1417            LOG.info("Write operation for {} took {}ms. Configured write timeout {}ms.",
1418                writeTableStringName, actualWriteLatency, this.configuredWriteTableTimeout);
1419            // Check that the writeTable write operation latency does not exceed the configured timeout.
1420            if (actualWriteLatency > this.configuredWriteTableTimeout) {
1421              LOG.error("Write operation for {} exceeded the configured write timeout.",
1422                  writeTableStringName);
1423            }
1424          }
1425        } catch (Exception e) {
1426          LOG.error("Run regionMonitor failed", e);
1427          this.errorCode = ERROR_EXIT_CODE;
1428        } finally {
1429          this.done = true;
1430        }
1431      }
1432      this.done = true;
1433    }
1434
1435    /**
1436     * @return List of tables to use in test.
1437     */
1438    private String[] generateMonitorTables(String[] monitorTargets) throws IOException {
1439      String[] returnTables = null;
1440
1441      if (this.useRegExp) {
1442        Pattern pattern = null;
1443        List<TableDescriptor> tds = null;
1444        Set<String> tmpTables = new TreeSet<>();
1445        try {
1446          LOG.debug(String.format("reading list of tables"));
1447          tds = this.admin.listTableDescriptors(pattern);
1448          if (tds == null) {
1449            tds = Collections.emptyList();
1450          }
1451          for (String monitorTarget : monitorTargets) {
1452            pattern = Pattern.compile(monitorTarget);
1453            for (TableDescriptor td : tds) {
1454              if (pattern.matcher(td.getTableName().getNameAsString()).matches()) {
1455                tmpTables.add(td.getTableName().getNameAsString());
1456              }
1457            }
1458          }
1459        } catch (IOException e) {
1460          LOG.error("Communicate with admin failed", e);
1461          throw e;
1462        }
1463
1464        if (tmpTables.size() > 0) {
1465          returnTables = tmpTables.toArray(new String[tmpTables.size()]);
1466        } else {
1467          String msg = "No HTable found, tablePattern:" + Arrays.toString(monitorTargets);
1468          LOG.error(msg);
1469          this.errorCode = INIT_ERROR_EXIT_CODE;
1470          throw new TableNotFoundException(msg);
1471        }
1472      } else {
1473        returnTables = monitorTargets;
1474      }
1475
1476      return returnTables;
1477    }
1478
1479    /*
1480     * Canary entry point to monitor all the tables.
1481     */
1482    private List<Future<Void>> sniff(TaskType taskType, RegionStdOutSink regionSink)
1483        throws Exception {
1484      LOG.debug("Reading list of tables");
1485      List<Future<Void>> taskFutures = new LinkedList<>();
1486      for (TableDescriptor td: admin.listTableDescriptors()) {
1487        if (admin.tableExists(td.getTableName()) && admin.isTableEnabled(td.getTableName()) &&
1488            (!td.getTableName().equals(writeTableName))) {
1489          LongAdder readLatency =
1490              regionSink.initializeAndGetReadLatencyForTable(td.getTableName().getNameAsString());
1491          taskFutures.addAll(CanaryTool.sniff(admin, sink, td, executor, taskType, this.rawScanEnabled,
1492              readLatency));
1493        }
1494      }
1495      return taskFutures;
1496    }
1497
1498    private void checkWriteTableDistribution() throws IOException {
1499      if (!admin.tableExists(writeTableName)) {
1500        int numberOfServers =
1501            admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics().size();
1502        if (numberOfServers == 0) {
1503          throw new IllegalStateException("No live regionservers");
1504        }
1505        createWriteTable(numberOfServers);
1506      }
1507
1508      if (!admin.isTableEnabled(writeTableName)) {
1509        admin.enableTable(writeTableName);
1510      }
1511
1512      ClusterMetrics status =
1513          admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS, Option.MASTER));
1514      int numberOfServers = status.getLiveServerMetrics().size();
1515      if (status.getLiveServerMetrics().containsKey(status.getMasterName())) {
1516        numberOfServers -= 1;
1517      }
1518
1519      List<Pair<RegionInfo, ServerName>> pairs =
1520          MetaTableAccessor.getTableRegionsAndLocations(connection, writeTableName);
1521      int numberOfRegions = pairs.size();
1522      if (numberOfRegions < numberOfServers * regionsLowerLimit
1523          || numberOfRegions > numberOfServers * regionsUpperLimit) {
1524        admin.disableTable(writeTableName);
1525        admin.deleteTable(writeTableName);
1526        createWriteTable(numberOfServers);
1527      }
1528      HashSet<ServerName> serverSet = new HashSet<>();
1529      for (Pair<RegionInfo, ServerName> pair : pairs) {
1530        serverSet.add(pair.getSecond());
1531      }
1532      int numberOfCoveredServers = serverSet.size();
1533      if (numberOfCoveredServers < numberOfServers) {
1534        admin.balance();
1535      }
1536    }
1537
1538    private void createWriteTable(int numberOfServers) throws IOException {
1539      int numberOfRegions = (int)(numberOfServers * regionsLowerLimit);
1540      LOG.info("Number of live regionservers {}, pre-splitting the canary table into {} regions " +
1541        "(current lower limit of regions per server is {} and you can change it with config {}).",
1542          numberOfServers, numberOfRegions, regionsLowerLimit,
1543          HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY);
1544      HTableDescriptor desc = new HTableDescriptor(writeTableName);
1545      HColumnDescriptor family = new HColumnDescriptor(CANARY_TABLE_FAMILY_NAME);
1546      family.setMaxVersions(1);
1547      family.setTimeToLive(writeDataTTL);
1548
1549      desc.addFamily(family);
1550      byte[][] splits = new RegionSplitter.HexStringSplit().split(numberOfRegions);
1551      admin.createTable(desc, splits);
1552    }
1553  }
1554
1555  /**
1556   * Canary entry point for specified table.
1557   * @throws Exception
1558   */
1559  private static List<Future<Void>> sniff(final Admin admin, final Sink sink, String tableName,
1560      ExecutorService executor, TaskType taskType, boolean rawScanEnabled, LongAdder readLatency)
1561      throws Exception {
1562    LOG.debug("Checking table is enabled and getting table descriptor for table {}", tableName);
1563    if (admin.isTableEnabled(TableName.valueOf(tableName))) {
1564      return CanaryTool.sniff(admin, sink, admin.getDescriptor(TableName.valueOf(tableName)),
1565        executor, taskType, rawScanEnabled, readLatency);
1566    } else {
1567      LOG.warn("Table {} is not enabled", tableName);
1568    }
1569    return new LinkedList<>();
1570  }
1571
1572  /*
1573   * Loops over regions of this table, and outputs information about the state.
1574   */
1575  private static List<Future<Void>> sniff(final Admin admin, final Sink sink,
1576      TableDescriptor tableDesc, ExecutorService executor, TaskType taskType,
1577      boolean rawScanEnabled, LongAdder rwLatency) throws Exception {
1578    LOG.debug("Reading list of regions for table {}", tableDesc.getTableName());
1579    try (Table table = admin.getConnection().getTable(tableDesc.getTableName())) {
1580      List<RegionTask> tasks = new ArrayList<>();
1581      try (RegionLocator regionLocator =
1582               admin.getConnection().getRegionLocator(tableDesc.getTableName())) {
1583        for (HRegionLocation location: regionLocator.getAllRegionLocations()) {
1584          if (location == null) {
1585            LOG.warn("Null location");
1586            continue;
1587          }
1588          ServerName rs = location.getServerName();
1589          RegionInfo region = location.getRegion();
1590          tasks.add(new RegionTask(admin.getConnection(), region, rs, (RegionStdOutSink)sink,
1591              taskType, rawScanEnabled, rwLatency));
1592          Map<String, List<RegionTaskResult>> regionMap = ((RegionStdOutSink) sink).getRegionMap();
1593          regionMap.put(region.getRegionNameAsString(), new ArrayList<RegionTaskResult>());
1594        }
1595        return executor.invokeAll(tasks);
1596      }
1597    } catch (TableNotFoundException e) {
1598      return Collections.EMPTY_LIST;
1599    }
1600  }
1601
1602  //  monitor for zookeeper mode
1603  private static class ZookeeperMonitor extends Monitor {
1604    private List<String> hosts;
1605    private final String znode;
1606    private final int timeout;
1607
1608    protected ZookeeperMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
1609        Sink sink, ExecutorService executor, boolean treatFailureAsError, long allowedFailures)  {
1610      super(connection, monitorTargets, useRegExp,
1611          sink, executor, treatFailureAsError, allowedFailures);
1612      Configuration configuration = connection.getConfiguration();
1613      znode =
1614          configuration.get(ZOOKEEPER_ZNODE_PARENT,
1615              DEFAULT_ZOOKEEPER_ZNODE_PARENT);
1616      timeout = configuration
1617          .getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
1618      ConnectStringParser parser =
1619          new ConnectStringParser(ZKConfig.getZKQuorumServersString(configuration));
1620      hosts = Lists.newArrayList();
1621      for (InetSocketAddress server : parser.getServerAddresses()) {
1622        hosts.add(server.toString());
1623      }
1624      if (allowedFailures > (hosts.size() - 1) / 2) {
1625        LOG.warn("Confirm allowable number of failed ZooKeeper nodes, as quorum will " +
1626                        "already be lost. Setting of {} failures is unexpected for {} ensemble size.",
1627                allowedFailures, hosts.size());
1628      }
1629    }
1630
1631    @Override public void run() {
1632      List<ZookeeperTask> tasks = Lists.newArrayList();
1633      ZookeeperStdOutSink zkSink = null;
1634      try {
1635        zkSink = this.getSink();
1636      } catch (RuntimeException e) {
1637        LOG.error("Run ZooKeeperMonitor failed!", e);
1638        this.errorCode = ERROR_EXIT_CODE;
1639      }
1640      this.initialized = true;
1641      for (final String host : hosts) {
1642        tasks.add(new ZookeeperTask(connection, host, znode, timeout, zkSink));
1643      }
1644      try {
1645        for (Future<Void> future : this.executor.invokeAll(tasks)) {
1646          try {
1647            future.get();
1648          } catch (ExecutionException e) {
1649            LOG.error("Sniff zookeeper failed!", e);
1650            this.errorCode = ERROR_EXIT_CODE;
1651          }
1652        }
1653      } catch (InterruptedException e) {
1654        this.errorCode = ERROR_EXIT_CODE;
1655        Thread.currentThread().interrupt();
1656        LOG.error("Sniff zookeeper interrupted!", e);
1657      }
1658      this.done = true;
1659    }
1660
1661    private ZookeeperStdOutSink getSink() {
1662      if (!(sink instanceof ZookeeperStdOutSink)) {
1663        throw new RuntimeException("Can only write to zookeeper sink");
1664      }
1665      return ((ZookeeperStdOutSink) sink);
1666    }
1667  }
1668
1669
1670  /**
1671   * A monitor for regionserver mode
1672   */
1673  private static class RegionServerMonitor extends Monitor {
1674    private boolean allRegions;
1675
1676    public RegionServerMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
1677        Sink sink, ExecutorService executor, boolean allRegions,
1678        boolean treatFailureAsError, long allowedFailures) {
1679      super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError,
1680          allowedFailures);
1681      this.allRegions = allRegions;
1682    }
1683
1684    private RegionServerStdOutSink getSink() {
1685      if (!(sink instanceof RegionServerStdOutSink)) {
1686        throw new RuntimeException("Can only write to regionserver sink");
1687      }
1688      return ((RegionServerStdOutSink) sink);
1689    }
1690
1691    @Override
1692    public void run() {
1693      if (this.initAdmin() && this.checkNoTableNames()) {
1694        RegionServerStdOutSink regionServerSink = null;
1695        try {
1696          regionServerSink = this.getSink();
1697        } catch (RuntimeException e) {
1698          LOG.error("Run RegionServerMonitor failed!", e);
1699          this.errorCode = ERROR_EXIT_CODE;
1700        }
1701        Map<String, List<RegionInfo>> rsAndRMap = this.filterRegionServerByName();
1702        this.initialized = true;
1703        this.monitorRegionServers(rsAndRMap, regionServerSink);
1704      }
1705      this.done = true;
1706    }
1707
1708    private boolean checkNoTableNames() {
1709      List<String> foundTableNames = new ArrayList<>();
1710      TableName[] tableNames = null;
1711      LOG.debug("Reading list of tables");
1712      try {
1713        tableNames = this.admin.listTableNames();
1714      } catch (IOException e) {
1715        LOG.error("Get listTableNames failed", e);
1716        this.errorCode = INIT_ERROR_EXIT_CODE;
1717        return false;
1718      }
1719
1720      if (this.targets == null || this.targets.length == 0) return true;
1721
1722      for (String target : this.targets) {
1723        for (TableName tableName : tableNames) {
1724          if (target.equals(tableName.getNameAsString())) {
1725            foundTableNames.add(target);
1726          }
1727        }
1728      }
1729
1730      if (foundTableNames.size() > 0) {
1731        System.err.println("Cannot pass a tablename when using the -regionserver " +
1732            "option, tablenames:" + foundTableNames.toString());
1733        this.errorCode = USAGE_EXIT_CODE;
1734      }
1735      return foundTableNames.isEmpty();
1736    }
1737
1738    private void monitorRegionServers(Map<String, List<RegionInfo>> rsAndRMap, RegionServerStdOutSink regionServerSink) {
1739      List<RegionServerTask> tasks = new ArrayList<>();
1740      Map<String, AtomicLong> successMap = new HashMap<>();
1741      Random rand = new Random();
1742      for (Map.Entry<String, List<RegionInfo>> entry : rsAndRMap.entrySet()) {
1743        String serverName = entry.getKey();
1744        AtomicLong successes = new AtomicLong(0);
1745        successMap.put(serverName, successes);
1746        if (entry.getValue().isEmpty()) {
1747          LOG.error("Regionserver not serving any regions - {}", serverName);
1748        } else if (this.allRegions) {
1749          for (RegionInfo region : entry.getValue()) {
1750            tasks.add(new RegionServerTask(this.connection,
1751                serverName,
1752                region,
1753                regionServerSink,
1754                successes));
1755          }
1756        } else {
1757          // random select a region if flag not set
1758          RegionInfo region = entry.getValue().get(rand.nextInt(entry.getValue().size()));
1759          tasks.add(new RegionServerTask(this.connection,
1760              serverName,
1761              region,
1762              regionServerSink,
1763              successes));
1764        }
1765      }
1766      try {
1767        for (Future<Void> future : this.executor.invokeAll(tasks)) {
1768          try {
1769            future.get();
1770          } catch (ExecutionException e) {
1771            LOG.error("Sniff regionserver failed!", e);
1772            this.errorCode = ERROR_EXIT_CODE;
1773          }
1774        }
1775        if (this.allRegions) {
1776          for (Map.Entry<String, List<RegionInfo>> entry : rsAndRMap.entrySet()) {
1777            String serverName = entry.getKey();
1778            LOG.info("Successfully read {} regions out of {} on regionserver {}",
1779                successMap.get(serverName), entry.getValue().size(), serverName);
1780          }
1781        }
1782      } catch (InterruptedException e) {
1783        this.errorCode = ERROR_EXIT_CODE;
1784        LOG.error("Sniff regionserver interrupted!", e);
1785      }
1786    }
1787
1788    private Map<String, List<RegionInfo>> filterRegionServerByName() {
1789      Map<String, List<RegionInfo>> regionServerAndRegionsMap = this.getAllRegionServerByName();
1790      regionServerAndRegionsMap = this.doFilterRegionServerByName(regionServerAndRegionsMap);
1791      return regionServerAndRegionsMap;
1792    }
1793
1794    private Map<String, List<RegionInfo>> getAllRegionServerByName() {
1795      Map<String, List<RegionInfo>> rsAndRMap = new HashMap<>();
1796      try {
1797        LOG.debug("Reading list of tables and locations");
1798        List<TableDescriptor> tableDescs = this.admin.listTableDescriptors();
1799        List<RegionInfo> regions = null;
1800        for (TableDescriptor tableDesc: tableDescs) {
1801          try (RegionLocator regionLocator =
1802                   this.admin.getConnection().getRegionLocator(tableDesc.getTableName())) {
1803            for (HRegionLocation location : regionLocator.getAllRegionLocations()) {
1804              if (location == null) {
1805                LOG.warn("Null location");
1806                continue;
1807              }
1808              ServerName rs = location.getServerName();
1809              String rsName = rs.getHostname();
1810              RegionInfo r = location.getRegion();
1811              if (rsAndRMap.containsKey(rsName)) {
1812                regions = rsAndRMap.get(rsName);
1813              } else {
1814                regions = new ArrayList<>();
1815                rsAndRMap.put(rsName, regions);
1816              }
1817              regions.add(r);
1818            }
1819          }
1820        }
1821
1822        // get any live regionservers not serving any regions
1823        for (ServerName rs: this.admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))
1824          .getLiveServerMetrics().keySet()) {
1825          String rsName = rs.getHostname();
1826          if (!rsAndRMap.containsKey(rsName)) {
1827            rsAndRMap.put(rsName, Collections.<RegionInfo> emptyList());
1828          }
1829        }
1830      } catch (IOException e) {
1831        LOG.error("Get HTables info failed", e);
1832        this.errorCode = INIT_ERROR_EXIT_CODE;
1833      }
1834      return rsAndRMap;
1835    }
1836
1837    private Map<String, List<RegionInfo>> doFilterRegionServerByName(
1838        Map<String, List<RegionInfo>> fullRsAndRMap) {
1839
1840      Map<String, List<RegionInfo>> filteredRsAndRMap = null;
1841
1842      if (this.targets != null && this.targets.length > 0) {
1843        filteredRsAndRMap = new HashMap<>();
1844        Pattern pattern = null;
1845        Matcher matcher = null;
1846        boolean regExpFound = false;
1847        for (String rsName : this.targets) {
1848          if (this.useRegExp) {
1849            regExpFound = false;
1850            pattern = Pattern.compile(rsName);
1851            for (Map.Entry<String, List<RegionInfo>> entry : fullRsAndRMap.entrySet()) {
1852              matcher = pattern.matcher(entry.getKey());
1853              if (matcher.matches()) {
1854                filteredRsAndRMap.put(entry.getKey(), entry.getValue());
1855                regExpFound = true;
1856              }
1857            }
1858            if (!regExpFound) {
1859              LOG.info("No RegionServerInfo found, regionServerPattern {}", rsName);
1860            }
1861          } else {
1862            if (fullRsAndRMap.containsKey(rsName)) {
1863              filteredRsAndRMap.put(rsName, fullRsAndRMap.get(rsName));
1864            } else {
1865              LOG.info("No RegionServerInfo found, regionServerName {}", rsName);
1866            }
1867          }
1868        }
1869      } else {
1870        filteredRsAndRMap = fullRsAndRMap;
1871      }
1872      return filteredRsAndRMap;
1873    }
1874  }
1875
1876  public static void main(String[] args) throws Exception {
1877    final Configuration conf = HBaseConfiguration.create();
1878
1879    int numThreads = conf.getInt("hbase.canary.threads.num", MAX_THREADS_NUM);
1880    LOG.info("Execution thread count={}", numThreads);
1881
1882    int exitCode;
1883    ExecutorService executor = new ScheduledThreadPoolExecutor(numThreads);
1884    try {
1885      exitCode = ToolRunner.run(conf, new CanaryTool(executor), args);
1886    } finally {
1887      executor.shutdown();
1888    }
1889    System.exit(exitCode);
1890  }
1891}