001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.tool;
021
022import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT;
023import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_ZNODE_PARENT;
024
025import java.io.Closeable;
026import java.io.IOException;
027import java.net.InetSocketAddress;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Collections;
031import java.util.EnumSet;
032import java.util.HashMap;
033import java.util.HashSet;
034import java.util.LinkedList;
035import java.util.List;
036import java.util.Map;
037import java.util.Random;
038import java.util.Set;
039import java.util.TreeSet;
040import java.util.concurrent.Callable;
041import java.util.concurrent.ConcurrentHashMap;
042import java.util.concurrent.ExecutionException;
043import java.util.concurrent.ExecutorService;
044import java.util.concurrent.Future;
045import java.util.concurrent.ScheduledThreadPoolExecutor;
046import java.util.concurrent.atomic.AtomicLong;
047import java.util.concurrent.atomic.LongAdder;
048import java.util.regex.Matcher;
049import java.util.regex.Pattern;
050
051import org.apache.commons.lang3.time.StopWatch;
052import org.apache.hadoop.conf.Configuration;
053import org.apache.hadoop.hbase.AuthUtil;
054import org.apache.hadoop.hbase.ChoreService;
055import org.apache.hadoop.hbase.ClusterMetrics;
056import org.apache.hadoop.hbase.ClusterMetrics.Option;
057import org.apache.hadoop.hbase.DoNotRetryIOException;
058import org.apache.hadoop.hbase.HBaseConfiguration;
059import org.apache.hadoop.hbase.HBaseInterfaceAudience;
060import org.apache.hadoop.hbase.HConstants;
061import org.apache.hadoop.hbase.HRegionLocation;
062import org.apache.hadoop.hbase.MetaTableAccessor;
063import org.apache.hadoop.hbase.NamespaceDescriptor;
064import org.apache.hadoop.hbase.ScheduledChore;
065import org.apache.hadoop.hbase.ServerName;
066import org.apache.hadoop.hbase.TableName;
067import org.apache.hadoop.hbase.TableNotEnabledException;
068import org.apache.hadoop.hbase.TableNotFoundException;
069import org.apache.hadoop.hbase.client.Admin;
070import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
071import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
072import org.apache.hadoop.hbase.client.Connection;
073import org.apache.hadoop.hbase.client.ConnectionFactory;
074import org.apache.hadoop.hbase.client.Get;
075import org.apache.hadoop.hbase.client.Put;
076import org.apache.hadoop.hbase.client.RegionInfo;
077import org.apache.hadoop.hbase.client.RegionLocator;
078import org.apache.hadoop.hbase.client.ResultScanner;
079import org.apache.hadoop.hbase.client.Scan;
080import org.apache.hadoop.hbase.client.Table;
081import org.apache.hadoop.hbase.client.TableDescriptor;
082import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
083import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
084import org.apache.hadoop.hbase.tool.CanaryTool.RegionTask.TaskType;
085import org.apache.hadoop.hbase.util.Bytes;
086import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
087import org.apache.hadoop.hbase.util.Pair;
088import org.apache.hadoop.hbase.util.ReflectionUtils;
089import org.apache.hadoop.hbase.util.RegionSplitter;
090import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
091import org.apache.hadoop.hbase.zookeeper.ZKConfig;
092import org.apache.hadoop.util.Tool;
093import org.apache.hadoop.util.ToolRunner;
094import org.apache.yetus.audience.InterfaceAudience;
095import org.apache.zookeeper.KeeperException;
096import org.apache.zookeeper.ZooKeeper;
097import org.apache.zookeeper.client.ConnectStringParser;
098import org.apache.zookeeper.data.Stat;
099import org.slf4j.Logger;
100import org.slf4j.LoggerFactory;
101
102import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
103import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
104
105/**
106 * HBase Canary Tool for "canary monitoring" of a running HBase cluster.
107 *
108 * There are three modes:
109 * <ol>
110 * <li>region mode (Default): For each region, try to get one row per column family outputting
111 * information on failure (ERROR) or else the latency.
112 * </li>
113 *
114 * <li>regionserver mode: For each regionserver try to get one row from one table selected
115 * randomly outputting information on failure (ERROR) or else the latency.
116 * </li>
117 *
118 * <li>zookeeper mode: for each zookeeper instance, selects a znode outputting information on
119 * failure (ERROR) or else the latency.
120 * </li>
121 * </ol>
122 */
123@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
124public class CanaryTool implements Tool, Canary {
125
126  @Override
127  public int checkRegions(String[] targets) throws Exception {
128    String configuredReadTableTimeoutsStr = conf.get(HBASE_CANARY_REGION_READ_TABLE_TIMEOUT);
129    try {
130      if (configuredReadTableTimeoutsStr != null) {
131        populateReadTableTimeoutsMap(configuredReadTableTimeoutsStr);
132      }
133    } catch (IllegalArgumentException e) {
134      LOG.error("Constructing read table timeouts map failed ", e);
135      return USAGE_EXIT_CODE;
136    }
137    return runMonitor(targets);
138  }
139
140  @Override
141  public int checkRegionServers(String[] targets) throws Exception {
142    regionServerMode = true;
143    return runMonitor(targets);
144  }
145
146  @Override
147  public int checkZooKeeper() throws Exception {
148    zookeeperMode = true;
149    return runMonitor(null);
150  }
151
152  /**
153   * Sink interface used by the canary to output information
154   */
155  public interface Sink {
156    long getReadFailureCount();
157    long incReadFailureCount();
158    Map<String,String> getReadFailures();
159    void updateReadFailures(String regionName, String serverName);
160    long getWriteFailureCount();
161    long incWriteFailureCount();
162    Map<String,String> getWriteFailures();
163    void updateWriteFailures(String regionName, String serverName);
164    long getReadSuccessCount();
165    long incReadSuccessCount();
166    long getWriteSuccessCount();
167    long incWriteSuccessCount();
168  }
169
170  /**
171   * Simple implementation of canary sink that allows plotting to a file or standard output.
172   */
173  public static class StdOutSink implements Sink {
174    private AtomicLong readFailureCount = new AtomicLong(0),
175        writeFailureCount = new AtomicLong(0),
176        readSuccessCount = new AtomicLong(0),
177        writeSuccessCount = new AtomicLong(0);
178    private Map<String, String> readFailures = new ConcurrentHashMap<>();
179    private Map<String, String> writeFailures = new ConcurrentHashMap<>();
180
181    @Override
182    public long getReadFailureCount() {
183      return readFailureCount.get();
184    }
185
186    @Override
187    public long incReadFailureCount() {
188      return readFailureCount.incrementAndGet();
189    }
190
191    @Override
192    public Map<String, String> getReadFailures() {
193      return readFailures;
194    }
195
196    @Override
197    public void updateReadFailures(String regionName, String serverName) {
198      readFailures.put(regionName, serverName);
199    }
200
201    @Override
202    public long getWriteFailureCount() {
203      return writeFailureCount.get();
204    }
205
206    @Override
207    public long incWriteFailureCount() {
208      return writeFailureCount.incrementAndGet();
209    }
210
211    @Override
212    public Map<String, String> getWriteFailures() {
213      return writeFailures;
214    }
215
216    @Override
217    public void updateWriteFailures(String regionName, String serverName) {
218      writeFailures.put(regionName, serverName);
219    }
220
221    @Override
222    public long getReadSuccessCount() {
223      return readSuccessCount.get();
224    }
225
226    @Override
227    public long incReadSuccessCount() {
228      return readSuccessCount.incrementAndGet();
229    }
230
231    @Override
232    public long getWriteSuccessCount() {
233      return writeSuccessCount.get();
234    }
235
236    @Override
237    public long incWriteSuccessCount() {
238      return writeSuccessCount.incrementAndGet();
239    }
240  }
241
242  /**
243   * By RegionServer, for 'regionserver' mode.
244   */
245  public static class RegionServerStdOutSink extends StdOutSink {
246    public void publishReadFailure(String table, String server) {
247      incReadFailureCount();
248      LOG.error("Read from {} on {}", table, server);
249    }
250
251    public void publishReadTiming(String table, String server, long msTime) {
252      LOG.info("Read from {} on {} in {}ms", table, server, msTime);
253    }
254  }
255
256  /**
257   * Output for 'zookeeper' mode.
258   */
259  public static class ZookeeperStdOutSink extends StdOutSink {
260    public void publishReadFailure(String znode, String server) {
261      incReadFailureCount();
262      LOG.error("Read from {} on {}", znode, server);
263    }
264
265    public void publishReadTiming(String znode, String server, long msTime) {
266      LOG.info("Read from {} on {} in {}ms", znode, server, msTime);
267    }
268  }
269
270  /**
271   * By Region, for 'region'  mode.
272   */
273  public static class RegionStdOutSink extends StdOutSink {
274    private Map<String, LongAdder> perTableReadLatency = new HashMap<>();
275    private LongAdder writeLatency = new LongAdder();
276    private final Map<String, List<RegionTaskResult>> regionMap = new ConcurrentHashMap<>();
277
278    public void publishReadFailure(ServerName serverName, RegionInfo region, Exception e) {
279      incReadFailureCount();
280      LOG.error("Read from {} on serverName={} failed",
281          region.getRegionNameAsString(), serverName, e);
282    }
283
284    public void publishReadFailure(ServerName serverName, RegionInfo region,
285        ColumnFamilyDescriptor column, Exception e) {
286      incReadFailureCount();
287      LOG.error("Read from {} on serverName={}, columnFamily={} failed",
288          region.getRegionNameAsString(), serverName,
289          column.getNameAsString(), e);
290    }
291
292    public void publishReadTiming(ServerName serverName, RegionInfo region,
293        ColumnFamilyDescriptor column, long msTime) {
294      RegionTaskResult rtr = new RegionTaskResult(region, region.getTable(), serverName, column);
295      rtr.setReadSuccess();
296      rtr.setReadLatency(msTime);
297      List<RegionTaskResult> rtrs = regionMap.get(region.getRegionNameAsString());
298      rtrs.add(rtr);
299      // Note that read success count will be equal to total column family read successes.
300      incReadSuccessCount();
301      LOG.info("Read from {} on {} {} in {}ms", region.getRegionNameAsString(), serverName,
302          column.getNameAsString(), msTime);
303    }
304
305    public void publishWriteFailure(ServerName serverName, RegionInfo region, Exception e) {
306      incWriteFailureCount();
307      LOG.error("Write to {} on {} failed", region.getRegionNameAsString(), serverName, e);
308    }
309
310    public void publishWriteFailure(ServerName serverName, RegionInfo region,
311        ColumnFamilyDescriptor column, Exception e) {
312      incWriteFailureCount();
313      LOG.error("Write to {} on {} {} failed", region.getRegionNameAsString(), serverName,
314          column.getNameAsString(), e);
315    }
316
317    public void publishWriteTiming(ServerName serverName, RegionInfo region,
318        ColumnFamilyDescriptor column, long msTime) {
319      RegionTaskResult rtr = new RegionTaskResult(region, region.getTable(), serverName, column);
320      rtr.setWriteSuccess();
321      rtr.setWriteLatency(msTime);
322      List<RegionTaskResult> rtrs = regionMap.get(region.getRegionNameAsString());
323      rtrs.add(rtr);
324      // Note that write success count will be equal to total column family write successes.
325      incWriteSuccessCount();
326      LOG.info("Write to {} on {} {} in {}ms",
327        region.getRegionNameAsString(), serverName, column.getNameAsString(), msTime);
328    }
329
330    public Map<String, LongAdder> getReadLatencyMap() {
331      return this.perTableReadLatency;
332    }
333
334    public LongAdder initializeAndGetReadLatencyForTable(String tableName) {
335      LongAdder initLatency = new LongAdder();
336      this.perTableReadLatency.put(tableName, initLatency);
337      return initLatency;
338    }
339
340    public void initializeWriteLatency() {
341      this.writeLatency.reset();
342    }
343
344    public LongAdder getWriteLatency() {
345      return this.writeLatency;
346    }
347
348    public Map<String, List<RegionTaskResult>> getRegionMap() {
349      return this.regionMap;
350    }
351
352    public int getTotalExpectedRegions() {
353      return this.regionMap.size();
354    }
355  }
356
357  /**
358   * Run a single zookeeper Task and then exit.
359   */
360  static class ZookeeperTask implements Callable<Void> {
361    private final Connection connection;
362    private final String host;
363    private String znode;
364    private final int timeout;
365    private ZookeeperStdOutSink sink;
366
367    public ZookeeperTask(Connection connection, String host, String znode, int timeout,
368        ZookeeperStdOutSink sink) {
369      this.connection = connection;
370      this.host = host;
371      this.znode = znode;
372      this.timeout = timeout;
373      this.sink = sink;
374    }
375
376    @Override public Void call() throws Exception {
377      ZooKeeper zooKeeper = null;
378      try {
379        zooKeeper = new ZooKeeper(host, timeout, EmptyWatcher.instance);
380        Stat exists = zooKeeper.exists(znode, false);
381        StopWatch stopwatch = new StopWatch();
382        stopwatch.start();
383        zooKeeper.getData(znode, false, exists);
384        stopwatch.stop();
385        sink.publishReadTiming(znode, host, stopwatch.getTime());
386      } catch (KeeperException | InterruptedException e) {
387        sink.publishReadFailure(znode, host);
388      } finally {
389        if (zooKeeper != null) {
390          zooKeeper.close();
391        }
392      }
393      return null;
394    }
395  }
396
397  /**
398   * Run a single Region Task and then exit. For each column family of the Region, get one row and
399   * output latency or failure.
400   */
401  static class RegionTask implements Callable<Void> {
402    public enum TaskType{
403      READ, WRITE
404    }
405    private Connection connection;
406    private RegionInfo region;
407    private RegionStdOutSink sink;
408    private TaskType taskType;
409    private boolean rawScanEnabled;
410    private ServerName serverName;
411    private LongAdder readWriteLatency;
412
413    RegionTask(Connection connection, RegionInfo region, ServerName serverName,
414        RegionStdOutSink sink, TaskType taskType, boolean rawScanEnabled, LongAdder rwLatency) {
415      this.connection = connection;
416      this.region = region;
417      this.serverName = serverName;
418      this.sink = sink;
419      this.taskType = taskType;
420      this.rawScanEnabled = rawScanEnabled;
421      this.readWriteLatency = rwLatency;
422    }
423
424    @Override
425    public Void call() {
426      switch (taskType) {
427      case READ:
428        return read();
429      case WRITE:
430        return write();
431      default:
432        return read();
433      }
434    }
435
436    public Void read() {
437      Table table = null;
438      TableDescriptor tableDesc = null;
439      try {
440        LOG.debug("Reading table descriptor for table {}", region.getTable());
441        table = connection.getTable(region.getTable());
442        tableDesc = table.getDescriptor();
443      } catch (IOException e) {
444        LOG.debug("sniffRegion {} of {} failed", region.getEncodedName(), e);
445        sink.publishReadFailure(serverName, region, e);
446        if (table != null) {
447          try {
448            table.close();
449          } catch (IOException ioe) {
450            LOG.error("Close table failed", e);
451          }
452        }
453        return null;
454      }
455
456      byte[] startKey = null;
457      Get get = null;
458      Scan scan = null;
459      ResultScanner rs = null;
460      StopWatch stopWatch = new StopWatch();
461      for (ColumnFamilyDescriptor column : tableDesc.getColumnFamilies()) {
462        stopWatch.reset();
463        startKey = region.getStartKey();
464        // Can't do a get on empty start row so do a Scan of first element if any instead.
465        if (startKey.length > 0) {
466          get = new Get(startKey);
467          get.setCacheBlocks(false);
468          get.setFilter(new FirstKeyOnlyFilter());
469          get.addFamily(column.getName());
470        } else {
471          scan = new Scan();
472          LOG.debug("rawScan {} for {}", rawScanEnabled, tableDesc.getTableName());
473          scan.setRaw(rawScanEnabled);
474          scan.setCaching(1);
475          scan.setCacheBlocks(false);
476          scan.setFilter(new FirstKeyOnlyFilter());
477          scan.addFamily(column.getName());
478          scan.setMaxResultSize(1L);
479          scan.setOneRowLimit();
480        }
481        LOG.debug("Reading from {} {} {} {}", tableDesc.getTableName(),
482            region.getRegionNameAsString(), column.getNameAsString(),
483            Bytes.toStringBinary(startKey));
484        try {
485          stopWatch.start();
486          if (startKey.length > 0) {
487            table.get(get);
488          } else {
489            rs = table.getScanner(scan);
490            rs.next();
491          }
492          stopWatch.stop();
493          this.readWriteLatency.add(stopWatch.getTime());
494          sink.publishReadTiming(serverName, region, column, stopWatch.getTime());
495        } catch (Exception e) {
496          sink.publishReadFailure(serverName, region, column, e);
497          sink.updateReadFailures(region.getRegionNameAsString(),
498              serverName == null? "NULL": serverName.getHostname());
499        } finally {
500          if (rs != null) {
501            rs.close();
502          }
503          scan = null;
504          get = null;
505        }
506      }
507      try {
508        table.close();
509      } catch (IOException e) {
510        LOG.error("Close table failed", e);
511      }
512      return null;
513    }
514
515    /**
516     * Check writes for the canary table
517     */
518    private Void write() {
519      Table table = null;
520      TableDescriptor tableDesc = null;
521      try {
522        table = connection.getTable(region.getTable());
523        tableDesc = table.getDescriptor();
524        byte[] rowToCheck = region.getStartKey();
525        if (rowToCheck.length == 0) {
526          rowToCheck = new byte[]{0x0};
527        }
528        int writeValueSize =
529            connection.getConfiguration().getInt(HConstants.HBASE_CANARY_WRITE_VALUE_SIZE_KEY, 10);
530        for (ColumnFamilyDescriptor column : tableDesc.getColumnFamilies()) {
531          Put put = new Put(rowToCheck);
532          byte[] value = new byte[writeValueSize];
533          Bytes.random(value);
534          put.addColumn(column.getName(), HConstants.EMPTY_BYTE_ARRAY, value);
535
536          LOG.debug("Writing to {} {} {} {}",
537            tableDesc.getTableName(), region.getRegionNameAsString(), column.getNameAsString(),
538            Bytes.toStringBinary(rowToCheck));
539          try {
540            long startTime = System.currentTimeMillis();
541            table.put(put);
542            long time = System.currentTimeMillis() - startTime;
543            this.readWriteLatency.add(time);
544            sink.publishWriteTiming(serverName, region, column, time);
545          } catch (Exception e) {
546            sink.publishWriteFailure(serverName, region, column, e);
547          }
548        }
549        table.close();
550      } catch (IOException e) {
551        sink.publishWriteFailure(serverName, region, e);
552        sink.updateWriteFailures(region.getRegionNameAsString(), serverName.getHostname() );
553      }
554      return null;
555    }
556  }
557
558  /**
559   * Run a single RegionServer Task and then exit.
560   * Get one row from a region on the regionserver and output latency or the failure.
561   */
562  static class RegionServerTask implements Callable<Void> {
563    private Connection connection;
564    private String serverName;
565    private RegionInfo region;
566    private RegionServerStdOutSink sink;
567    private AtomicLong successes;
568
569    RegionServerTask(Connection connection, String serverName, RegionInfo region,
570        RegionServerStdOutSink sink, AtomicLong successes) {
571      this.connection = connection;
572      this.serverName = serverName;
573      this.region = region;
574      this.sink = sink;
575      this.successes = successes;
576    }
577
578    @Override
579    public Void call() {
580      TableName tableName = null;
581      Table table = null;
582      Get get = null;
583      byte[] startKey = null;
584      Scan scan = null;
585      StopWatch stopWatch = new StopWatch();
586      // monitor one region on every region server
587      stopWatch.reset();
588      try {
589        tableName = region.getTable();
590        table = connection.getTable(tableName);
591        startKey = region.getStartKey();
592        // Can't do a get on empty start row so do a Scan of first element if any instead.
593        LOG.debug("Reading from {} {} {} {}",
594          serverName, region.getTable(), region.getRegionNameAsString(),
595          Bytes.toStringBinary(startKey));
596        if (startKey.length > 0) {
597          get = new Get(startKey);
598          get.setCacheBlocks(false);
599          get.setFilter(new FirstKeyOnlyFilter());
600          stopWatch.start();
601          table.get(get);
602          stopWatch.stop();
603        } else {
604          scan = new Scan();
605          scan.setCacheBlocks(false);
606          scan.setFilter(new FirstKeyOnlyFilter());
607          scan.setCaching(1);
608          scan.setMaxResultSize(1L);
609          scan.setOneRowLimit();
610          stopWatch.start();
611          ResultScanner s = table.getScanner(scan);
612          s.next();
613          s.close();
614          stopWatch.stop();
615        }
616        successes.incrementAndGet();
617        sink.publishReadTiming(tableName.getNameAsString(), serverName, stopWatch.getTime());
618      } catch (TableNotFoundException tnfe) {
619        LOG.error("Table may be deleted", tnfe);
620        // This is ignored because it doesn't imply that the regionserver is dead
621      } catch (TableNotEnabledException tnee) {
622        // This is considered a success since we got a response.
623        successes.incrementAndGet();
624        LOG.debug("The targeted table was disabled.  Assuming success.");
625      } catch (DoNotRetryIOException dnrioe) {
626        sink.publishReadFailure(tableName.getNameAsString(), serverName);
627        LOG.error(dnrioe.toString(), dnrioe);
628      } catch (IOException e) {
629        sink.publishReadFailure(tableName.getNameAsString(), serverName);
630        LOG.error(e.toString(), e);
631      } finally {
632        if (table != null) {
633          try {
634            table.close();
635          } catch (IOException e) {/* DO NOTHING */
636            LOG.error("Close table failed", e);
637          }
638        }
639        scan = null;
640        get = null;
641        startKey = null;
642      }
643      return null;
644    }
645  }
646
647  private static final int USAGE_EXIT_CODE = 1;
648  private static final int INIT_ERROR_EXIT_CODE = 2;
649  private static final int TIMEOUT_ERROR_EXIT_CODE = 3;
650  private static final int ERROR_EXIT_CODE = 4;
651  private static final int FAILURE_EXIT_CODE = 5;
652
653  private static final long DEFAULT_INTERVAL = 60000;
654
655  private static final long DEFAULT_TIMEOUT = 600000; // 10 mins
656  private static final int MAX_THREADS_NUM = 16; // #threads to contact regions
657
658  private static final Logger LOG = LoggerFactory.getLogger(Canary.class);
659
660  public static final TableName DEFAULT_WRITE_TABLE_NAME = TableName.valueOf(
661    NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "canary");
662
663  private static final String CANARY_TABLE_FAMILY_NAME = "Test";
664
665  private Configuration conf = null;
666  private long interval = 0;
667  private Sink sink = null;
668
669  /**
670   * True if we are to run in 'regionServer' mode.
671   */
672  private boolean regionServerMode = false;
673
674  /**
675   * True if we are to run in zookeeper 'mode'.
676   */
677  private boolean zookeeperMode = false;
678
679  /**
680   * This is a Map of table to timeout. The timeout is for reading all regions in the table; i.e.
681   * we aggregate time to fetch each region and it needs to be less than this value else we
682   * log an ERROR.
683   */
684  private HashMap<String, Long> configuredReadTableTimeouts = new HashMap<>();
685
686  public static final String HBASE_CANARY_REGIONSERVER_ALL_REGIONS
687          = "hbase.canary.regionserver_all_regions";
688
689  public static final String HBASE_CANARY_REGION_WRITE_SNIFFING
690          = "hbase.canary.region.write.sniffing";
691  public static final String HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT
692          = "hbase.canary.region.write.table.timeout";
693  public static final String HBASE_CANARY_REGION_WRITE_TABLE_NAME
694          = "hbase.canary.region.write.table.name";
695  public static final String HBASE_CANARY_REGION_READ_TABLE_TIMEOUT
696          = "hbase.canary.region.read.table.timeout";
697
698  public static final String HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES
699          = "hbase.canary.zookeeper.permitted.failures";
700
701  public static final String HBASE_CANARY_USE_REGEX = "hbase.canary.use.regex";
702  public static final String HBASE_CANARY_TIMEOUT = "hbase.canary.timeout";
703  public static final String HBASE_CANARY_FAIL_ON_ERROR = "hbase.canary.fail.on.error";
704
705
706  private ExecutorService executor; // threads to retrieve data from regionservers
707
708  public CanaryTool() {
709    this(new ScheduledThreadPoolExecutor(1));
710  }
711
712  public CanaryTool(ExecutorService executor) {
713    this(executor, null);
714  }
715
716  @VisibleForTesting
717  CanaryTool(ExecutorService executor, Sink sink) {
718    this.executor = executor;
719    this.sink = sink;
720  }
721
722  CanaryTool(Configuration conf, ExecutorService executor) {
723    this(conf, executor, null);
724  }
725
726  CanaryTool(Configuration conf, ExecutorService executor, Sink sink) {
727    this(executor, sink);
728    setConf(conf);
729  }
730
731  @Override
732  public Configuration getConf() {
733    return conf;
734  }
735
736  @Override
737  public void setConf(Configuration conf) {
738    if (conf == null) {
739      conf = HBaseConfiguration.create();
740    }
741    this.conf = conf;
742  }
743
744  private int parseArgs(String[] args) {
745    int index = -1;
746    long permittedFailures = 0;
747    boolean regionServerAllRegions = false, writeSniffing = false;
748    String readTableTimeoutsStr = null;
749
750    // Process command line args
751    for (int i = 0; i < args.length; i++) {
752      String cmd = args[i];
753
754      if (cmd.startsWith("-")) {
755        if (index >= 0) {
756          // command line args must be in the form: [opts] [table 1 [table 2 ...]]
757          System.err.println("Invalid command line options");
758          printUsageAndExit();
759        }
760
761        if (cmd.equals("-help") || cmd.equals("-h")) {
762          // user asked for help, print the help and quit.
763          printUsageAndExit();
764        } else if (cmd.equals("-daemon") && interval == 0) {
765          // user asked for daemon mode, set a default interval between checks
766          interval = DEFAULT_INTERVAL;
767        } else if (cmd.equals("-interval")) {
768          // user has specified an interval for canary breaths (-interval N)
769          i++;
770
771          if (i == args.length) {
772            System.err.println("-interval takes a numeric seconds value argument.");
773            printUsageAndExit();
774          }
775
776          try {
777            interval = Long.parseLong(args[i]) * 1000;
778          } catch (NumberFormatException e) {
779            System.err.println("-interval needs a numeric value argument.");
780            printUsageAndExit();
781          }
782        } else if (cmd.equals("-zookeeper")) {
783          this.zookeeperMode = true;
784        } else if(cmd.equals("-regionserver")) {
785          this.regionServerMode = true;
786        } else if(cmd.equals("-allRegions")) {
787          conf.setBoolean(HBASE_CANARY_REGIONSERVER_ALL_REGIONS, true);
788          regionServerAllRegions = true;
789        } else if(cmd.equals("-writeSniffing")) {
790          writeSniffing = true;
791          conf.setBoolean(HBASE_CANARY_REGION_WRITE_SNIFFING, true);
792        } else if(cmd.equals("-treatFailureAsError") || cmd.equals("-failureAsError")) {
793          conf.setBoolean(HBASE_CANARY_FAIL_ON_ERROR, true);
794        } else if (cmd.equals("-e")) {
795          conf.setBoolean(HBASE_CANARY_USE_REGEX, true);
796        } else if (cmd.equals("-t")) {
797          i++;
798
799          if (i == args.length) {
800            System.err.println("-t takes a numeric milliseconds value argument.");
801            printUsageAndExit();
802          }
803          long timeout = 0;
804          try {
805            timeout = Long.parseLong(args[i]);
806          } catch (NumberFormatException e) {
807            System.err.println("-t takes a numeric milliseconds value argument.");
808            printUsageAndExit();
809          }
810          conf.setLong(HBASE_CANARY_TIMEOUT, timeout);
811        } else if(cmd.equals("-writeTableTimeout")) {
812          i++;
813
814          if (i == args.length) {
815            System.err.println("-writeTableTimeout takes a numeric milliseconds value argument.");
816            printUsageAndExit();
817          }
818          long configuredWriteTableTimeout = 0;
819          try {
820            configuredWriteTableTimeout = Long.parseLong(args[i]);
821          } catch (NumberFormatException e) {
822            System.err.println("-writeTableTimeout takes a numeric milliseconds value argument.");
823            printUsageAndExit();
824          }
825          conf.setLong(HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT, configuredWriteTableTimeout);
826        } else if (cmd.equals("-writeTable")) {
827          i++;
828
829          if (i == args.length) {
830            System.err.println("-writeTable takes a string tablename value argument.");
831            printUsageAndExit();
832          }
833          conf.set(HBASE_CANARY_REGION_WRITE_TABLE_NAME, args[i]);
834        } else if (cmd.equals("-f")) {
835          i++;
836
837          if (i == args.length) {
838            System.err
839                .println("-f needs a boolean value argument (true|false).");
840            printUsageAndExit();
841          }
842
843          conf.setBoolean(HBASE_CANARY_FAIL_ON_ERROR, Boolean.parseBoolean(args[i]));
844        } else if (cmd.equals("-readTableTimeouts")) {
845          i++;
846
847          if (i == args.length) {
848            System.err.println("-readTableTimeouts needs a comma-separated list of read " +
849                "millisecond timeouts per table (without spaces).");
850            printUsageAndExit();
851          }
852          readTableTimeoutsStr = args[i];
853          conf.set(HBASE_CANARY_REGION_READ_TABLE_TIMEOUT, readTableTimeoutsStr);
854        } else if (cmd.equals("-permittedZookeeperFailures")) {
855          i++;
856
857          if (i == args.length) {
858            System.err.println("-permittedZookeeperFailures needs a numeric value argument.");
859            printUsageAndExit();
860          }
861          try {
862            permittedFailures = Long.parseLong(args[i]);
863          } catch (NumberFormatException e) {
864            System.err.println("-permittedZookeeperFailures needs a numeric value argument.");
865            printUsageAndExit();
866          }
867          conf.setLong(HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES, permittedFailures);
868        } else {
869          // no options match
870          System.err.println(cmd + " options is invalid.");
871          printUsageAndExit();
872        }
873      } else if (index < 0) {
874        // keep track of first table name specified by the user
875        index = i;
876      }
877    }
878    if (regionServerAllRegions && !this.regionServerMode) {
879      System.err.println("-allRegions can only be specified in regionserver mode.");
880      printUsageAndExit();
881    }
882    if (this.zookeeperMode) {
883      if (this.regionServerMode || regionServerAllRegions || writeSniffing) {
884        System.err.println("-zookeeper is exclusive and cannot be combined with "
885            + "other modes.");
886        printUsageAndExit();
887      }
888    }
889    if (permittedFailures != 0 && !this.zookeeperMode) {
890      System.err.println("-permittedZookeeperFailures requires -zookeeper mode.");
891      printUsageAndExit();
892    }
893    if (readTableTimeoutsStr != null && (this.regionServerMode || this.zookeeperMode)) {
894      System.err.println("-readTableTimeouts can only be configured in region mode.");
895      printUsageAndExit();
896    }
897    return index;
898  }
899
900  @Override
901  public int run(String[] args) throws Exception {
902    int index = parseArgs(args);
903    String[] monitorTargets = null;
904
905    if (index >= 0) {
906      int length = args.length - index;
907      monitorTargets = new String[length];
908      System.arraycopy(args, index, monitorTargets, 0, length);
909    }
910
911    if (zookeeperMode) {
912      return checkZooKeeper();
913    } else if (regionServerMode) {
914      return checkRegionServers(monitorTargets);
915    } else {
916      return checkRegions(monitorTargets);
917    }
918  }
919
920  private int runMonitor(String[] monitorTargets) throws Exception {
921    ChoreService choreService = null;
922
923    // Launches chore for refreshing kerberos credentials if security is enabled.
924    // Please see http://hbase.apache.org/book.html#_running_canary_in_a_kerberos_enabled_cluster
925    // for more details.
926    final ScheduledChore authChore = AuthUtil.getAuthChore(conf);
927    if (authChore != null) {
928      choreService = new ChoreService("CANARY_TOOL");
929      choreService.scheduleChore(authChore);
930    }
931
932    // Start to prepare the stuffs
933    Monitor monitor = null;
934    Thread monitorThread;
935    long startTime = 0;
936    long currentTimeLength = 0;
937    boolean failOnError = conf.getBoolean(HBASE_CANARY_FAIL_ON_ERROR, true);
938    long timeout = conf.getLong(HBASE_CANARY_TIMEOUT, DEFAULT_TIMEOUT);
939    // Get a connection to use in below.
940    try (Connection connection = ConnectionFactory.createConnection(this.conf)) {
941      do {
942        // Do monitor !!
943        try {
944          monitor = this.newMonitor(connection, monitorTargets);
945          monitorThread = new Thread(monitor, "CanaryMonitor-" + System.currentTimeMillis());
946          startTime = System.currentTimeMillis();
947          monitorThread.start();
948          while (!monitor.isDone()) {
949            // wait for 1 sec
950            Thread.sleep(1000);
951            // exit if any error occurs
952            if (failOnError && monitor.hasError()) {
953              monitorThread.interrupt();
954              if (monitor.initialized) {
955                return monitor.errorCode;
956              } else {
957                return INIT_ERROR_EXIT_CODE;
958              }
959            }
960            currentTimeLength = System.currentTimeMillis() - startTime;
961            if (currentTimeLength > timeout) {
962              LOG.error("The monitor is running too long (" + currentTimeLength
963                  + ") after timeout limit:" + timeout
964                  + " will be killed itself !!");
965              if (monitor.initialized) {
966                return TIMEOUT_ERROR_EXIT_CODE;
967              } else {
968                return INIT_ERROR_EXIT_CODE;
969              }
970            }
971          }
972
973          if (failOnError && monitor.finalCheckForErrors()) {
974            monitorThread.interrupt();
975            return monitor.errorCode;
976          }
977        } finally {
978          if (monitor != null) monitor.close();
979        }
980
981        Thread.sleep(interval);
982      } while (interval > 0);
983    } // try-with-resources close
984
985    if (choreService != null) {
986      choreService.shutdown();
987    }
988    return monitor.errorCode;
989  }
990
991  @Override
992  public Map<String, String> getReadFailures()  {
993    return sink.getReadFailures();
994  }
995
996  @Override
997  public Map<String, String> getWriteFailures()  {
998    return sink.getWriteFailures();
999  }
1000
1001  private void printUsageAndExit() {
1002    System.err.println(
1003      "Usage: canary [OPTIONS] [<TABLE1> [<TABLE2]...] | [<REGIONSERVER1> [<REGIONSERVER2]..]");
1004    System.err.println("Where [OPTIONS] are:");
1005    System.err.println(" -h,-help        show this help and exit.");
1006    System.err.println(" -regionserver   set 'regionserver mode'; gets row from random region on " +
1007        "server");
1008    System.err.println(" -allRegions     get from ALL regions when 'regionserver mode', not just " +
1009        "random one.");
1010    System.err.println(" -zookeeper      set 'zookeeper mode'; grab zookeeper.znode.parent on " +
1011        "each ensemble member");
1012    System.err.println(" -daemon         continuous check at defined intervals.");
1013    System.err.println(" -interval <N>   interval between checks in seconds");
1014    System.err.println(" -e              consider table/regionserver argument as regular " +
1015        "expression");
1016    System.err.println(" -f <B>          exit on first error; default=true");
1017    System.err.println(" -failureAsError treat read/write failure as error");
1018    System.err.println(" -t <N>          timeout for canary-test run; default=600000ms");
1019    System.err.println(" -writeSniffing  enable write sniffing");
1020    System.err.println(" -writeTable     the table used for write sniffing; default=hbase:canary");
1021    System.err.println(" -writeTableTimeout <N>  timeout for writeTable; default=600000ms");
1022    System.err.println(" -readTableTimeouts <tableName>=<read timeout>," +
1023        "<tableName>=<read timeout>,...");
1024    System.err.println("                comma-separated list of table read timeouts " +
1025        "(no spaces);");
1026    System.err.println("                logs 'ERROR' if takes longer. default=600000ms");
1027    System.err.println(" -permittedZookeeperFailures <N>  Ignore first N failures attempting to ");
1028    System.err.println("                connect to individual zookeeper nodes in ensemble");
1029    System.err.println("");
1030    System.err.println(" -D<configProperty>=<value> to assign or override configuration params");
1031    System.err.println(" -Dhbase.canary.read.raw.enabled=<true/false> Set to enable/disable " +
1032        "raw scan; default=false");
1033    System.err.println("");
1034    System.err.println("Canary runs in one of three modes: region (default), regionserver, or " +
1035        "zookeeper.");
1036    System.err.println("To sniff/probe all regions, pass no arguments.");
1037    System.err.println("To sniff/probe all regions of a table, pass tablename.");
1038    System.err.println("To sniff/probe regionservers, pass -regionserver, etc.");
1039    System.err.println("See http://hbase.apache.org/book.html#_canary for Canary documentation.");
1040    System.exit(USAGE_EXIT_CODE);
1041  }
1042
1043  Sink getSink(Configuration configuration, Class clazz) {
1044    // In test context, this.sink might be set. Use it if non-null. For testing.
1045    return this.sink != null? this.sink:
1046        (Sink)ReflectionUtils.newInstance(configuration.getClass("hbase.canary.sink.class",
1047            clazz, Sink.class));
1048  }
1049
1050  /**
1051   * Canary region mode-specific data structure which stores information about each region
1052   * to be scanned
1053   */
1054  public static class RegionTaskResult {
1055    private RegionInfo region;
1056    private TableName tableName;
1057    private ServerName serverName;
1058    private ColumnFamilyDescriptor column;
1059    private AtomicLong readLatency = null;
1060    private AtomicLong writeLatency = null;
1061    private boolean readSuccess = false;
1062    private boolean writeSuccess = false;
1063
1064    public RegionTaskResult(RegionInfo region, TableName tableName, ServerName serverName,
1065        ColumnFamilyDescriptor column) {
1066      this.region = region;
1067      this.tableName = tableName;
1068      this.serverName = serverName;
1069      this.column = column;
1070    }
1071
1072    public RegionInfo getRegionInfo() {
1073      return this.region;
1074    }
1075
1076    public String getRegionNameAsString() {
1077      return this.region.getRegionNameAsString();
1078    }
1079
1080    public TableName getTableName() {
1081      return this.tableName;
1082    }
1083
1084    public String getTableNameAsString() {
1085      return this.tableName.getNameAsString();
1086    }
1087
1088    public ServerName getServerName() {
1089      return this.serverName;
1090    }
1091
1092    public String getServerNameAsString() {
1093      return this.serverName.getServerName();
1094    }
1095
1096    public ColumnFamilyDescriptor getColumnFamily() {
1097      return this.column;
1098    }
1099
1100    public String getColumnFamilyNameAsString() {
1101      return this.column.getNameAsString();
1102    }
1103
1104    public long getReadLatency() {
1105      if (this.readLatency == null) {
1106        return -1;
1107      }
1108      return this.readLatency.get();
1109    }
1110
1111    public void setReadLatency(long readLatency) {
1112      if (this.readLatency != null) {
1113        this.readLatency.set(readLatency);
1114      } else {
1115        this.readLatency = new AtomicLong(readLatency);
1116      }
1117    }
1118
1119    public long getWriteLatency() {
1120      if (this.writeLatency == null) {
1121        return -1;
1122      }
1123      return this.writeLatency.get();
1124    }
1125
1126    public void setWriteLatency(long writeLatency) {
1127      if (this.writeLatency != null) {
1128        this.writeLatency.set(writeLatency);
1129      } else {
1130        this.writeLatency = new AtomicLong(writeLatency);
1131      }
1132    }
1133
1134    public boolean isReadSuccess() {
1135      return this.readSuccess;
1136    }
1137
1138    public void setReadSuccess() {
1139      this.readSuccess = true;
1140    }
1141
1142    public boolean isWriteSuccess() {
1143      return this.writeSuccess;
1144    }
1145
1146    public void setWriteSuccess() {
1147      this.writeSuccess = true;
1148    }
1149  }
1150
1151  /**
1152   * A Factory method for {@link Monitor}.
1153   * Makes a RegionServerMonitor, or a ZooKeeperMonitor, or a RegionMonitor.
1154   * @return a Monitor instance
1155   */
1156  private Monitor newMonitor(final Connection connection, String[] monitorTargets) {
1157    Monitor monitor;
1158    boolean useRegExp = conf.getBoolean(HBASE_CANARY_USE_REGEX, false);
1159    boolean regionServerAllRegions
1160            = conf.getBoolean(HBASE_CANARY_REGIONSERVER_ALL_REGIONS, false);
1161    boolean failOnError
1162            = conf.getBoolean(HBASE_CANARY_FAIL_ON_ERROR, true);
1163    int permittedFailures
1164            = conf.getInt(HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES, 0);
1165    boolean writeSniffing
1166            = conf.getBoolean(HBASE_CANARY_REGION_WRITE_SNIFFING, false);
1167    String writeTableName = conf.get(HBASE_CANARY_REGION_WRITE_TABLE_NAME,
1168            DEFAULT_WRITE_TABLE_NAME.getNameAsString());
1169    long configuredWriteTableTimeout
1170            = conf.getLong(HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT, DEFAULT_TIMEOUT);
1171
1172    if (this.regionServerMode) {
1173      monitor =
1174          new RegionServerMonitor(connection, monitorTargets, useRegExp,
1175              getSink(connection.getConfiguration(), RegionServerStdOutSink.class),
1176              this.executor, regionServerAllRegions,
1177              failOnError, permittedFailures);
1178
1179    } else if (this.zookeeperMode) {
1180      monitor =
1181          new ZookeeperMonitor(connection, monitorTargets, useRegExp,
1182              getSink(connection.getConfiguration(), ZookeeperStdOutSink.class),
1183              this.executor, failOnError, permittedFailures);
1184    } else {
1185      monitor =
1186          new RegionMonitor(connection, monitorTargets, useRegExp,
1187              getSink(connection.getConfiguration(), RegionStdOutSink.class),
1188              this.executor, writeSniffing,
1189              TableName.valueOf(writeTableName), failOnError, configuredReadTableTimeouts,
1190              configuredWriteTableTimeout, permittedFailures);
1191    }
1192    return monitor;
1193  }
1194
1195  private void populateReadTableTimeoutsMap(String configuredReadTableTimeoutsStr) {
1196    String[] tableTimeouts = configuredReadTableTimeoutsStr.split(",");
1197    for (String tT : tableTimeouts) {
1198      String[] nameTimeout = tT.split("=");
1199      if (nameTimeout.length < 2) {
1200        throw new IllegalArgumentException("Each -readTableTimeouts argument must be of the form " +
1201            "<tableName>=<read timeout> (without spaces).");
1202      }
1203      long timeoutVal;
1204      try {
1205        timeoutVal = Long.parseLong(nameTimeout[1]);
1206      } catch (NumberFormatException e) {
1207        throw new IllegalArgumentException("-readTableTimeouts read timeout for each table" +
1208            " must be a numeric value argument.");
1209      }
1210      configuredReadTableTimeouts.put(nameTimeout[0], timeoutVal);
1211    }
1212  }
1213  /**
1214   * A Monitor super-class can be extended by users
1215   */
1216  public static abstract class Monitor implements Runnable, Closeable {
1217    protected Connection connection;
1218    protected Admin admin;
1219    /**
1220     * 'Target' dependent on 'mode'. Could be Tables or RegionServers or ZNodes.
1221     * Passed on the command-line as arguments.
1222     */
1223    protected String[] targets;
1224    protected boolean useRegExp;
1225    protected boolean treatFailureAsError;
1226    protected boolean initialized = false;
1227
1228    protected boolean done = false;
1229    protected int errorCode = 0;
1230    protected long allowedFailures = 0;
1231    protected Sink sink;
1232    protected ExecutorService executor;
1233
1234    public boolean isDone() {
1235      return done;
1236    }
1237
1238    public boolean hasError() {
1239      return errorCode != 0;
1240    }
1241
1242    public boolean finalCheckForErrors() {
1243      if (errorCode != 0) {
1244        return true;
1245      }
1246      if (treatFailureAsError &&
1247          (sink.getReadFailureCount() > allowedFailures || sink.getWriteFailureCount() > allowedFailures)) {
1248        LOG.error("Too many failures detected, treating failure as error, failing the Canary.");
1249        errorCode = FAILURE_EXIT_CODE;
1250        return true;
1251      }
1252      return false;
1253    }
1254
1255    @Override
1256    public void close() throws IOException {
1257      if (this.admin != null) this.admin.close();
1258    }
1259
1260    protected Monitor(Connection connection, String[] monitorTargets, boolean useRegExp, Sink sink,
1261        ExecutorService executor, boolean treatFailureAsError, long allowedFailures) {
1262      if (null == connection) throw new IllegalArgumentException("connection shall not be null");
1263
1264      this.connection = connection;
1265      this.targets = monitorTargets;
1266      this.useRegExp = useRegExp;
1267      this.treatFailureAsError = treatFailureAsError;
1268      this.sink = sink;
1269      this.executor = executor;
1270      this.allowedFailures = allowedFailures;
1271    }
1272
1273    @Override
1274    public abstract void run();
1275
1276    protected boolean initAdmin() {
1277      if (null == this.admin) {
1278        try {
1279          this.admin = this.connection.getAdmin();
1280        } catch (Exception e) {
1281          LOG.error("Initial HBaseAdmin failed...", e);
1282          this.errorCode = INIT_ERROR_EXIT_CODE;
1283        }
1284      } else if (admin.isAborted()) {
1285        LOG.error("HBaseAdmin aborted");
1286        this.errorCode = INIT_ERROR_EXIT_CODE;
1287      }
1288      return !this.hasError();
1289    }
1290  }
1291
1292  /**
1293   * A monitor for region mode.
1294   */
1295  private static class RegionMonitor extends Monitor {
1296    // 10 minutes
1297    private static final int DEFAULT_WRITE_TABLE_CHECK_PERIOD = 10 * 60 * 1000;
1298    // 1 days
1299    private static final int DEFAULT_WRITE_DATA_TTL = 24 * 60 * 60;
1300
1301    private long lastCheckTime = -1;
1302    private boolean writeSniffing;
1303    private TableName writeTableName;
1304    private int writeDataTTL;
1305    private float regionsLowerLimit;
1306    private float regionsUpperLimit;
1307    private int checkPeriod;
1308    private boolean rawScanEnabled;
1309
1310    /**
1311     * This is a timeout per table. If read of each region in the table aggregated takes longer
1312     * than what is configured here, we log an ERROR rather than just an INFO.
1313     */
1314    private HashMap<String, Long> configuredReadTableTimeouts;
1315
1316    private long configuredWriteTableTimeout;
1317
1318    public RegionMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
1319        Sink sink, ExecutorService executor, boolean writeSniffing, TableName writeTableName,
1320        boolean treatFailureAsError, HashMap<String, Long> configuredReadTableTimeouts,
1321        long configuredWriteTableTimeout,
1322        long allowedFailures) {
1323      super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError,
1324          allowedFailures);
1325      Configuration conf = connection.getConfiguration();
1326      this.writeSniffing = writeSniffing;
1327      this.writeTableName = writeTableName;
1328      this.writeDataTTL =
1329          conf.getInt(HConstants.HBASE_CANARY_WRITE_DATA_TTL_KEY, DEFAULT_WRITE_DATA_TTL);
1330      this.regionsLowerLimit =
1331          conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY, 1.0f);
1332      this.regionsUpperLimit =
1333          conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_UPPERLIMIT_KEY, 1.5f);
1334      this.checkPeriod =
1335          conf.getInt(HConstants.HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY,
1336            DEFAULT_WRITE_TABLE_CHECK_PERIOD);
1337      this.rawScanEnabled = conf.getBoolean(HConstants.HBASE_CANARY_READ_RAW_SCAN_KEY, false);
1338      this.configuredReadTableTimeouts = new HashMap<>(configuredReadTableTimeouts);
1339      this.configuredWriteTableTimeout = configuredWriteTableTimeout;
1340    }
1341
1342    private RegionStdOutSink getSink() {
1343      if (!(sink instanceof RegionStdOutSink)) {
1344        throw new RuntimeException("Can only write to Region sink");
1345      }
1346      return ((RegionStdOutSink) sink);
1347    }
1348
1349    @Override
1350    public void run() {
1351      if (this.initAdmin()) {
1352        try {
1353          List<Future<Void>> taskFutures = new LinkedList<>();
1354          RegionStdOutSink regionSink = this.getSink();
1355          if (this.targets != null && this.targets.length > 0) {
1356            String[] tables = generateMonitorTables(this.targets);
1357            // Check to see that each table name passed in the -readTableTimeouts argument is also
1358            // passed as a monitor target.
1359            if (!new HashSet<>(Arrays.asList(tables)).
1360                containsAll(this.configuredReadTableTimeouts.keySet())) {
1361              LOG.error("-readTableTimeouts can only specify read timeouts for monitor targets " +
1362                  "passed via command line.");
1363              this.errorCode = USAGE_EXIT_CODE;
1364              return;
1365            }
1366            this.initialized = true;
1367            for (String table : tables) {
1368              LongAdder readLatency = regionSink.initializeAndGetReadLatencyForTable(table);
1369              taskFutures.addAll(CanaryTool.sniff(admin, regionSink, table, executor, TaskType.READ,
1370                this.rawScanEnabled, readLatency));
1371            }
1372          } else {
1373            taskFutures.addAll(sniff(TaskType.READ, regionSink));
1374          }
1375
1376          if (writeSniffing) {
1377            if (EnvironmentEdgeManager.currentTime() - lastCheckTime > checkPeriod) {
1378              try {
1379                checkWriteTableDistribution();
1380              } catch (IOException e) {
1381                LOG.error("Check canary table distribution failed!", e);
1382              }
1383              lastCheckTime = EnvironmentEdgeManager.currentTime();
1384            }
1385            // sniff canary table with write operation
1386            regionSink.initializeWriteLatency();
1387            LongAdder writeTableLatency = regionSink.getWriteLatency();
1388            taskFutures.addAll(CanaryTool.sniff(admin, regionSink, admin.getDescriptor(writeTableName),
1389              executor, TaskType.WRITE, this.rawScanEnabled, writeTableLatency));
1390          }
1391
1392          for (Future<Void> future : taskFutures) {
1393            try {
1394              future.get();
1395            } catch (ExecutionException e) {
1396              LOG.error("Sniff region failed!", e);
1397            }
1398          }
1399          Map<String, LongAdder> actualReadTableLatency = regionSink.getReadLatencyMap();
1400          for (Map.Entry<String, Long> entry : configuredReadTableTimeouts.entrySet()) {
1401            String tableName = entry.getKey();
1402            if (actualReadTableLatency.containsKey(tableName)) {
1403              Long actual = actualReadTableLatency.get(tableName).longValue();
1404              Long configured = entry.getValue();
1405              if (actual > configured) {
1406                LOG.error("Read operation for {} took {}ms exceeded the configured read timeout." +
1407                    "(Configured read timeout {}ms.", tableName, actual, configured);
1408              } else {
1409                LOG.info("Read operation for {} took {}ms (Configured read timeout {}ms.",
1410                    tableName, actual, configured);
1411              }
1412            } else {
1413              LOG.error("Read operation for {} failed!", tableName);
1414            }
1415          }
1416          if (this.writeSniffing) {
1417            String writeTableStringName = this.writeTableName.getNameAsString();
1418            long actualWriteLatency = regionSink.getWriteLatency().longValue();
1419            LOG.info("Write operation for {} took {}ms. Configured write timeout {}ms.",
1420                writeTableStringName, actualWriteLatency, this.configuredWriteTableTimeout);
1421            // Check that the writeTable write operation latency does not exceed the configured timeout.
1422            if (actualWriteLatency > this.configuredWriteTableTimeout) {
1423              LOG.error("Write operation for {} exceeded the configured write timeout.",
1424                  writeTableStringName);
1425            }
1426          }
1427        } catch (Exception e) {
1428          LOG.error("Run regionMonitor failed", e);
1429          this.errorCode = ERROR_EXIT_CODE;
1430        } finally {
1431          this.done = true;
1432        }
1433      }
1434      this.done = true;
1435    }
1436
1437    /**
1438     * @return List of tables to use in test.
1439     */
1440    private String[] generateMonitorTables(String[] monitorTargets) throws IOException {
1441      String[] returnTables = null;
1442
1443      if (this.useRegExp) {
1444        Pattern pattern = null;
1445        List<TableDescriptor> tds = null;
1446        Set<String> tmpTables = new TreeSet<>();
1447        try {
1448          LOG.debug(String.format("reading list of tables"));
1449          tds = this.admin.listTableDescriptors(pattern);
1450          if (tds == null) {
1451            tds = Collections.emptyList();
1452          }
1453          for (String monitorTarget : monitorTargets) {
1454            pattern = Pattern.compile(monitorTarget);
1455            for (TableDescriptor td : tds) {
1456              if (pattern.matcher(td.getTableName().getNameAsString()).matches()) {
1457                tmpTables.add(td.getTableName().getNameAsString());
1458              }
1459            }
1460          }
1461        } catch (IOException e) {
1462          LOG.error("Communicate with admin failed", e);
1463          throw e;
1464        }
1465
1466        if (tmpTables.size() > 0) {
1467          returnTables = tmpTables.toArray(new String[tmpTables.size()]);
1468        } else {
1469          String msg = "No HTable found, tablePattern:" + Arrays.toString(monitorTargets);
1470          LOG.error(msg);
1471          this.errorCode = INIT_ERROR_EXIT_CODE;
1472          throw new TableNotFoundException(msg);
1473        }
1474      } else {
1475        returnTables = monitorTargets;
1476      }
1477
1478      return returnTables;
1479    }
1480
1481    /*
1482     * Canary entry point to monitor all the tables.
1483     */
1484    private List<Future<Void>> sniff(TaskType taskType, RegionStdOutSink regionSink)
1485        throws Exception {
1486      LOG.debug("Reading list of tables");
1487      List<Future<Void>> taskFutures = new LinkedList<>();
1488      for (TableDescriptor td: admin.listTableDescriptors()) {
1489        if (admin.tableExists(td.getTableName()) && admin.isTableEnabled(td.getTableName()) &&
1490            (!td.getTableName().equals(writeTableName))) {
1491          LongAdder readLatency =
1492              regionSink.initializeAndGetReadLatencyForTable(td.getTableName().getNameAsString());
1493          taskFutures.addAll(CanaryTool.sniff(admin, sink, td, executor, taskType, this.rawScanEnabled,
1494              readLatency));
1495        }
1496      }
1497      return taskFutures;
1498    }
1499
1500    private void checkWriteTableDistribution() throws IOException {
1501      if (!admin.tableExists(writeTableName)) {
1502        int numberOfServers = admin.getRegionServers().size();
1503        if (numberOfServers == 0) {
1504          throw new IllegalStateException("No live regionservers");
1505        }
1506        createWriteTable(numberOfServers);
1507      }
1508
1509      if (!admin.isTableEnabled(writeTableName)) {
1510        admin.enableTable(writeTableName);
1511      }
1512
1513      ClusterMetrics status =
1514          admin.getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER));
1515      int numberOfServers = status.getServersName().size();
1516      if (status.getServersName().contains(status.getMasterName())) {
1517        numberOfServers -= 1;
1518      }
1519
1520      List<Pair<RegionInfo, ServerName>> pairs =
1521          MetaTableAccessor.getTableRegionsAndLocations(connection, writeTableName);
1522      int numberOfRegions = pairs.size();
1523      if (numberOfRegions < numberOfServers * regionsLowerLimit
1524          || numberOfRegions > numberOfServers * regionsUpperLimit) {
1525        admin.disableTable(writeTableName);
1526        admin.deleteTable(writeTableName);
1527        createWriteTable(numberOfServers);
1528      }
1529      HashSet<ServerName> serverSet = new HashSet<>();
1530      for (Pair<RegionInfo, ServerName> pair : pairs) {
1531        serverSet.add(pair.getSecond());
1532      }
1533      int numberOfCoveredServers = serverSet.size();
1534      if (numberOfCoveredServers < numberOfServers) {
1535        admin.balance();
1536      }
1537    }
1538
1539    private void createWriteTable(int numberOfServers) throws IOException {
1540      int numberOfRegions = (int)(numberOfServers * regionsLowerLimit);
1541      LOG.info("Number of live regionservers {}, pre-splitting the canary table into {} regions " +
1542        "(current lower limit of regions per server is {} and you can change it with config {}).",
1543          numberOfServers, numberOfRegions, regionsLowerLimit,
1544          HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY);
1545      ColumnFamilyDescriptor family = ColumnFamilyDescriptorBuilder
1546        .newBuilder(Bytes.toBytes(CANARY_TABLE_FAMILY_NAME)).setMaxVersions(1)
1547        .setTimeToLive(writeDataTTL).build();
1548      TableDescriptor desc = TableDescriptorBuilder.newBuilder(writeTableName)
1549        .setColumnFamily(family).build();
1550      byte[][] splits = new RegionSplitter.HexStringSplit().split(numberOfRegions);
1551      admin.createTable(desc, splits);
1552    }
1553  }
1554
1555  /**
1556   * Canary entry point for specified table.
1557   * @throws Exception
1558   */
1559  private static List<Future<Void>> sniff(final Admin admin, final Sink sink, String tableName,
1560      ExecutorService executor, TaskType taskType, boolean rawScanEnabled, LongAdder readLatency)
1561      throws Exception {
1562    LOG.debug("Checking table is enabled and getting table descriptor for table {}", tableName);
1563    if (admin.isTableEnabled(TableName.valueOf(tableName))) {
1564      return CanaryTool.sniff(admin, sink, admin.getDescriptor(TableName.valueOf(tableName)),
1565        executor, taskType, rawScanEnabled, readLatency);
1566    } else {
1567      LOG.warn("Table {} is not enabled", tableName);
1568    }
1569    return new LinkedList<>();
1570  }
1571
1572  /*
1573   * Loops over regions of this table, and outputs information about the state.
1574   */
1575  private static List<Future<Void>> sniff(final Admin admin, final Sink sink,
1576      TableDescriptor tableDesc, ExecutorService executor, TaskType taskType,
1577      boolean rawScanEnabled, LongAdder rwLatency) throws Exception {
1578    LOG.debug("Reading list of regions for table {}", tableDesc.getTableName());
1579    try (Table table = admin.getConnection().getTable(tableDesc.getTableName())) {
1580      List<RegionTask> tasks = new ArrayList<>();
1581      try (RegionLocator regionLocator =
1582               admin.getConnection().getRegionLocator(tableDesc.getTableName())) {
1583        for (HRegionLocation location: regionLocator.getAllRegionLocations()) {
1584          if (location == null) {
1585            LOG.warn("Null location");
1586            continue;
1587          }
1588          ServerName rs = location.getServerName();
1589          RegionInfo region = location.getRegion();
1590          tasks.add(new RegionTask(admin.getConnection(), region, rs, (RegionStdOutSink)sink,
1591              taskType, rawScanEnabled, rwLatency));
1592          Map<String, List<RegionTaskResult>> regionMap = ((RegionStdOutSink) sink).getRegionMap();
1593          regionMap.put(region.getRegionNameAsString(), new ArrayList<RegionTaskResult>());
1594        }
1595        return executor.invokeAll(tasks);
1596      }
1597    } catch (TableNotFoundException e) {
1598      return Collections.EMPTY_LIST;
1599    }
1600  }
1601
1602  //  monitor for zookeeper mode
1603  private static class ZookeeperMonitor extends Monitor {
1604    private List<String> hosts;
1605    private final String znode;
1606    private final int timeout;
1607
1608    protected ZookeeperMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
1609        Sink sink, ExecutorService executor, boolean treatFailureAsError, long allowedFailures)  {
1610      super(connection, monitorTargets, useRegExp,
1611          sink, executor, treatFailureAsError, allowedFailures);
1612      Configuration configuration = connection.getConfiguration();
1613      znode =
1614          configuration.get(ZOOKEEPER_ZNODE_PARENT,
1615              DEFAULT_ZOOKEEPER_ZNODE_PARENT);
1616      timeout = configuration
1617          .getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
1618      ConnectStringParser parser =
1619          new ConnectStringParser(ZKConfig.getZKQuorumServersString(configuration));
1620      hosts = Lists.newArrayList();
1621      for (InetSocketAddress server : parser.getServerAddresses()) {
1622        hosts.add(server.toString());
1623      }
1624      if (allowedFailures > (hosts.size() - 1) / 2) {
1625        LOG.warn("Confirm allowable number of failed ZooKeeper nodes, as quorum will " +
1626                        "already be lost. Setting of {} failures is unexpected for {} ensemble size.",
1627                allowedFailures, hosts.size());
1628      }
1629    }
1630
1631    @Override public void run() {
1632      List<ZookeeperTask> tasks = Lists.newArrayList();
1633      ZookeeperStdOutSink zkSink = null;
1634      try {
1635        zkSink = this.getSink();
1636      } catch (RuntimeException e) {
1637        LOG.error("Run ZooKeeperMonitor failed!", e);
1638        this.errorCode = ERROR_EXIT_CODE;
1639      }
1640      this.initialized = true;
1641      for (final String host : hosts) {
1642        tasks.add(new ZookeeperTask(connection, host, znode, timeout, zkSink));
1643      }
1644      try {
1645        for (Future<Void> future : this.executor.invokeAll(tasks)) {
1646          try {
1647            future.get();
1648          } catch (ExecutionException e) {
1649            LOG.error("Sniff zookeeper failed!", e);
1650            this.errorCode = ERROR_EXIT_CODE;
1651          }
1652        }
1653      } catch (InterruptedException e) {
1654        this.errorCode = ERROR_EXIT_CODE;
1655        Thread.currentThread().interrupt();
1656        LOG.error("Sniff zookeeper interrupted!", e);
1657      }
1658      this.done = true;
1659    }
1660
1661    private ZookeeperStdOutSink getSink() {
1662      if (!(sink instanceof ZookeeperStdOutSink)) {
1663        throw new RuntimeException("Can only write to zookeeper sink");
1664      }
1665      return ((ZookeeperStdOutSink) sink);
1666    }
1667  }
1668
1669
1670  /**
1671   * A monitor for regionserver mode
1672   */
1673  private static class RegionServerMonitor extends Monitor {
1674    private boolean allRegions;
1675
1676    public RegionServerMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
1677        Sink sink, ExecutorService executor, boolean allRegions,
1678        boolean treatFailureAsError, long allowedFailures) {
1679      super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError,
1680          allowedFailures);
1681      this.allRegions = allRegions;
1682    }
1683
1684    private RegionServerStdOutSink getSink() {
1685      if (!(sink instanceof RegionServerStdOutSink)) {
1686        throw new RuntimeException("Can only write to regionserver sink");
1687      }
1688      return ((RegionServerStdOutSink) sink);
1689    }
1690
1691    @Override
1692    public void run() {
1693      if (this.initAdmin() && this.checkNoTableNames()) {
1694        RegionServerStdOutSink regionServerSink = null;
1695        try {
1696          regionServerSink = this.getSink();
1697        } catch (RuntimeException e) {
1698          LOG.error("Run RegionServerMonitor failed!", e);
1699          this.errorCode = ERROR_EXIT_CODE;
1700        }
1701        Map<String, List<RegionInfo>> rsAndRMap = this.filterRegionServerByName();
1702        this.initialized = true;
1703        this.monitorRegionServers(rsAndRMap, regionServerSink);
1704      }
1705      this.done = true;
1706    }
1707
1708    private boolean checkNoTableNames() {
1709      List<String> foundTableNames = new ArrayList<>();
1710      TableName[] tableNames = null;
1711      LOG.debug("Reading list of tables");
1712      try {
1713        tableNames = this.admin.listTableNames();
1714      } catch (IOException e) {
1715        LOG.error("Get listTableNames failed", e);
1716        this.errorCode = INIT_ERROR_EXIT_CODE;
1717        return false;
1718      }
1719
1720      if (this.targets == null || this.targets.length == 0) return true;
1721
1722      for (String target : this.targets) {
1723        for (TableName tableName : tableNames) {
1724          if (target.equals(tableName.getNameAsString())) {
1725            foundTableNames.add(target);
1726          }
1727        }
1728      }
1729
1730      if (foundTableNames.size() > 0) {
1731        System.err.println("Cannot pass a tablename when using the -regionserver " +
1732            "option, tablenames:" + foundTableNames.toString());
1733        this.errorCode = USAGE_EXIT_CODE;
1734      }
1735      return foundTableNames.isEmpty();
1736    }
1737
1738    private void monitorRegionServers(Map<String, List<RegionInfo>> rsAndRMap, RegionServerStdOutSink regionServerSink) {
1739      List<RegionServerTask> tasks = new ArrayList<>();
1740      Map<String, AtomicLong> successMap = new HashMap<>();
1741      Random rand = new Random();
1742      for (Map.Entry<String, List<RegionInfo>> entry : rsAndRMap.entrySet()) {
1743        String serverName = entry.getKey();
1744        AtomicLong successes = new AtomicLong(0);
1745        successMap.put(serverName, successes);
1746        if (entry.getValue().isEmpty()) {
1747          LOG.error("Regionserver not serving any regions - {}", serverName);
1748        } else if (this.allRegions) {
1749          for (RegionInfo region : entry.getValue()) {
1750            tasks.add(new RegionServerTask(this.connection,
1751                serverName,
1752                region,
1753                regionServerSink,
1754                successes));
1755          }
1756        } else {
1757          // random select a region if flag not set
1758          RegionInfo region = entry.getValue().get(rand.nextInt(entry.getValue().size()));
1759          tasks.add(new RegionServerTask(this.connection,
1760              serverName,
1761              region,
1762              regionServerSink,
1763              successes));
1764        }
1765      }
1766      try {
1767        for (Future<Void> future : this.executor.invokeAll(tasks)) {
1768          try {
1769            future.get();
1770          } catch (ExecutionException e) {
1771            LOG.error("Sniff regionserver failed!", e);
1772            this.errorCode = ERROR_EXIT_CODE;
1773          }
1774        }
1775        if (this.allRegions) {
1776          for (Map.Entry<String, List<RegionInfo>> entry : rsAndRMap.entrySet()) {
1777            String serverName = entry.getKey();
1778            LOG.info("Successfully read {} regions out of {} on regionserver {}",
1779                successMap.get(serverName), entry.getValue().size(), serverName);
1780          }
1781        }
1782      } catch (InterruptedException e) {
1783        this.errorCode = ERROR_EXIT_CODE;
1784        LOG.error("Sniff regionserver interrupted!", e);
1785      }
1786    }
1787
1788    private Map<String, List<RegionInfo>> filterRegionServerByName() {
1789      Map<String, List<RegionInfo>> regionServerAndRegionsMap = this.getAllRegionServerByName();
1790      regionServerAndRegionsMap = this.doFilterRegionServerByName(regionServerAndRegionsMap);
1791      return regionServerAndRegionsMap;
1792    }
1793
1794    private Map<String, List<RegionInfo>> getAllRegionServerByName() {
1795      Map<String, List<RegionInfo>> rsAndRMap = new HashMap<>();
1796      try {
1797        LOG.debug("Reading list of tables and locations");
1798        List<TableDescriptor> tableDescs = this.admin.listTableDescriptors();
1799        List<RegionInfo> regions = null;
1800        for (TableDescriptor tableDesc: tableDescs) {
1801          try (RegionLocator regionLocator =
1802                   this.admin.getConnection().getRegionLocator(tableDesc.getTableName())) {
1803            for (HRegionLocation location : regionLocator.getAllRegionLocations()) {
1804              if (location == null) {
1805                LOG.warn("Null location");
1806                continue;
1807              }
1808              ServerName rs = location.getServerName();
1809              String rsName = rs.getHostname();
1810              RegionInfo r = location.getRegion();
1811              if (rsAndRMap.containsKey(rsName)) {
1812                regions = rsAndRMap.get(rsName);
1813              } else {
1814                regions = new ArrayList<>();
1815                rsAndRMap.put(rsName, regions);
1816              }
1817              regions.add(r);
1818            }
1819          }
1820        }
1821
1822        // get any live regionservers not serving any regions
1823        for (ServerName rs: this.admin.getRegionServers()) {
1824          String rsName = rs.getHostname();
1825          if (!rsAndRMap.containsKey(rsName)) {
1826            rsAndRMap.put(rsName, Collections.<RegionInfo> emptyList());
1827          }
1828        }
1829      } catch (IOException e) {
1830        LOG.error("Get HTables info failed", e);
1831        this.errorCode = INIT_ERROR_EXIT_CODE;
1832      }
1833      return rsAndRMap;
1834    }
1835
1836    private Map<String, List<RegionInfo>> doFilterRegionServerByName(
1837        Map<String, List<RegionInfo>> fullRsAndRMap) {
1838
1839      Map<String, List<RegionInfo>> filteredRsAndRMap = null;
1840
1841      if (this.targets != null && this.targets.length > 0) {
1842        filteredRsAndRMap = new HashMap<>();
1843        Pattern pattern = null;
1844        Matcher matcher = null;
1845        boolean regExpFound = false;
1846        for (String rsName : this.targets) {
1847          if (this.useRegExp) {
1848            regExpFound = false;
1849            pattern = Pattern.compile(rsName);
1850            for (Map.Entry<String, List<RegionInfo>> entry : fullRsAndRMap.entrySet()) {
1851              matcher = pattern.matcher(entry.getKey());
1852              if (matcher.matches()) {
1853                filteredRsAndRMap.put(entry.getKey(), entry.getValue());
1854                regExpFound = true;
1855              }
1856            }
1857            if (!regExpFound) {
1858              LOG.info("No RegionServerInfo found, regionServerPattern {}", rsName);
1859            }
1860          } else {
1861            if (fullRsAndRMap.containsKey(rsName)) {
1862              filteredRsAndRMap.put(rsName, fullRsAndRMap.get(rsName));
1863            } else {
1864              LOG.info("No RegionServerInfo found, regionServerName {}", rsName);
1865            }
1866          }
1867        }
1868      } else {
1869        filteredRsAndRMap = fullRsAndRMap;
1870      }
1871      return filteredRsAndRMap;
1872    }
1873  }
1874
1875  public static void main(String[] args) throws Exception {
1876    final Configuration conf = HBaseConfiguration.create();
1877
1878    int numThreads = conf.getInt("hbase.canary.threads.num", MAX_THREADS_NUM);
1879    LOG.info("Execution thread count={}", numThreads);
1880
1881    int exitCode;
1882    ExecutorService executor = new ScheduledThreadPoolExecutor(numThreads);
1883    try {
1884      exitCode = ToolRunner.run(conf, new CanaryTool(executor), args);
1885    } finally {
1886      executor.shutdown();
1887    }
1888    System.exit(exitCode);
1889  }
1890}