001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.tool;
021
022import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT;
023import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_ZNODE_PARENT;
024
025import java.io.Closeable;
026import java.io.IOException;
027import java.net.BindException;
028import java.net.InetSocketAddress;
029import java.util.ArrayList;
030import java.util.Arrays;
031import java.util.Collections;
032import java.util.EnumSet;
033import java.util.HashMap;
034import java.util.HashSet;
035import java.util.LinkedList;
036import java.util.List;
037import java.util.Map;
038import java.util.Random;
039import java.util.Set;
040import java.util.TreeSet;
041import java.util.concurrent.Callable;
042import java.util.concurrent.ConcurrentHashMap;
043import java.util.concurrent.ConcurrentMap;
044import java.util.concurrent.ExecutionException;
045import java.util.concurrent.ExecutorService;
046import java.util.concurrent.Future;
047import java.util.concurrent.ScheduledThreadPoolExecutor;
048import java.util.concurrent.ThreadLocalRandom;
049import java.util.concurrent.atomic.AtomicLong;
050import java.util.concurrent.atomic.LongAdder;
051import java.util.regex.Matcher;
052import java.util.regex.Pattern;
053
054import org.apache.commons.lang3.time.StopWatch;
055import org.apache.hadoop.conf.Configuration;
056import org.apache.hadoop.hbase.AuthUtil;
057import org.apache.hadoop.hbase.ChoreService;
058import org.apache.hadoop.hbase.ClusterMetrics;
059import org.apache.hadoop.hbase.ClusterMetrics.Option;
060import org.apache.hadoop.hbase.DoNotRetryIOException;
061import org.apache.hadoop.hbase.HBaseConfiguration;
062import org.apache.hadoop.hbase.HBaseInterfaceAudience;
063import org.apache.hadoop.hbase.HConstants;
064import org.apache.hadoop.hbase.HRegionLocation;
065import org.apache.hadoop.hbase.MetaTableAccessor;
066import org.apache.hadoop.hbase.NamespaceDescriptor;
067import org.apache.hadoop.hbase.ScheduledChore;
068import org.apache.hadoop.hbase.ServerName;
069import org.apache.hadoop.hbase.TableName;
070import org.apache.hadoop.hbase.TableNotEnabledException;
071import org.apache.hadoop.hbase.TableNotFoundException;
072import org.apache.hadoop.hbase.client.Admin;
073import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
074import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
075import org.apache.hadoop.hbase.client.Connection;
076import org.apache.hadoop.hbase.client.ConnectionFactory;
077import org.apache.hadoop.hbase.client.Get;
078import org.apache.hadoop.hbase.client.Put;
079import org.apache.hadoop.hbase.client.RegionInfo;
080import org.apache.hadoop.hbase.client.RegionLocator;
081import org.apache.hadoop.hbase.client.ResultScanner;
082import org.apache.hadoop.hbase.client.Scan;
083import org.apache.hadoop.hbase.client.Table;
084import org.apache.hadoop.hbase.client.TableDescriptor;
085import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
086import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
087import org.apache.hadoop.hbase.http.InfoServer;
088import org.apache.hadoop.hbase.tool.CanaryTool.RegionTask.TaskType;
089import org.apache.hadoop.hbase.util.Bytes;
090import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
091import org.apache.hadoop.hbase.util.Pair;
092import org.apache.hadoop.hbase.util.ReflectionUtils;
093import org.apache.hadoop.hbase.util.RegionSplitter;
094import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
095import org.apache.hadoop.hbase.zookeeper.ZKConfig;
096import org.apache.hadoop.util.Tool;
097import org.apache.hadoop.util.ToolRunner;
098import org.apache.yetus.audience.InterfaceAudience;
099import org.apache.zookeeper.KeeperException;
100import org.apache.zookeeper.ZooKeeper;
101import org.apache.zookeeper.client.ConnectStringParser;
102import org.apache.zookeeper.data.Stat;
103import org.slf4j.Logger;
104import org.slf4j.LoggerFactory;
105
106import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
107import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
108
109/**
110 * HBase Canary Tool for "canary monitoring" of a running HBase cluster.
111 *
112 * There are three modes:
113 * <ol>
114 * <li>region mode (Default): For each region, try to get one row per column family outputting
115 * information on failure (ERROR) or else the latency.
116 * </li>
117 *
118 * <li>regionserver mode: For each regionserver try to get one row from one table selected
119 * randomly outputting information on failure (ERROR) or else the latency.
120 * </li>
121 *
122 * <li>zookeeper mode: for each zookeeper instance, selects a znode outputting information on
123 * failure (ERROR) or else the latency.
124 * </li>
125 * </ol>
126 */
127@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
128public class CanaryTool implements Tool, Canary {
129  public static final String HBASE_CANARY_INFO_PORT = "hbase.canary.info.port";
130
131  public static final int DEFAULT_CANARY_INFOPORT = 16050;
132
133  public static final String HBASE_CANARY_INFO_BINDADDRESS = "hbase.canary.info.bindAddress";
134
135  private InfoServer infoServer;
136
137  private void putUpWebUI() throws IOException {
138    int port = conf.getInt(HBASE_CANARY_INFO_PORT, DEFAULT_CANARY_INFOPORT);
139    // -1 is for disabling info server
140    if (port < 0) {
141      return;
142    }
143    if (zookeeperMode) {
144      LOG.info("WebUI is not supported in Zookeeper mode");
145    } else if (regionServerMode) {
146      LOG.info("WebUI is not supported in RegionServer mode");
147    } else {
148      String addr = conf.get(HBASE_CANARY_INFO_BINDADDRESS, "0.0.0.0");
149      try {
150        infoServer = new InfoServer("canary", addr, port, false, conf);
151        infoServer.addUnprivilegedServlet("canary", "/canary-status", CanaryStatusServlet.class);
152        infoServer.setAttribute("sink", this.sink);
153        infoServer.start();
154        LOG.info("Bind Canary http info server to {}:{} ", addr, port);
155      } catch (BindException e) {
156        LOG.warn("Failed binding Canary http info server to {}:{}", addr, port, e);
157      }
158    }
159  }
160
161  @Override
162  public int checkRegions(String[] targets) throws Exception {
163    String configuredReadTableTimeoutsStr = conf.get(HBASE_CANARY_REGION_READ_TABLE_TIMEOUT);
164    try {
165      if (configuredReadTableTimeoutsStr != null) {
166        populateReadTableTimeoutsMap(configuredReadTableTimeoutsStr);
167      }
168    } catch (IllegalArgumentException e) {
169      LOG.error("Constructing read table timeouts map failed ", e);
170      return USAGE_EXIT_CODE;
171    }
172    return runMonitor(targets);
173  }
174
175  @Override
176  public int checkRegionServers(String[] targets) throws Exception {
177    regionServerMode = true;
178    return runMonitor(targets);
179  }
180
181  @Override
182  public int checkZooKeeper() throws Exception {
183    zookeeperMode = true;
184    return runMonitor(null);
185  }
186
187  /**
188   * Sink interface used by the canary to output information
189   */
190  public interface Sink {
191    long getReadFailureCount();
192    long incReadFailureCount();
193    Map<String,String> getReadFailures();
194    void updateReadFailures(String regionName, String serverName);
195    long getWriteFailureCount();
196    long incWriteFailureCount();
197    Map<String,String> getWriteFailures();
198    void updateWriteFailures(String regionName, String serverName);
199    long getReadSuccessCount();
200    long incReadSuccessCount();
201    long getWriteSuccessCount();
202    long incWriteSuccessCount();
203  }
204
205  /**
206   * Simple implementation of canary sink that allows plotting to a file or standard output.
207   */
208  public static class StdOutSink implements Sink {
209    private AtomicLong readFailureCount = new AtomicLong(0),
210        writeFailureCount = new AtomicLong(0),
211        readSuccessCount = new AtomicLong(0),
212        writeSuccessCount = new AtomicLong(0);
213    private Map<String, String> readFailures = new ConcurrentHashMap<>();
214    private Map<String, String> writeFailures = new ConcurrentHashMap<>();
215
216    @Override
217    public long getReadFailureCount() {
218      return readFailureCount.get();
219    }
220
221    @Override
222    public long incReadFailureCount() {
223      return readFailureCount.incrementAndGet();
224    }
225
226    @Override
227    public Map<String, String> getReadFailures() {
228      return readFailures;
229    }
230
231    @Override
232    public void updateReadFailures(String regionName, String serverName) {
233      readFailures.put(regionName, serverName);
234    }
235
236    @Override
237    public long getWriteFailureCount() {
238      return writeFailureCount.get();
239    }
240
241    @Override
242    public long incWriteFailureCount() {
243      return writeFailureCount.incrementAndGet();
244    }
245
246    @Override
247    public Map<String, String> getWriteFailures() {
248      return writeFailures;
249    }
250
251    @Override
252    public void updateWriteFailures(String regionName, String serverName) {
253      writeFailures.put(regionName, serverName);
254    }
255
256    @Override
257    public long getReadSuccessCount() {
258      return readSuccessCount.get();
259    }
260
261    @Override
262    public long incReadSuccessCount() {
263      return readSuccessCount.incrementAndGet();
264    }
265
266    @Override
267    public long getWriteSuccessCount() {
268      return writeSuccessCount.get();
269    }
270
271    @Override
272    public long incWriteSuccessCount() {
273      return writeSuccessCount.incrementAndGet();
274    }
275  }
276
277  /**
278   * By RegionServer, for 'regionserver' mode.
279   */
280  public static class RegionServerStdOutSink extends StdOutSink {
281    public void publishReadFailure(String table, String server) {
282      incReadFailureCount();
283      LOG.error("Read from {} on {}", table, server);
284    }
285
286    public void publishReadTiming(String table, String server, long msTime) {
287      LOG.info("Read from {} on {} in {}ms", table, server, msTime);
288    }
289  }
290
291  /**
292   * Output for 'zookeeper' mode.
293   */
294  public static class ZookeeperStdOutSink extends StdOutSink {
295    public void publishReadFailure(String znode, String server) {
296      incReadFailureCount();
297      LOG.error("Read from {} on {}", znode, server);
298    }
299
300    public void publishReadTiming(String znode, String server, long msTime) {
301      LOG.info("Read from {} on {} in {}ms", znode, server, msTime);
302    }
303  }
304
305  /**
306   * By Region, for 'region'  mode.
307   */
308  public static class RegionStdOutSink extends StdOutSink {
309    private Map<String, LongAdder> perTableReadLatency = new HashMap<>();
310    private LongAdder writeLatency = new LongAdder();
311    private final ConcurrentMap<String, List<RegionTaskResult>> regionMap =
312      new ConcurrentHashMap<>();
313    private ConcurrentMap<ServerName, LongAdder> perServerFailuresCount =
314      new ConcurrentHashMap<>();
315    private ConcurrentMap<String, LongAdder> perTableFailuresCount = new ConcurrentHashMap<>();
316
317    public ConcurrentMap<ServerName, LongAdder> getPerServerFailuresCount() {
318      return perServerFailuresCount;
319    }
320
321    public ConcurrentMap<String, LongAdder> getPerTableFailuresCount() {
322      return perTableFailuresCount;
323    }
324
325    public void resetFailuresCountDetails() {
326      perServerFailuresCount.clear();
327      perTableFailuresCount.clear();
328    }
329
330    private void incFailuresCountDetails(ServerName serverName, RegionInfo region) {
331      perServerFailuresCount.compute(serverName, (server, count) -> {
332        if (count == null) {
333          count = new LongAdder();
334        }
335        count.increment();
336        return count;
337      });
338      perTableFailuresCount.compute(region.getTable().getNameAsString(), (tableName, count) -> {
339        if (count == null) {
340          count = new LongAdder();
341        }
342        count.increment();
343        return count;
344      });
345    }
346
347    public void publishReadFailure(ServerName serverName, RegionInfo region, Exception e) {
348      incReadFailureCount();
349      incFailuresCountDetails(serverName, region);
350      LOG.error("Read from {} on serverName={} failed",
351          region.getRegionNameAsString(), serverName, e);
352    }
353
354    public void publishReadFailure(ServerName serverName, RegionInfo region,
355        ColumnFamilyDescriptor column, Exception e) {
356      incReadFailureCount();
357      incFailuresCountDetails(serverName, region);
358      LOG.error("Read from {} on serverName={}, columnFamily={} failed",
359          region.getRegionNameAsString(), serverName,
360          column.getNameAsString(), e);
361    }
362
363    public void publishReadTiming(ServerName serverName, RegionInfo region,
364        ColumnFamilyDescriptor column, long msTime) {
365      RegionTaskResult rtr = new RegionTaskResult(region, region.getTable(), serverName, column);
366      rtr.setReadSuccess();
367      rtr.setReadLatency(msTime);
368      List<RegionTaskResult> rtrs = regionMap.get(region.getRegionNameAsString());
369      rtrs.add(rtr);
370      // Note that read success count will be equal to total column family read successes.
371      incReadSuccessCount();
372      LOG.info("Read from {} on {} {} in {}ms", region.getRegionNameAsString(), serverName,
373          column.getNameAsString(), msTime);
374    }
375
376    public void publishWriteFailure(ServerName serverName, RegionInfo region, Exception e) {
377      incWriteFailureCount();
378      incFailuresCountDetails(serverName, region);
379      LOG.error("Write to {} on {} failed", region.getRegionNameAsString(), serverName, e);
380    }
381
382    public void publishWriteFailure(ServerName serverName, RegionInfo region,
383        ColumnFamilyDescriptor column, Exception e) {
384      incWriteFailureCount();
385      incFailuresCountDetails(serverName, region);
386      LOG.error("Write to {} on {} {} failed", region.getRegionNameAsString(), serverName,
387          column.getNameAsString(), e);
388    }
389
390    public void publishWriteTiming(ServerName serverName, RegionInfo region,
391        ColumnFamilyDescriptor column, long msTime) {
392      RegionTaskResult rtr = new RegionTaskResult(region, region.getTable(), serverName, column);
393      rtr.setWriteSuccess();
394      rtr.setWriteLatency(msTime);
395      List<RegionTaskResult> rtrs = regionMap.get(region.getRegionNameAsString());
396      rtrs.add(rtr);
397      // Note that write success count will be equal to total column family write successes.
398      incWriteSuccessCount();
399      LOG.info("Write to {} on {} {} in {}ms",
400        region.getRegionNameAsString(), serverName, column.getNameAsString(), msTime);
401    }
402
403    public Map<String, LongAdder> getReadLatencyMap() {
404      return this.perTableReadLatency;
405    }
406
407    public LongAdder initializeAndGetReadLatencyForTable(String tableName) {
408      LongAdder initLatency = new LongAdder();
409      this.perTableReadLatency.put(tableName, initLatency);
410      return initLatency;
411    }
412
413    public void initializeWriteLatency() {
414      this.writeLatency.reset();
415    }
416
417    public LongAdder getWriteLatency() {
418      return this.writeLatency;
419    }
420
421    public ConcurrentMap<String, List<RegionTaskResult>> getRegionMap() {
422      return this.regionMap;
423    }
424
425    public int getTotalExpectedRegions() {
426      return this.regionMap.size();
427    }
428  }
429
430  /**
431   * Run a single zookeeper Task and then exit.
432   */
433  static class ZookeeperTask implements Callable<Void> {
434    private final Connection connection;
435    private final String host;
436    private String znode;
437    private final int timeout;
438    private ZookeeperStdOutSink sink;
439
440    public ZookeeperTask(Connection connection, String host, String znode, int timeout,
441        ZookeeperStdOutSink sink) {
442      this.connection = connection;
443      this.host = host;
444      this.znode = znode;
445      this.timeout = timeout;
446      this.sink = sink;
447    }
448
449    @Override public Void call() throws Exception {
450      ZooKeeper zooKeeper = null;
451      try {
452        zooKeeper = new ZooKeeper(host, timeout, EmptyWatcher.instance);
453        Stat exists = zooKeeper.exists(znode, false);
454        StopWatch stopwatch = new StopWatch();
455        stopwatch.start();
456        zooKeeper.getData(znode, false, exists);
457        stopwatch.stop();
458        sink.publishReadTiming(znode, host, stopwatch.getTime());
459      } catch (KeeperException | InterruptedException e) {
460        sink.publishReadFailure(znode, host);
461      } finally {
462        if (zooKeeper != null) {
463          zooKeeper.close();
464        }
465      }
466      return null;
467    }
468  }
469
470  /**
471   * Run a single Region Task and then exit. For each column family of the Region, get one row and
472   * output latency or failure.
473   */
474  static class RegionTask implements Callable<Void> {
475    public enum TaskType{
476      READ, WRITE
477    }
478    private Connection connection;
479    private RegionInfo region;
480    private RegionStdOutSink sink;
481    private TaskType taskType;
482    private boolean rawScanEnabled;
483    private ServerName serverName;
484    private LongAdder readWriteLatency;
485    private boolean readAllCF;
486
487    RegionTask(Connection connection, RegionInfo region, ServerName serverName,
488        RegionStdOutSink sink, TaskType taskType, boolean rawScanEnabled, LongAdder rwLatency,
489        boolean readAllCF) {
490      this.connection = connection;
491      this.region = region;
492      this.serverName = serverName;
493      this.sink = sink;
494      this.taskType = taskType;
495      this.rawScanEnabled = rawScanEnabled;
496      this.readWriteLatency = rwLatency;
497      this.readAllCF = readAllCF;
498    }
499
500    @Override
501    public Void call() {
502      switch (taskType) {
503        case READ:
504          return read();
505        case WRITE:
506          return write();
507        default:
508          return read();
509      }
510    }
511
512    private Void readColumnFamily(Table table, ColumnFamilyDescriptor column) {
513      byte[] startKey = null;
514      Get get = null;
515      Scan scan = null;
516      ResultScanner rs = null;
517      StopWatch stopWatch = new StopWatch();
518      startKey = region.getStartKey();
519      // Can't do a get on empty start row so do a Scan of first element if any instead.
520      if (startKey.length > 0) {
521        get = new Get(startKey);
522        get.setCacheBlocks(false);
523        get.setFilter(new FirstKeyOnlyFilter());
524        get.addFamily(column.getName());
525      } else {
526        scan = new Scan();
527        LOG.debug("rawScan {} for {}", rawScanEnabled, region.getTable());
528        scan.setRaw(rawScanEnabled);
529        scan.setCaching(1);
530        scan.setCacheBlocks(false);
531        scan.setFilter(new FirstKeyOnlyFilter());
532        scan.addFamily(column.getName());
533        scan.setMaxResultSize(1L);
534        scan.setOneRowLimit();
535      }
536      LOG.debug("Reading from {} {} {} {}", region.getTable(), region.getRegionNameAsString(),
537        column.getNameAsString(), Bytes.toStringBinary(startKey));
538      try {
539        stopWatch.start();
540        if (startKey.length > 0) {
541          table.get(get);
542        } else {
543          rs = table.getScanner(scan);
544          rs.next();
545        }
546        stopWatch.stop();
547        this.readWriteLatency.add(stopWatch.getTime());
548        sink.publishReadTiming(serverName, region, column, stopWatch.getTime());
549      } catch (Exception e) {
550        sink.publishReadFailure(serverName, region, column, e);
551        sink.updateReadFailures(region.getRegionNameAsString(),
552          serverName == null ? "NULL" : serverName.getHostname());
553      } finally {
554        if (rs != null) {
555          rs.close();
556        }
557      }
558      return null;
559    }
560
561    private ColumnFamilyDescriptor randomPickOneColumnFamily(ColumnFamilyDescriptor[] cfs) {
562      int size = cfs.length;
563      return cfs[ThreadLocalRandom.current().nextInt(size)];
564
565    }
566
567    public Void read() {
568      Table table = null;
569      TableDescriptor tableDesc = null;
570      try {
571        LOG.debug("Reading table descriptor for table {}", region.getTable());
572        table = connection.getTable(region.getTable());
573        tableDesc = table.getDescriptor();
574      } catch (IOException e) {
575        LOG.debug("sniffRegion {} of {} failed", region.getEncodedName(), e);
576        sink.publishReadFailure(serverName, region, e);
577        if (table != null) {
578          try {
579            table.close();
580          } catch (IOException ioe) {
581            LOG.error("Close table failed", e);
582          }
583        }
584        return null;
585      }
586
587      if (readAllCF) {
588        for (ColumnFamilyDescriptor column : tableDesc.getColumnFamilies()) {
589          readColumnFamily(table, column);
590        }
591      } else {
592        readColumnFamily(table, randomPickOneColumnFamily(tableDesc.getColumnFamilies()));
593      }
594      try {
595        table.close();
596      } catch (IOException e) {
597        LOG.error("Close table failed", e);
598      }
599      return null;
600    }
601
602    /**
603     * Check writes for the canary table
604     */
605    private Void write() {
606      Table table = null;
607      TableDescriptor tableDesc = null;
608      try {
609        table = connection.getTable(region.getTable());
610        tableDesc = table.getDescriptor();
611        byte[] rowToCheck = region.getStartKey();
612        if (rowToCheck.length == 0) {
613          rowToCheck = new byte[]{0x0};
614        }
615        int writeValueSize =
616            connection.getConfiguration().getInt(HConstants.HBASE_CANARY_WRITE_VALUE_SIZE_KEY, 10);
617        for (ColumnFamilyDescriptor column : tableDesc.getColumnFamilies()) {
618          Put put = new Put(rowToCheck);
619          byte[] value = new byte[writeValueSize];
620          Bytes.random(value);
621          put.addColumn(column.getName(), HConstants.EMPTY_BYTE_ARRAY, value);
622
623          LOG.debug("Writing to {} {} {} {}",
624            tableDesc.getTableName(), region.getRegionNameAsString(), column.getNameAsString(),
625            Bytes.toStringBinary(rowToCheck));
626          try {
627            long startTime = System.currentTimeMillis();
628            table.put(put);
629            long time = System.currentTimeMillis() - startTime;
630            this.readWriteLatency.add(time);
631            sink.publishWriteTiming(serverName, region, column, time);
632          } catch (Exception e) {
633            sink.publishWriteFailure(serverName, region, column, e);
634          }
635        }
636        table.close();
637      } catch (IOException e) {
638        sink.publishWriteFailure(serverName, region, e);
639        sink.updateWriteFailures(region.getRegionNameAsString(), serverName.getHostname());
640      }
641      return null;
642    }
643  }
644
645  /**
646   * Run a single RegionServer Task and then exit.
647   * Get one row from a region on the regionserver and output latency or the failure.
648   */
649  static class RegionServerTask implements Callable<Void> {
650    private Connection connection;
651    private String serverName;
652    private RegionInfo region;
653    private RegionServerStdOutSink sink;
654    private AtomicLong successes;
655
656    RegionServerTask(Connection connection, String serverName, RegionInfo region,
657        RegionServerStdOutSink sink, AtomicLong successes) {
658      this.connection = connection;
659      this.serverName = serverName;
660      this.region = region;
661      this.sink = sink;
662      this.successes = successes;
663    }
664
665    @Override
666    public Void call() {
667      TableName tableName = null;
668      Table table = null;
669      Get get = null;
670      byte[] startKey = null;
671      Scan scan = null;
672      StopWatch stopWatch = new StopWatch();
673      // monitor one region on every region server
674      stopWatch.reset();
675      try {
676        tableName = region.getTable();
677        table = connection.getTable(tableName);
678        startKey = region.getStartKey();
679        // Can't do a get on empty start row so do a Scan of first element if any instead.
680        LOG.debug("Reading from {} {} {} {}",
681          serverName, region.getTable(), region.getRegionNameAsString(),
682          Bytes.toStringBinary(startKey));
683        if (startKey.length > 0) {
684          get = new Get(startKey);
685          get.setCacheBlocks(false);
686          get.setFilter(new FirstKeyOnlyFilter());
687          stopWatch.start();
688          table.get(get);
689          stopWatch.stop();
690        } else {
691          scan = new Scan();
692          scan.setCacheBlocks(false);
693          scan.setFilter(new FirstKeyOnlyFilter());
694          scan.setCaching(1);
695          scan.setMaxResultSize(1L);
696          scan.setOneRowLimit();
697          stopWatch.start();
698          ResultScanner s = table.getScanner(scan);
699          s.next();
700          s.close();
701          stopWatch.stop();
702        }
703        successes.incrementAndGet();
704        sink.publishReadTiming(tableName.getNameAsString(), serverName, stopWatch.getTime());
705      } catch (TableNotFoundException tnfe) {
706        LOG.error("Table may be deleted", tnfe);
707        // This is ignored because it doesn't imply that the regionserver is dead
708      } catch (TableNotEnabledException tnee) {
709        // This is considered a success since we got a response.
710        successes.incrementAndGet();
711        LOG.debug("The targeted table was disabled.  Assuming success.");
712      } catch (DoNotRetryIOException dnrioe) {
713        sink.publishReadFailure(tableName.getNameAsString(), serverName);
714        LOG.error(dnrioe.toString(), dnrioe);
715      } catch (IOException e) {
716        sink.publishReadFailure(tableName.getNameAsString(), serverName);
717        LOG.error(e.toString(), e);
718      } finally {
719        if (table != null) {
720          try {
721            table.close();
722          } catch (IOException e) {/* DO NOTHING */
723            LOG.error("Close table failed", e);
724          }
725        }
726        scan = null;
727        get = null;
728        startKey = null;
729      }
730      return null;
731    }
732  }
733
734  private static final int USAGE_EXIT_CODE = 1;
735  private static final int INIT_ERROR_EXIT_CODE = 2;
736  private static final int TIMEOUT_ERROR_EXIT_CODE = 3;
737  private static final int ERROR_EXIT_CODE = 4;
738  private static final int FAILURE_EXIT_CODE = 5;
739
740  private static final long DEFAULT_INTERVAL = 60000;
741
742  private static final long DEFAULT_TIMEOUT = 600000; // 10 mins
743  private static final int MAX_THREADS_NUM = 16; // #threads to contact regions
744
745  private static final Logger LOG = LoggerFactory.getLogger(Canary.class);
746
747  public static final TableName DEFAULT_WRITE_TABLE_NAME = TableName.valueOf(
748    NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "canary");
749
750  private static final String CANARY_TABLE_FAMILY_NAME = "Test";
751
752  private Configuration conf = null;
753  private long interval = 0;
754  private Sink sink = null;
755
756  /**
757   * True if we are to run in 'regionServer' mode.
758   */
759  private boolean regionServerMode = false;
760
761  /**
762   * True if we are to run in zookeeper 'mode'.
763   */
764  private boolean zookeeperMode = false;
765
766  /**
767   * This is a Map of table to timeout. The timeout is for reading all regions in the table; i.e.
768   * we aggregate time to fetch each region and it needs to be less than this value else we
769   * log an ERROR.
770   */
771  private HashMap<String, Long> configuredReadTableTimeouts = new HashMap<>();
772
773  public static final String HBASE_CANARY_REGIONSERVER_ALL_REGIONS
774          = "hbase.canary.regionserver_all_regions";
775
776  public static final String HBASE_CANARY_REGION_WRITE_SNIFFING
777          = "hbase.canary.region.write.sniffing";
778  public static final String HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT
779          = "hbase.canary.region.write.table.timeout";
780  public static final String HBASE_CANARY_REGION_WRITE_TABLE_NAME
781          = "hbase.canary.region.write.table.name";
782  public static final String HBASE_CANARY_REGION_READ_TABLE_TIMEOUT
783          = "hbase.canary.region.read.table.timeout";
784
785  public static final String HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES
786          = "hbase.canary.zookeeper.permitted.failures";
787
788  public static final String HBASE_CANARY_USE_REGEX = "hbase.canary.use.regex";
789  public static final String HBASE_CANARY_TIMEOUT = "hbase.canary.timeout";
790  public static final String HBASE_CANARY_FAIL_ON_ERROR = "hbase.canary.fail.on.error";
791
792
793  private ExecutorService executor; // threads to retrieve data from regionservers
794
795  public CanaryTool() {
796    this(new ScheduledThreadPoolExecutor(1));
797  }
798
799  public CanaryTool(ExecutorService executor) {
800    this(executor, null);
801  }
802
803  @VisibleForTesting
804  CanaryTool(ExecutorService executor, Sink sink) {
805    this.executor = executor;
806    this.sink = sink;
807  }
808
809  CanaryTool(Configuration conf, ExecutorService executor) {
810    this(conf, executor, null);
811  }
812
813  CanaryTool(Configuration conf, ExecutorService executor, Sink sink) {
814    this(executor, sink);
815    setConf(conf);
816  }
817
818  @Override
819  public Configuration getConf() {
820    return conf;
821  }
822
823  @Override
824  public void setConf(Configuration conf) {
825    if (conf == null) {
826      conf = HBaseConfiguration.create();
827    }
828    this.conf = conf;
829  }
830
831  private int parseArgs(String[] args) {
832    int index = -1;
833    long permittedFailures = 0;
834    boolean regionServerAllRegions = false, writeSniffing = false;
835    String readTableTimeoutsStr = null;
836    // Process command line args
837    for (int i = 0; i < args.length; i++) {
838      String cmd = args[i];
839      if (cmd.startsWith("-")) {
840        if (index >= 0) {
841          // command line args must be in the form: [opts] [table 1 [table 2 ...]]
842          System.err.println("Invalid command line options");
843          printUsageAndExit();
844        }
845        if (cmd.equals("-help") || cmd.equals("-h")) {
846          // user asked for help, print the help and quit.
847          printUsageAndExit();
848        } else if (cmd.equals("-daemon") && interval == 0) {
849          // user asked for daemon mode, set a default interval between checks
850          interval = DEFAULT_INTERVAL;
851        } else if (cmd.equals("-interval")) {
852          // user has specified an interval for canary breaths (-interval N)
853          i++;
854
855          if (i == args.length) {
856            System.err.println("-interval takes a numeric seconds value argument.");
857            printUsageAndExit();
858          }
859          try {
860            interval = Long.parseLong(args[i]) * 1000;
861          } catch (NumberFormatException e) {
862            System.err.println("-interval needs a numeric value argument.");
863            printUsageAndExit();
864          }
865        } else if (cmd.equals("-zookeeper")) {
866          this.zookeeperMode = true;
867        } else if(cmd.equals("-regionserver")) {
868          this.regionServerMode = true;
869        } else if(cmd.equals("-allRegions")) {
870          conf.setBoolean(HBASE_CANARY_REGIONSERVER_ALL_REGIONS, true);
871          regionServerAllRegions = true;
872        } else if(cmd.equals("-writeSniffing")) {
873          writeSniffing = true;
874          conf.setBoolean(HBASE_CANARY_REGION_WRITE_SNIFFING, true);
875        } else if(cmd.equals("-treatFailureAsError") || cmd.equals("-failureAsError")) {
876          conf.setBoolean(HBASE_CANARY_FAIL_ON_ERROR, true);
877        } else if (cmd.equals("-e")) {
878          conf.setBoolean(HBASE_CANARY_USE_REGEX, true);
879        } else if (cmd.equals("-t")) {
880          i++;
881
882          if (i == args.length) {
883            System.err.println("-t takes a numeric milliseconds value argument.");
884            printUsageAndExit();
885          }
886          long timeout = 0;
887          try {
888            timeout = Long.parseLong(args[i]);
889          } catch (NumberFormatException e) {
890            System.err.println("-t takes a numeric milliseconds value argument.");
891            printUsageAndExit();
892          }
893          conf.setLong(HBASE_CANARY_TIMEOUT, timeout);
894        } else if(cmd.equals("-writeTableTimeout")) {
895          i++;
896
897          if (i == args.length) {
898            System.err.println("-writeTableTimeout takes a numeric milliseconds value argument.");
899            printUsageAndExit();
900          }
901          long configuredWriteTableTimeout = 0;
902          try {
903            configuredWriteTableTimeout = Long.parseLong(args[i]);
904          } catch (NumberFormatException e) {
905            System.err.println("-writeTableTimeout takes a numeric milliseconds value argument.");
906            printUsageAndExit();
907          }
908          conf.setLong(HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT, configuredWriteTableTimeout);
909        } else if (cmd.equals("-writeTable")) {
910          i++;
911
912          if (i == args.length) {
913            System.err.println("-writeTable takes a string tablename value argument.");
914            printUsageAndExit();
915          }
916          conf.set(HBASE_CANARY_REGION_WRITE_TABLE_NAME, args[i]);
917        } else if (cmd.equals("-f")) {
918          i++;
919          if (i == args.length) {
920            System.err
921                .println("-f needs a boolean value argument (true|false).");
922            printUsageAndExit();
923          }
924
925          conf.setBoolean(HBASE_CANARY_FAIL_ON_ERROR, Boolean.parseBoolean(args[i]));
926        } else if (cmd.equals("-readTableTimeouts")) {
927          i++;
928          if (i == args.length) {
929            System.err.println("-readTableTimeouts needs a comma-separated list of read " +
930                "millisecond timeouts per table (without spaces).");
931            printUsageAndExit();
932          }
933          readTableTimeoutsStr = args[i];
934          conf.set(HBASE_CANARY_REGION_READ_TABLE_TIMEOUT, readTableTimeoutsStr);
935        } else if (cmd.equals("-permittedZookeeperFailures")) {
936          i++;
937
938          if (i == args.length) {
939            System.err.println("-permittedZookeeperFailures needs a numeric value argument.");
940            printUsageAndExit();
941          }
942          try {
943            permittedFailures = Long.parseLong(args[i]);
944          } catch (NumberFormatException e) {
945            System.err.println("-permittedZookeeperFailures needs a numeric value argument.");
946            printUsageAndExit();
947          }
948          conf.setLong(HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES, permittedFailures);
949        } else {
950          // no options match
951          System.err.println(cmd + " options is invalid.");
952          printUsageAndExit();
953        }
954      } else if (index < 0) {
955        // keep track of first table name specified by the user
956        index = i;
957      }
958    }
959    if (regionServerAllRegions && !this.regionServerMode) {
960      System.err.println("-allRegions can only be specified in regionserver mode.");
961      printUsageAndExit();
962    }
963    if (this.zookeeperMode) {
964      if (this.regionServerMode || regionServerAllRegions || writeSniffing) {
965        System.err.println("-zookeeper is exclusive and cannot be combined with "
966            + "other modes.");
967        printUsageAndExit();
968      }
969    }
970    if (permittedFailures != 0 && !this.zookeeperMode) {
971      System.err.println("-permittedZookeeperFailures requires -zookeeper mode.");
972      printUsageAndExit();
973    }
974    if (readTableTimeoutsStr != null && (this.regionServerMode || this.zookeeperMode)) {
975      System.err.println("-readTableTimeouts can only be configured in region mode.");
976      printUsageAndExit();
977    }
978    return index;
979  }
980
981  @Override
982  public int run(String[] args) throws Exception {
983    int index = parseArgs(args);
984    String[] monitorTargets = null;
985
986    if (index >= 0) {
987      int length = args.length - index;
988      monitorTargets = new String[length];
989      System.arraycopy(args, index, monitorTargets, 0, length);
990    }
991
992    putUpWebUI();
993    if (zookeeperMode) {
994      return checkZooKeeper();
995    } else if (regionServerMode) {
996      return checkRegionServers(monitorTargets);
997    } else {
998      return checkRegions(monitorTargets);
999    }
1000  }
1001
1002  private int runMonitor(String[] monitorTargets) throws Exception {
1003    ChoreService choreService = null;
1004
1005    // Launches chore for refreshing kerberos credentials if security is enabled.
1006    // Please see http://hbase.apache.org/book.html#_running_canary_in_a_kerberos_enabled_cluster
1007    // for more details.
1008    final ScheduledChore authChore = AuthUtil.getAuthChore(conf);
1009    if (authChore != null) {
1010      choreService = new ChoreService("CANARY_TOOL");
1011      choreService.scheduleChore(authChore);
1012    }
1013
1014    // Start to prepare the stuffs
1015    Monitor monitor = null;
1016    Thread monitorThread;
1017    long startTime = 0;
1018    long currentTimeLength = 0;
1019    boolean failOnError = conf.getBoolean(HBASE_CANARY_FAIL_ON_ERROR, true);
1020    long timeout = conf.getLong(HBASE_CANARY_TIMEOUT, DEFAULT_TIMEOUT);
1021    // Get a connection to use in below.
1022    try (Connection connection = ConnectionFactory.createConnection(this.conf)) {
1023      do {
1024        // Do monitor !!
1025        try {
1026          monitor = this.newMonitor(connection, monitorTargets);
1027          monitorThread = new Thread(monitor, "CanaryMonitor-" + System.currentTimeMillis());
1028          startTime = System.currentTimeMillis();
1029          monitorThread.start();
1030          while (!monitor.isDone()) {
1031            // wait for 1 sec
1032            Thread.sleep(1000);
1033            // exit if any error occurs
1034            if (failOnError && monitor.hasError()) {
1035              monitorThread.interrupt();
1036              if (monitor.initialized) {
1037                return monitor.errorCode;
1038              } else {
1039                return INIT_ERROR_EXIT_CODE;
1040              }
1041            }
1042            currentTimeLength = System.currentTimeMillis() - startTime;
1043            if (currentTimeLength > timeout) {
1044              LOG.error("The monitor is running too long (" + currentTimeLength
1045                  + ") after timeout limit:" + timeout
1046                  + " will be killed itself !!");
1047              if (monitor.initialized) {
1048                return TIMEOUT_ERROR_EXIT_CODE;
1049              } else {
1050                return INIT_ERROR_EXIT_CODE;
1051              }
1052            }
1053          }
1054
1055          if (failOnError && monitor.finalCheckForErrors()) {
1056            monitorThread.interrupt();
1057            return monitor.errorCode;
1058          }
1059        } finally {
1060          if (monitor != null) {
1061            monitor.close();
1062          }
1063        }
1064
1065        Thread.sleep(interval);
1066      } while (interval > 0);
1067    } // try-with-resources close
1068
1069    if (choreService != null) {
1070      choreService.shutdown();
1071    }
1072    return monitor.errorCode;
1073  }
1074
1075  @Override
1076  public Map<String, String> getReadFailures()  {
1077    return sink.getReadFailures();
1078  }
1079
1080  @Override
1081  public Map<String, String> getWriteFailures()  {
1082    return sink.getWriteFailures();
1083  }
1084
1085  private void printUsageAndExit() {
1086    System.err.println(
1087      "Usage: canary [OPTIONS] [<TABLE1> [<TABLE2]...] | [<REGIONSERVER1> [<REGIONSERVER2]..]");
1088    System.err.println("Where [OPTIONS] are:");
1089    System.err.println(" -h,-help        show this help and exit.");
1090    System.err.println(" -regionserver   set 'regionserver mode'; gets row from random region on " +
1091        "server");
1092    System.err.println(" -allRegions     get from ALL regions when 'regionserver mode', not just " +
1093        "random one.");
1094    System.err.println(" -zookeeper      set 'zookeeper mode'; grab zookeeper.znode.parent on " +
1095        "each ensemble member");
1096    System.err.println(" -daemon         continuous check at defined intervals.");
1097    System.err.println(" -interval <N>   interval between checks in seconds");
1098    System.err.println(" -e              consider table/regionserver argument as regular " +
1099        "expression");
1100    System.err.println(" -f <B>          exit on first error; default=true");
1101    System.err.println(" -failureAsError treat read/write failure as error");
1102    System.err.println(" -t <N>          timeout for canary-test run; default=600000ms");
1103    System.err.println(" -writeSniffing  enable write sniffing");
1104    System.err.println(" -writeTable     the table used for write sniffing; default=hbase:canary");
1105    System.err.println(" -writeTableTimeout <N>  timeout for writeTable; default=600000ms");
1106    System.err.println(" -readTableTimeouts <tableName>=<read timeout>," +
1107        "<tableName>=<read timeout>,...");
1108    System.err.println("                comma-separated list of table read timeouts " +
1109        "(no spaces);");
1110    System.err.println("                logs 'ERROR' if takes longer. default=600000ms");
1111    System.err.println(" -permittedZookeeperFailures <N>  Ignore first N failures attempting to ");
1112    System.err.println("                connect to individual zookeeper nodes in ensemble");
1113    System.err.println("");
1114    System.err.println(" -D<configProperty>=<value> to assign or override configuration params");
1115    System.err.println(" -Dhbase.canary.read.raw.enabled=<true/false> Set to enable/disable " +
1116        "raw scan; default=false");
1117    System.err.println("");
1118    System.err.println("Canary runs in one of three modes: region (default), regionserver, or " +
1119        "zookeeper.");
1120    System.err.println("To sniff/probe all regions, pass no arguments.");
1121    System.err.println("To sniff/probe all regions of a table, pass tablename.");
1122    System.err.println("To sniff/probe regionservers, pass -regionserver, etc.");
1123    System.err.println("See http://hbase.apache.org/book.html#_canary for Canary documentation.");
1124    System.exit(USAGE_EXIT_CODE);
1125  }
1126
1127  Sink getSink(Configuration configuration, Class clazz) {
1128    // In test context, this.sink might be set. Use it if non-null. For testing.
1129    return this.sink != null? this.sink:
1130        (Sink)ReflectionUtils.newInstance(configuration.getClass("hbase.canary.sink.class",
1131            clazz, Sink.class));
1132  }
1133
1134  /**
1135   * Canary region mode-specific data structure which stores information about each region
1136   * to be scanned
1137   */
1138  public static class RegionTaskResult {
1139    private RegionInfo region;
1140    private TableName tableName;
1141    private ServerName serverName;
1142    private ColumnFamilyDescriptor column;
1143    private AtomicLong readLatency = null;
1144    private AtomicLong writeLatency = null;
1145    private boolean readSuccess = false;
1146    private boolean writeSuccess = false;
1147
1148    public RegionTaskResult(RegionInfo region, TableName tableName, ServerName serverName,
1149        ColumnFamilyDescriptor column) {
1150      this.region = region;
1151      this.tableName = tableName;
1152      this.serverName = serverName;
1153      this.column = column;
1154    }
1155
1156    public RegionInfo getRegionInfo() {
1157      return this.region;
1158    }
1159
1160    public String getRegionNameAsString() {
1161      return this.region.getRegionNameAsString();
1162    }
1163
1164    public TableName getTableName() {
1165      return this.tableName;
1166    }
1167
1168    public String getTableNameAsString() {
1169      return this.tableName.getNameAsString();
1170    }
1171
1172    public ServerName getServerName() {
1173      return this.serverName;
1174    }
1175
1176    public String getServerNameAsString() {
1177      return this.serverName.getServerName();
1178    }
1179
1180    public ColumnFamilyDescriptor getColumnFamily() {
1181      return this.column;
1182    }
1183
1184    public String getColumnFamilyNameAsString() {
1185      return this.column.getNameAsString();
1186    }
1187
1188    public long getReadLatency() {
1189      if (this.readLatency == null) {
1190        return -1;
1191      }
1192      return this.readLatency.get();
1193    }
1194
1195    public void setReadLatency(long readLatency) {
1196      if (this.readLatency != null) {
1197        this.readLatency.set(readLatency);
1198      } else {
1199        this.readLatency = new AtomicLong(readLatency);
1200      }
1201    }
1202
1203    public long getWriteLatency() {
1204      if (this.writeLatency == null) {
1205        return -1;
1206      }
1207      return this.writeLatency.get();
1208    }
1209
1210    public void setWriteLatency(long writeLatency) {
1211      if (this.writeLatency != null) {
1212        this.writeLatency.set(writeLatency);
1213      } else {
1214        this.writeLatency = new AtomicLong(writeLatency);
1215      }
1216    }
1217
1218    public boolean isReadSuccess() {
1219      return this.readSuccess;
1220    }
1221
1222    public void setReadSuccess() {
1223      this.readSuccess = true;
1224    }
1225
1226    public boolean isWriteSuccess() {
1227      return this.writeSuccess;
1228    }
1229
1230    public void setWriteSuccess() {
1231      this.writeSuccess = true;
1232    }
1233  }
1234
1235  /**
1236   * A Factory method for {@link Monitor}.
1237   * Makes a RegionServerMonitor, or a ZooKeeperMonitor, or a RegionMonitor.
1238   * @return a Monitor instance
1239   */
1240  private Monitor newMonitor(final Connection connection, String[] monitorTargets) {
1241    Monitor monitor;
1242    boolean useRegExp = conf.getBoolean(HBASE_CANARY_USE_REGEX, false);
1243    boolean regionServerAllRegions
1244            = conf.getBoolean(HBASE_CANARY_REGIONSERVER_ALL_REGIONS, false);
1245    boolean failOnError
1246            = conf.getBoolean(HBASE_CANARY_FAIL_ON_ERROR, true);
1247    int permittedFailures
1248            = conf.getInt(HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES, 0);
1249    boolean writeSniffing
1250            = conf.getBoolean(HBASE_CANARY_REGION_WRITE_SNIFFING, false);
1251    String writeTableName = conf.get(HBASE_CANARY_REGION_WRITE_TABLE_NAME,
1252            DEFAULT_WRITE_TABLE_NAME.getNameAsString());
1253    long configuredWriteTableTimeout
1254            = conf.getLong(HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT, DEFAULT_TIMEOUT);
1255
1256    if (this.regionServerMode) {
1257      monitor =
1258          new RegionServerMonitor(connection, monitorTargets, useRegExp,
1259              getSink(connection.getConfiguration(), RegionServerStdOutSink.class),
1260              this.executor, regionServerAllRegions,
1261              failOnError, permittedFailures);
1262
1263    } else if (this.zookeeperMode) {
1264      monitor =
1265          new ZookeeperMonitor(connection, monitorTargets, useRegExp,
1266              getSink(connection.getConfiguration(), ZookeeperStdOutSink.class),
1267              this.executor, failOnError, permittedFailures);
1268    } else {
1269      monitor =
1270          new RegionMonitor(connection, monitorTargets, useRegExp,
1271              getSink(connection.getConfiguration(), RegionStdOutSink.class),
1272              this.executor, writeSniffing,
1273              TableName.valueOf(writeTableName), failOnError, configuredReadTableTimeouts,
1274              configuredWriteTableTimeout, permittedFailures);
1275    }
1276    return monitor;
1277  }
1278
1279  private void populateReadTableTimeoutsMap(String configuredReadTableTimeoutsStr) {
1280    String[] tableTimeouts = configuredReadTableTimeoutsStr.split(",");
1281    for (String tT : tableTimeouts) {
1282      String[] nameTimeout = tT.split("=");
1283      if (nameTimeout.length < 2) {
1284        throw new IllegalArgumentException("Each -readTableTimeouts argument must be of the form " +
1285            "<tableName>=<read timeout> (without spaces).");
1286      }
1287      long timeoutVal;
1288      try {
1289        timeoutVal = Long.parseLong(nameTimeout[1]);
1290      } catch (NumberFormatException e) {
1291        throw new IllegalArgumentException("-readTableTimeouts read timeout for each table" +
1292            " must be a numeric value argument.");
1293      }
1294      configuredReadTableTimeouts.put(nameTimeout[0], timeoutVal);
1295    }
1296  }
1297  /**
1298   * A Monitor super-class can be extended by users
1299   */
1300  public static abstract class Monitor implements Runnable, Closeable {
1301    protected Connection connection;
1302    protected Admin admin;
1303    /**
1304     * 'Target' dependent on 'mode'. Could be Tables or RegionServers or ZNodes.
1305     * Passed on the command-line as arguments.
1306     */
1307    protected String[] targets;
1308    protected boolean useRegExp;
1309    protected boolean treatFailureAsError;
1310    protected boolean initialized = false;
1311
1312    protected boolean done = false;
1313    protected int errorCode = 0;
1314    protected long allowedFailures = 0;
1315    protected Sink sink;
1316    protected ExecutorService executor;
1317
1318    public boolean isDone() {
1319      return done;
1320    }
1321
1322    public boolean hasError() {
1323      return errorCode != 0;
1324    }
1325
1326    public boolean finalCheckForErrors() {
1327      if (errorCode != 0) {
1328        return true;
1329      }
1330      if (treatFailureAsError && (sink.getReadFailureCount() > allowedFailures
1331          || sink.getWriteFailureCount() > allowedFailures)) {
1332        LOG.error("Too many failures detected, treating failure as error, failing the Canary.");
1333        errorCode = FAILURE_EXIT_CODE;
1334        return true;
1335      }
1336      return false;
1337    }
1338
1339    @Override
1340    public void close() throws IOException {
1341      if (this.admin != null) {
1342        this.admin.close();
1343      }
1344    }
1345
1346    protected Monitor(Connection connection, String[] monitorTargets, boolean useRegExp, Sink sink,
1347        ExecutorService executor, boolean treatFailureAsError, long allowedFailures) {
1348      if (null == connection) {
1349        throw new IllegalArgumentException("connection shall not be null");
1350      }
1351
1352      this.connection = connection;
1353      this.targets = monitorTargets;
1354      this.useRegExp = useRegExp;
1355      this.treatFailureAsError = treatFailureAsError;
1356      this.sink = sink;
1357      this.executor = executor;
1358      this.allowedFailures = allowedFailures;
1359    }
1360
1361    @Override
1362    public abstract void run();
1363
1364    protected boolean initAdmin() {
1365      if (null == this.admin) {
1366        try {
1367          this.admin = this.connection.getAdmin();
1368        } catch (Exception e) {
1369          LOG.error("Initial HBaseAdmin failed...", e);
1370          this.errorCode = INIT_ERROR_EXIT_CODE;
1371        }
1372      } else if (admin.isAborted()) {
1373        LOG.error("HBaseAdmin aborted");
1374        this.errorCode = INIT_ERROR_EXIT_CODE;
1375      }
1376      return !this.hasError();
1377    }
1378  }
1379
1380  /**
1381   * A monitor for region mode.
1382   */
1383  private static class RegionMonitor extends Monitor {
1384    // 10 minutes
1385    private static final int DEFAULT_WRITE_TABLE_CHECK_PERIOD = 10 * 60 * 1000;
1386    // 1 days
1387    private static final int DEFAULT_WRITE_DATA_TTL = 24 * 60 * 60;
1388
1389    private long lastCheckTime = -1;
1390    private boolean writeSniffing;
1391    private TableName writeTableName;
1392    private int writeDataTTL;
1393    private float regionsLowerLimit;
1394    private float regionsUpperLimit;
1395    private int checkPeriod;
1396    private boolean rawScanEnabled;
1397    private boolean readAllCF;
1398
1399    /**
1400     * This is a timeout per table. If read of each region in the table aggregated takes longer
1401     * than what is configured here, we log an ERROR rather than just an INFO.
1402     */
1403    private HashMap<String, Long> configuredReadTableTimeouts;
1404
1405    private long configuredWriteTableTimeout;
1406
1407    public RegionMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
1408        Sink sink, ExecutorService executor, boolean writeSniffing, TableName writeTableName,
1409        boolean treatFailureAsError, HashMap<String, Long> configuredReadTableTimeouts,
1410        long configuredWriteTableTimeout,
1411        long allowedFailures) {
1412      super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError,
1413          allowedFailures);
1414      Configuration conf = connection.getConfiguration();
1415      this.writeSniffing = writeSniffing;
1416      this.writeTableName = writeTableName;
1417      this.writeDataTTL =
1418          conf.getInt(HConstants.HBASE_CANARY_WRITE_DATA_TTL_KEY, DEFAULT_WRITE_DATA_TTL);
1419      this.regionsLowerLimit =
1420          conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY, 1.0f);
1421      this.regionsUpperLimit =
1422          conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_UPPERLIMIT_KEY, 1.5f);
1423      this.checkPeriod =
1424          conf.getInt(HConstants.HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY,
1425            DEFAULT_WRITE_TABLE_CHECK_PERIOD);
1426      this.rawScanEnabled = conf.getBoolean(HConstants.HBASE_CANARY_READ_RAW_SCAN_KEY, false);
1427      this.configuredReadTableTimeouts = new HashMap<>(configuredReadTableTimeouts);
1428      this.configuredWriteTableTimeout = configuredWriteTableTimeout;
1429      this.readAllCF = conf.getBoolean(HConstants.HBASE_CANARY_READ_ALL_CF, true);
1430    }
1431
1432    private RegionStdOutSink getSink() {
1433      if (!(sink instanceof RegionStdOutSink)) {
1434        throw new RuntimeException("Can only write to Region sink");
1435      }
1436      return ((RegionStdOutSink) sink);
1437    }
1438
1439    @Override
1440    public void run() {
1441      if (this.initAdmin()) {
1442        try {
1443          List<Future<Void>> taskFutures = new LinkedList<>();
1444          RegionStdOutSink regionSink = this.getSink();
1445          regionSink.resetFailuresCountDetails();
1446          if (this.targets != null && this.targets.length > 0) {
1447            String[] tables = generateMonitorTables(this.targets);
1448            // Check to see that each table name passed in the -readTableTimeouts argument is also
1449            // passed as a monitor target.
1450            if (!new HashSet<>(Arrays.asList(tables)).
1451                containsAll(this.configuredReadTableTimeouts.keySet())) {
1452              LOG.error("-readTableTimeouts can only specify read timeouts for monitor targets " +
1453                  "passed via command line.");
1454              this.errorCode = USAGE_EXIT_CODE;
1455              return;
1456            }
1457            this.initialized = true;
1458            for (String table : tables) {
1459              LongAdder readLatency = regionSink.initializeAndGetReadLatencyForTable(table);
1460              taskFutures.addAll(CanaryTool.sniff(admin, regionSink, table, executor, TaskType.READ,
1461                this.rawScanEnabled, readLatency, readAllCF));
1462            }
1463          } else {
1464            taskFutures.addAll(sniff(TaskType.READ, regionSink));
1465          }
1466
1467          if (writeSniffing) {
1468            if (EnvironmentEdgeManager.currentTime() - lastCheckTime > checkPeriod) {
1469              try {
1470                checkWriteTableDistribution();
1471              } catch (IOException e) {
1472                LOG.error("Check canary table distribution failed!", e);
1473              }
1474              lastCheckTime = EnvironmentEdgeManager.currentTime();
1475            }
1476            // sniff canary table with write operation
1477            regionSink.initializeWriteLatency();
1478            LongAdder writeTableLatency = regionSink.getWriteLatency();
1479            taskFutures
1480                .addAll(CanaryTool.sniff(admin, regionSink, admin.getDescriptor(writeTableName),
1481                  executor, TaskType.WRITE, this.rawScanEnabled, writeTableLatency, readAllCF));
1482          }
1483
1484          for (Future<Void> future : taskFutures) {
1485            try {
1486              future.get();
1487            } catch (ExecutionException e) {
1488              LOG.error("Sniff region failed!", e);
1489            }
1490          }
1491          Map<String, LongAdder> actualReadTableLatency = regionSink.getReadLatencyMap();
1492          for (Map.Entry<String, Long> entry : configuredReadTableTimeouts.entrySet()) {
1493            String tableName = entry.getKey();
1494            if (actualReadTableLatency.containsKey(tableName)) {
1495              Long actual = actualReadTableLatency.get(tableName).longValue();
1496              Long configured = entry.getValue();
1497              if (actual > configured) {
1498                LOG.error("Read operation for {} took {}ms exceeded the configured read timeout." +
1499                    "(Configured read timeout {}ms.", tableName, actual, configured);
1500              } else {
1501                LOG.info("Read operation for {} took {}ms (Configured read timeout {}ms.",
1502                    tableName, actual, configured);
1503              }
1504            } else {
1505              LOG.error("Read operation for {} failed!", tableName);
1506            }
1507          }
1508          if (this.writeSniffing) {
1509            String writeTableStringName = this.writeTableName.getNameAsString();
1510            long actualWriteLatency = regionSink.getWriteLatency().longValue();
1511            LOG.info("Write operation for {} took {}ms. Configured write timeout {}ms.",
1512                writeTableStringName, actualWriteLatency, this.configuredWriteTableTimeout);
1513            // Check that the writeTable write operation latency does not exceed the configured
1514            // timeout.
1515            if (actualWriteLatency > this.configuredWriteTableTimeout) {
1516              LOG.error("Write operation for {} exceeded the configured write timeout.",
1517                  writeTableStringName);
1518            }
1519          }
1520        } catch (Exception e) {
1521          LOG.error("Run regionMonitor failed", e);
1522          this.errorCode = ERROR_EXIT_CODE;
1523        } finally {
1524          this.done = true;
1525        }
1526      }
1527      this.done = true;
1528    }
1529
1530    /**
1531     * @return List of tables to use in test.
1532     */
1533    private String[] generateMonitorTables(String[] monitorTargets) throws IOException {
1534      String[] returnTables = null;
1535
1536      if (this.useRegExp) {
1537        Pattern pattern = null;
1538        List<TableDescriptor> tds = null;
1539        Set<String> tmpTables = new TreeSet<>();
1540        try {
1541          LOG.debug(String.format("reading list of tables"));
1542          tds = this.admin.listTableDescriptors(pattern);
1543          if (tds == null) {
1544            tds = Collections.emptyList();
1545          }
1546          for (String monitorTarget : monitorTargets) {
1547            pattern = Pattern.compile(monitorTarget);
1548            for (TableDescriptor td : tds) {
1549              if (pattern.matcher(td.getTableName().getNameAsString()).matches()) {
1550                tmpTables.add(td.getTableName().getNameAsString());
1551              }
1552            }
1553          }
1554        } catch (IOException e) {
1555          LOG.error("Communicate with admin failed", e);
1556          throw e;
1557        }
1558
1559        if (tmpTables.size() > 0) {
1560          returnTables = tmpTables.toArray(new String[tmpTables.size()]);
1561        } else {
1562          String msg = "No HTable found, tablePattern:" + Arrays.toString(monitorTargets);
1563          LOG.error(msg);
1564          this.errorCode = INIT_ERROR_EXIT_CODE;
1565          throw new TableNotFoundException(msg);
1566        }
1567      } else {
1568        returnTables = monitorTargets;
1569      }
1570
1571      return returnTables;
1572    }
1573
1574    /*
1575     * Canary entry point to monitor all the tables.
1576     */
1577    private List<Future<Void>> sniff(TaskType taskType, RegionStdOutSink regionSink)
1578        throws Exception {
1579      LOG.debug("Reading list of tables");
1580      List<Future<Void>> taskFutures = new LinkedList<>();
1581      for (TableDescriptor td: admin.listTableDescriptors()) {
1582        if (admin.tableExists(td.getTableName()) && admin.isTableEnabled(td.getTableName()) &&
1583            (!td.getTableName().equals(writeTableName))) {
1584          LongAdder readLatency =
1585              regionSink.initializeAndGetReadLatencyForTable(td.getTableName().getNameAsString());
1586          taskFutures.addAll(CanaryTool.sniff(admin, sink, td, executor, taskType,
1587            this.rawScanEnabled, readLatency, readAllCF));
1588        }
1589      }
1590      return taskFutures;
1591    }
1592
1593    private void checkWriteTableDistribution() throws IOException {
1594      if (!admin.tableExists(writeTableName)) {
1595        int numberOfServers = admin.getRegionServers().size();
1596        if (numberOfServers == 0) {
1597          throw new IllegalStateException("No live regionservers");
1598        }
1599        createWriteTable(numberOfServers);
1600      }
1601
1602      if (!admin.isTableEnabled(writeTableName)) {
1603        admin.enableTable(writeTableName);
1604      }
1605
1606      ClusterMetrics status =
1607          admin.getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER));
1608      int numberOfServers = status.getServersName().size();
1609      if (status.getServersName().contains(status.getMasterName())) {
1610        numberOfServers -= 1;
1611      }
1612
1613      List<Pair<RegionInfo, ServerName>> pairs =
1614          MetaTableAccessor.getTableRegionsAndLocations(connection, writeTableName);
1615      int numberOfRegions = pairs.size();
1616      if (numberOfRegions < numberOfServers * regionsLowerLimit
1617          || numberOfRegions > numberOfServers * regionsUpperLimit) {
1618        admin.disableTable(writeTableName);
1619        admin.deleteTable(writeTableName);
1620        createWriteTable(numberOfServers);
1621      }
1622      HashSet<ServerName> serverSet = new HashSet<>();
1623      for (Pair<RegionInfo, ServerName> pair : pairs) {
1624        serverSet.add(pair.getSecond());
1625      }
1626      int numberOfCoveredServers = serverSet.size();
1627      if (numberOfCoveredServers < numberOfServers) {
1628        admin.balance();
1629      }
1630    }
1631
1632    private void createWriteTable(int numberOfServers) throws IOException {
1633      int numberOfRegions = (int)(numberOfServers * regionsLowerLimit);
1634      LOG.info("Number of live regionservers {}, pre-splitting the canary table into {} regions " +
1635        "(current lower limit of regions per server is {} and you can change it with config {}).",
1636          numberOfServers, numberOfRegions, regionsLowerLimit,
1637          HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY);
1638      ColumnFamilyDescriptor family = ColumnFamilyDescriptorBuilder
1639        .newBuilder(Bytes.toBytes(CANARY_TABLE_FAMILY_NAME)).setMaxVersions(1)
1640        .setTimeToLive(writeDataTTL).build();
1641      TableDescriptor desc = TableDescriptorBuilder.newBuilder(writeTableName)
1642        .setColumnFamily(family).build();
1643      byte[][] splits = new RegionSplitter.HexStringSplit().split(numberOfRegions);
1644      admin.createTable(desc, splits);
1645    }
1646  }
1647
1648  /**
1649   * Canary entry point for specified table.
1650   * @throws Exception exception
1651   */
1652  private static List<Future<Void>> sniff(final Admin admin, final Sink sink, String tableName,
1653      ExecutorService executor, TaskType taskType, boolean rawScanEnabled, LongAdder readLatency,
1654      boolean readAllCF) throws Exception {
1655    LOG.debug("Checking table is enabled and getting table descriptor for table {}", tableName);
1656    if (admin.isTableEnabled(TableName.valueOf(tableName))) {
1657      return CanaryTool.sniff(admin, sink, admin.getDescriptor(TableName.valueOf(tableName)),
1658        executor, taskType, rawScanEnabled, readLatency, readAllCF);
1659    } else {
1660      LOG.warn("Table {} is not enabled", tableName);
1661    }
1662    return new LinkedList<>();
1663  }
1664
1665  /*
1666   * Loops over regions of this table, and outputs information about the state.
1667   */
1668  private static List<Future<Void>> sniff(final Admin admin, final Sink sink,
1669      TableDescriptor tableDesc, ExecutorService executor, TaskType taskType,
1670      boolean rawScanEnabled, LongAdder rwLatency, boolean readAllCF) throws Exception {
1671    LOG.debug("Reading list of regions for table {}", tableDesc.getTableName());
1672    try (Table table = admin.getConnection().getTable(tableDesc.getTableName())) {
1673      List<RegionTask> tasks = new ArrayList<>();
1674      try (RegionLocator regionLocator =
1675               admin.getConnection().getRegionLocator(tableDesc.getTableName())) {
1676        for (HRegionLocation location: regionLocator.getAllRegionLocations()) {
1677          if (location == null) {
1678            LOG.warn("Null location");
1679            continue;
1680          }
1681          ServerName rs = location.getServerName();
1682          RegionInfo region = location.getRegion();
1683          tasks.add(new RegionTask(admin.getConnection(), region, rs, (RegionStdOutSink)sink,
1684              taskType, rawScanEnabled, rwLatency, readAllCF));
1685          Map<String, List<RegionTaskResult>> regionMap = ((RegionStdOutSink) sink).getRegionMap();
1686          regionMap.put(region.getRegionNameAsString(), new ArrayList<RegionTaskResult>());
1687        }
1688        return executor.invokeAll(tasks);
1689      }
1690    } catch (TableNotFoundException e) {
1691      return Collections.EMPTY_LIST;
1692    }
1693  }
1694
1695  //  monitor for zookeeper mode
1696  private static class ZookeeperMonitor extends Monitor {
1697    private List<String> hosts;
1698    private final String znode;
1699    private final int timeout;
1700
1701    protected ZookeeperMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
1702        Sink sink, ExecutorService executor, boolean treatFailureAsError, long allowedFailures)  {
1703      super(connection, monitorTargets, useRegExp,
1704          sink, executor, treatFailureAsError, allowedFailures);
1705      Configuration configuration = connection.getConfiguration();
1706      znode =
1707          configuration.get(ZOOKEEPER_ZNODE_PARENT,
1708              DEFAULT_ZOOKEEPER_ZNODE_PARENT);
1709      timeout = configuration
1710          .getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
1711      ConnectStringParser parser =
1712          new ConnectStringParser(ZKConfig.getZKQuorumServersString(configuration));
1713      hosts = Lists.newArrayList();
1714      for (InetSocketAddress server : parser.getServerAddresses()) {
1715        hosts.add(server.toString());
1716      }
1717      if (allowedFailures > (hosts.size() - 1) / 2) {
1718        LOG.warn(
1719          "Confirm allowable number of failed ZooKeeper nodes, as quorum will "
1720              + "already be lost. Setting of {} failures is unexpected for {} ensemble size.",
1721          allowedFailures, hosts.size());
1722      }
1723    }
1724
1725    @Override public void run() {
1726      List<ZookeeperTask> tasks = Lists.newArrayList();
1727      ZookeeperStdOutSink zkSink = null;
1728      try {
1729        zkSink = this.getSink();
1730      } catch (RuntimeException e) {
1731        LOG.error("Run ZooKeeperMonitor failed!", e);
1732        this.errorCode = ERROR_EXIT_CODE;
1733      }
1734      this.initialized = true;
1735      for (final String host : hosts) {
1736        tasks.add(new ZookeeperTask(connection, host, znode, timeout, zkSink));
1737      }
1738      try {
1739        for (Future<Void> future : this.executor.invokeAll(tasks)) {
1740          try {
1741            future.get();
1742          } catch (ExecutionException e) {
1743            LOG.error("Sniff zookeeper failed!", e);
1744            this.errorCode = ERROR_EXIT_CODE;
1745          }
1746        }
1747      } catch (InterruptedException e) {
1748        this.errorCode = ERROR_EXIT_CODE;
1749        Thread.currentThread().interrupt();
1750        LOG.error("Sniff zookeeper interrupted!", e);
1751      }
1752      this.done = true;
1753    }
1754
1755    private ZookeeperStdOutSink getSink() {
1756      if (!(sink instanceof ZookeeperStdOutSink)) {
1757        throw new RuntimeException("Can only write to zookeeper sink");
1758      }
1759      return ((ZookeeperStdOutSink) sink);
1760    }
1761  }
1762
1763
1764  /**
1765   * A monitor for regionserver mode
1766   */
1767  private static class RegionServerMonitor extends Monitor {
1768    private boolean allRegions;
1769
1770    public RegionServerMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
1771        Sink sink, ExecutorService executor, boolean allRegions,
1772        boolean treatFailureAsError, long allowedFailures) {
1773      super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError,
1774          allowedFailures);
1775      this.allRegions = allRegions;
1776    }
1777
1778    private RegionServerStdOutSink getSink() {
1779      if (!(sink instanceof RegionServerStdOutSink)) {
1780        throw new RuntimeException("Can only write to regionserver sink");
1781      }
1782      return ((RegionServerStdOutSink) sink);
1783    }
1784
1785    @Override
1786    public void run() {
1787      if (this.initAdmin() && this.checkNoTableNames()) {
1788        RegionServerStdOutSink regionServerSink = null;
1789        try {
1790          regionServerSink = this.getSink();
1791        } catch (RuntimeException e) {
1792          LOG.error("Run RegionServerMonitor failed!", e);
1793          this.errorCode = ERROR_EXIT_CODE;
1794        }
1795        Map<String, List<RegionInfo>> rsAndRMap = this.filterRegionServerByName();
1796        this.initialized = true;
1797        this.monitorRegionServers(rsAndRMap, regionServerSink);
1798      }
1799      this.done = true;
1800    }
1801
1802    private boolean checkNoTableNames() {
1803      List<String> foundTableNames = new ArrayList<>();
1804      TableName[] tableNames = null;
1805      LOG.debug("Reading list of tables");
1806      try {
1807        tableNames = this.admin.listTableNames();
1808      } catch (IOException e) {
1809        LOG.error("Get listTableNames failed", e);
1810        this.errorCode = INIT_ERROR_EXIT_CODE;
1811        return false;
1812      }
1813
1814      if (this.targets == null || this.targets.length == 0) {
1815        return true;
1816      }
1817
1818      for (String target : this.targets) {
1819        for (TableName tableName : tableNames) {
1820          if (target.equals(tableName.getNameAsString())) {
1821            foundTableNames.add(target);
1822          }
1823        }
1824      }
1825
1826      if (foundTableNames.size() > 0) {
1827        System.err.println("Cannot pass a tablename when using the -regionserver " +
1828            "option, tablenames:" + foundTableNames.toString());
1829        this.errorCode = USAGE_EXIT_CODE;
1830      }
1831      return foundTableNames.isEmpty();
1832    }
1833
1834    private void monitorRegionServers(Map<String, List<RegionInfo>> rsAndRMap,
1835        RegionServerStdOutSink regionServerSink) {
1836      List<RegionServerTask> tasks = new ArrayList<>();
1837      Map<String, AtomicLong> successMap = new HashMap<>();
1838      Random rand = new Random();
1839      for (Map.Entry<String, List<RegionInfo>> entry : rsAndRMap.entrySet()) {
1840        String serverName = entry.getKey();
1841        AtomicLong successes = new AtomicLong(0);
1842        successMap.put(serverName, successes);
1843        if (entry.getValue().isEmpty()) {
1844          LOG.error("Regionserver not serving any regions - {}", serverName);
1845        } else if (this.allRegions) {
1846          for (RegionInfo region : entry.getValue()) {
1847            tasks.add(new RegionServerTask(this.connection,
1848                serverName,
1849                region,
1850                regionServerSink,
1851                successes));
1852          }
1853        } else {
1854          // random select a region if flag not set
1855          RegionInfo region = entry.getValue().get(rand.nextInt(entry.getValue().size()));
1856          tasks.add(new RegionServerTask(this.connection,
1857              serverName,
1858              region,
1859              regionServerSink,
1860              successes));
1861        }
1862      }
1863      try {
1864        for (Future<Void> future : this.executor.invokeAll(tasks)) {
1865          try {
1866            future.get();
1867          } catch (ExecutionException e) {
1868            LOG.error("Sniff regionserver failed!", e);
1869            this.errorCode = ERROR_EXIT_CODE;
1870          }
1871        }
1872        if (this.allRegions) {
1873          for (Map.Entry<String, List<RegionInfo>> entry : rsAndRMap.entrySet()) {
1874            String serverName = entry.getKey();
1875            LOG.info("Successfully read {} regions out of {} on regionserver {}",
1876                successMap.get(serverName), entry.getValue().size(), serverName);
1877          }
1878        }
1879      } catch (InterruptedException e) {
1880        this.errorCode = ERROR_EXIT_CODE;
1881        LOG.error("Sniff regionserver interrupted!", e);
1882      }
1883    }
1884
1885    private Map<String, List<RegionInfo>> filterRegionServerByName() {
1886      Map<String, List<RegionInfo>> regionServerAndRegionsMap = this.getAllRegionServerByName();
1887      regionServerAndRegionsMap = this.doFilterRegionServerByName(regionServerAndRegionsMap);
1888      return regionServerAndRegionsMap;
1889    }
1890
1891    private Map<String, List<RegionInfo>> getAllRegionServerByName() {
1892      Map<String, List<RegionInfo>> rsAndRMap = new HashMap<>();
1893      try {
1894        LOG.debug("Reading list of tables and locations");
1895        List<TableDescriptor> tableDescs = this.admin.listTableDescriptors();
1896        List<RegionInfo> regions = null;
1897        for (TableDescriptor tableDesc: tableDescs) {
1898          try (RegionLocator regionLocator =
1899                   this.admin.getConnection().getRegionLocator(tableDesc.getTableName())) {
1900            for (HRegionLocation location : regionLocator.getAllRegionLocations()) {
1901              if (location == null) {
1902                LOG.warn("Null location");
1903                continue;
1904              }
1905              ServerName rs = location.getServerName();
1906              String rsName = rs.getHostname();
1907              RegionInfo r = location.getRegion();
1908              if (rsAndRMap.containsKey(rsName)) {
1909                regions = rsAndRMap.get(rsName);
1910              } else {
1911                regions = new ArrayList<>();
1912                rsAndRMap.put(rsName, regions);
1913              }
1914              regions.add(r);
1915            }
1916          }
1917        }
1918
1919        // get any live regionservers not serving any regions
1920        for (ServerName rs: this.admin.getRegionServers()) {
1921          String rsName = rs.getHostname();
1922          if (!rsAndRMap.containsKey(rsName)) {
1923            rsAndRMap.put(rsName, Collections.<RegionInfo> emptyList());
1924          }
1925        }
1926      } catch (IOException e) {
1927        LOG.error("Get HTables info failed", e);
1928        this.errorCode = INIT_ERROR_EXIT_CODE;
1929      }
1930      return rsAndRMap;
1931    }
1932
1933    private Map<String, List<RegionInfo>> doFilterRegionServerByName(
1934        Map<String, List<RegionInfo>> fullRsAndRMap) {
1935
1936      Map<String, List<RegionInfo>> filteredRsAndRMap = null;
1937
1938      if (this.targets != null && this.targets.length > 0) {
1939        filteredRsAndRMap = new HashMap<>();
1940        Pattern pattern = null;
1941        Matcher matcher = null;
1942        boolean regExpFound = false;
1943        for (String rsName : this.targets) {
1944          if (this.useRegExp) {
1945            regExpFound = false;
1946            pattern = Pattern.compile(rsName);
1947            for (Map.Entry<String, List<RegionInfo>> entry : fullRsAndRMap.entrySet()) {
1948              matcher = pattern.matcher(entry.getKey());
1949              if (matcher.matches()) {
1950                filteredRsAndRMap.put(entry.getKey(), entry.getValue());
1951                regExpFound = true;
1952              }
1953            }
1954            if (!regExpFound) {
1955              LOG.info("No RegionServerInfo found, regionServerPattern {}", rsName);
1956            }
1957          } else {
1958            if (fullRsAndRMap.containsKey(rsName)) {
1959              filteredRsAndRMap.put(rsName, fullRsAndRMap.get(rsName));
1960            } else {
1961              LOG.info("No RegionServerInfo found, regionServerName {}", rsName);
1962            }
1963          }
1964        }
1965      } else {
1966        filteredRsAndRMap = fullRsAndRMap;
1967      }
1968      return filteredRsAndRMap;
1969    }
1970  }
1971
1972  public static void main(String[] args) throws Exception {
1973    final Configuration conf = HBaseConfiguration.create();
1974
1975    int numThreads = conf.getInt("hbase.canary.threads.num", MAX_THREADS_NUM);
1976    LOG.info("Execution thread count={}", numThreads);
1977
1978    int exitCode;
1979    ExecutorService executor = new ScheduledThreadPoolExecutor(numThreads);
1980    try {
1981      exitCode = ToolRunner.run(conf, new CanaryTool(executor), args);
1982    } finally {
1983      executor.shutdown();
1984    }
1985    System.exit(exitCode);
1986  }
1987}