001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.tool;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT;
021import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_ZNODE_PARENT;
022import static org.apache.hadoop.hbase.util.Addressing.inetSocketAddress2String;
023
024import java.io.Closeable;
025import java.io.IOException;
026import java.net.BindException;
027import java.net.InetSocketAddress;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Collections;
031import java.util.EnumSet;
032import java.util.HashMap;
033import java.util.HashSet;
034import java.util.LinkedList;
035import java.util.List;
036import java.util.Map;
037import java.util.Set;
038import java.util.TreeSet;
039import java.util.concurrent.Callable;
040import java.util.concurrent.ConcurrentHashMap;
041import java.util.concurrent.ConcurrentMap;
042import java.util.concurrent.ExecutionException;
043import java.util.concurrent.ExecutorService;
044import java.util.concurrent.Future;
045import java.util.concurrent.ScheduledThreadPoolExecutor;
046import java.util.concurrent.ThreadLocalRandom;
047import java.util.concurrent.atomic.AtomicLong;
048import java.util.concurrent.atomic.LongAdder;
049import java.util.regex.Matcher;
050import java.util.regex.Pattern;
051import org.apache.commons.lang3.time.StopWatch;
052import org.apache.hadoop.conf.Configuration;
053import org.apache.hadoop.hbase.AuthUtil;
054import org.apache.hadoop.hbase.ChoreService;
055import org.apache.hadoop.hbase.ClusterMetrics;
056import org.apache.hadoop.hbase.ClusterMetrics.Option;
057import org.apache.hadoop.hbase.DoNotRetryIOException;
058import org.apache.hadoop.hbase.HBaseConfiguration;
059import org.apache.hadoop.hbase.HBaseInterfaceAudience;
060import org.apache.hadoop.hbase.HConstants;
061import org.apache.hadoop.hbase.HRegionLocation;
062import org.apache.hadoop.hbase.MetaTableAccessor;
063import org.apache.hadoop.hbase.NamespaceDescriptor;
064import org.apache.hadoop.hbase.ScheduledChore;
065import org.apache.hadoop.hbase.ServerName;
066import org.apache.hadoop.hbase.TableName;
067import org.apache.hadoop.hbase.TableNotEnabledException;
068import org.apache.hadoop.hbase.TableNotFoundException;
069import org.apache.hadoop.hbase.client.Admin;
070import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
071import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
072import org.apache.hadoop.hbase.client.Connection;
073import org.apache.hadoop.hbase.client.ConnectionFactory;
074import org.apache.hadoop.hbase.client.Get;
075import org.apache.hadoop.hbase.client.Put;
076import org.apache.hadoop.hbase.client.RegionInfo;
077import org.apache.hadoop.hbase.client.RegionLocator;
078import org.apache.hadoop.hbase.client.ResultScanner;
079import org.apache.hadoop.hbase.client.Scan;
080import org.apache.hadoop.hbase.client.Table;
081import org.apache.hadoop.hbase.client.TableDescriptor;
082import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
083import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
084import org.apache.hadoop.hbase.http.InfoServer;
085import org.apache.hadoop.hbase.tool.CanaryTool.RegionTask.TaskType;
086import org.apache.hadoop.hbase.util.Bytes;
087import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
088import org.apache.hadoop.hbase.util.Pair;
089import org.apache.hadoop.hbase.util.ReflectionUtils;
090import org.apache.hadoop.hbase.util.RegionSplitter;
091import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
092import org.apache.hadoop.hbase.zookeeper.ZKConfig;
093import org.apache.hadoop.util.Tool;
094import org.apache.hadoop.util.ToolRunner;
095import org.apache.yetus.audience.InterfaceAudience;
096import org.apache.zookeeper.KeeperException;
097import org.apache.zookeeper.ZooKeeper;
098import org.apache.zookeeper.client.ConnectStringParser;
099import org.apache.zookeeper.data.Stat;
100import org.slf4j.Logger;
101import org.slf4j.LoggerFactory;
102
103import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
104
105/**
106 * HBase Canary Tool for "canary monitoring" of a running HBase cluster. There are three modes:
107 * <ol>
108 * <li>region mode (Default): For each region, try to get one row per column family outputting
109 * information on failure (ERROR) or else the latency.</li>
110 * <li>regionserver mode: For each regionserver try to get one row from one table selected randomly
111 * outputting information on failure (ERROR) or else the latency.</li>
112 * <li>zookeeper mode: for each zookeeper instance, selects a znode outputting information on
113 * failure (ERROR) or else the latency.</li>
114 * </ol>
115 */
116@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
117public class CanaryTool implements Tool, Canary {
118  public static final String HBASE_CANARY_INFO_PORT = "hbase.canary.info.port";
119  public static final String HBASE_CANARY_INFO_BINDADDRESS = "hbase.canary.info.bindAddress";
120
121  private void putUpWebUI() throws IOException {
122    int port = conf.getInt(HBASE_CANARY_INFO_PORT, -1);
123    // -1 is for disabling info server
124    if (port < 0) {
125      return;
126    }
127    if (zookeeperMode) {
128      LOG.info("WebUI is not supported in Zookeeper mode");
129    } else if (regionServerMode) {
130      LOG.info("WebUI is not supported in RegionServer mode");
131    } else {
132      String addr = conf.get(HBASE_CANARY_INFO_BINDADDRESS, "0.0.0.0");
133      try {
134        InfoServer infoServer = new InfoServer("canary", addr, port, false, conf);
135        infoServer.addUnprivilegedServlet("canary", "/canary-status", CanaryStatusServlet.class);
136        infoServer.setAttribute("sink", getSink(conf, RegionStdOutSink.class));
137        infoServer.start();
138        LOG.info("Bind Canary http info server to {}:{} ", addr, port);
139      } catch (BindException e) {
140        LOG.warn("Failed binding Canary http info server to {}:{}", addr, port, e);
141      }
142    }
143  }
144
145  @Override
146  public int checkRegions(String[] targets) throws Exception {
147    String configuredReadTableTimeoutsStr = conf.get(HBASE_CANARY_REGION_READ_TABLE_TIMEOUT);
148    try {
149      LOG.info("Canary tool is running in Region mode");
150      if (configuredReadTableTimeoutsStr != null) {
151        populateReadTableTimeoutsMap(configuredReadTableTimeoutsStr);
152      }
153    } catch (IllegalArgumentException e) {
154      LOG.error("Constructing read table timeouts map failed ", e);
155      return USAGE_EXIT_CODE;
156    }
157    return runMonitor(targets);
158  }
159
160  @Override
161  public int checkRegionServers(String[] targets) throws Exception {
162    regionServerMode = true;
163    LOG.info("Canary tool is running in RegionServer mode");
164    return runMonitor(targets);
165  }
166
167  @Override
168  public int checkZooKeeper() throws Exception {
169    zookeeperMode = true;
170    LOG.info("Canary tool is running in ZooKeeper mode");
171    return runMonitor(null);
172  }
173
174  /**
175   * Sink interface used by the canary to output information
176   */
177  public interface Sink {
178    long getReadFailureCount();
179
180    long incReadFailureCount();
181
182    Map<String, String> getReadFailures();
183
184    void updateReadFailures(String regionName, String serverName);
185
186    long getWriteFailureCount();
187
188    long incWriteFailureCount();
189
190    Map<String, String> getWriteFailures();
191
192    void updateWriteFailures(String regionName, String serverName);
193
194    long getReadSuccessCount();
195
196    long incReadSuccessCount();
197
198    long getWriteSuccessCount();
199
200    long incWriteSuccessCount();
201  }
202
203  /**
204   * Simple implementation of canary sink that allows plotting to a file or standard output.
205   */
206  public static class StdOutSink implements Sink {
207    private AtomicLong readFailureCount = new AtomicLong(0), writeFailureCount = new AtomicLong(0),
208        readSuccessCount = new AtomicLong(0), writeSuccessCount = new AtomicLong(0);
209    private Map<String, String> readFailures = new ConcurrentHashMap<>();
210    private Map<String, String> writeFailures = new ConcurrentHashMap<>();
211
212    @Override
213    public long getReadFailureCount() {
214      return readFailureCount.get();
215    }
216
217    @Override
218    public long incReadFailureCount() {
219      return readFailureCount.incrementAndGet();
220    }
221
222    @Override
223    public Map<String, String> getReadFailures() {
224      return readFailures;
225    }
226
227    @Override
228    public void updateReadFailures(String regionName, String serverName) {
229      readFailures.put(regionName, serverName);
230    }
231
232    @Override
233    public long getWriteFailureCount() {
234      return writeFailureCount.get();
235    }
236
237    @Override
238    public long incWriteFailureCount() {
239      return writeFailureCount.incrementAndGet();
240    }
241
242    @Override
243    public Map<String, String> getWriteFailures() {
244      return writeFailures;
245    }
246
247    @Override
248    public void updateWriteFailures(String regionName, String serverName) {
249      writeFailures.put(regionName, serverName);
250    }
251
252    @Override
253    public long getReadSuccessCount() {
254      return readSuccessCount.get();
255    }
256
257    @Override
258    public long incReadSuccessCount() {
259      return readSuccessCount.incrementAndGet();
260    }
261
262    @Override
263    public long getWriteSuccessCount() {
264      return writeSuccessCount.get();
265    }
266
267    @Override
268    public long incWriteSuccessCount() {
269      return writeSuccessCount.incrementAndGet();
270    }
271  }
272
273  /**
274   * By RegionServer, for 'regionserver' mode.
275   */
276  public static class RegionServerStdOutSink extends StdOutSink {
277    public void publishReadFailure(String table, String server) {
278      incReadFailureCount();
279      LOG.error("Read from {} on {}", table, server);
280    }
281
282    public void publishReadTiming(String table, String server, long msTime) {
283      LOG.info("Read from {} on {} in {}ms", table, server, msTime);
284    }
285  }
286
287  /**
288   * Output for 'zookeeper' mode.
289   */
290  public static class ZookeeperStdOutSink extends StdOutSink {
291    public void publishReadFailure(String znode, String server) {
292      incReadFailureCount();
293      LOG.error("Read from {} on {}", znode, server);
294    }
295
296    public void publishReadTiming(String znode, String server, long msTime) {
297      LOG.info("Read from {} on {} in {}ms", znode, server, msTime);
298    }
299  }
300
301  /**
302   * By Region, for 'region' mode.
303   */
304  public static class RegionStdOutSink extends StdOutSink {
305    private Map<String, LongAdder> perTableReadLatency = new HashMap<>();
306    private LongAdder writeLatency = new LongAdder();
307    private final ConcurrentMap<String, List<RegionTaskResult>> regionMap =
308      new ConcurrentHashMap<>();
309    private ConcurrentMap<ServerName, LongAdder> perServerFailuresCount = new ConcurrentHashMap<>();
310    private ConcurrentMap<String, LongAdder> perTableFailuresCount = new ConcurrentHashMap<>();
311
312    public ConcurrentMap<ServerName, LongAdder> getPerServerFailuresCount() {
313      return perServerFailuresCount;
314    }
315
316    public ConcurrentMap<String, LongAdder> getPerTableFailuresCount() {
317      return perTableFailuresCount;
318    }
319
320    public void resetFailuresCountDetails() {
321      perServerFailuresCount.clear();
322      perTableFailuresCount.clear();
323    }
324
325    private void incFailuresCountDetails(ServerName serverName, RegionInfo region) {
326      perServerFailuresCount.compute(serverName, (server, count) -> {
327        if (count == null) {
328          count = new LongAdder();
329        }
330        count.increment();
331        return count;
332      });
333      perTableFailuresCount.compute(region.getTable().getNameAsString(), (tableName, count) -> {
334        if (count == null) {
335          count = new LongAdder();
336        }
337        count.increment();
338        return count;
339      });
340    }
341
342    public void publishReadFailure(ServerName serverName, RegionInfo region, Exception e) {
343      incReadFailureCount();
344      incFailuresCountDetails(serverName, region);
345      LOG.error("Read from {} on serverName={} failed", region.getRegionNameAsString(), serverName,
346        e);
347    }
348
349    public void publishReadFailure(ServerName serverName, RegionInfo region,
350      ColumnFamilyDescriptor column, Exception e) {
351      incReadFailureCount();
352      incFailuresCountDetails(serverName, region);
353      LOG.error("Read from {} on serverName={}, columnFamily={} failed",
354        region.getRegionNameAsString(), serverName, column.getNameAsString(), e);
355    }
356
357    public void publishReadTiming(ServerName serverName, RegionInfo region,
358      ColumnFamilyDescriptor column, long msTime) {
359      RegionTaskResult rtr = new RegionTaskResult(region, region.getTable(), serverName, column);
360      rtr.setReadSuccess();
361      rtr.setReadLatency(msTime);
362      List<RegionTaskResult> rtrs = regionMap.get(region.getRegionNameAsString());
363      rtrs.add(rtr);
364      // Note that read success count will be equal to total column family read successes.
365      incReadSuccessCount();
366      LOG.info("Read from {} on {} {} in {}ms", region.getRegionNameAsString(), serverName,
367        column.getNameAsString(), msTime);
368    }
369
370    public void publishWriteFailure(ServerName serverName, RegionInfo region, Exception e) {
371      incWriteFailureCount();
372      incFailuresCountDetails(serverName, region);
373      LOG.error("Write to {} on {} failed", region.getRegionNameAsString(), serverName, e);
374    }
375
376    public void publishWriteFailure(ServerName serverName, RegionInfo region,
377      ColumnFamilyDescriptor column, Exception e) {
378      incWriteFailureCount();
379      incFailuresCountDetails(serverName, region);
380      LOG.error("Write to {} on {} {} failed", region.getRegionNameAsString(), serverName,
381        column.getNameAsString(), e);
382    }
383
384    public void publishWriteTiming(ServerName serverName, RegionInfo region,
385      ColumnFamilyDescriptor column, long msTime) {
386      RegionTaskResult rtr = new RegionTaskResult(region, region.getTable(), serverName, column);
387      rtr.setWriteSuccess();
388      rtr.setWriteLatency(msTime);
389      List<RegionTaskResult> rtrs = regionMap.get(region.getRegionNameAsString());
390      rtrs.add(rtr);
391      // Note that write success count will be equal to total column family write successes.
392      incWriteSuccessCount();
393      LOG.info("Write to {} on {} {} in {}ms", region.getRegionNameAsString(), serverName,
394        column.getNameAsString(), msTime);
395    }
396
397    public Map<String, LongAdder> getReadLatencyMap() {
398      return this.perTableReadLatency;
399    }
400
401    public LongAdder initializeAndGetReadLatencyForTable(String tableName) {
402      LongAdder initLatency = new LongAdder();
403      this.perTableReadLatency.put(tableName, initLatency);
404      return initLatency;
405    }
406
407    public void initializeWriteLatency() {
408      this.writeLatency.reset();
409    }
410
411    public LongAdder getWriteLatency() {
412      return this.writeLatency;
413    }
414
415    public ConcurrentMap<String, List<RegionTaskResult>> getRegionMap() {
416      return this.regionMap;
417    }
418
419    public int getTotalExpectedRegions() {
420      return this.regionMap.size();
421    }
422  }
423
424  /**
425   * Run a single zookeeper Task and then exit.
426   */
427  static class ZookeeperTask implements Callable<Void> {
428    private final Connection connection;
429    private final String host;
430    private String znode;
431    private final int timeout;
432    private ZookeeperStdOutSink sink;
433
434    public ZookeeperTask(Connection connection, String host, String znode, int timeout,
435      ZookeeperStdOutSink sink) {
436      this.connection = connection;
437      this.host = host;
438      this.znode = znode;
439      this.timeout = timeout;
440      this.sink = sink;
441    }
442
443    @Override
444    public Void call() throws Exception {
445      ZooKeeper zooKeeper = null;
446      try {
447        zooKeeper = new ZooKeeper(host, timeout, EmptyWatcher.instance);
448        Stat exists = zooKeeper.exists(znode, false);
449        StopWatch stopwatch = new StopWatch();
450        stopwatch.start();
451        zooKeeper.getData(znode, false, exists);
452        stopwatch.stop();
453        sink.publishReadTiming(znode, host, stopwatch.getTime());
454      } catch (KeeperException | InterruptedException e) {
455        sink.publishReadFailure(znode, host);
456      } finally {
457        if (zooKeeper != null) {
458          zooKeeper.close();
459        }
460      }
461      return null;
462    }
463  }
464
465  /**
466   * Run a single Region Task and then exit. For each column family of the Region, get one row and
467   * output latency or failure.
468   */
469  static class RegionTask implements Callable<Void> {
470    public enum TaskType {
471      READ,
472      WRITE
473    }
474
475    private Connection connection;
476    private RegionInfo region;
477    private RegionStdOutSink sink;
478    private TaskType taskType;
479    private boolean rawScanEnabled;
480    private ServerName serverName;
481    private LongAdder readWriteLatency;
482    private boolean readAllCF;
483
484    RegionTask(Connection connection, RegionInfo region, ServerName serverName,
485      RegionStdOutSink sink, TaskType taskType, boolean rawScanEnabled, LongAdder rwLatency,
486      boolean readAllCF) {
487      this.connection = connection;
488      this.region = region;
489      this.serverName = serverName;
490      this.sink = sink;
491      this.taskType = taskType;
492      this.rawScanEnabled = rawScanEnabled;
493      this.readWriteLatency = rwLatency;
494      this.readAllCF = readAllCF;
495    }
496
497    @Override
498    public Void call() {
499      switch (taskType) {
500        case READ:
501          return read();
502        case WRITE:
503          return write();
504        default:
505          return read();
506      }
507    }
508
509    private Void readColumnFamily(Table table, ColumnFamilyDescriptor column) {
510      byte[] startKey = null;
511      Get get = null;
512      Scan scan = null;
513      ResultScanner rs = null;
514      StopWatch stopWatch = new StopWatch();
515      startKey = region.getStartKey();
516      // Can't do a get on empty start row so do a Scan of first element if any instead.
517      if (startKey.length > 0) {
518        get = new Get(startKey);
519        get.setCacheBlocks(false);
520        get.setFilter(new FirstKeyOnlyFilter());
521        get.addFamily(column.getName());
522      } else {
523        scan = new Scan();
524        LOG.debug("rawScan {} for {}", rawScanEnabled, region.getTable());
525        scan.setRaw(rawScanEnabled);
526        scan.setCaching(1);
527        scan.setCacheBlocks(false);
528        scan.setFilter(new FirstKeyOnlyFilter());
529        scan.addFamily(column.getName());
530        scan.setMaxResultSize(1L);
531        scan.setOneRowLimit();
532      }
533      LOG.debug("Reading from {} {} {} {}", region.getTable(), region.getRegionNameAsString(),
534        column.getNameAsString(), Bytes.toStringBinary(startKey));
535      try {
536        stopWatch.start();
537        if (startKey.length > 0) {
538          table.get(get);
539        } else {
540          rs = table.getScanner(scan);
541          rs.next();
542        }
543        stopWatch.stop();
544        this.readWriteLatency.add(stopWatch.getTime());
545        sink.publishReadTiming(serverName, region, column, stopWatch.getTime());
546      } catch (Exception e) {
547        sink.publishReadFailure(serverName, region, column, e);
548        sink.updateReadFailures(region.getRegionNameAsString(),
549          serverName == null ? "NULL" : serverName.getHostname());
550      } finally {
551        if (rs != null) {
552          rs.close();
553        }
554      }
555      return null;
556    }
557
558    private ColumnFamilyDescriptor randomPickOneColumnFamily(ColumnFamilyDescriptor[] cfs) {
559      int size = cfs.length;
560      return cfs[ThreadLocalRandom.current().nextInt(size)];
561
562    }
563
564    public Void read() {
565      Table table = null;
566      TableDescriptor tableDesc = null;
567      try {
568        LOG.debug("Reading table descriptor for table {}", region.getTable());
569        table = connection.getTable(region.getTable());
570        tableDesc = table.getDescriptor();
571      } catch (IOException e) {
572        LOG.debug("sniffRegion {} of {} failed", region.getEncodedName(), e);
573        sink.publishReadFailure(serverName, region, e);
574        if (table != null) {
575          try {
576            table.close();
577          } catch (IOException ioe) {
578            LOG.error("Close table failed", e);
579          }
580        }
581        return null;
582      }
583
584      if (readAllCF) {
585        for (ColumnFamilyDescriptor column : tableDesc.getColumnFamilies()) {
586          readColumnFamily(table, column);
587        }
588      } else {
589        readColumnFamily(table, randomPickOneColumnFamily(tableDesc.getColumnFamilies()));
590      }
591      try {
592        table.close();
593      } catch (IOException e) {
594        LOG.error("Close table failed", e);
595      }
596      return null;
597    }
598
599    /**
600     * Check writes for the canary table
601     */
602    private Void write() {
603      Table table = null;
604      TableDescriptor tableDesc = null;
605      try {
606        table = connection.getTable(region.getTable());
607        tableDesc = table.getDescriptor();
608        byte[] rowToCheck = region.getStartKey();
609        if (rowToCheck.length == 0) {
610          rowToCheck = new byte[] { 0x0 };
611        }
612        int writeValueSize =
613          connection.getConfiguration().getInt(HConstants.HBASE_CANARY_WRITE_VALUE_SIZE_KEY, 10);
614        for (ColumnFamilyDescriptor column : tableDesc.getColumnFamilies()) {
615          Put put = new Put(rowToCheck);
616          byte[] value = new byte[writeValueSize];
617          Bytes.random(value);
618          put.addColumn(column.getName(), HConstants.EMPTY_BYTE_ARRAY, value);
619          LOG.debug("Writing to {} {} {} {}", tableDesc.getTableName(),
620            region.getRegionNameAsString(), column.getNameAsString(),
621            Bytes.toStringBinary(rowToCheck));
622          try {
623            long startTime = EnvironmentEdgeManager.currentTime();
624            table.put(put);
625            long time = EnvironmentEdgeManager.currentTime() - startTime;
626            this.readWriteLatency.add(time);
627            sink.publishWriteTiming(serverName, region, column, time);
628          } catch (Exception e) {
629            sink.publishWriteFailure(serverName, region, column, e);
630          }
631        }
632        table.close();
633      } catch (IOException e) {
634        sink.publishWriteFailure(serverName, region, e);
635        sink.updateWriteFailures(region.getRegionNameAsString(), serverName.getHostname());
636      }
637      return null;
638    }
639  }
640
641  /**
642   * Run a single RegionServer Task and then exit. Get one row from a region on the regionserver and
643   * output latency or the failure.
644   */
645  static class RegionServerTask implements Callable<Void> {
646    private Connection connection;
647    private String serverName;
648    private RegionInfo region;
649    private RegionServerStdOutSink sink;
650    private AtomicLong successes;
651
652    RegionServerTask(Connection connection, String serverName, RegionInfo region,
653      RegionServerStdOutSink sink, AtomicLong successes) {
654      this.connection = connection;
655      this.serverName = serverName;
656      this.region = region;
657      this.sink = sink;
658      this.successes = successes;
659    }
660
661    @Override
662    public Void call() {
663      TableName tableName = null;
664      Table table = null;
665      Get get = null;
666      byte[] startKey = null;
667      Scan scan = null;
668      StopWatch stopWatch = new StopWatch();
669      // monitor one region on every region server
670      stopWatch.reset();
671      try {
672        tableName = region.getTable();
673        table = connection.getTable(tableName);
674        startKey = region.getStartKey();
675        // Can't do a get on empty start row so do a Scan of first element if any instead.
676        LOG.debug("Reading from {} {} {} {}", serverName, region.getTable(),
677          region.getRegionNameAsString(), Bytes.toStringBinary(startKey));
678        if (startKey.length > 0) {
679          get = new Get(startKey);
680          get.setCacheBlocks(false);
681          get.setFilter(new FirstKeyOnlyFilter());
682          stopWatch.start();
683          table.get(get);
684          stopWatch.stop();
685        } else {
686          scan = new Scan();
687          scan.setCacheBlocks(false);
688          scan.setFilter(new FirstKeyOnlyFilter());
689          scan.setCaching(1);
690          scan.setMaxResultSize(1L);
691          scan.setOneRowLimit();
692          stopWatch.start();
693          ResultScanner s = table.getScanner(scan);
694          s.next();
695          s.close();
696          stopWatch.stop();
697        }
698        successes.incrementAndGet();
699        sink.publishReadTiming(tableName.getNameAsString(), serverName, stopWatch.getTime());
700      } catch (TableNotFoundException tnfe) {
701        LOG.error("Table may be deleted", tnfe);
702        // This is ignored because it doesn't imply that the regionserver is dead
703      } catch (TableNotEnabledException tnee) {
704        // This is considered a success since we got a response.
705        successes.incrementAndGet();
706        LOG.debug("The targeted table was disabled.  Assuming success.");
707      } catch (DoNotRetryIOException dnrioe) {
708        sink.publishReadFailure(tableName.getNameAsString(), serverName);
709        LOG.error(dnrioe.toString(), dnrioe);
710      } catch (IOException e) {
711        sink.publishReadFailure(tableName.getNameAsString(), serverName);
712        LOG.error(e.toString(), e);
713      } finally {
714        if (table != null) {
715          try {
716            table.close();
717          } catch (IOException e) {/* DO NOTHING */
718            LOG.error("Close table failed", e);
719          }
720        }
721        scan = null;
722        get = null;
723        startKey = null;
724      }
725      return null;
726    }
727  }
728
729  private static final int USAGE_EXIT_CODE = 1;
730  private static final int INIT_ERROR_EXIT_CODE = 2;
731  private static final int TIMEOUT_ERROR_EXIT_CODE = 3;
732  private static final int ERROR_EXIT_CODE = 4;
733  private static final int FAILURE_EXIT_CODE = 5;
734
735  private static final long DEFAULT_INTERVAL = 60000;
736
737  private static final long DEFAULT_TIMEOUT = 600000; // 10 mins
738  private static final int MAX_THREADS_NUM = 16; // #threads to contact regions
739
740  private static final Logger LOG = LoggerFactory.getLogger(Canary.class);
741
742  public static final TableName DEFAULT_WRITE_TABLE_NAME =
743    TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "canary");
744
745  private static final String CANARY_TABLE_FAMILY_NAME = "Test";
746
747  private Configuration conf = null;
748  private long interval = 0;
749  private Sink sink = null;
750
751  /**
752   * True if we are to run in 'regionServer' mode.
753   */
754  private boolean regionServerMode = false;
755
756  /**
757   * True if we are to run in zookeeper 'mode'.
758   */
759  private boolean zookeeperMode = false;
760
761  /**
762   * This is a Map of table to timeout. The timeout is for reading all regions in the table; i.e. we
763   * aggregate time to fetch each region and it needs to be less than this value else we log an
764   * ERROR.
765   */
766  private HashMap<String, Long> configuredReadTableTimeouts = new HashMap<>();
767
768  public static final String HBASE_CANARY_REGIONSERVER_ALL_REGIONS =
769    "hbase.canary.regionserver_all_regions";
770
771  public static final String HBASE_CANARY_REGION_WRITE_SNIFFING =
772    "hbase.canary.region.write.sniffing";
773  public static final String HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT =
774    "hbase.canary.region.write.table.timeout";
775  public static final String HBASE_CANARY_REGION_WRITE_TABLE_NAME =
776    "hbase.canary.region.write.table.name";
777  public static final String HBASE_CANARY_REGION_READ_TABLE_TIMEOUT =
778    "hbase.canary.region.read.table.timeout";
779
780  public static final String HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES =
781    "hbase.canary.zookeeper.permitted.failures";
782
783  public static final String HBASE_CANARY_USE_REGEX = "hbase.canary.use.regex";
784  public static final String HBASE_CANARY_TIMEOUT = "hbase.canary.timeout";
785  public static final String HBASE_CANARY_FAIL_ON_ERROR = "hbase.canary.fail.on.error";
786
787  private ExecutorService executor; // threads to retrieve data from regionservers
788
789  public CanaryTool() {
790    this(new ScheduledThreadPoolExecutor(1));
791  }
792
793  public CanaryTool(ExecutorService executor) {
794    this(executor, null);
795  }
796
797  @InterfaceAudience.Private
798  CanaryTool(ExecutorService executor, Sink sink) {
799    this.executor = executor;
800    this.sink = sink;
801  }
802
803  CanaryTool(Configuration conf, ExecutorService executor) {
804    this(conf, executor, null);
805  }
806
807  CanaryTool(Configuration conf, ExecutorService executor, Sink sink) {
808    this(executor, sink);
809    setConf(conf);
810  }
811
812  @Override
813  public Configuration getConf() {
814    return conf;
815  }
816
817  @Override
818  public void setConf(Configuration conf) {
819    if (conf == null) {
820      conf = HBaseConfiguration.create();
821    }
822    this.conf = conf;
823  }
824
825  private int parseArgs(String[] args) {
826    int index = -1;
827    long permittedFailures = 0;
828    boolean regionServerAllRegions = false, writeSniffing = false;
829    String readTableTimeoutsStr = null;
830    // Process command line args
831    for (int i = 0; i < args.length; i++) {
832      String cmd = args[i];
833      if (cmd.startsWith("-")) {
834        if (index >= 0) {
835          // command line args must be in the form: [opts] [table 1 [table 2 ...]]
836          System.err.println("Invalid command line options");
837          printUsageAndExit();
838        }
839        if (cmd.equals("-help") || cmd.equals("-h")) {
840          // user asked for help, print the help and quit.
841          printUsageAndExit();
842        } else if (cmd.equals("-daemon") && interval == 0) {
843          // user asked for daemon mode, set a default interval between checks
844          interval = DEFAULT_INTERVAL;
845        } else if (cmd.equals("-interval")) {
846          // user has specified an interval for canary breaths (-interval N)
847          i++;
848
849          if (i == args.length) {
850            System.err.println("-interval takes a numeric seconds value argument.");
851            printUsageAndExit();
852          }
853          try {
854            interval = Long.parseLong(args[i]) * 1000;
855          } catch (NumberFormatException e) {
856            System.err.println("-interval needs a numeric value argument.");
857            printUsageAndExit();
858          }
859        } else if (cmd.equals("-zookeeper")) {
860          this.zookeeperMode = true;
861        } else if (cmd.equals("-regionserver")) {
862          this.regionServerMode = true;
863        } else if (cmd.equals("-allRegions")) {
864          conf.setBoolean(HBASE_CANARY_REGIONSERVER_ALL_REGIONS, true);
865          regionServerAllRegions = true;
866        } else if (cmd.equals("-writeSniffing")) {
867          writeSniffing = true;
868          conf.setBoolean(HBASE_CANARY_REGION_WRITE_SNIFFING, true);
869        } else if (cmd.equals("-treatFailureAsError") || cmd.equals("-failureAsError")) {
870          conf.setBoolean(HBASE_CANARY_FAIL_ON_ERROR, true);
871        } else if (cmd.equals("-e")) {
872          conf.setBoolean(HBASE_CANARY_USE_REGEX, true);
873        } else if (cmd.equals("-t")) {
874          i++;
875
876          if (i == args.length) {
877            System.err.println("-t takes a numeric milliseconds value argument.");
878            printUsageAndExit();
879          }
880          long timeout = 0;
881          try {
882            timeout = Long.parseLong(args[i]);
883          } catch (NumberFormatException e) {
884            System.err.println("-t takes a numeric milliseconds value argument.");
885            printUsageAndExit();
886          }
887          conf.setLong(HBASE_CANARY_TIMEOUT, timeout);
888        } else if (cmd.equals("-writeTableTimeout")) {
889          i++;
890
891          if (i == args.length) {
892            System.err.println("-writeTableTimeout takes a numeric milliseconds value argument.");
893            printUsageAndExit();
894          }
895          long configuredWriteTableTimeout = 0;
896          try {
897            configuredWriteTableTimeout = Long.parseLong(args[i]);
898          } catch (NumberFormatException e) {
899            System.err.println("-writeTableTimeout takes a numeric milliseconds value argument.");
900            printUsageAndExit();
901          }
902          conf.setLong(HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT, configuredWriteTableTimeout);
903        } else if (cmd.equals("-writeTable")) {
904          i++;
905
906          if (i == args.length) {
907            System.err.println("-writeTable takes a string tablename value argument.");
908            printUsageAndExit();
909          }
910          conf.set(HBASE_CANARY_REGION_WRITE_TABLE_NAME, args[i]);
911        } else if (cmd.equals("-f")) {
912          i++;
913          if (i == args.length) {
914            System.err.println("-f needs a boolean value argument (true|false).");
915            printUsageAndExit();
916          }
917
918          conf.setBoolean(HBASE_CANARY_FAIL_ON_ERROR, Boolean.parseBoolean(args[i]));
919        } else if (cmd.equals("-readTableTimeouts")) {
920          i++;
921          if (i == args.length) {
922            System.err.println("-readTableTimeouts needs a comma-separated list of read "
923              + "millisecond timeouts per table (without spaces).");
924            printUsageAndExit();
925          }
926          readTableTimeoutsStr = args[i];
927          conf.set(HBASE_CANARY_REGION_READ_TABLE_TIMEOUT, readTableTimeoutsStr);
928        } else if (cmd.equals("-permittedZookeeperFailures")) {
929          i++;
930
931          if (i == args.length) {
932            System.err.println("-permittedZookeeperFailures needs a numeric value argument.");
933            printUsageAndExit();
934          }
935          try {
936            permittedFailures = Long.parseLong(args[i]);
937          } catch (NumberFormatException e) {
938            System.err.println("-permittedZookeeperFailures needs a numeric value argument.");
939            printUsageAndExit();
940          }
941          conf.setLong(HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES, permittedFailures);
942        } else {
943          // no options match
944          System.err.println(cmd + " options is invalid.");
945          printUsageAndExit();
946        }
947      } else if (index < 0) {
948        // keep track of first table name specified by the user
949        index = i;
950      }
951    }
952    if (regionServerAllRegions && !this.regionServerMode) {
953      System.err.println("-allRegions can only be specified in regionserver mode.");
954      printUsageAndExit();
955    }
956    if (this.zookeeperMode) {
957      if (this.regionServerMode || regionServerAllRegions || writeSniffing) {
958        System.err.println("-zookeeper is exclusive and cannot be combined with " + "other modes.");
959        printUsageAndExit();
960      }
961    }
962    if (permittedFailures != 0 && !this.zookeeperMode) {
963      System.err.println("-permittedZookeeperFailures requires -zookeeper mode.");
964      printUsageAndExit();
965    }
966    if (readTableTimeoutsStr != null && (this.regionServerMode || this.zookeeperMode)) {
967      System.err.println("-readTableTimeouts can only be configured in region mode.");
968      printUsageAndExit();
969    }
970    return index;
971  }
972
973  @Override
974  public int run(String[] args) throws Exception {
975    int index = parseArgs(args);
976    String[] monitorTargets = null;
977
978    if (index >= 0) {
979      int length = args.length - index;
980      monitorTargets = new String[length];
981      System.arraycopy(args, index, monitorTargets, 0, length);
982    }
983    if (interval > 0) {
984      // Only show the web page in daemon mode
985      putUpWebUI();
986    }
987    if (zookeeperMode) {
988      return checkZooKeeper();
989    } else if (regionServerMode) {
990      return checkRegionServers(monitorTargets);
991    } else {
992      return checkRegions(monitorTargets);
993    }
994  }
995
996  private int runMonitor(String[] monitorTargets) throws Exception {
997    ChoreService choreService = null;
998
999    // Launches chore for refreshing kerberos credentials if security is enabled.
1000    // Please see http://hbase.apache.org/book.html#_running_canary_in_a_kerberos_enabled_cluster
1001    // for more details.
1002    final ScheduledChore authChore = AuthUtil.getAuthChore(conf);
1003    if (authChore != null) {
1004      choreService = new ChoreService("CANARY_TOOL");
1005      choreService.scheduleChore(authChore);
1006    }
1007
1008    // Start to prepare the stuffs
1009    Monitor monitor = null;
1010    Thread monitorThread;
1011    long startTime = 0;
1012    long currentTimeLength = 0;
1013    boolean failOnError = conf.getBoolean(HBASE_CANARY_FAIL_ON_ERROR, true);
1014    long timeout = conf.getLong(HBASE_CANARY_TIMEOUT, DEFAULT_TIMEOUT);
1015    // Get a connection to use in below.
1016    try (Connection connection = ConnectionFactory.createConnection(this.conf)) {
1017      do {
1018        // Do monitor !!
1019        try {
1020          monitor = this.newMonitor(connection, monitorTargets);
1021          startTime = EnvironmentEdgeManager.currentTime();
1022          monitorThread = new Thread(monitor, "CanaryMonitor-" + startTime);
1023          monitorThread.start();
1024          while (!monitor.isDone()) {
1025            // wait for 1 sec
1026            Thread.sleep(1000);
1027            // exit if any error occurs
1028            if (failOnError && monitor.hasError()) {
1029              monitorThread.interrupt();
1030              if (monitor.initialized) {
1031                return monitor.errorCode;
1032              } else {
1033                return INIT_ERROR_EXIT_CODE;
1034              }
1035            }
1036            currentTimeLength = EnvironmentEdgeManager.currentTime() - startTime;
1037            if (currentTimeLength > timeout) {
1038              LOG.error("The monitor is running too long (" + currentTimeLength
1039                + ") after timeout limit:" + timeout + " will be killed itself !!");
1040              if (monitor.initialized) {
1041                return TIMEOUT_ERROR_EXIT_CODE;
1042              } else {
1043                return INIT_ERROR_EXIT_CODE;
1044              }
1045            }
1046          }
1047
1048          if (failOnError && monitor.finalCheckForErrors()) {
1049            monitorThread.interrupt();
1050            return monitor.errorCode;
1051          }
1052        } finally {
1053          if (monitor != null) {
1054            monitor.close();
1055          }
1056        }
1057
1058        Thread.sleep(interval);
1059      } while (interval > 0);
1060    } // try-with-resources close
1061
1062    if (choreService != null) {
1063      choreService.shutdown();
1064    }
1065    return monitor.errorCode;
1066  }
1067
1068  @Override
1069  public Map<String, String> getReadFailures() {
1070    return sink.getReadFailures();
1071  }
1072
1073  @Override
1074  public Map<String, String> getWriteFailures() {
1075    return sink.getWriteFailures();
1076  }
1077
1078  private void printUsageAndExit() {
1079    System.err.println(
1080      "Usage: canary [OPTIONS] [<TABLE1> [<TABLE2]...] | [<REGIONSERVER1> [<REGIONSERVER2]..]");
1081    System.err.println("Where [OPTIONS] are:");
1082    System.err.println(" -h,-help        show this help and exit.");
1083    System.err.println(
1084      " -regionserver   set 'regionserver mode'; gets row from random region on " + "server");
1085    System.err.println(
1086      " -allRegions     get from ALL regions when 'regionserver mode', not just " + "random one.");
1087    System.err.println(" -zookeeper      set 'zookeeper mode'; grab zookeeper.znode.parent on "
1088      + "each ensemble member");
1089    System.err.println(" -daemon         continuous check at defined intervals.");
1090    System.err.println(" -interval <N>   interval between checks in seconds");
1091    System.err
1092      .println(" -e              consider table/regionserver argument as regular " + "expression");
1093    System.err.println(" -f <B>          exit on first error; default=true");
1094    System.err.println(" -failureAsError treat read/write failure as error");
1095    System.err.println(" -t <N>          timeout for canary-test run; default=600000ms");
1096    System.err.println(" -writeSniffing  enable write sniffing");
1097    System.err.println(" -writeTable     the table used for write sniffing; default=hbase:canary");
1098    System.err.println(" -writeTableTimeout <N>  timeout for writeTable; default=600000ms");
1099    System.err.println(
1100      " -readTableTimeouts <tableName>=<read timeout>," + "<tableName>=<read timeout>,...");
1101    System.err
1102      .println("                comma-separated list of table read timeouts " + "(no spaces);");
1103    System.err.println("                logs 'ERROR' if takes longer. default=600000ms");
1104    System.err.println(" -permittedZookeeperFailures <N>  Ignore first N failures attempting to ");
1105    System.err.println("                connect to individual zookeeper nodes in ensemble");
1106    System.err.println("");
1107    System.err.println(" -D<configProperty>=<value> to assign or override configuration params");
1108    System.err.println(" -Dhbase.canary.read.raw.enabled=<true/false> Set to enable/disable "
1109      + "raw scan; default=false");
1110    System.err.println(
1111      " -Dhbase.canary.info.port=PORT_NUMBER  Set for a Canary UI; " + "default=-1 (None)");
1112    System.err.println("");
1113    System.err.println(
1114      "Canary runs in one of three modes: region (default), regionserver, or " + "zookeeper.");
1115    System.err.println("To sniff/probe all regions, pass no arguments.");
1116    System.err.println("To sniff/probe all regions of a table, pass tablename.");
1117    System.err.println("To sniff/probe regionservers, pass -regionserver, etc.");
1118    System.err.println("See http://hbase.apache.org/book.html#_canary for Canary documentation.");
1119    System.exit(USAGE_EXIT_CODE);
1120  }
1121
1122  Sink getSink(Configuration configuration, Class clazz) {
1123    // In test context, this.sink might be set. Use it if non-null. For testing.
1124    return this.sink != null
1125      ? this.sink
1126      : (Sink) ReflectionUtils
1127        .newInstance(configuration.getClass("hbase.canary.sink.class", clazz, Sink.class));
1128  }
1129
1130  /**
1131   * Canary region mode-specific data structure which stores information about each region to be
1132   * scanned
1133   */
1134  public static class RegionTaskResult {
1135    private RegionInfo region;
1136    private TableName tableName;
1137    private ServerName serverName;
1138    private ColumnFamilyDescriptor column;
1139    private AtomicLong readLatency = null;
1140    private AtomicLong writeLatency = null;
1141    private boolean readSuccess = false;
1142    private boolean writeSuccess = false;
1143
1144    public RegionTaskResult(RegionInfo region, TableName tableName, ServerName serverName,
1145      ColumnFamilyDescriptor column) {
1146      this.region = region;
1147      this.tableName = tableName;
1148      this.serverName = serverName;
1149      this.column = column;
1150    }
1151
1152    public RegionInfo getRegionInfo() {
1153      return this.region;
1154    }
1155
1156    public String getRegionNameAsString() {
1157      return this.region.getRegionNameAsString();
1158    }
1159
1160    public TableName getTableName() {
1161      return this.tableName;
1162    }
1163
1164    public String getTableNameAsString() {
1165      return this.tableName.getNameAsString();
1166    }
1167
1168    public ServerName getServerName() {
1169      return this.serverName;
1170    }
1171
1172    public String getServerNameAsString() {
1173      return this.serverName.getServerName();
1174    }
1175
1176    public ColumnFamilyDescriptor getColumnFamily() {
1177      return this.column;
1178    }
1179
1180    public String getColumnFamilyNameAsString() {
1181      return this.column.getNameAsString();
1182    }
1183
1184    public long getReadLatency() {
1185      if (this.readLatency == null) {
1186        return -1;
1187      }
1188      return this.readLatency.get();
1189    }
1190
1191    public void setReadLatency(long readLatency) {
1192      if (this.readLatency != null) {
1193        this.readLatency.set(readLatency);
1194      } else {
1195        this.readLatency = new AtomicLong(readLatency);
1196      }
1197    }
1198
1199    public long getWriteLatency() {
1200      if (this.writeLatency == null) {
1201        return -1;
1202      }
1203      return this.writeLatency.get();
1204    }
1205
1206    public void setWriteLatency(long writeLatency) {
1207      if (this.writeLatency != null) {
1208        this.writeLatency.set(writeLatency);
1209      } else {
1210        this.writeLatency = new AtomicLong(writeLatency);
1211      }
1212    }
1213
1214    public boolean isReadSuccess() {
1215      return this.readSuccess;
1216    }
1217
1218    public void setReadSuccess() {
1219      this.readSuccess = true;
1220    }
1221
1222    public boolean isWriteSuccess() {
1223      return this.writeSuccess;
1224    }
1225
1226    public void setWriteSuccess() {
1227      this.writeSuccess = true;
1228    }
1229  }
1230
1231  /**
1232   * A Factory method for {@link Monitor}. Makes a RegionServerMonitor, or a ZooKeeperMonitor, or a
1233   * RegionMonitor.
1234   * @return a Monitor instance
1235   */
1236  private Monitor newMonitor(final Connection connection, String[] monitorTargets) {
1237    Monitor monitor;
1238    boolean useRegExp = conf.getBoolean(HBASE_CANARY_USE_REGEX, false);
1239    boolean regionServerAllRegions = conf.getBoolean(HBASE_CANARY_REGIONSERVER_ALL_REGIONS, false);
1240    boolean failOnError = conf.getBoolean(HBASE_CANARY_FAIL_ON_ERROR, true);
1241    int permittedFailures = conf.getInt(HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES, 0);
1242    boolean writeSniffing = conf.getBoolean(HBASE_CANARY_REGION_WRITE_SNIFFING, false);
1243    String writeTableName =
1244      conf.get(HBASE_CANARY_REGION_WRITE_TABLE_NAME, DEFAULT_WRITE_TABLE_NAME.getNameAsString());
1245    long configuredWriteTableTimeout =
1246      conf.getLong(HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT, DEFAULT_TIMEOUT);
1247
1248    if (this.regionServerMode) {
1249      monitor = new RegionServerMonitor(connection, monitorTargets, useRegExp,
1250        getSink(connection.getConfiguration(), RegionServerStdOutSink.class), this.executor,
1251        regionServerAllRegions, failOnError, permittedFailures);
1252
1253    } else if (this.zookeeperMode) {
1254      monitor = new ZookeeperMonitor(connection, monitorTargets, useRegExp,
1255        getSink(connection.getConfiguration(), ZookeeperStdOutSink.class), this.executor,
1256        failOnError, permittedFailures);
1257    } else {
1258      monitor = new RegionMonitor(connection, monitorTargets, useRegExp,
1259        getSink(connection.getConfiguration(), RegionStdOutSink.class), this.executor,
1260        writeSniffing, TableName.valueOf(writeTableName), failOnError, configuredReadTableTimeouts,
1261        configuredWriteTableTimeout, permittedFailures);
1262    }
1263    return monitor;
1264  }
1265
1266  private void populateReadTableTimeoutsMap(String configuredReadTableTimeoutsStr) {
1267    String[] tableTimeouts = configuredReadTableTimeoutsStr.split(",");
1268    for (String tT : tableTimeouts) {
1269      String[] nameTimeout = tT.split("=");
1270      if (nameTimeout.length < 2) {
1271        throw new IllegalArgumentException("Each -readTableTimeouts argument must be of the form "
1272          + "<tableName>=<read timeout> (without spaces).");
1273      }
1274      long timeoutVal;
1275      try {
1276        timeoutVal = Long.parseLong(nameTimeout[1]);
1277      } catch (NumberFormatException e) {
1278        throw new IllegalArgumentException(
1279          "-readTableTimeouts read timeout for each table" + " must be a numeric value argument.");
1280      }
1281      configuredReadTableTimeouts.put(nameTimeout[0], timeoutVal);
1282    }
1283  }
1284
1285  /**
1286   * A Monitor super-class can be extended by users
1287   */
1288  public static abstract class Monitor implements Runnable, Closeable {
1289    protected Connection connection;
1290    protected Admin admin;
1291    /**
1292     * 'Target' dependent on 'mode'. Could be Tables or RegionServers or ZNodes. Passed on the
1293     * command-line as arguments.
1294     */
1295    protected String[] targets;
1296    protected boolean useRegExp;
1297    protected boolean treatFailureAsError;
1298    protected boolean initialized = false;
1299
1300    protected boolean done = false;
1301    protected int errorCode = 0;
1302    protected long allowedFailures = 0;
1303    protected Sink sink;
1304    protected ExecutorService executor;
1305
1306    public boolean isDone() {
1307      return done;
1308    }
1309
1310    public boolean hasError() {
1311      return errorCode != 0;
1312    }
1313
1314    public boolean finalCheckForErrors() {
1315      if (errorCode != 0) {
1316        return true;
1317      }
1318      if (
1319        treatFailureAsError && (sink.getReadFailureCount() > allowedFailures
1320          || sink.getWriteFailureCount() > allowedFailures)
1321      ) {
1322        LOG.error("Too many failures detected, treating failure as error, failing the Canary.");
1323        errorCode = FAILURE_EXIT_CODE;
1324        return true;
1325      }
1326      return false;
1327    }
1328
1329    @Override
1330    public void close() throws IOException {
1331      if (this.admin != null) {
1332        this.admin.close();
1333      }
1334    }
1335
1336    protected Monitor(Connection connection, String[] monitorTargets, boolean useRegExp, Sink sink,
1337      ExecutorService executor, boolean treatFailureAsError, long allowedFailures) {
1338      if (null == connection) {
1339        throw new IllegalArgumentException("connection shall not be null");
1340      }
1341
1342      this.connection = connection;
1343      this.targets = monitorTargets;
1344      this.useRegExp = useRegExp;
1345      this.treatFailureAsError = treatFailureAsError;
1346      this.sink = sink;
1347      this.executor = executor;
1348      this.allowedFailures = allowedFailures;
1349    }
1350
1351    @Override
1352    public abstract void run();
1353
1354    protected boolean initAdmin() {
1355      if (null == this.admin) {
1356        try {
1357          this.admin = this.connection.getAdmin();
1358        } catch (Exception e) {
1359          LOG.error("Initial HBaseAdmin failed...", e);
1360          this.errorCode = INIT_ERROR_EXIT_CODE;
1361        }
1362      } else if (admin.isAborted()) {
1363        LOG.error("HBaseAdmin aborted");
1364        this.errorCode = INIT_ERROR_EXIT_CODE;
1365      }
1366      return !this.hasError();
1367    }
1368  }
1369
1370  /**
1371   * A monitor for region mode.
1372   */
1373  private static class RegionMonitor extends Monitor {
1374    // 10 minutes
1375    private static final int DEFAULT_WRITE_TABLE_CHECK_PERIOD = 10 * 60 * 1000;
1376    // 1 days
1377    private static final int DEFAULT_WRITE_DATA_TTL = 24 * 60 * 60;
1378
1379    private long lastCheckTime = -1;
1380    private boolean writeSniffing;
1381    private TableName writeTableName;
1382    private int writeDataTTL;
1383    private float regionsLowerLimit;
1384    private float regionsUpperLimit;
1385    private int checkPeriod;
1386    private boolean rawScanEnabled;
1387    private boolean readAllCF;
1388
1389    /**
1390     * This is a timeout per table. If read of each region in the table aggregated takes longer than
1391     * what is configured here, we log an ERROR rather than just an INFO.
1392     */
1393    private HashMap<String, Long> configuredReadTableTimeouts;
1394
1395    private long configuredWriteTableTimeout;
1396
1397    public RegionMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
1398      Sink sink, ExecutorService executor, boolean writeSniffing, TableName writeTableName,
1399      boolean treatFailureAsError, HashMap<String, Long> configuredReadTableTimeouts,
1400      long configuredWriteTableTimeout, long allowedFailures) {
1401      super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError,
1402        allowedFailures);
1403      Configuration conf = connection.getConfiguration();
1404      this.writeSniffing = writeSniffing;
1405      this.writeTableName = writeTableName;
1406      this.writeDataTTL =
1407        conf.getInt(HConstants.HBASE_CANARY_WRITE_DATA_TTL_KEY, DEFAULT_WRITE_DATA_TTL);
1408      this.regionsLowerLimit =
1409        conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY, 1.0f);
1410      this.regionsUpperLimit =
1411        conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_UPPERLIMIT_KEY, 1.5f);
1412      this.checkPeriod = conf.getInt(HConstants.HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY,
1413        DEFAULT_WRITE_TABLE_CHECK_PERIOD);
1414      this.rawScanEnabled = conf.getBoolean(HConstants.HBASE_CANARY_READ_RAW_SCAN_KEY, false);
1415      this.configuredReadTableTimeouts = new HashMap<>(configuredReadTableTimeouts);
1416      this.configuredWriteTableTimeout = configuredWriteTableTimeout;
1417      this.readAllCF = conf.getBoolean(HConstants.HBASE_CANARY_READ_ALL_CF, true);
1418    }
1419
1420    private RegionStdOutSink getSink() {
1421      if (!(sink instanceof RegionStdOutSink)) {
1422        throw new RuntimeException("Can only write to Region sink");
1423      }
1424      return ((RegionStdOutSink) sink);
1425    }
1426
1427    @Override
1428    public void run() {
1429      if (this.initAdmin()) {
1430        try {
1431          List<Future<Void>> taskFutures = new LinkedList<>();
1432          RegionStdOutSink regionSink = this.getSink();
1433          regionSink.resetFailuresCountDetails();
1434          if (this.targets != null && this.targets.length > 0) {
1435            String[] tables = generateMonitorTables(this.targets);
1436            // Check to see that each table name passed in the -readTableTimeouts argument is also
1437            // passed as a monitor target.
1438            if (
1439              !new HashSet<>(Arrays.asList(tables))
1440                .containsAll(this.configuredReadTableTimeouts.keySet())
1441            ) {
1442              LOG.error("-readTableTimeouts can only specify read timeouts for monitor targets "
1443                + "passed via command line.");
1444              this.errorCode = USAGE_EXIT_CODE;
1445              return;
1446            }
1447            this.initialized = true;
1448            for (String table : tables) {
1449              LongAdder readLatency = regionSink.initializeAndGetReadLatencyForTable(table);
1450              taskFutures.addAll(CanaryTool.sniff(admin, regionSink, table, executor, TaskType.READ,
1451                this.rawScanEnabled, readLatency, readAllCF));
1452            }
1453          } else {
1454            taskFutures.addAll(sniff(TaskType.READ, regionSink));
1455          }
1456
1457          if (writeSniffing) {
1458            if (EnvironmentEdgeManager.currentTime() - lastCheckTime > checkPeriod) {
1459              try {
1460                checkWriteTableDistribution();
1461              } catch (IOException e) {
1462                LOG.error("Check canary table distribution failed!", e);
1463              }
1464              lastCheckTime = EnvironmentEdgeManager.currentTime();
1465            }
1466            // sniff canary table with write operation
1467            regionSink.initializeWriteLatency();
1468            LongAdder writeTableLatency = regionSink.getWriteLatency();
1469            taskFutures
1470              .addAll(CanaryTool.sniff(admin, regionSink, admin.getDescriptor(writeTableName),
1471                executor, TaskType.WRITE, this.rawScanEnabled, writeTableLatency, readAllCF));
1472          }
1473
1474          for (Future<Void> future : taskFutures) {
1475            try {
1476              future.get();
1477            } catch (ExecutionException e) {
1478              LOG.error("Sniff region failed!", e);
1479            }
1480          }
1481          Map<String, LongAdder> actualReadTableLatency = regionSink.getReadLatencyMap();
1482          for (Map.Entry<String, Long> entry : configuredReadTableTimeouts.entrySet()) {
1483            String tableName = entry.getKey();
1484            if (actualReadTableLatency.containsKey(tableName)) {
1485              Long actual = actualReadTableLatency.get(tableName).longValue();
1486              Long configured = entry.getValue();
1487              if (actual > configured) {
1488                LOG.error("Read operation for {} took {}ms exceeded the configured read timeout."
1489                  + "(Configured read timeout {}ms.", tableName, actual, configured);
1490              } else {
1491                LOG.info("Read operation for {} took {}ms (Configured read timeout {}ms.",
1492                  tableName, actual, configured);
1493              }
1494            } else {
1495              LOG.error("Read operation for {} failed!", tableName);
1496            }
1497          }
1498          if (this.writeSniffing) {
1499            String writeTableStringName = this.writeTableName.getNameAsString();
1500            long actualWriteLatency = regionSink.getWriteLatency().longValue();
1501            LOG.info("Write operation for {} took {}ms. Configured write timeout {}ms.",
1502              writeTableStringName, actualWriteLatency, this.configuredWriteTableTimeout);
1503            // Check that the writeTable write operation latency does not exceed the configured
1504            // timeout.
1505            if (actualWriteLatency > this.configuredWriteTableTimeout) {
1506              LOG.error("Write operation for {} exceeded the configured write timeout.",
1507                writeTableStringName);
1508            }
1509          }
1510        } catch (Exception e) {
1511          LOG.error("Run regionMonitor failed", e);
1512          this.errorCode = ERROR_EXIT_CODE;
1513        } finally {
1514          this.done = true;
1515        }
1516      }
1517      this.done = true;
1518    }
1519
1520    /** Returns List of tables to use in test. */
1521    private String[] generateMonitorTables(String[] monitorTargets) throws IOException {
1522      String[] returnTables = null;
1523
1524      if (this.useRegExp) {
1525        Pattern pattern = null;
1526        List<TableDescriptor> tds = null;
1527        Set<String> tmpTables = new TreeSet<>();
1528        try {
1529          LOG.debug(String.format("reading list of tables"));
1530          tds = this.admin.listTableDescriptors(pattern);
1531          if (tds == null) {
1532            tds = Collections.emptyList();
1533          }
1534          for (String monitorTarget : monitorTargets) {
1535            pattern = Pattern.compile(monitorTarget);
1536            for (TableDescriptor td : tds) {
1537              if (pattern.matcher(td.getTableName().getNameAsString()).matches()) {
1538                tmpTables.add(td.getTableName().getNameAsString());
1539              }
1540            }
1541          }
1542        } catch (IOException e) {
1543          LOG.error("Communicate with admin failed", e);
1544          throw e;
1545        }
1546
1547        if (tmpTables.size() > 0) {
1548          returnTables = tmpTables.toArray(new String[tmpTables.size()]);
1549        } else {
1550          String msg = "No HTable found, tablePattern:" + Arrays.toString(monitorTargets);
1551          LOG.error(msg);
1552          this.errorCode = INIT_ERROR_EXIT_CODE;
1553          throw new TableNotFoundException(msg);
1554        }
1555      } else {
1556        returnTables = monitorTargets;
1557      }
1558
1559      return returnTables;
1560    }
1561
1562    /*
1563     * Canary entry point to monitor all the tables.
1564     */
1565    private List<Future<Void>> sniff(TaskType taskType, RegionStdOutSink regionSink)
1566      throws Exception {
1567      LOG.debug("Reading list of tables");
1568      List<Future<Void>> taskFutures = new LinkedList<>();
1569      for (TableDescriptor td : admin.listTableDescriptors()) {
1570        if (
1571          admin.tableExists(td.getTableName()) && admin.isTableEnabled(td.getTableName())
1572            && (!td.getTableName().equals(writeTableName))
1573        ) {
1574          LongAdder readLatency =
1575            regionSink.initializeAndGetReadLatencyForTable(td.getTableName().getNameAsString());
1576          taskFutures.addAll(CanaryTool.sniff(admin, sink, td, executor, taskType,
1577            this.rawScanEnabled, readLatency, readAllCF));
1578        }
1579      }
1580      return taskFutures;
1581    }
1582
1583    private void checkWriteTableDistribution() throws IOException {
1584      if (!admin.tableExists(writeTableName)) {
1585        int numberOfServers = admin.getRegionServers().size();
1586        if (numberOfServers == 0) {
1587          throw new IllegalStateException("No live regionservers");
1588        }
1589        createWriteTable(numberOfServers);
1590      }
1591
1592      if (!admin.isTableEnabled(writeTableName)) {
1593        admin.enableTable(writeTableName);
1594      }
1595
1596      ClusterMetrics status =
1597        admin.getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER));
1598      int numberOfServers = status.getServersName().size();
1599      if (status.getServersName().contains(status.getMasterName())) {
1600        numberOfServers -= 1;
1601      }
1602
1603      List<Pair<RegionInfo, ServerName>> pairs =
1604        MetaTableAccessor.getTableRegionsAndLocations(connection, writeTableName);
1605      int numberOfRegions = pairs.size();
1606      if (
1607        numberOfRegions < numberOfServers * regionsLowerLimit
1608          || numberOfRegions > numberOfServers * regionsUpperLimit
1609      ) {
1610        admin.disableTable(writeTableName);
1611        admin.deleteTable(writeTableName);
1612        createWriteTable(numberOfServers);
1613      }
1614      HashSet<ServerName> serverSet = new HashSet<>();
1615      for (Pair<RegionInfo, ServerName> pair : pairs) {
1616        serverSet.add(pair.getSecond());
1617      }
1618      int numberOfCoveredServers = serverSet.size();
1619      if (numberOfCoveredServers < numberOfServers) {
1620        admin.balance();
1621      }
1622    }
1623
1624    private void createWriteTable(int numberOfServers) throws IOException {
1625      int numberOfRegions = (int) (numberOfServers * regionsLowerLimit);
1626      LOG.info("Number of live regionservers {}, pre-splitting the canary table into {} regions "
1627        + "(current lower limit of regions per server is {} and you can change it with config {}).",
1628        numberOfServers, numberOfRegions, regionsLowerLimit,
1629        HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY);
1630      ColumnFamilyDescriptor family =
1631        ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(CANARY_TABLE_FAMILY_NAME))
1632          .setMaxVersions(1).setTimeToLive(writeDataTTL).build();
1633      TableDescriptor desc =
1634        TableDescriptorBuilder.newBuilder(writeTableName).setColumnFamily(family).build();
1635      byte[][] splits = new RegionSplitter.HexStringSplit().split(numberOfRegions);
1636      admin.createTable(desc, splits);
1637    }
1638  }
1639
1640  /**
1641   * Canary entry point for specified table.
1642   * @throws Exception exception
1643   */
1644  private static List<Future<Void>> sniff(final Admin admin, final Sink sink, String tableName,
1645    ExecutorService executor, TaskType taskType, boolean rawScanEnabled, LongAdder readLatency,
1646    boolean readAllCF) throws Exception {
1647    LOG.debug("Checking table is enabled and getting table descriptor for table {}", tableName);
1648    if (admin.isTableEnabled(TableName.valueOf(tableName))) {
1649      return CanaryTool.sniff(admin, sink, admin.getDescriptor(TableName.valueOf(tableName)),
1650        executor, taskType, rawScanEnabled, readLatency, readAllCF);
1651    } else {
1652      LOG.warn("Table {} is not enabled", tableName);
1653    }
1654    return new LinkedList<>();
1655  }
1656
1657  /*
1658   * Loops over regions of this table, and outputs information about the state.
1659   */
1660  private static List<Future<Void>> sniff(final Admin admin, final Sink sink,
1661    TableDescriptor tableDesc, ExecutorService executor, TaskType taskType, boolean rawScanEnabled,
1662    LongAdder rwLatency, boolean readAllCF) throws Exception {
1663    LOG.debug("Reading list of regions for table {}", tableDesc.getTableName());
1664    try (Table table = admin.getConnection().getTable(tableDesc.getTableName())) {
1665      List<RegionTask> tasks = new ArrayList<>();
1666      try (RegionLocator regionLocator =
1667        admin.getConnection().getRegionLocator(tableDesc.getTableName())) {
1668        for (HRegionLocation location : regionLocator.getAllRegionLocations()) {
1669          if (location == null) {
1670            LOG.warn("Null location");
1671            continue;
1672          }
1673          ServerName rs = location.getServerName();
1674          RegionInfo region = location.getRegion();
1675          tasks.add(new RegionTask(admin.getConnection(), region, rs, (RegionStdOutSink) sink,
1676            taskType, rawScanEnabled, rwLatency, readAllCF));
1677          Map<String, List<RegionTaskResult>> regionMap = ((RegionStdOutSink) sink).getRegionMap();
1678          regionMap.put(region.getRegionNameAsString(), new ArrayList<RegionTaskResult>());
1679        }
1680        return executor.invokeAll(tasks);
1681      }
1682    } catch (TableNotFoundException e) {
1683      return Collections.EMPTY_LIST;
1684    }
1685  }
1686
1687  // monitor for zookeeper mode
1688  private static class ZookeeperMonitor extends Monitor {
1689    private List<String> hosts;
1690    private final String znode;
1691    private final int timeout;
1692
1693    protected ZookeeperMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
1694      Sink sink, ExecutorService executor, boolean treatFailureAsError, long allowedFailures) {
1695      super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError,
1696        allowedFailures);
1697      Configuration configuration = connection.getConfiguration();
1698      znode = configuration.get(ZOOKEEPER_ZNODE_PARENT, DEFAULT_ZOOKEEPER_ZNODE_PARENT);
1699      timeout =
1700        configuration.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
1701      ConnectStringParser parser =
1702        new ConnectStringParser(ZKConfig.getZKQuorumServersString(configuration));
1703      hosts = Lists.newArrayList();
1704      for (InetSocketAddress server : parser.getServerAddresses()) {
1705        hosts.add(inetSocketAddress2String(server));
1706      }
1707      if (allowedFailures > (hosts.size() - 1) / 2) {
1708        LOG.warn(
1709          "Confirm allowable number of failed ZooKeeper nodes, as quorum will "
1710            + "already be lost. Setting of {} failures is unexpected for {} ensemble size.",
1711          allowedFailures, hosts.size());
1712      }
1713    }
1714
1715    @Override
1716    public void run() {
1717      List<ZookeeperTask> tasks = Lists.newArrayList();
1718      ZookeeperStdOutSink zkSink = null;
1719      try {
1720        zkSink = this.getSink();
1721      } catch (RuntimeException e) {
1722        LOG.error("Run ZooKeeperMonitor failed!", e);
1723        this.errorCode = ERROR_EXIT_CODE;
1724      }
1725      this.initialized = true;
1726      for (final String host : hosts) {
1727        tasks.add(new ZookeeperTask(connection, host, znode, timeout, zkSink));
1728      }
1729      try {
1730        for (Future<Void> future : this.executor.invokeAll(tasks)) {
1731          try {
1732            future.get();
1733          } catch (ExecutionException e) {
1734            LOG.error("Sniff zookeeper failed!", e);
1735            this.errorCode = ERROR_EXIT_CODE;
1736          }
1737        }
1738      } catch (InterruptedException e) {
1739        this.errorCode = ERROR_EXIT_CODE;
1740        Thread.currentThread().interrupt();
1741        LOG.error("Sniff zookeeper interrupted!", e);
1742      }
1743      this.done = true;
1744    }
1745
1746    private ZookeeperStdOutSink getSink() {
1747      if (!(sink instanceof ZookeeperStdOutSink)) {
1748        throw new RuntimeException("Can only write to zookeeper sink");
1749      }
1750      return ((ZookeeperStdOutSink) sink);
1751    }
1752  }
1753
1754  /**
1755   * A monitor for regionserver mode
1756   */
1757  private static class RegionServerMonitor extends Monitor {
1758    private boolean allRegions;
1759
1760    public RegionServerMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
1761      Sink sink, ExecutorService executor, boolean allRegions, boolean treatFailureAsError,
1762      long allowedFailures) {
1763      super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError,
1764        allowedFailures);
1765      this.allRegions = allRegions;
1766    }
1767
1768    private RegionServerStdOutSink getSink() {
1769      if (!(sink instanceof RegionServerStdOutSink)) {
1770        throw new RuntimeException("Can only write to regionserver sink");
1771      }
1772      return ((RegionServerStdOutSink) sink);
1773    }
1774
1775    @Override
1776    public void run() {
1777      if (this.initAdmin() && this.checkNoTableNames()) {
1778        RegionServerStdOutSink regionServerSink = null;
1779        try {
1780          regionServerSink = this.getSink();
1781        } catch (RuntimeException e) {
1782          LOG.error("Run RegionServerMonitor failed!", e);
1783          this.errorCode = ERROR_EXIT_CODE;
1784        }
1785        Map<String, List<RegionInfo>> rsAndRMap = this.filterRegionServerByName();
1786        this.initialized = true;
1787        this.monitorRegionServers(rsAndRMap, regionServerSink);
1788      }
1789      this.done = true;
1790    }
1791
1792    private boolean checkNoTableNames() {
1793      List<String> foundTableNames = new ArrayList<>();
1794      TableName[] tableNames = null;
1795      LOG.debug("Reading list of tables");
1796      try {
1797        tableNames = this.admin.listTableNames();
1798      } catch (IOException e) {
1799        LOG.error("Get listTableNames failed", e);
1800        this.errorCode = INIT_ERROR_EXIT_CODE;
1801        return false;
1802      }
1803
1804      if (this.targets == null || this.targets.length == 0) {
1805        return true;
1806      }
1807
1808      for (String target : this.targets) {
1809        for (TableName tableName : tableNames) {
1810          if (target.equals(tableName.getNameAsString())) {
1811            foundTableNames.add(target);
1812          }
1813        }
1814      }
1815
1816      if (foundTableNames.size() > 0) {
1817        System.err.println("Cannot pass a tablename when using the -regionserver "
1818          + "option, tablenames:" + foundTableNames.toString());
1819        this.errorCode = USAGE_EXIT_CODE;
1820      }
1821      return foundTableNames.isEmpty();
1822    }
1823
1824    private void monitorRegionServers(Map<String, List<RegionInfo>> rsAndRMap,
1825      RegionServerStdOutSink regionServerSink) {
1826      List<RegionServerTask> tasks = new ArrayList<>();
1827      Map<String, AtomicLong> successMap = new HashMap<>();
1828      for (Map.Entry<String, List<RegionInfo>> entry : rsAndRMap.entrySet()) {
1829        String serverName = entry.getKey();
1830        AtomicLong successes = new AtomicLong(0);
1831        successMap.put(serverName, successes);
1832        if (entry.getValue().isEmpty()) {
1833          LOG.error("Regionserver not serving any regions - {}", serverName);
1834        } else if (this.allRegions) {
1835          for (RegionInfo region : entry.getValue()) {
1836            tasks.add(new RegionServerTask(this.connection, serverName, region, regionServerSink,
1837              successes));
1838          }
1839        } else {
1840          // random select a region if flag not set
1841          RegionInfo region =
1842            entry.getValue().get(ThreadLocalRandom.current().nextInt(entry.getValue().size()));
1843          tasks.add(
1844            new RegionServerTask(this.connection, serverName, region, regionServerSink, successes));
1845        }
1846      }
1847      try {
1848        for (Future<Void> future : this.executor.invokeAll(tasks)) {
1849          try {
1850            future.get();
1851          } catch (ExecutionException e) {
1852            LOG.error("Sniff regionserver failed!", e);
1853            this.errorCode = ERROR_EXIT_CODE;
1854          }
1855        }
1856        if (this.allRegions) {
1857          for (Map.Entry<String, List<RegionInfo>> entry : rsAndRMap.entrySet()) {
1858            String serverName = entry.getKey();
1859            LOG.info("Successfully read {} regions out of {} on regionserver {}",
1860              successMap.get(serverName), entry.getValue().size(), serverName);
1861          }
1862        }
1863      } catch (InterruptedException e) {
1864        this.errorCode = ERROR_EXIT_CODE;
1865        LOG.error("Sniff regionserver interrupted!", e);
1866      }
1867    }
1868
1869    private Map<String, List<RegionInfo>> filterRegionServerByName() {
1870      Map<String, List<RegionInfo>> regionServerAndRegionsMap = this.getAllRegionServerByName();
1871      regionServerAndRegionsMap = this.doFilterRegionServerByName(regionServerAndRegionsMap);
1872      return regionServerAndRegionsMap;
1873    }
1874
1875    private Map<String, List<RegionInfo>> getAllRegionServerByName() {
1876      Map<String, List<RegionInfo>> rsAndRMap = new HashMap<>();
1877      try {
1878        LOG.debug("Reading list of tables and locations");
1879        List<TableDescriptor> tableDescs = this.admin.listTableDescriptors();
1880        List<RegionInfo> regions = null;
1881        for (TableDescriptor tableDesc : tableDescs) {
1882          try (RegionLocator regionLocator =
1883            this.admin.getConnection().getRegionLocator(tableDesc.getTableName())) {
1884            for (HRegionLocation location : regionLocator.getAllRegionLocations()) {
1885              if (location == null) {
1886                LOG.warn("Null location");
1887                continue;
1888              }
1889              ServerName rs = location.getServerName();
1890              String rsName = rs.getHostname();
1891              RegionInfo r = location.getRegion();
1892              if (rsAndRMap.containsKey(rsName)) {
1893                regions = rsAndRMap.get(rsName);
1894              } else {
1895                regions = new ArrayList<>();
1896                rsAndRMap.put(rsName, regions);
1897              }
1898              regions.add(r);
1899            }
1900          }
1901        }
1902
1903        // get any live regionservers not serving any regions
1904        for (ServerName rs : this.admin.getRegionServers()) {
1905          String rsName = rs.getHostname();
1906          if (!rsAndRMap.containsKey(rsName)) {
1907            rsAndRMap.put(rsName, Collections.<RegionInfo> emptyList());
1908          }
1909        }
1910      } catch (IOException e) {
1911        LOG.error("Get HTables info failed", e);
1912        this.errorCode = INIT_ERROR_EXIT_CODE;
1913      }
1914      return rsAndRMap;
1915    }
1916
1917    private Map<String, List<RegionInfo>>
1918      doFilterRegionServerByName(Map<String, List<RegionInfo>> fullRsAndRMap) {
1919
1920      Map<String, List<RegionInfo>> filteredRsAndRMap = null;
1921
1922      if (this.targets != null && this.targets.length > 0) {
1923        filteredRsAndRMap = new HashMap<>();
1924        Pattern pattern = null;
1925        Matcher matcher = null;
1926        boolean regExpFound = false;
1927        for (String rsName : this.targets) {
1928          if (this.useRegExp) {
1929            regExpFound = false;
1930            pattern = Pattern.compile(rsName);
1931            for (Map.Entry<String, List<RegionInfo>> entry : fullRsAndRMap.entrySet()) {
1932              matcher = pattern.matcher(entry.getKey());
1933              if (matcher.matches()) {
1934                filteredRsAndRMap.put(entry.getKey(), entry.getValue());
1935                regExpFound = true;
1936              }
1937            }
1938            if (!regExpFound) {
1939              LOG.info("No RegionServerInfo found, regionServerPattern {}", rsName);
1940            }
1941          } else {
1942            if (fullRsAndRMap.containsKey(rsName)) {
1943              filteredRsAndRMap.put(rsName, fullRsAndRMap.get(rsName));
1944            } else {
1945              LOG.info("No RegionServerInfo found, regionServerName {}", rsName);
1946            }
1947          }
1948        }
1949      } else {
1950        filteredRsAndRMap = fullRsAndRMap;
1951      }
1952      return filteredRsAndRMap;
1953    }
1954  }
1955
1956  public static void main(String[] args) throws Exception {
1957    final Configuration conf = HBaseConfiguration.create();
1958
1959    int numThreads = conf.getInt("hbase.canary.threads.num", MAX_THREADS_NUM);
1960    LOG.info("Execution thread count={}", numThreads);
1961
1962    int exitCode;
1963    ExecutorService executor = new ScheduledThreadPoolExecutor(numThreads);
1964    try {
1965      exitCode = ToolRunner.run(conf, new CanaryTool(executor), args);
1966    } finally {
1967      executor.shutdown();
1968    }
1969    System.exit(exitCode);
1970  }
1971}