001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.tool;
020
021import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT;
022import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_ZNODE_PARENT;
023import java.io.Closeable;
024import java.io.IOException;
025import java.net.BindException;
026import java.net.InetSocketAddress;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Collections;
030import java.util.EnumSet;
031import java.util.HashMap;
032import java.util.HashSet;
033import java.util.LinkedList;
034import java.util.List;
035import java.util.Map;
036import java.util.Random;
037import java.util.Set;
038import java.util.TreeSet;
039import java.util.concurrent.Callable;
040import java.util.concurrent.ConcurrentHashMap;
041import java.util.concurrent.ConcurrentMap;
042import java.util.concurrent.ExecutionException;
043import java.util.concurrent.ExecutorService;
044import java.util.concurrent.Future;
045import java.util.concurrent.ScheduledThreadPoolExecutor;
046import java.util.concurrent.ThreadLocalRandom;
047import java.util.concurrent.atomic.AtomicLong;
048import java.util.concurrent.atomic.LongAdder;
049import java.util.regex.Matcher;
050import java.util.regex.Pattern;
051import org.apache.commons.lang3.time.StopWatch;
052import org.apache.hadoop.conf.Configuration;
053import org.apache.hadoop.hbase.AuthUtil;
054import org.apache.hadoop.hbase.ChoreService;
055import org.apache.hadoop.hbase.ClusterMetrics;
056import org.apache.hadoop.hbase.ClusterMetrics.Option;
057import org.apache.hadoop.hbase.DoNotRetryIOException;
058import org.apache.hadoop.hbase.HBaseConfiguration;
059import org.apache.hadoop.hbase.HBaseInterfaceAudience;
060import org.apache.hadoop.hbase.HConstants;
061import org.apache.hadoop.hbase.HRegionLocation;
062import org.apache.hadoop.hbase.MetaTableAccessor;
063import org.apache.hadoop.hbase.NamespaceDescriptor;
064import org.apache.hadoop.hbase.ScheduledChore;
065import org.apache.hadoop.hbase.ServerName;
066import org.apache.hadoop.hbase.TableName;
067import org.apache.hadoop.hbase.TableNotEnabledException;
068import org.apache.hadoop.hbase.TableNotFoundException;
069import org.apache.hadoop.hbase.client.Admin;
070import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
071import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
072import org.apache.hadoop.hbase.client.Connection;
073import org.apache.hadoop.hbase.client.ConnectionFactory;
074import org.apache.hadoop.hbase.client.Get;
075import org.apache.hadoop.hbase.client.Put;
076import org.apache.hadoop.hbase.client.RegionInfo;
077import org.apache.hadoop.hbase.client.RegionLocator;
078import org.apache.hadoop.hbase.client.ResultScanner;
079import org.apache.hadoop.hbase.client.Scan;
080import org.apache.hadoop.hbase.client.Table;
081import org.apache.hadoop.hbase.client.TableDescriptor;
082import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
083import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
084import org.apache.hadoop.hbase.http.InfoServer;
085import org.apache.hadoop.hbase.tool.CanaryTool.RegionTask.TaskType;
086import org.apache.hadoop.hbase.util.Bytes;
087import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
088import org.apache.hadoop.hbase.util.Pair;
089import org.apache.hadoop.hbase.util.ReflectionUtils;
090import org.apache.hadoop.hbase.util.RegionSplitter;
091import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
092import org.apache.hadoop.hbase.zookeeper.ZKConfig;
093import org.apache.hadoop.util.Tool;
094import org.apache.hadoop.util.ToolRunner;
095import org.apache.yetus.audience.InterfaceAudience;
096import org.apache.zookeeper.KeeperException;
097import org.apache.zookeeper.ZooKeeper;
098import org.apache.zookeeper.client.ConnectStringParser;
099import org.apache.zookeeper.data.Stat;
100import org.slf4j.Logger;
101import org.slf4j.LoggerFactory;
102
103import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
104
105/**
106 * HBase Canary Tool for "canary monitoring" of a running HBase cluster.
107 *
108 * There are three modes:
109 * <ol>
110 * <li>region mode (Default): For each region, try to get one row per column family outputting
111 * information on failure (ERROR) or else the latency.
112 * </li>
113 *
114 * <li>regionserver mode: For each regionserver try to get one row from one table selected
115 * randomly outputting information on failure (ERROR) or else the latency.
116 * </li>
117 *
118 * <li>zookeeper mode: for each zookeeper instance, selects a znode outputting information on
119 * failure (ERROR) or else the latency.
120 * </li>
121 * </ol>
122 */
123@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
124public class CanaryTool implements Tool, Canary {
125  public static final String HBASE_CANARY_INFO_PORT = "hbase.canary.info.port";
126  public static final String HBASE_CANARY_INFO_BINDADDRESS = "hbase.canary.info.bindAddress";
127
128  private void putUpWebUI() throws IOException {
129    int port = conf.getInt(HBASE_CANARY_INFO_PORT, -1);
130    // -1 is for disabling info server
131    if (port < 0) {
132      return;
133    }
134    if (zookeeperMode) {
135      LOG.info("WebUI is not supported in Zookeeper mode");
136    } else if (regionServerMode) {
137      LOG.info("WebUI is not supported in RegionServer mode");
138    } else {
139      String addr = conf.get(HBASE_CANARY_INFO_BINDADDRESS, "0.0.0.0");
140      try {
141        InfoServer infoServer = new InfoServer("canary", addr, port, false, conf);
142        infoServer.addUnprivilegedServlet("canary", "/canary-status", CanaryStatusServlet.class);
143        infoServer.setAttribute("sink", getSink(conf, RegionStdOutSink.class));
144        infoServer.start();
145        LOG.info("Bind Canary http info server to {}:{} ", addr, port);
146      } catch (BindException e) {
147        LOG.warn("Failed binding Canary http info server to {}:{}", addr, port, e);
148      }
149    }
150  }
151
152  @Override
153  public int checkRegions(String[] targets) throws Exception {
154    String configuredReadTableTimeoutsStr = conf.get(HBASE_CANARY_REGION_READ_TABLE_TIMEOUT);
155    try {
156      if (configuredReadTableTimeoutsStr != null) {
157        populateReadTableTimeoutsMap(configuredReadTableTimeoutsStr);
158      }
159    } catch (IllegalArgumentException e) {
160      LOG.error("Constructing read table timeouts map failed ", e);
161      return USAGE_EXIT_CODE;
162    }
163    return runMonitor(targets);
164  }
165
166  @Override
167  public int checkRegionServers(String[] targets) throws Exception {
168    regionServerMode = true;
169    return runMonitor(targets);
170  }
171
172  @Override
173  public int checkZooKeeper() throws Exception {
174    zookeeperMode = true;
175    return runMonitor(null);
176  }
177
178  /**
179   * Sink interface used by the canary to output information
180   */
181  public interface Sink {
182    long getReadFailureCount();
183    long incReadFailureCount();
184    Map<String,String> getReadFailures();
185    void updateReadFailures(String regionName, String serverName);
186    long getWriteFailureCount();
187    long incWriteFailureCount();
188    Map<String,String> getWriteFailures();
189    void updateWriteFailures(String regionName, String serverName);
190    long getReadSuccessCount();
191    long incReadSuccessCount();
192    long getWriteSuccessCount();
193    long incWriteSuccessCount();
194  }
195
196  /**
197   * Simple implementation of canary sink that allows plotting to a file or standard output.
198   */
199  public static class StdOutSink implements Sink {
200    private AtomicLong readFailureCount = new AtomicLong(0),
201        writeFailureCount = new AtomicLong(0),
202        readSuccessCount = new AtomicLong(0),
203        writeSuccessCount = new AtomicLong(0);
204    private Map<String, String> readFailures = new ConcurrentHashMap<>();
205    private Map<String, String> writeFailures = new ConcurrentHashMap<>();
206
207    @Override
208    public long getReadFailureCount() {
209      return readFailureCount.get();
210    }
211
212    @Override
213    public long incReadFailureCount() {
214      return readFailureCount.incrementAndGet();
215    }
216
217    @Override
218    public Map<String, String> getReadFailures() {
219      return readFailures;
220    }
221
222    @Override
223    public void updateReadFailures(String regionName, String serverName) {
224      readFailures.put(regionName, serverName);
225    }
226
227    @Override
228    public long getWriteFailureCount() {
229      return writeFailureCount.get();
230    }
231
232    @Override
233    public long incWriteFailureCount() {
234      return writeFailureCount.incrementAndGet();
235    }
236
237    @Override
238    public Map<String, String> getWriteFailures() {
239      return writeFailures;
240    }
241
242    @Override
243    public void updateWriteFailures(String regionName, String serverName) {
244      writeFailures.put(regionName, serverName);
245    }
246
247    @Override
248    public long getReadSuccessCount() {
249      return readSuccessCount.get();
250    }
251
252    @Override
253    public long incReadSuccessCount() {
254      return readSuccessCount.incrementAndGet();
255    }
256
257    @Override
258    public long getWriteSuccessCount() {
259      return writeSuccessCount.get();
260    }
261
262    @Override
263    public long incWriteSuccessCount() {
264      return writeSuccessCount.incrementAndGet();
265    }
266  }
267
268  /**
269   * By RegionServer, for 'regionserver' mode.
270   */
271  public static class RegionServerStdOutSink extends StdOutSink {
272    public void publishReadFailure(String table, String server) {
273      incReadFailureCount();
274      LOG.error("Read from {} on {}", table, server);
275    }
276
277    public void publishReadTiming(String table, String server, long msTime) {
278      LOG.info("Read from {} on {} in {}ms", table, server, msTime);
279    }
280  }
281
282  /**
283   * Output for 'zookeeper' mode.
284   */
285  public static class ZookeeperStdOutSink extends StdOutSink {
286    public void publishReadFailure(String znode, String server) {
287      incReadFailureCount();
288      LOG.error("Read from {} on {}", znode, server);
289    }
290
291    public void publishReadTiming(String znode, String server, long msTime) {
292      LOG.info("Read from {} on {} in {}ms", znode, server, msTime);
293    }
294  }
295
296  /**
297   * By Region, for 'region'  mode.
298   */
299  public static class RegionStdOutSink extends StdOutSink {
300    private Map<String, LongAdder> perTableReadLatency = new HashMap<>();
301    private LongAdder writeLatency = new LongAdder();
302    private final ConcurrentMap<String, List<RegionTaskResult>> regionMap =
303      new ConcurrentHashMap<>();
304    private ConcurrentMap<ServerName, LongAdder> perServerFailuresCount =
305      new ConcurrentHashMap<>();
306    private ConcurrentMap<String, LongAdder> perTableFailuresCount = new ConcurrentHashMap<>();
307
308    public ConcurrentMap<ServerName, LongAdder> getPerServerFailuresCount() {
309      return perServerFailuresCount;
310    }
311
312    public ConcurrentMap<String, LongAdder> getPerTableFailuresCount() {
313      return perTableFailuresCount;
314    }
315
316    public void resetFailuresCountDetails() {
317      perServerFailuresCount.clear();
318      perTableFailuresCount.clear();
319    }
320
321    private void incFailuresCountDetails(ServerName serverName, RegionInfo region) {
322      perServerFailuresCount.compute(serverName, (server, count) -> {
323        if (count == null) {
324          count = new LongAdder();
325        }
326        count.increment();
327        return count;
328      });
329      perTableFailuresCount.compute(region.getTable().getNameAsString(), (tableName, count) -> {
330        if (count == null) {
331          count = new LongAdder();
332        }
333        count.increment();
334        return count;
335      });
336    }
337
338    public void publishReadFailure(ServerName serverName, RegionInfo region, Exception e) {
339      incReadFailureCount();
340      incFailuresCountDetails(serverName, region);
341      LOG.error("Read from {} on serverName={} failed",
342          region.getRegionNameAsString(), serverName, e);
343    }
344
345    public void publishReadFailure(ServerName serverName, RegionInfo region,
346        ColumnFamilyDescriptor column, Exception e) {
347      incReadFailureCount();
348      incFailuresCountDetails(serverName, region);
349      LOG.error("Read from {} on serverName={}, columnFamily={} failed",
350          region.getRegionNameAsString(), serverName,
351          column.getNameAsString(), e);
352    }
353
354    public void publishReadTiming(ServerName serverName, RegionInfo region,
355        ColumnFamilyDescriptor column, long msTime) {
356      RegionTaskResult rtr = new RegionTaskResult(region, region.getTable(), serverName, column);
357      rtr.setReadSuccess();
358      rtr.setReadLatency(msTime);
359      List<RegionTaskResult> rtrs = regionMap.get(region.getRegionNameAsString());
360      rtrs.add(rtr);
361      // Note that read success count will be equal to total column family read successes.
362      incReadSuccessCount();
363      LOG.info("Read from {} on {} {} in {}ms", region.getRegionNameAsString(), serverName,
364          column.getNameAsString(), msTime);
365    }
366
367    public void publishWriteFailure(ServerName serverName, RegionInfo region, Exception e) {
368      incWriteFailureCount();
369      incFailuresCountDetails(serverName, region);
370      LOG.error("Write to {} on {} failed", region.getRegionNameAsString(), serverName, e);
371    }
372
373    public void publishWriteFailure(ServerName serverName, RegionInfo region,
374        ColumnFamilyDescriptor column, Exception e) {
375      incWriteFailureCount();
376      incFailuresCountDetails(serverName, region);
377      LOG.error("Write to {} on {} {} failed", region.getRegionNameAsString(), serverName,
378          column.getNameAsString(), e);
379    }
380
381    public void publishWriteTiming(ServerName serverName, RegionInfo region,
382        ColumnFamilyDescriptor column, long msTime) {
383      RegionTaskResult rtr = new RegionTaskResult(region, region.getTable(), serverName, column);
384      rtr.setWriteSuccess();
385      rtr.setWriteLatency(msTime);
386      List<RegionTaskResult> rtrs = regionMap.get(region.getRegionNameAsString());
387      rtrs.add(rtr);
388      // Note that write success count will be equal to total column family write successes.
389      incWriteSuccessCount();
390      LOG.info("Write to {} on {} {} in {}ms",
391        region.getRegionNameAsString(), serverName, column.getNameAsString(), msTime);
392    }
393
394    public Map<String, LongAdder> getReadLatencyMap() {
395      return this.perTableReadLatency;
396    }
397
398    public LongAdder initializeAndGetReadLatencyForTable(String tableName) {
399      LongAdder initLatency = new LongAdder();
400      this.perTableReadLatency.put(tableName, initLatency);
401      return initLatency;
402    }
403
404    public void initializeWriteLatency() {
405      this.writeLatency.reset();
406    }
407
408    public LongAdder getWriteLatency() {
409      return this.writeLatency;
410    }
411
412    public ConcurrentMap<String, List<RegionTaskResult>> getRegionMap() {
413      return this.regionMap;
414    }
415
416    public int getTotalExpectedRegions() {
417      return this.regionMap.size();
418    }
419  }
420
421  /**
422   * Run a single zookeeper Task and then exit.
423   */
424  static class ZookeeperTask implements Callable<Void> {
425    private final Connection connection;
426    private final String host;
427    private String znode;
428    private final int timeout;
429    private ZookeeperStdOutSink sink;
430
431    public ZookeeperTask(Connection connection, String host, String znode, int timeout,
432        ZookeeperStdOutSink sink) {
433      this.connection = connection;
434      this.host = host;
435      this.znode = znode;
436      this.timeout = timeout;
437      this.sink = sink;
438    }
439
440    @Override public Void call() throws Exception {
441      ZooKeeper zooKeeper = null;
442      try {
443        zooKeeper = new ZooKeeper(host, timeout, EmptyWatcher.instance);
444        Stat exists = zooKeeper.exists(znode, false);
445        StopWatch stopwatch = new StopWatch();
446        stopwatch.start();
447        zooKeeper.getData(znode, false, exists);
448        stopwatch.stop();
449        sink.publishReadTiming(znode, host, stopwatch.getTime());
450      } catch (KeeperException | InterruptedException e) {
451        sink.publishReadFailure(znode, host);
452      } finally {
453        if (zooKeeper != null) {
454          zooKeeper.close();
455        }
456      }
457      return null;
458    }
459  }
460
461  /**
462   * Run a single Region Task and then exit. For each column family of the Region, get one row and
463   * output latency or failure.
464   */
465  static class RegionTask implements Callable<Void> {
466    public enum TaskType{
467      READ, WRITE
468    }
469    private Connection connection;
470    private RegionInfo region;
471    private RegionStdOutSink sink;
472    private TaskType taskType;
473    private boolean rawScanEnabled;
474    private ServerName serverName;
475    private LongAdder readWriteLatency;
476    private boolean readAllCF;
477
478    RegionTask(Connection connection, RegionInfo region, ServerName serverName,
479        RegionStdOutSink sink, TaskType taskType, boolean rawScanEnabled, LongAdder rwLatency,
480        boolean readAllCF) {
481      this.connection = connection;
482      this.region = region;
483      this.serverName = serverName;
484      this.sink = sink;
485      this.taskType = taskType;
486      this.rawScanEnabled = rawScanEnabled;
487      this.readWriteLatency = rwLatency;
488      this.readAllCF = readAllCF;
489    }
490
491    @Override
492    public Void call() {
493      switch (taskType) {
494        case READ:
495          return read();
496        case WRITE:
497          return write();
498        default:
499          return read();
500      }
501    }
502
503    private Void readColumnFamily(Table table, ColumnFamilyDescriptor column) {
504      byte[] startKey = null;
505      Get get = null;
506      Scan scan = null;
507      ResultScanner rs = null;
508      StopWatch stopWatch = new StopWatch();
509      startKey = region.getStartKey();
510      // Can't do a get on empty start row so do a Scan of first element if any instead.
511      if (startKey.length > 0) {
512        get = new Get(startKey);
513        get.setCacheBlocks(false);
514        get.setFilter(new FirstKeyOnlyFilter());
515        get.addFamily(column.getName());
516      } else {
517        scan = new Scan();
518        LOG.debug("rawScan {} for {}", rawScanEnabled, region.getTable());
519        scan.setRaw(rawScanEnabled);
520        scan.setCaching(1);
521        scan.setCacheBlocks(false);
522        scan.setFilter(new FirstKeyOnlyFilter());
523        scan.addFamily(column.getName());
524        scan.setMaxResultSize(1L);
525        scan.setOneRowLimit();
526      }
527      LOG.debug("Reading from {} {} {} {}", region.getTable(), region.getRegionNameAsString(),
528        column.getNameAsString(), Bytes.toStringBinary(startKey));
529      try {
530        stopWatch.start();
531        if (startKey.length > 0) {
532          table.get(get);
533        } else {
534          rs = table.getScanner(scan);
535          rs.next();
536        }
537        stopWatch.stop();
538        this.readWriteLatency.add(stopWatch.getTime());
539        sink.publishReadTiming(serverName, region, column, stopWatch.getTime());
540      } catch (Exception e) {
541        sink.publishReadFailure(serverName, region, column, e);
542        sink.updateReadFailures(region.getRegionNameAsString(),
543          serverName == null ? "NULL" : serverName.getHostname());
544      } finally {
545        if (rs != null) {
546          rs.close();
547        }
548      }
549      return null;
550    }
551
552    private ColumnFamilyDescriptor randomPickOneColumnFamily(ColumnFamilyDescriptor[] cfs) {
553      int size = cfs.length;
554      return cfs[ThreadLocalRandom.current().nextInt(size)];
555
556    }
557
558    public Void read() {
559      Table table = null;
560      TableDescriptor tableDesc = null;
561      try {
562        LOG.debug("Reading table descriptor for table {}", region.getTable());
563        table = connection.getTable(region.getTable());
564        tableDesc = table.getDescriptor();
565      } catch (IOException e) {
566        LOG.debug("sniffRegion {} of {} failed", region.getEncodedName(), e);
567        sink.publishReadFailure(serverName, region, e);
568        if (table != null) {
569          try {
570            table.close();
571          } catch (IOException ioe) {
572            LOG.error("Close table failed", e);
573          }
574        }
575        return null;
576      }
577
578      if (readAllCF) {
579        for (ColumnFamilyDescriptor column : tableDesc.getColumnFamilies()) {
580          readColumnFamily(table, column);
581        }
582      } else {
583        readColumnFamily(table, randomPickOneColumnFamily(tableDesc.getColumnFamilies()));
584      }
585      try {
586        table.close();
587      } catch (IOException e) {
588        LOG.error("Close table failed", e);
589      }
590      return null;
591    }
592
593    /**
594     * Check writes for the canary table
595     */
596    private Void write() {
597      Table table = null;
598      TableDescriptor tableDesc = null;
599      try {
600        table = connection.getTable(region.getTable());
601        tableDesc = table.getDescriptor();
602        byte[] rowToCheck = region.getStartKey();
603        if (rowToCheck.length == 0) {
604          rowToCheck = new byte[]{0x0};
605        }
606        int writeValueSize =
607            connection.getConfiguration().getInt(HConstants.HBASE_CANARY_WRITE_VALUE_SIZE_KEY, 10);
608        for (ColumnFamilyDescriptor column : tableDesc.getColumnFamilies()) {
609          Put put = new Put(rowToCheck);
610          byte[] value = new byte[writeValueSize];
611          Bytes.random(value);
612          put.addColumn(column.getName(), HConstants.EMPTY_BYTE_ARRAY, value);
613
614          LOG.debug("Writing to {} {} {} {}",
615            tableDesc.getTableName(), region.getRegionNameAsString(), column.getNameAsString(),
616            Bytes.toStringBinary(rowToCheck));
617          try {
618            long startTime = System.currentTimeMillis();
619            table.put(put);
620            long time = System.currentTimeMillis() - startTime;
621            this.readWriteLatency.add(time);
622            sink.publishWriteTiming(serverName, region, column, time);
623          } catch (Exception e) {
624            sink.publishWriteFailure(serverName, region, column, e);
625          }
626        }
627        table.close();
628      } catch (IOException e) {
629        sink.publishWriteFailure(serverName, region, e);
630        sink.updateWriteFailures(region.getRegionNameAsString(), serverName.getHostname());
631      }
632      return null;
633    }
634  }
635
636  /**
637   * Run a single RegionServer Task and then exit.
638   * Get one row from a region on the regionserver and output latency or the failure.
639   */
640  static class RegionServerTask implements Callable<Void> {
641    private Connection connection;
642    private String serverName;
643    private RegionInfo region;
644    private RegionServerStdOutSink sink;
645    private AtomicLong successes;
646
647    RegionServerTask(Connection connection, String serverName, RegionInfo region,
648        RegionServerStdOutSink sink, AtomicLong successes) {
649      this.connection = connection;
650      this.serverName = serverName;
651      this.region = region;
652      this.sink = sink;
653      this.successes = successes;
654    }
655
656    @Override
657    public Void call() {
658      TableName tableName = null;
659      Table table = null;
660      Get get = null;
661      byte[] startKey = null;
662      Scan scan = null;
663      StopWatch stopWatch = new StopWatch();
664      // monitor one region on every region server
665      stopWatch.reset();
666      try {
667        tableName = region.getTable();
668        table = connection.getTable(tableName);
669        startKey = region.getStartKey();
670        // Can't do a get on empty start row so do a Scan of first element if any instead.
671        LOG.debug("Reading from {} {} {} {}",
672          serverName, region.getTable(), region.getRegionNameAsString(),
673          Bytes.toStringBinary(startKey));
674        if (startKey.length > 0) {
675          get = new Get(startKey);
676          get.setCacheBlocks(false);
677          get.setFilter(new FirstKeyOnlyFilter());
678          stopWatch.start();
679          table.get(get);
680          stopWatch.stop();
681        } else {
682          scan = new Scan();
683          scan.setCacheBlocks(false);
684          scan.setFilter(new FirstKeyOnlyFilter());
685          scan.setCaching(1);
686          scan.setMaxResultSize(1L);
687          scan.setOneRowLimit();
688          stopWatch.start();
689          ResultScanner s = table.getScanner(scan);
690          s.next();
691          s.close();
692          stopWatch.stop();
693        }
694        successes.incrementAndGet();
695        sink.publishReadTiming(tableName.getNameAsString(), serverName, stopWatch.getTime());
696      } catch (TableNotFoundException tnfe) {
697        LOG.error("Table may be deleted", tnfe);
698        // This is ignored because it doesn't imply that the regionserver is dead
699      } catch (TableNotEnabledException tnee) {
700        // This is considered a success since we got a response.
701        successes.incrementAndGet();
702        LOG.debug("The targeted table was disabled.  Assuming success.");
703      } catch (DoNotRetryIOException dnrioe) {
704        sink.publishReadFailure(tableName.getNameAsString(), serverName);
705        LOG.error(dnrioe.toString(), dnrioe);
706      } catch (IOException e) {
707        sink.publishReadFailure(tableName.getNameAsString(), serverName);
708        LOG.error(e.toString(), e);
709      } finally {
710        if (table != null) {
711          try {
712            table.close();
713          } catch (IOException e) {/* DO NOTHING */
714            LOG.error("Close table failed", e);
715          }
716        }
717        scan = null;
718        get = null;
719        startKey = null;
720      }
721      return null;
722    }
723  }
724
725  private static final int USAGE_EXIT_CODE = 1;
726  private static final int INIT_ERROR_EXIT_CODE = 2;
727  private static final int TIMEOUT_ERROR_EXIT_CODE = 3;
728  private static final int ERROR_EXIT_CODE = 4;
729  private static final int FAILURE_EXIT_CODE = 5;
730
731  private static final long DEFAULT_INTERVAL = 60000;
732
733  private static final long DEFAULT_TIMEOUT = 600000; // 10 mins
734  private static final int MAX_THREADS_NUM = 16; // #threads to contact regions
735
736  private static final Logger LOG = LoggerFactory.getLogger(Canary.class);
737
738  public static final TableName DEFAULT_WRITE_TABLE_NAME = TableName.valueOf(
739    NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "canary");
740
741  private static final String CANARY_TABLE_FAMILY_NAME = "Test";
742
743  private Configuration conf = null;
744  private long interval = 0;
745  private Sink sink = null;
746
747  /**
748   * True if we are to run in 'regionServer' mode.
749   */
750  private boolean regionServerMode = false;
751
752  /**
753   * True if we are to run in zookeeper 'mode'.
754   */
755  private boolean zookeeperMode = false;
756
757  /**
758   * This is a Map of table to timeout. The timeout is for reading all regions in the table; i.e.
759   * we aggregate time to fetch each region and it needs to be less than this value else we
760   * log an ERROR.
761   */
762  private HashMap<String, Long> configuredReadTableTimeouts = new HashMap<>();
763
764  public static final String HBASE_CANARY_REGIONSERVER_ALL_REGIONS
765          = "hbase.canary.regionserver_all_regions";
766
767  public static final String HBASE_CANARY_REGION_WRITE_SNIFFING
768          = "hbase.canary.region.write.sniffing";
769  public static final String HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT
770          = "hbase.canary.region.write.table.timeout";
771  public static final String HBASE_CANARY_REGION_WRITE_TABLE_NAME
772          = "hbase.canary.region.write.table.name";
773  public static final String HBASE_CANARY_REGION_READ_TABLE_TIMEOUT
774          = "hbase.canary.region.read.table.timeout";
775
776  public static final String HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES
777          = "hbase.canary.zookeeper.permitted.failures";
778
779  public static final String HBASE_CANARY_USE_REGEX = "hbase.canary.use.regex";
780  public static final String HBASE_CANARY_TIMEOUT = "hbase.canary.timeout";
781  public static final String HBASE_CANARY_FAIL_ON_ERROR = "hbase.canary.fail.on.error";
782
783
784  private ExecutorService executor; // threads to retrieve data from regionservers
785
786  public CanaryTool() {
787    this(new ScheduledThreadPoolExecutor(1));
788  }
789
790  public CanaryTool(ExecutorService executor) {
791    this(executor, null);
792  }
793
794  @InterfaceAudience.Private
795  CanaryTool(ExecutorService executor, Sink sink) {
796    this.executor = executor;
797    this.sink = sink;
798  }
799
800  CanaryTool(Configuration conf, ExecutorService executor) {
801    this(conf, executor, null);
802  }
803
804  CanaryTool(Configuration conf, ExecutorService executor, Sink sink) {
805    this(executor, sink);
806    setConf(conf);
807  }
808
809  @Override
810  public Configuration getConf() {
811    return conf;
812  }
813
814  @Override
815  public void setConf(Configuration conf) {
816    if (conf == null) {
817      conf = HBaseConfiguration.create();
818    }
819    this.conf = conf;
820  }
821
822  private int parseArgs(String[] args) {
823    int index = -1;
824    long permittedFailures = 0;
825    boolean regionServerAllRegions = false, writeSniffing = false;
826    String readTableTimeoutsStr = null;
827    // Process command line args
828    for (int i = 0; i < args.length; i++) {
829      String cmd = args[i];
830      if (cmd.startsWith("-")) {
831        if (index >= 0) {
832          // command line args must be in the form: [opts] [table 1 [table 2 ...]]
833          System.err.println("Invalid command line options");
834          printUsageAndExit();
835        }
836        if (cmd.equals("-help") || cmd.equals("-h")) {
837          // user asked for help, print the help and quit.
838          printUsageAndExit();
839        } else if (cmd.equals("-daemon") && interval == 0) {
840          // user asked for daemon mode, set a default interval between checks
841          interval = DEFAULT_INTERVAL;
842        } else if (cmd.equals("-interval")) {
843          // user has specified an interval for canary breaths (-interval N)
844          i++;
845
846          if (i == args.length) {
847            System.err.println("-interval takes a numeric seconds value argument.");
848            printUsageAndExit();
849          }
850          try {
851            interval = Long.parseLong(args[i]) * 1000;
852          } catch (NumberFormatException e) {
853            System.err.println("-interval needs a numeric value argument.");
854            printUsageAndExit();
855          }
856        } else if (cmd.equals("-zookeeper")) {
857          this.zookeeperMode = true;
858        } else if(cmd.equals("-regionserver")) {
859          this.regionServerMode = true;
860        } else if(cmd.equals("-allRegions")) {
861          conf.setBoolean(HBASE_CANARY_REGIONSERVER_ALL_REGIONS, true);
862          regionServerAllRegions = true;
863        } else if(cmd.equals("-writeSniffing")) {
864          writeSniffing = true;
865          conf.setBoolean(HBASE_CANARY_REGION_WRITE_SNIFFING, true);
866        } else if(cmd.equals("-treatFailureAsError") || cmd.equals("-failureAsError")) {
867          conf.setBoolean(HBASE_CANARY_FAIL_ON_ERROR, true);
868        } else if (cmd.equals("-e")) {
869          conf.setBoolean(HBASE_CANARY_USE_REGEX, true);
870        } else if (cmd.equals("-t")) {
871          i++;
872
873          if (i == args.length) {
874            System.err.println("-t takes a numeric milliseconds value argument.");
875            printUsageAndExit();
876          }
877          long timeout = 0;
878          try {
879            timeout = Long.parseLong(args[i]);
880          } catch (NumberFormatException e) {
881            System.err.println("-t takes a numeric milliseconds value argument.");
882            printUsageAndExit();
883          }
884          conf.setLong(HBASE_CANARY_TIMEOUT, timeout);
885        } else if(cmd.equals("-writeTableTimeout")) {
886          i++;
887
888          if (i == args.length) {
889            System.err.println("-writeTableTimeout takes a numeric milliseconds value argument.");
890            printUsageAndExit();
891          }
892          long configuredWriteTableTimeout = 0;
893          try {
894            configuredWriteTableTimeout = Long.parseLong(args[i]);
895          } catch (NumberFormatException e) {
896            System.err.println("-writeTableTimeout takes a numeric milliseconds value argument.");
897            printUsageAndExit();
898          }
899          conf.setLong(HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT, configuredWriteTableTimeout);
900        } else if (cmd.equals("-writeTable")) {
901          i++;
902
903          if (i == args.length) {
904            System.err.println("-writeTable takes a string tablename value argument.");
905            printUsageAndExit();
906          }
907          conf.set(HBASE_CANARY_REGION_WRITE_TABLE_NAME, args[i]);
908        } else if (cmd.equals("-f")) {
909          i++;
910          if (i == args.length) {
911            System.err
912                .println("-f needs a boolean value argument (true|false).");
913            printUsageAndExit();
914          }
915
916          conf.setBoolean(HBASE_CANARY_FAIL_ON_ERROR, Boolean.parseBoolean(args[i]));
917        } else if (cmd.equals("-readTableTimeouts")) {
918          i++;
919          if (i == args.length) {
920            System.err.println("-readTableTimeouts needs a comma-separated list of read " +
921                "millisecond timeouts per table (without spaces).");
922            printUsageAndExit();
923          }
924          readTableTimeoutsStr = args[i];
925          conf.set(HBASE_CANARY_REGION_READ_TABLE_TIMEOUT, readTableTimeoutsStr);
926        } else if (cmd.equals("-permittedZookeeperFailures")) {
927          i++;
928
929          if (i == args.length) {
930            System.err.println("-permittedZookeeperFailures needs a numeric value argument.");
931            printUsageAndExit();
932          }
933          try {
934            permittedFailures = Long.parseLong(args[i]);
935          } catch (NumberFormatException e) {
936            System.err.println("-permittedZookeeperFailures needs a numeric value argument.");
937            printUsageAndExit();
938          }
939          conf.setLong(HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES, permittedFailures);
940        } else {
941          // no options match
942          System.err.println(cmd + " options is invalid.");
943          printUsageAndExit();
944        }
945      } else if (index < 0) {
946        // keep track of first table name specified by the user
947        index = i;
948      }
949    }
950    if (regionServerAllRegions && !this.regionServerMode) {
951      System.err.println("-allRegions can only be specified in regionserver mode.");
952      printUsageAndExit();
953    }
954    if (this.zookeeperMode) {
955      if (this.regionServerMode || regionServerAllRegions || writeSniffing) {
956        System.err.println("-zookeeper is exclusive and cannot be combined with "
957            + "other modes.");
958        printUsageAndExit();
959      }
960    }
961    if (permittedFailures != 0 && !this.zookeeperMode) {
962      System.err.println("-permittedZookeeperFailures requires -zookeeper mode.");
963      printUsageAndExit();
964    }
965    if (readTableTimeoutsStr != null && (this.regionServerMode || this.zookeeperMode)) {
966      System.err.println("-readTableTimeouts can only be configured in region mode.");
967      printUsageAndExit();
968    }
969    return index;
970  }
971
972  @Override
973  public int run(String[] args) throws Exception {
974    int index = parseArgs(args);
975    String[] monitorTargets = null;
976
977    if (index >= 0) {
978      int length = args.length - index;
979      monitorTargets = new String[length];
980      System.arraycopy(args, index, monitorTargets, 0, length);
981    }
982    if (interval > 0) {
983      //Only show the web page in daemon mode
984      putUpWebUI();
985    }
986    if (zookeeperMode) {
987      return checkZooKeeper();
988    } else if (regionServerMode) {
989      return checkRegionServers(monitorTargets);
990    } else {
991      return checkRegions(monitorTargets);
992    }
993  }
994
995  private int runMonitor(String[] monitorTargets) throws Exception {
996    ChoreService choreService = null;
997
998    // Launches chore for refreshing kerberos credentials if security is enabled.
999    // Please see http://hbase.apache.org/book.html#_running_canary_in_a_kerberos_enabled_cluster
1000    // for more details.
1001    final ScheduledChore authChore = AuthUtil.getAuthChore(conf);
1002    if (authChore != null) {
1003      choreService = new ChoreService("CANARY_TOOL");
1004      choreService.scheduleChore(authChore);
1005    }
1006
1007    // Start to prepare the stuffs
1008    Monitor monitor = null;
1009    Thread monitorThread;
1010    long startTime = 0;
1011    long currentTimeLength = 0;
1012    boolean failOnError = conf.getBoolean(HBASE_CANARY_FAIL_ON_ERROR, true);
1013    long timeout = conf.getLong(HBASE_CANARY_TIMEOUT, DEFAULT_TIMEOUT);
1014    // Get a connection to use in below.
1015    try (Connection connection = ConnectionFactory.createConnection(this.conf)) {
1016      do {
1017        // Do monitor !!
1018        try {
1019          monitor = this.newMonitor(connection, monitorTargets);
1020          monitorThread = new Thread(monitor, "CanaryMonitor-" + System.currentTimeMillis());
1021          startTime = System.currentTimeMillis();
1022          monitorThread.start();
1023          while (!monitor.isDone()) {
1024            // wait for 1 sec
1025            Thread.sleep(1000);
1026            // exit if any error occurs
1027            if (failOnError && monitor.hasError()) {
1028              monitorThread.interrupt();
1029              if (monitor.initialized) {
1030                return monitor.errorCode;
1031              } else {
1032                return INIT_ERROR_EXIT_CODE;
1033              }
1034            }
1035            currentTimeLength = System.currentTimeMillis() - startTime;
1036            if (currentTimeLength > timeout) {
1037              LOG.error("The monitor is running too long (" + currentTimeLength
1038                  + ") after timeout limit:" + timeout
1039                  + " will be killed itself !!");
1040              if (monitor.initialized) {
1041                return TIMEOUT_ERROR_EXIT_CODE;
1042              } else {
1043                return INIT_ERROR_EXIT_CODE;
1044              }
1045            }
1046          }
1047
1048          if (failOnError && monitor.finalCheckForErrors()) {
1049            monitorThread.interrupt();
1050            return monitor.errorCode;
1051          }
1052        } finally {
1053          if (monitor != null) {
1054            monitor.close();
1055          }
1056        }
1057
1058        Thread.sleep(interval);
1059      } while (interval > 0);
1060    } // try-with-resources close
1061
1062    if (choreService != null) {
1063      choreService.shutdown();
1064    }
1065    return monitor.errorCode;
1066  }
1067
1068  @Override
1069  public Map<String, String> getReadFailures()  {
1070    return sink.getReadFailures();
1071  }
1072
1073  @Override
1074  public Map<String, String> getWriteFailures()  {
1075    return sink.getWriteFailures();
1076  }
1077
1078  private void printUsageAndExit() {
1079    System.err.println(
1080      "Usage: canary [OPTIONS] [<TABLE1> [<TABLE2]...] | [<REGIONSERVER1> [<REGIONSERVER2]..]");
1081    System.err.println("Where [OPTIONS] are:");
1082    System.err.println(" -h,-help        show this help and exit.");
1083    System.err.println(" -regionserver   set 'regionserver mode'; gets row from random region on " +
1084        "server");
1085    System.err.println(" -allRegions     get from ALL regions when 'regionserver mode', not just " +
1086        "random one.");
1087    System.err.println(" -zookeeper      set 'zookeeper mode'; grab zookeeper.znode.parent on " +
1088        "each ensemble member");
1089    System.err.println(" -daemon         continuous check at defined intervals.");
1090    System.err.println(" -interval <N>   interval between checks in seconds");
1091    System.err.println(" -e              consider table/regionserver argument as regular " +
1092        "expression");
1093    System.err.println(" -f <B>          exit on first error; default=true");
1094    System.err.println(" -failureAsError treat read/write failure as error");
1095    System.err.println(" -t <N>          timeout for canary-test run; default=600000ms");
1096    System.err.println(" -writeSniffing  enable write sniffing");
1097    System.err.println(" -writeTable     the table used for write sniffing; default=hbase:canary");
1098    System.err.println(" -writeTableTimeout <N>  timeout for writeTable; default=600000ms");
1099    System.err.println(" -readTableTimeouts <tableName>=<read timeout>," +
1100        "<tableName>=<read timeout>,...");
1101    System.err.println("                comma-separated list of table read timeouts " +
1102        "(no spaces);");
1103    System.err.println("                logs 'ERROR' if takes longer. default=600000ms");
1104    System.err.println(" -permittedZookeeperFailures <N>  Ignore first N failures attempting to ");
1105    System.err.println("                connect to individual zookeeper nodes in ensemble");
1106    System.err.println("");
1107    System.err.println(" -D<configProperty>=<value> to assign or override configuration params");
1108    System.err.println(" -Dhbase.canary.read.raw.enabled=<true/false> Set to enable/disable " +
1109        "raw scan; default=false");
1110    System.err.println(" -Dhbase.canary.info.port=PORT_NUMBER  Set for a Canary UI; " +
1111      "default=-1 (None)");
1112    System.err.println("");
1113    System.err.println("Canary runs in one of three modes: region (default), regionserver, or " +
1114        "zookeeper.");
1115    System.err.println("To sniff/probe all regions, pass no arguments.");
1116    System.err.println("To sniff/probe all regions of a table, pass tablename.");
1117    System.err.println("To sniff/probe regionservers, pass -regionserver, etc.");
1118    System.err.println("See http://hbase.apache.org/book.html#_canary for Canary documentation.");
1119    System.exit(USAGE_EXIT_CODE);
1120  }
1121
1122  Sink getSink(Configuration configuration, Class clazz) {
1123    // In test context, this.sink might be set. Use it if non-null. For testing.
1124    return this.sink != null? this.sink:
1125        (Sink)ReflectionUtils.newInstance(configuration.getClass("hbase.canary.sink.class",
1126            clazz, Sink.class));
1127  }
1128
1129  /**
1130   * Canary region mode-specific data structure which stores information about each region
1131   * to be scanned
1132   */
1133  public static class RegionTaskResult {
1134    private RegionInfo region;
1135    private TableName tableName;
1136    private ServerName serverName;
1137    private ColumnFamilyDescriptor column;
1138    private AtomicLong readLatency = null;
1139    private AtomicLong writeLatency = null;
1140    private boolean readSuccess = false;
1141    private boolean writeSuccess = false;
1142
1143    public RegionTaskResult(RegionInfo region, TableName tableName, ServerName serverName,
1144        ColumnFamilyDescriptor column) {
1145      this.region = region;
1146      this.tableName = tableName;
1147      this.serverName = serverName;
1148      this.column = column;
1149    }
1150
1151    public RegionInfo getRegionInfo() {
1152      return this.region;
1153    }
1154
1155    public String getRegionNameAsString() {
1156      return this.region.getRegionNameAsString();
1157    }
1158
1159    public TableName getTableName() {
1160      return this.tableName;
1161    }
1162
1163    public String getTableNameAsString() {
1164      return this.tableName.getNameAsString();
1165    }
1166
1167    public ServerName getServerName() {
1168      return this.serverName;
1169    }
1170
1171    public String getServerNameAsString() {
1172      return this.serverName.getServerName();
1173    }
1174
1175    public ColumnFamilyDescriptor getColumnFamily() {
1176      return this.column;
1177    }
1178
1179    public String getColumnFamilyNameAsString() {
1180      return this.column.getNameAsString();
1181    }
1182
1183    public long getReadLatency() {
1184      if (this.readLatency == null) {
1185        return -1;
1186      }
1187      return this.readLatency.get();
1188    }
1189
1190    public void setReadLatency(long readLatency) {
1191      if (this.readLatency != null) {
1192        this.readLatency.set(readLatency);
1193      } else {
1194        this.readLatency = new AtomicLong(readLatency);
1195      }
1196    }
1197
1198    public long getWriteLatency() {
1199      if (this.writeLatency == null) {
1200        return -1;
1201      }
1202      return this.writeLatency.get();
1203    }
1204
1205    public void setWriteLatency(long writeLatency) {
1206      if (this.writeLatency != null) {
1207        this.writeLatency.set(writeLatency);
1208      } else {
1209        this.writeLatency = new AtomicLong(writeLatency);
1210      }
1211    }
1212
1213    public boolean isReadSuccess() {
1214      return this.readSuccess;
1215    }
1216
1217    public void setReadSuccess() {
1218      this.readSuccess = true;
1219    }
1220
1221    public boolean isWriteSuccess() {
1222      return this.writeSuccess;
1223    }
1224
1225    public void setWriteSuccess() {
1226      this.writeSuccess = true;
1227    }
1228  }
1229
1230  /**
1231   * A Factory method for {@link Monitor}.
1232   * Makes a RegionServerMonitor, or a ZooKeeperMonitor, or a RegionMonitor.
1233   * @return a Monitor instance
1234   */
1235  private Monitor newMonitor(final Connection connection, String[] monitorTargets) {
1236    Monitor monitor;
1237    boolean useRegExp = conf.getBoolean(HBASE_CANARY_USE_REGEX, false);
1238    boolean regionServerAllRegions
1239            = conf.getBoolean(HBASE_CANARY_REGIONSERVER_ALL_REGIONS, false);
1240    boolean failOnError
1241            = conf.getBoolean(HBASE_CANARY_FAIL_ON_ERROR, true);
1242    int permittedFailures
1243            = conf.getInt(HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES, 0);
1244    boolean writeSniffing
1245            = conf.getBoolean(HBASE_CANARY_REGION_WRITE_SNIFFING, false);
1246    String writeTableName = conf.get(HBASE_CANARY_REGION_WRITE_TABLE_NAME,
1247            DEFAULT_WRITE_TABLE_NAME.getNameAsString());
1248    long configuredWriteTableTimeout
1249            = conf.getLong(HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT, DEFAULT_TIMEOUT);
1250
1251    if (this.regionServerMode) {
1252      monitor =
1253          new RegionServerMonitor(connection, monitorTargets, useRegExp,
1254              getSink(connection.getConfiguration(), RegionServerStdOutSink.class),
1255              this.executor, regionServerAllRegions,
1256              failOnError, permittedFailures);
1257
1258    } else if (this.zookeeperMode) {
1259      monitor =
1260          new ZookeeperMonitor(connection, monitorTargets, useRegExp,
1261              getSink(connection.getConfiguration(), ZookeeperStdOutSink.class),
1262              this.executor, failOnError, permittedFailures);
1263    } else {
1264      monitor =
1265          new RegionMonitor(connection, monitorTargets, useRegExp,
1266              getSink(connection.getConfiguration(), RegionStdOutSink.class),
1267              this.executor, writeSniffing,
1268              TableName.valueOf(writeTableName), failOnError, configuredReadTableTimeouts,
1269              configuredWriteTableTimeout, permittedFailures);
1270    }
1271    return monitor;
1272  }
1273
1274  private void populateReadTableTimeoutsMap(String configuredReadTableTimeoutsStr) {
1275    String[] tableTimeouts = configuredReadTableTimeoutsStr.split(",");
1276    for (String tT : tableTimeouts) {
1277      String[] nameTimeout = tT.split("=");
1278      if (nameTimeout.length < 2) {
1279        throw new IllegalArgumentException("Each -readTableTimeouts argument must be of the form " +
1280            "<tableName>=<read timeout> (without spaces).");
1281      }
1282      long timeoutVal;
1283      try {
1284        timeoutVal = Long.parseLong(nameTimeout[1]);
1285      } catch (NumberFormatException e) {
1286        throw new IllegalArgumentException("-readTableTimeouts read timeout for each table" +
1287            " must be a numeric value argument.");
1288      }
1289      configuredReadTableTimeouts.put(nameTimeout[0], timeoutVal);
1290    }
1291  }
1292  /**
1293   * A Monitor super-class can be extended by users
1294   */
1295  public static abstract class Monitor implements Runnable, Closeable {
1296    protected Connection connection;
1297    protected Admin admin;
1298    /**
1299     * 'Target' dependent on 'mode'. Could be Tables or RegionServers or ZNodes.
1300     * Passed on the command-line as arguments.
1301     */
1302    protected String[] targets;
1303    protected boolean useRegExp;
1304    protected boolean treatFailureAsError;
1305    protected boolean initialized = false;
1306
1307    protected boolean done = false;
1308    protected int errorCode = 0;
1309    protected long allowedFailures = 0;
1310    protected Sink sink;
1311    protected ExecutorService executor;
1312
1313    public boolean isDone() {
1314      return done;
1315    }
1316
1317    public boolean hasError() {
1318      return errorCode != 0;
1319    }
1320
1321    public boolean finalCheckForErrors() {
1322      if (errorCode != 0) {
1323        return true;
1324      }
1325      if (treatFailureAsError && (sink.getReadFailureCount() > allowedFailures
1326          || sink.getWriteFailureCount() > allowedFailures)) {
1327        LOG.error("Too many failures detected, treating failure as error, failing the Canary.");
1328        errorCode = FAILURE_EXIT_CODE;
1329        return true;
1330      }
1331      return false;
1332    }
1333
1334    @Override
1335    public void close() throws IOException {
1336      if (this.admin != null) {
1337        this.admin.close();
1338      }
1339    }
1340
1341    protected Monitor(Connection connection, String[] monitorTargets, boolean useRegExp, Sink sink,
1342        ExecutorService executor, boolean treatFailureAsError, long allowedFailures) {
1343      if (null == connection) {
1344        throw new IllegalArgumentException("connection shall not be null");
1345      }
1346
1347      this.connection = connection;
1348      this.targets = monitorTargets;
1349      this.useRegExp = useRegExp;
1350      this.treatFailureAsError = treatFailureAsError;
1351      this.sink = sink;
1352      this.executor = executor;
1353      this.allowedFailures = allowedFailures;
1354    }
1355
1356    @Override
1357    public abstract void run();
1358
1359    protected boolean initAdmin() {
1360      if (null == this.admin) {
1361        try {
1362          this.admin = this.connection.getAdmin();
1363        } catch (Exception e) {
1364          LOG.error("Initial HBaseAdmin failed...", e);
1365          this.errorCode = INIT_ERROR_EXIT_CODE;
1366        }
1367      } else if (admin.isAborted()) {
1368        LOG.error("HBaseAdmin aborted");
1369        this.errorCode = INIT_ERROR_EXIT_CODE;
1370      }
1371      return !this.hasError();
1372    }
1373  }
1374
1375  /**
1376   * A monitor for region mode.
1377   */
1378  private static class RegionMonitor extends Monitor {
1379    // 10 minutes
1380    private static final int DEFAULT_WRITE_TABLE_CHECK_PERIOD = 10 * 60 * 1000;
1381    // 1 days
1382    private static final int DEFAULT_WRITE_DATA_TTL = 24 * 60 * 60;
1383
1384    private long lastCheckTime = -1;
1385    private boolean writeSniffing;
1386    private TableName writeTableName;
1387    private int writeDataTTL;
1388    private float regionsLowerLimit;
1389    private float regionsUpperLimit;
1390    private int checkPeriod;
1391    private boolean rawScanEnabled;
1392    private boolean readAllCF;
1393
1394    /**
1395     * This is a timeout per table. If read of each region in the table aggregated takes longer
1396     * than what is configured here, we log an ERROR rather than just an INFO.
1397     */
1398    private HashMap<String, Long> configuredReadTableTimeouts;
1399
1400    private long configuredWriteTableTimeout;
1401
1402    public RegionMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
1403        Sink sink, ExecutorService executor, boolean writeSniffing, TableName writeTableName,
1404        boolean treatFailureAsError, HashMap<String, Long> configuredReadTableTimeouts,
1405        long configuredWriteTableTimeout,
1406        long allowedFailures) {
1407      super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError,
1408          allowedFailures);
1409      Configuration conf = connection.getConfiguration();
1410      this.writeSniffing = writeSniffing;
1411      this.writeTableName = writeTableName;
1412      this.writeDataTTL =
1413          conf.getInt(HConstants.HBASE_CANARY_WRITE_DATA_TTL_KEY, DEFAULT_WRITE_DATA_TTL);
1414      this.regionsLowerLimit =
1415          conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY, 1.0f);
1416      this.regionsUpperLimit =
1417          conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_UPPERLIMIT_KEY, 1.5f);
1418      this.checkPeriod =
1419          conf.getInt(HConstants.HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY,
1420            DEFAULT_WRITE_TABLE_CHECK_PERIOD);
1421      this.rawScanEnabled = conf.getBoolean(HConstants.HBASE_CANARY_READ_RAW_SCAN_KEY, false);
1422      this.configuredReadTableTimeouts = new HashMap<>(configuredReadTableTimeouts);
1423      this.configuredWriteTableTimeout = configuredWriteTableTimeout;
1424      this.readAllCF = conf.getBoolean(HConstants.HBASE_CANARY_READ_ALL_CF, true);
1425    }
1426
1427    private RegionStdOutSink getSink() {
1428      if (!(sink instanceof RegionStdOutSink)) {
1429        throw new RuntimeException("Can only write to Region sink");
1430      }
1431      return ((RegionStdOutSink) sink);
1432    }
1433
1434    @Override
1435    public void run() {
1436      if (this.initAdmin()) {
1437        try {
1438          List<Future<Void>> taskFutures = new LinkedList<>();
1439          RegionStdOutSink regionSink = this.getSink();
1440          regionSink.resetFailuresCountDetails();
1441          if (this.targets != null && this.targets.length > 0) {
1442            String[] tables = generateMonitorTables(this.targets);
1443            // Check to see that each table name passed in the -readTableTimeouts argument is also
1444            // passed as a monitor target.
1445            if (!new HashSet<>(Arrays.asList(tables)).
1446                containsAll(this.configuredReadTableTimeouts.keySet())) {
1447              LOG.error("-readTableTimeouts can only specify read timeouts for monitor targets " +
1448                  "passed via command line.");
1449              this.errorCode = USAGE_EXIT_CODE;
1450              return;
1451            }
1452            this.initialized = true;
1453            for (String table : tables) {
1454              LongAdder readLatency = regionSink.initializeAndGetReadLatencyForTable(table);
1455              taskFutures.addAll(CanaryTool.sniff(admin, regionSink, table, executor, TaskType.READ,
1456                this.rawScanEnabled, readLatency, readAllCF));
1457            }
1458          } else {
1459            taskFutures.addAll(sniff(TaskType.READ, regionSink));
1460          }
1461
1462          if (writeSniffing) {
1463            if (EnvironmentEdgeManager.currentTime() - lastCheckTime > checkPeriod) {
1464              try {
1465                checkWriteTableDistribution();
1466              } catch (IOException e) {
1467                LOG.error("Check canary table distribution failed!", e);
1468              }
1469              lastCheckTime = EnvironmentEdgeManager.currentTime();
1470            }
1471            // sniff canary table with write operation
1472            regionSink.initializeWriteLatency();
1473            LongAdder writeTableLatency = regionSink.getWriteLatency();
1474            taskFutures
1475                .addAll(CanaryTool.sniff(admin, regionSink, admin.getDescriptor(writeTableName),
1476                  executor, TaskType.WRITE, this.rawScanEnabled, writeTableLatency, readAllCF));
1477          }
1478
1479          for (Future<Void> future : taskFutures) {
1480            try {
1481              future.get();
1482            } catch (ExecutionException e) {
1483              LOG.error("Sniff region failed!", e);
1484            }
1485          }
1486          Map<String, LongAdder> actualReadTableLatency = regionSink.getReadLatencyMap();
1487          for (Map.Entry<String, Long> entry : configuredReadTableTimeouts.entrySet()) {
1488            String tableName = entry.getKey();
1489            if (actualReadTableLatency.containsKey(tableName)) {
1490              Long actual = actualReadTableLatency.get(tableName).longValue();
1491              Long configured = entry.getValue();
1492              if (actual > configured) {
1493                LOG.error("Read operation for {} took {}ms exceeded the configured read timeout." +
1494                    "(Configured read timeout {}ms.", tableName, actual, configured);
1495              } else {
1496                LOG.info("Read operation for {} took {}ms (Configured read timeout {}ms.",
1497                    tableName, actual, configured);
1498              }
1499            } else {
1500              LOG.error("Read operation for {} failed!", tableName);
1501            }
1502          }
1503          if (this.writeSniffing) {
1504            String writeTableStringName = this.writeTableName.getNameAsString();
1505            long actualWriteLatency = regionSink.getWriteLatency().longValue();
1506            LOG.info("Write operation for {} took {}ms. Configured write timeout {}ms.",
1507                writeTableStringName, actualWriteLatency, this.configuredWriteTableTimeout);
1508            // Check that the writeTable write operation latency does not exceed the configured
1509            // timeout.
1510            if (actualWriteLatency > this.configuredWriteTableTimeout) {
1511              LOG.error("Write operation for {} exceeded the configured write timeout.",
1512                  writeTableStringName);
1513            }
1514          }
1515        } catch (Exception e) {
1516          LOG.error("Run regionMonitor failed", e);
1517          this.errorCode = ERROR_EXIT_CODE;
1518        } finally {
1519          this.done = true;
1520        }
1521      }
1522      this.done = true;
1523    }
1524
1525    /**
1526     * @return List of tables to use in test.
1527     */
1528    private String[] generateMonitorTables(String[] monitorTargets) throws IOException {
1529      String[] returnTables = null;
1530
1531      if (this.useRegExp) {
1532        Pattern pattern = null;
1533        List<TableDescriptor> tds = null;
1534        Set<String> tmpTables = new TreeSet<>();
1535        try {
1536          LOG.debug(String.format("reading list of tables"));
1537          tds = this.admin.listTableDescriptors(pattern);
1538          if (tds == null) {
1539            tds = Collections.emptyList();
1540          }
1541          for (String monitorTarget : monitorTargets) {
1542            pattern = Pattern.compile(monitorTarget);
1543            for (TableDescriptor td : tds) {
1544              if (pattern.matcher(td.getTableName().getNameAsString()).matches()) {
1545                tmpTables.add(td.getTableName().getNameAsString());
1546              }
1547            }
1548          }
1549        } catch (IOException e) {
1550          LOG.error("Communicate with admin failed", e);
1551          throw e;
1552        }
1553
1554        if (tmpTables.size() > 0) {
1555          returnTables = tmpTables.toArray(new String[tmpTables.size()]);
1556        } else {
1557          String msg = "No HTable found, tablePattern:" + Arrays.toString(monitorTargets);
1558          LOG.error(msg);
1559          this.errorCode = INIT_ERROR_EXIT_CODE;
1560          throw new TableNotFoundException(msg);
1561        }
1562      } else {
1563        returnTables = monitorTargets;
1564      }
1565
1566      return returnTables;
1567    }
1568
1569    /*
1570     * Canary entry point to monitor all the tables.
1571     */
1572    private List<Future<Void>> sniff(TaskType taskType, RegionStdOutSink regionSink)
1573        throws Exception {
1574      LOG.debug("Reading list of tables");
1575      List<Future<Void>> taskFutures = new LinkedList<>();
1576      for (TableDescriptor td: admin.listTableDescriptors()) {
1577        if (admin.tableExists(td.getTableName()) && admin.isTableEnabled(td.getTableName()) &&
1578            (!td.getTableName().equals(writeTableName))) {
1579          LongAdder readLatency =
1580              regionSink.initializeAndGetReadLatencyForTable(td.getTableName().getNameAsString());
1581          taskFutures.addAll(CanaryTool.sniff(admin, sink, td, executor, taskType,
1582            this.rawScanEnabled, readLatency, readAllCF));
1583        }
1584      }
1585      return taskFutures;
1586    }
1587
1588    private void checkWriteTableDistribution() throws IOException {
1589      if (!admin.tableExists(writeTableName)) {
1590        int numberOfServers = admin.getRegionServers().size();
1591        if (numberOfServers == 0) {
1592          throw new IllegalStateException("No live regionservers");
1593        }
1594        createWriteTable(numberOfServers);
1595      }
1596
1597      if (!admin.isTableEnabled(writeTableName)) {
1598        admin.enableTable(writeTableName);
1599      }
1600
1601      ClusterMetrics status =
1602          admin.getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER));
1603      int numberOfServers = status.getServersName().size();
1604      if (status.getServersName().contains(status.getMasterName())) {
1605        numberOfServers -= 1;
1606      }
1607
1608      List<Pair<RegionInfo, ServerName>> pairs =
1609          MetaTableAccessor.getTableRegionsAndLocations(connection, writeTableName);
1610      int numberOfRegions = pairs.size();
1611      if (numberOfRegions < numberOfServers * regionsLowerLimit
1612          || numberOfRegions > numberOfServers * regionsUpperLimit) {
1613        admin.disableTable(writeTableName);
1614        admin.deleteTable(writeTableName);
1615        createWriteTable(numberOfServers);
1616      }
1617      HashSet<ServerName> serverSet = new HashSet<>();
1618      for (Pair<RegionInfo, ServerName> pair : pairs) {
1619        serverSet.add(pair.getSecond());
1620      }
1621      int numberOfCoveredServers = serverSet.size();
1622      if (numberOfCoveredServers < numberOfServers) {
1623        admin.balance();
1624      }
1625    }
1626
1627    private void createWriteTable(int numberOfServers) throws IOException {
1628      int numberOfRegions = (int)(numberOfServers * regionsLowerLimit);
1629      LOG.info("Number of live regionservers {}, pre-splitting the canary table into {} regions " +
1630        "(current lower limit of regions per server is {} and you can change it with config {}).",
1631          numberOfServers, numberOfRegions, regionsLowerLimit,
1632          HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY);
1633      ColumnFamilyDescriptor family = ColumnFamilyDescriptorBuilder
1634        .newBuilder(Bytes.toBytes(CANARY_TABLE_FAMILY_NAME)).setMaxVersions(1)
1635        .setTimeToLive(writeDataTTL).build();
1636      TableDescriptor desc = TableDescriptorBuilder.newBuilder(writeTableName)
1637        .setColumnFamily(family).build();
1638      byte[][] splits = new RegionSplitter.HexStringSplit().split(numberOfRegions);
1639      admin.createTable(desc, splits);
1640    }
1641  }
1642
1643  /**
1644   * Canary entry point for specified table.
1645   * @throws Exception exception
1646   */
1647  private static List<Future<Void>> sniff(final Admin admin, final Sink sink, String tableName,
1648      ExecutorService executor, TaskType taskType, boolean rawScanEnabled, LongAdder readLatency,
1649      boolean readAllCF) throws Exception {
1650    LOG.debug("Checking table is enabled and getting table descriptor for table {}", tableName);
1651    if (admin.isTableEnabled(TableName.valueOf(tableName))) {
1652      return CanaryTool.sniff(admin, sink, admin.getDescriptor(TableName.valueOf(tableName)),
1653        executor, taskType, rawScanEnabled, readLatency, readAllCF);
1654    } else {
1655      LOG.warn("Table {} is not enabled", tableName);
1656    }
1657    return new LinkedList<>();
1658  }
1659
1660  /*
1661   * Loops over regions of this table, and outputs information about the state.
1662   */
1663  private static List<Future<Void>> sniff(final Admin admin, final Sink sink,
1664      TableDescriptor tableDesc, ExecutorService executor, TaskType taskType,
1665      boolean rawScanEnabled, LongAdder rwLatency, boolean readAllCF) throws Exception {
1666    LOG.debug("Reading list of regions for table {}", tableDesc.getTableName());
1667    try (Table table = admin.getConnection().getTable(tableDesc.getTableName())) {
1668      List<RegionTask> tasks = new ArrayList<>();
1669      try (RegionLocator regionLocator =
1670               admin.getConnection().getRegionLocator(tableDesc.getTableName())) {
1671        for (HRegionLocation location: regionLocator.getAllRegionLocations()) {
1672          if (location == null) {
1673            LOG.warn("Null location");
1674            continue;
1675          }
1676          ServerName rs = location.getServerName();
1677          RegionInfo region = location.getRegion();
1678          tasks.add(new RegionTask(admin.getConnection(), region, rs, (RegionStdOutSink)sink,
1679              taskType, rawScanEnabled, rwLatency, readAllCF));
1680          Map<String, List<RegionTaskResult>> regionMap = ((RegionStdOutSink) sink).getRegionMap();
1681          regionMap.put(region.getRegionNameAsString(), new ArrayList<RegionTaskResult>());
1682        }
1683        return executor.invokeAll(tasks);
1684      }
1685    } catch (TableNotFoundException e) {
1686      return Collections.EMPTY_LIST;
1687    }
1688  }
1689
1690  //  monitor for zookeeper mode
1691  private static class ZookeeperMonitor extends Monitor {
1692    private List<String> hosts;
1693    private final String znode;
1694    private final int timeout;
1695
1696    protected ZookeeperMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
1697        Sink sink, ExecutorService executor, boolean treatFailureAsError, long allowedFailures)  {
1698      super(connection, monitorTargets, useRegExp,
1699          sink, executor, treatFailureAsError, allowedFailures);
1700      Configuration configuration = connection.getConfiguration();
1701      znode =
1702          configuration.get(ZOOKEEPER_ZNODE_PARENT,
1703              DEFAULT_ZOOKEEPER_ZNODE_PARENT);
1704      timeout = configuration
1705          .getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
1706      ConnectStringParser parser =
1707          new ConnectStringParser(ZKConfig.getZKQuorumServersString(configuration));
1708      hosts = Lists.newArrayList();
1709      for (InetSocketAddress server : parser.getServerAddresses()) {
1710        hosts.add(server.toString());
1711      }
1712      if (allowedFailures > (hosts.size() - 1) / 2) {
1713        LOG.warn(
1714          "Confirm allowable number of failed ZooKeeper nodes, as quorum will "
1715              + "already be lost. Setting of {} failures is unexpected for {} ensemble size.",
1716          allowedFailures, hosts.size());
1717      }
1718    }
1719
1720    @Override public void run() {
1721      List<ZookeeperTask> tasks = Lists.newArrayList();
1722      ZookeeperStdOutSink zkSink = null;
1723      try {
1724        zkSink = this.getSink();
1725      } catch (RuntimeException e) {
1726        LOG.error("Run ZooKeeperMonitor failed!", e);
1727        this.errorCode = ERROR_EXIT_CODE;
1728      }
1729      this.initialized = true;
1730      for (final String host : hosts) {
1731        tasks.add(new ZookeeperTask(connection, host, znode, timeout, zkSink));
1732      }
1733      try {
1734        for (Future<Void> future : this.executor.invokeAll(tasks)) {
1735          try {
1736            future.get();
1737          } catch (ExecutionException e) {
1738            LOG.error("Sniff zookeeper failed!", e);
1739            this.errorCode = ERROR_EXIT_CODE;
1740          }
1741        }
1742      } catch (InterruptedException e) {
1743        this.errorCode = ERROR_EXIT_CODE;
1744        Thread.currentThread().interrupt();
1745        LOG.error("Sniff zookeeper interrupted!", e);
1746      }
1747      this.done = true;
1748    }
1749
1750    private ZookeeperStdOutSink getSink() {
1751      if (!(sink instanceof ZookeeperStdOutSink)) {
1752        throw new RuntimeException("Can only write to zookeeper sink");
1753      }
1754      return ((ZookeeperStdOutSink) sink);
1755    }
1756  }
1757
1758
1759  /**
1760   * A monitor for regionserver mode
1761   */
1762  private static class RegionServerMonitor extends Monitor {
1763    private boolean allRegions;
1764
1765    public RegionServerMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
1766        Sink sink, ExecutorService executor, boolean allRegions,
1767        boolean treatFailureAsError, long allowedFailures) {
1768      super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError,
1769          allowedFailures);
1770      this.allRegions = allRegions;
1771    }
1772
1773    private RegionServerStdOutSink getSink() {
1774      if (!(sink instanceof RegionServerStdOutSink)) {
1775        throw new RuntimeException("Can only write to regionserver sink");
1776      }
1777      return ((RegionServerStdOutSink) sink);
1778    }
1779
1780    @Override
1781    public void run() {
1782      if (this.initAdmin() && this.checkNoTableNames()) {
1783        RegionServerStdOutSink regionServerSink = null;
1784        try {
1785          regionServerSink = this.getSink();
1786        } catch (RuntimeException e) {
1787          LOG.error("Run RegionServerMonitor failed!", e);
1788          this.errorCode = ERROR_EXIT_CODE;
1789        }
1790        Map<String, List<RegionInfo>> rsAndRMap = this.filterRegionServerByName();
1791        this.initialized = true;
1792        this.monitorRegionServers(rsAndRMap, regionServerSink);
1793      }
1794      this.done = true;
1795    }
1796
1797    private boolean checkNoTableNames() {
1798      List<String> foundTableNames = new ArrayList<>();
1799      TableName[] tableNames = null;
1800      LOG.debug("Reading list of tables");
1801      try {
1802        tableNames = this.admin.listTableNames();
1803      } catch (IOException e) {
1804        LOG.error("Get listTableNames failed", e);
1805        this.errorCode = INIT_ERROR_EXIT_CODE;
1806        return false;
1807      }
1808
1809      if (this.targets == null || this.targets.length == 0) {
1810        return true;
1811      }
1812
1813      for (String target : this.targets) {
1814        for (TableName tableName : tableNames) {
1815          if (target.equals(tableName.getNameAsString())) {
1816            foundTableNames.add(target);
1817          }
1818        }
1819      }
1820
1821      if (foundTableNames.size() > 0) {
1822        System.err.println("Cannot pass a tablename when using the -regionserver " +
1823            "option, tablenames:" + foundTableNames.toString());
1824        this.errorCode = USAGE_EXIT_CODE;
1825      }
1826      return foundTableNames.isEmpty();
1827    }
1828
1829    private void monitorRegionServers(Map<String, List<RegionInfo>> rsAndRMap,
1830        RegionServerStdOutSink regionServerSink) {
1831      List<RegionServerTask> tasks = new ArrayList<>();
1832      Map<String, AtomicLong> successMap = new HashMap<>();
1833      Random rand = new Random();
1834      for (Map.Entry<String, List<RegionInfo>> entry : rsAndRMap.entrySet()) {
1835        String serverName = entry.getKey();
1836        AtomicLong successes = new AtomicLong(0);
1837        successMap.put(serverName, successes);
1838        if (entry.getValue().isEmpty()) {
1839          LOG.error("Regionserver not serving any regions - {}", serverName);
1840        } else if (this.allRegions) {
1841          for (RegionInfo region : entry.getValue()) {
1842            tasks.add(new RegionServerTask(this.connection,
1843                serverName,
1844                region,
1845                regionServerSink,
1846                successes));
1847          }
1848        } else {
1849          // random select a region if flag not set
1850          RegionInfo region = entry.getValue().get(rand.nextInt(entry.getValue().size()));
1851          tasks.add(new RegionServerTask(this.connection,
1852              serverName,
1853              region,
1854              regionServerSink,
1855              successes));
1856        }
1857      }
1858      try {
1859        for (Future<Void> future : this.executor.invokeAll(tasks)) {
1860          try {
1861            future.get();
1862          } catch (ExecutionException e) {
1863            LOG.error("Sniff regionserver failed!", e);
1864            this.errorCode = ERROR_EXIT_CODE;
1865          }
1866        }
1867        if (this.allRegions) {
1868          for (Map.Entry<String, List<RegionInfo>> entry : rsAndRMap.entrySet()) {
1869            String serverName = entry.getKey();
1870            LOG.info("Successfully read {} regions out of {} on regionserver {}",
1871                successMap.get(serverName), entry.getValue().size(), serverName);
1872          }
1873        }
1874      } catch (InterruptedException e) {
1875        this.errorCode = ERROR_EXIT_CODE;
1876        LOG.error("Sniff regionserver interrupted!", e);
1877      }
1878    }
1879
1880    private Map<String, List<RegionInfo>> filterRegionServerByName() {
1881      Map<String, List<RegionInfo>> regionServerAndRegionsMap = this.getAllRegionServerByName();
1882      regionServerAndRegionsMap = this.doFilterRegionServerByName(regionServerAndRegionsMap);
1883      return regionServerAndRegionsMap;
1884    }
1885
1886    private Map<String, List<RegionInfo>> getAllRegionServerByName() {
1887      Map<String, List<RegionInfo>> rsAndRMap = new HashMap<>();
1888      try {
1889        LOG.debug("Reading list of tables and locations");
1890        List<TableDescriptor> tableDescs = this.admin.listTableDescriptors();
1891        List<RegionInfo> regions = null;
1892        for (TableDescriptor tableDesc: tableDescs) {
1893          try (RegionLocator regionLocator =
1894                   this.admin.getConnection().getRegionLocator(tableDesc.getTableName())) {
1895            for (HRegionLocation location : regionLocator.getAllRegionLocations()) {
1896              if (location == null) {
1897                LOG.warn("Null location");
1898                continue;
1899              }
1900              ServerName rs = location.getServerName();
1901              String rsName = rs.getHostname();
1902              RegionInfo r = location.getRegion();
1903              if (rsAndRMap.containsKey(rsName)) {
1904                regions = rsAndRMap.get(rsName);
1905              } else {
1906                regions = new ArrayList<>();
1907                rsAndRMap.put(rsName, regions);
1908              }
1909              regions.add(r);
1910            }
1911          }
1912        }
1913
1914        // get any live regionservers not serving any regions
1915        for (ServerName rs: this.admin.getRegionServers()) {
1916          String rsName = rs.getHostname();
1917          if (!rsAndRMap.containsKey(rsName)) {
1918            rsAndRMap.put(rsName, Collections.<RegionInfo> emptyList());
1919          }
1920        }
1921      } catch (IOException e) {
1922        LOG.error("Get HTables info failed", e);
1923        this.errorCode = INIT_ERROR_EXIT_CODE;
1924      }
1925      return rsAndRMap;
1926    }
1927
1928    private Map<String, List<RegionInfo>> doFilterRegionServerByName(
1929        Map<String, List<RegionInfo>> fullRsAndRMap) {
1930
1931      Map<String, List<RegionInfo>> filteredRsAndRMap = null;
1932
1933      if (this.targets != null && this.targets.length > 0) {
1934        filteredRsAndRMap = new HashMap<>();
1935        Pattern pattern = null;
1936        Matcher matcher = null;
1937        boolean regExpFound = false;
1938        for (String rsName : this.targets) {
1939          if (this.useRegExp) {
1940            regExpFound = false;
1941            pattern = Pattern.compile(rsName);
1942            for (Map.Entry<String, List<RegionInfo>> entry : fullRsAndRMap.entrySet()) {
1943              matcher = pattern.matcher(entry.getKey());
1944              if (matcher.matches()) {
1945                filteredRsAndRMap.put(entry.getKey(), entry.getValue());
1946                regExpFound = true;
1947              }
1948            }
1949            if (!regExpFound) {
1950              LOG.info("No RegionServerInfo found, regionServerPattern {}", rsName);
1951            }
1952          } else {
1953            if (fullRsAndRMap.containsKey(rsName)) {
1954              filteredRsAndRMap.put(rsName, fullRsAndRMap.get(rsName));
1955            } else {
1956              LOG.info("No RegionServerInfo found, regionServerName {}", rsName);
1957            }
1958          }
1959        }
1960      } else {
1961        filteredRsAndRMap = fullRsAndRMap;
1962      }
1963      return filteredRsAndRMap;
1964    }
1965  }
1966
1967  public static void main(String[] args) throws Exception {
1968    final Configuration conf = HBaseConfiguration.create();
1969
1970    int numThreads = conf.getInt("hbase.canary.threads.num", MAX_THREADS_NUM);
1971    LOG.info("Execution thread count={}", numThreads);
1972
1973    int exitCode;
1974    ExecutorService executor = new ScheduledThreadPoolExecutor(numThreads);
1975    try {
1976      exitCode = ToolRunner.run(conf, new CanaryTool(executor), args);
1977    } finally {
1978      executor.shutdown();
1979    }
1980    System.exit(exitCode);
1981  }
1982}