001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.tool;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT;
021import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_ZNODE_PARENT;
022import static org.apache.hadoop.hbase.util.Addressing.inetSocketAddress2String;
023
024import java.io.Closeable;
025import java.io.IOException;
026import java.net.BindException;
027import java.net.InetSocketAddress;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Collections;
031import java.util.EnumSet;
032import java.util.HashMap;
033import java.util.HashSet;
034import java.util.LinkedList;
035import java.util.List;
036import java.util.Map;
037import java.util.Set;
038import java.util.TreeSet;
039import java.util.concurrent.Callable;
040import java.util.concurrent.ConcurrentHashMap;
041import java.util.concurrent.ConcurrentMap;
042import java.util.concurrent.ExecutionException;
043import java.util.concurrent.ExecutorService;
044import java.util.concurrent.Future;
045import java.util.concurrent.ScheduledThreadPoolExecutor;
046import java.util.concurrent.ThreadLocalRandom;
047import java.util.concurrent.atomic.AtomicLong;
048import java.util.concurrent.atomic.LongAdder;
049import java.util.regex.Matcher;
050import java.util.regex.Pattern;
051import org.apache.commons.lang3.time.StopWatch;
052import org.apache.hadoop.conf.Configuration;
053import org.apache.hadoop.hbase.AuthUtil;
054import org.apache.hadoop.hbase.ChoreService;
055import org.apache.hadoop.hbase.ClusterMetrics;
056import org.apache.hadoop.hbase.ClusterMetrics.Option;
057import org.apache.hadoop.hbase.DoNotRetryIOException;
058import org.apache.hadoop.hbase.HBaseConfiguration;
059import org.apache.hadoop.hbase.HBaseInterfaceAudience;
060import org.apache.hadoop.hbase.HConstants;
061import org.apache.hadoop.hbase.HRegionLocation;
062import org.apache.hadoop.hbase.MetaTableAccessor;
063import org.apache.hadoop.hbase.NamespaceDescriptor;
064import org.apache.hadoop.hbase.ScheduledChore;
065import org.apache.hadoop.hbase.ServerName;
066import org.apache.hadoop.hbase.TableName;
067import org.apache.hadoop.hbase.TableNotEnabledException;
068import org.apache.hadoop.hbase.TableNotFoundException;
069import org.apache.hadoop.hbase.client.Admin;
070import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
071import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
072import org.apache.hadoop.hbase.client.Connection;
073import org.apache.hadoop.hbase.client.ConnectionFactory;
074import org.apache.hadoop.hbase.client.Get;
075import org.apache.hadoop.hbase.client.Put;
076import org.apache.hadoop.hbase.client.RegionInfo;
077import org.apache.hadoop.hbase.client.RegionLocator;
078import org.apache.hadoop.hbase.client.ResultScanner;
079import org.apache.hadoop.hbase.client.Scan;
080import org.apache.hadoop.hbase.client.Table;
081import org.apache.hadoop.hbase.client.TableDescriptor;
082import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
083import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
084import org.apache.hadoop.hbase.http.InfoServer;
085import org.apache.hadoop.hbase.tool.CanaryTool.RegionTask.TaskType;
086import org.apache.hadoop.hbase.util.Bytes;
087import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
088import org.apache.hadoop.hbase.util.Pair;
089import org.apache.hadoop.hbase.util.ReflectionUtils;
090import org.apache.hadoop.hbase.util.RegionSplitter;
091import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
092import org.apache.hadoop.hbase.zookeeper.ZKConfig;
093import org.apache.hadoop.util.Tool;
094import org.apache.hadoop.util.ToolRunner;
095import org.apache.yetus.audience.InterfaceAudience;
096import org.apache.zookeeper.KeeperException;
097import org.apache.zookeeper.ZooKeeper;
098import org.apache.zookeeper.client.ConnectStringParser;
099import org.apache.zookeeper.data.Stat;
100import org.slf4j.Logger;
101import org.slf4j.LoggerFactory;
102
103import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
104
105/**
106 * HBase Canary Tool for "canary monitoring" of a running HBase cluster. There are three modes:
107 * <ol>
108 * <li>region mode (Default): For each region, try to get one row per column family outputting
109 * information on failure (ERROR) or else the latency.</li>
110 * <li>regionserver mode: For each regionserver try to get one row from one table selected randomly
111 * outputting information on failure (ERROR) or else the latency.</li>
112 * <li>zookeeper mode: for each zookeeper instance, selects a znode outputting information on
113 * failure (ERROR) or else the latency.</li>
114 * </ol>
115 */
116@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
117public class CanaryTool implements Tool, Canary {
118  public static final String HBASE_CANARY_INFO_PORT = "hbase.canary.info.port";
119  public static final String HBASE_CANARY_INFO_BINDADDRESS = "hbase.canary.info.bindAddress";
120
121  private void putUpWebUI() throws IOException {
122    int port = conf.getInt(HBASE_CANARY_INFO_PORT, -1);
123    // -1 is for disabling info server
124    if (port < 0) {
125      return;
126    }
127    if (zookeeperMode) {
128      LOG.info("WebUI is not supported in Zookeeper mode");
129    } else if (regionServerMode) {
130      LOG.info("WebUI is not supported in RegionServer mode");
131    } else {
132      String addr = conf.get(HBASE_CANARY_INFO_BINDADDRESS, "0.0.0.0");
133      try {
134        InfoServer infoServer = new InfoServer("canary", addr, port, false, conf);
135        infoServer.addUnprivilegedServlet("canary", "/canary-status", CanaryStatusServlet.class);
136        infoServer.setAttribute("sink", getSink(conf, RegionStdOutSink.class));
137        infoServer.start();
138        LOG.info("Bind Canary http info server to {}:{} ", addr, port);
139      } catch (BindException e) {
140        LOG.warn("Failed binding Canary http info server to {}:{}", addr, port, e);
141      }
142    }
143  }
144
145  @Override
146  public int checkRegions(String[] targets) throws Exception {
147    String configuredReadTableTimeoutsStr = conf.get(HBASE_CANARY_REGION_READ_TABLE_TIMEOUT);
148    try {
149      if (configuredReadTableTimeoutsStr != null) {
150        populateReadTableTimeoutsMap(configuredReadTableTimeoutsStr);
151      }
152    } catch (IllegalArgumentException e) {
153      LOG.error("Constructing read table timeouts map failed ", e);
154      return USAGE_EXIT_CODE;
155    }
156    return runMonitor(targets);
157  }
158
159  @Override
160  public int checkRegionServers(String[] targets) throws Exception {
161    regionServerMode = true;
162    return runMonitor(targets);
163  }
164
165  @Override
166  public int checkZooKeeper() throws Exception {
167    zookeeperMode = true;
168    return runMonitor(null);
169  }
170
171  /**
172   * Sink interface used by the canary to output information
173   */
174  public interface Sink {
175    long getReadFailureCount();
176
177    long incReadFailureCount();
178
179    Map<String, String> getReadFailures();
180
181    void updateReadFailures(String regionName, String serverName);
182
183    long getWriteFailureCount();
184
185    long incWriteFailureCount();
186
187    Map<String, String> getWriteFailures();
188
189    void updateWriteFailures(String regionName, String serverName);
190
191    long getReadSuccessCount();
192
193    long incReadSuccessCount();
194
195    long getWriteSuccessCount();
196
197    long incWriteSuccessCount();
198  }
199
200  /**
201   * Simple implementation of canary sink that allows plotting to a file or standard output.
202   */
203  public static class StdOutSink implements Sink {
204    private AtomicLong readFailureCount = new AtomicLong(0), writeFailureCount = new AtomicLong(0),
205        readSuccessCount = new AtomicLong(0), writeSuccessCount = new AtomicLong(0);
206    private Map<String, String> readFailures = new ConcurrentHashMap<>();
207    private Map<String, String> writeFailures = new ConcurrentHashMap<>();
208
209    @Override
210    public long getReadFailureCount() {
211      return readFailureCount.get();
212    }
213
214    @Override
215    public long incReadFailureCount() {
216      return readFailureCount.incrementAndGet();
217    }
218
219    @Override
220    public Map<String, String> getReadFailures() {
221      return readFailures;
222    }
223
224    @Override
225    public void updateReadFailures(String regionName, String serverName) {
226      readFailures.put(regionName, serverName);
227    }
228
229    @Override
230    public long getWriteFailureCount() {
231      return writeFailureCount.get();
232    }
233
234    @Override
235    public long incWriteFailureCount() {
236      return writeFailureCount.incrementAndGet();
237    }
238
239    @Override
240    public Map<String, String> getWriteFailures() {
241      return writeFailures;
242    }
243
244    @Override
245    public void updateWriteFailures(String regionName, String serverName) {
246      writeFailures.put(regionName, serverName);
247    }
248
249    @Override
250    public long getReadSuccessCount() {
251      return readSuccessCount.get();
252    }
253
254    @Override
255    public long incReadSuccessCount() {
256      return readSuccessCount.incrementAndGet();
257    }
258
259    @Override
260    public long getWriteSuccessCount() {
261      return writeSuccessCount.get();
262    }
263
264    @Override
265    public long incWriteSuccessCount() {
266      return writeSuccessCount.incrementAndGet();
267    }
268  }
269
270  /**
271   * By RegionServer, for 'regionserver' mode.
272   */
273  public static class RegionServerStdOutSink extends StdOutSink {
274    public void publishReadFailure(String table, String server) {
275      incReadFailureCount();
276      LOG.error("Read from {} on {}", table, server);
277    }
278
279    public void publishReadTiming(String table, String server, long msTime) {
280      LOG.info("Read from {} on {} in {}ms", table, server, msTime);
281    }
282  }
283
284  /**
285   * Output for 'zookeeper' mode.
286   */
287  public static class ZookeeperStdOutSink extends StdOutSink {
288    public void publishReadFailure(String znode, String server) {
289      incReadFailureCount();
290      LOG.error("Read from {} on {}", znode, server);
291    }
292
293    public void publishReadTiming(String znode, String server, long msTime) {
294      LOG.info("Read from {} on {} in {}ms", znode, server, msTime);
295    }
296  }
297
298  /**
299   * By Region, for 'region' mode.
300   */
301  public static class RegionStdOutSink extends StdOutSink {
302    private Map<String, LongAdder> perTableReadLatency = new HashMap<>();
303    private LongAdder writeLatency = new LongAdder();
304    private final ConcurrentMap<String, List<RegionTaskResult>> regionMap =
305      new ConcurrentHashMap<>();
306    private ConcurrentMap<ServerName, LongAdder> perServerFailuresCount = new ConcurrentHashMap<>();
307    private ConcurrentMap<String, LongAdder> perTableFailuresCount = new ConcurrentHashMap<>();
308
309    public ConcurrentMap<ServerName, LongAdder> getPerServerFailuresCount() {
310      return perServerFailuresCount;
311    }
312
313    public ConcurrentMap<String, LongAdder> getPerTableFailuresCount() {
314      return perTableFailuresCount;
315    }
316
317    public void resetFailuresCountDetails() {
318      perServerFailuresCount.clear();
319      perTableFailuresCount.clear();
320    }
321
322    private void incFailuresCountDetails(ServerName serverName, RegionInfo region) {
323      perServerFailuresCount.compute(serverName, (server, count) -> {
324        if (count == null) {
325          count = new LongAdder();
326        }
327        count.increment();
328        return count;
329      });
330      perTableFailuresCount.compute(region.getTable().getNameAsString(), (tableName, count) -> {
331        if (count == null) {
332          count = new LongAdder();
333        }
334        count.increment();
335        return count;
336      });
337    }
338
339    public void publishReadFailure(ServerName serverName, RegionInfo region, Exception e) {
340      incReadFailureCount();
341      incFailuresCountDetails(serverName, region);
342      LOG.error("Read from {} on serverName={} failed", region.getRegionNameAsString(), serverName,
343        e);
344    }
345
346    public void publishReadFailure(ServerName serverName, RegionInfo region,
347      ColumnFamilyDescriptor column, Exception e) {
348      incReadFailureCount();
349      incFailuresCountDetails(serverName, region);
350      LOG.error("Read from {} on serverName={}, columnFamily={} failed",
351        region.getRegionNameAsString(), serverName, column.getNameAsString(), e);
352    }
353
354    public void publishReadTiming(ServerName serverName, RegionInfo region,
355      ColumnFamilyDescriptor column, long msTime) {
356      RegionTaskResult rtr = new RegionTaskResult(region, region.getTable(), serverName, column);
357      rtr.setReadSuccess();
358      rtr.setReadLatency(msTime);
359      List<RegionTaskResult> rtrs = regionMap.get(region.getRegionNameAsString());
360      rtrs.add(rtr);
361      // Note that read success count will be equal to total column family read successes.
362      incReadSuccessCount();
363      LOG.info("Read from {} on {} {} in {}ms", region.getRegionNameAsString(), serverName,
364        column.getNameAsString(), msTime);
365    }
366
367    public void publishWriteFailure(ServerName serverName, RegionInfo region, Exception e) {
368      incWriteFailureCount();
369      incFailuresCountDetails(serverName, region);
370      LOG.error("Write to {} on {} failed", region.getRegionNameAsString(), serverName, e);
371    }
372
373    public void publishWriteFailure(ServerName serverName, RegionInfo region,
374      ColumnFamilyDescriptor column, Exception e) {
375      incWriteFailureCount();
376      incFailuresCountDetails(serverName, region);
377      LOG.error("Write to {} on {} {} failed", region.getRegionNameAsString(), serverName,
378        column.getNameAsString(), e);
379    }
380
381    public void publishWriteTiming(ServerName serverName, RegionInfo region,
382      ColumnFamilyDescriptor column, long msTime) {
383      RegionTaskResult rtr = new RegionTaskResult(region, region.getTable(), serverName, column);
384      rtr.setWriteSuccess();
385      rtr.setWriteLatency(msTime);
386      List<RegionTaskResult> rtrs = regionMap.get(region.getRegionNameAsString());
387      rtrs.add(rtr);
388      // Note that write success count will be equal to total column family write successes.
389      incWriteSuccessCount();
390      LOG.info("Write to {} on {} {} in {}ms", region.getRegionNameAsString(), serverName,
391        column.getNameAsString(), msTime);
392    }
393
394    public Map<String, LongAdder> getReadLatencyMap() {
395      return this.perTableReadLatency;
396    }
397
398    public LongAdder initializeAndGetReadLatencyForTable(String tableName) {
399      LongAdder initLatency = new LongAdder();
400      this.perTableReadLatency.put(tableName, initLatency);
401      return initLatency;
402    }
403
404    public void initializeWriteLatency() {
405      this.writeLatency.reset();
406    }
407
408    public LongAdder getWriteLatency() {
409      return this.writeLatency;
410    }
411
412    public ConcurrentMap<String, List<RegionTaskResult>> getRegionMap() {
413      return this.regionMap;
414    }
415
416    public int getTotalExpectedRegions() {
417      return this.regionMap.size();
418    }
419  }
420
421  /**
422   * Run a single zookeeper Task and then exit.
423   */
424  static class ZookeeperTask implements Callable<Void> {
425    private final Connection connection;
426    private final String host;
427    private String znode;
428    private final int timeout;
429    private ZookeeperStdOutSink sink;
430
431    public ZookeeperTask(Connection connection, String host, String znode, int timeout,
432      ZookeeperStdOutSink sink) {
433      this.connection = connection;
434      this.host = host;
435      this.znode = znode;
436      this.timeout = timeout;
437      this.sink = sink;
438    }
439
440    @Override
441    public Void call() throws Exception {
442      ZooKeeper zooKeeper = null;
443      try {
444        zooKeeper = new ZooKeeper(host, timeout, EmptyWatcher.instance);
445        Stat exists = zooKeeper.exists(znode, false);
446        StopWatch stopwatch = new StopWatch();
447        stopwatch.start();
448        zooKeeper.getData(znode, false, exists);
449        stopwatch.stop();
450        sink.publishReadTiming(znode, host, stopwatch.getTime());
451      } catch (KeeperException | InterruptedException e) {
452        sink.publishReadFailure(znode, host);
453      } finally {
454        if (zooKeeper != null) {
455          zooKeeper.close();
456        }
457      }
458      return null;
459    }
460  }
461
462  /**
463   * Run a single Region Task and then exit. For each column family of the Region, get one row and
464   * output latency or failure.
465   */
466  static class RegionTask implements Callable<Void> {
467    public enum TaskType {
468      READ,
469      WRITE
470    }
471
472    private Connection connection;
473    private RegionInfo region;
474    private RegionStdOutSink sink;
475    private TaskType taskType;
476    private boolean rawScanEnabled;
477    private ServerName serverName;
478    private LongAdder readWriteLatency;
479    private boolean readAllCF;
480
481    RegionTask(Connection connection, RegionInfo region, ServerName serverName,
482      RegionStdOutSink sink, TaskType taskType, boolean rawScanEnabled, LongAdder rwLatency,
483      boolean readAllCF) {
484      this.connection = connection;
485      this.region = region;
486      this.serverName = serverName;
487      this.sink = sink;
488      this.taskType = taskType;
489      this.rawScanEnabled = rawScanEnabled;
490      this.readWriteLatency = rwLatency;
491      this.readAllCF = readAllCF;
492    }
493
494    @Override
495    public Void call() {
496      switch (taskType) {
497        case READ:
498          return read();
499        case WRITE:
500          return write();
501        default:
502          return read();
503      }
504    }
505
506    private Void readColumnFamily(Table table, ColumnFamilyDescriptor column) {
507      byte[] startKey = null;
508      Get get = null;
509      Scan scan = null;
510      ResultScanner rs = null;
511      StopWatch stopWatch = new StopWatch();
512      startKey = region.getStartKey();
513      // Can't do a get on empty start row so do a Scan of first element if any instead.
514      if (startKey.length > 0) {
515        get = new Get(startKey);
516        get.setCacheBlocks(false);
517        get.setFilter(new FirstKeyOnlyFilter());
518        get.addFamily(column.getName());
519      } else {
520        scan = new Scan();
521        LOG.debug("rawScan {} for {}", rawScanEnabled, region.getTable());
522        scan.setRaw(rawScanEnabled);
523        scan.setCaching(1);
524        scan.setCacheBlocks(false);
525        scan.setFilter(new FirstKeyOnlyFilter());
526        scan.addFamily(column.getName());
527        scan.setMaxResultSize(1L);
528        scan.setOneRowLimit();
529      }
530      LOG.debug("Reading from {} {} {} {}", region.getTable(), region.getRegionNameAsString(),
531        column.getNameAsString(), Bytes.toStringBinary(startKey));
532      try {
533        stopWatch.start();
534        if (startKey.length > 0) {
535          table.get(get);
536        } else {
537          rs = table.getScanner(scan);
538          rs.next();
539        }
540        stopWatch.stop();
541        this.readWriteLatency.add(stopWatch.getTime());
542        sink.publishReadTiming(serverName, region, column, stopWatch.getTime());
543      } catch (Exception e) {
544        sink.publishReadFailure(serverName, region, column, e);
545        sink.updateReadFailures(region.getRegionNameAsString(),
546          serverName == null ? "NULL" : serverName.getHostname());
547      } finally {
548        if (rs != null) {
549          rs.close();
550        }
551      }
552      return null;
553    }
554
555    private ColumnFamilyDescriptor randomPickOneColumnFamily(ColumnFamilyDescriptor[] cfs) {
556      int size = cfs.length;
557      return cfs[ThreadLocalRandom.current().nextInt(size)];
558
559    }
560
561    public Void read() {
562      Table table = null;
563      TableDescriptor tableDesc = null;
564      try {
565        LOG.debug("Reading table descriptor for table {}", region.getTable());
566        table = connection.getTable(region.getTable());
567        tableDesc = table.getDescriptor();
568      } catch (IOException e) {
569        LOG.debug("sniffRegion {} of {} failed", region.getEncodedName(), e);
570        sink.publishReadFailure(serverName, region, e);
571        if (table != null) {
572          try {
573            table.close();
574          } catch (IOException ioe) {
575            LOG.error("Close table failed", e);
576          }
577        }
578        return null;
579      }
580
581      if (readAllCF) {
582        for (ColumnFamilyDescriptor column : tableDesc.getColumnFamilies()) {
583          readColumnFamily(table, column);
584        }
585      } else {
586        readColumnFamily(table, randomPickOneColumnFamily(tableDesc.getColumnFamilies()));
587      }
588      try {
589        table.close();
590      } catch (IOException e) {
591        LOG.error("Close table failed", e);
592      }
593      return null;
594    }
595
596    /**
597     * Check writes for the canary table
598     */
599    private Void write() {
600      Table table = null;
601      TableDescriptor tableDesc = null;
602      try {
603        table = connection.getTable(region.getTable());
604        tableDesc = table.getDescriptor();
605        byte[] rowToCheck = region.getStartKey();
606        if (rowToCheck.length == 0) {
607          rowToCheck = new byte[] { 0x0 };
608        }
609        int writeValueSize =
610          connection.getConfiguration().getInt(HConstants.HBASE_CANARY_WRITE_VALUE_SIZE_KEY, 10);
611        for (ColumnFamilyDescriptor column : tableDesc.getColumnFamilies()) {
612          Put put = new Put(rowToCheck);
613          byte[] value = new byte[writeValueSize];
614          Bytes.random(value);
615          put.addColumn(column.getName(), HConstants.EMPTY_BYTE_ARRAY, value);
616          LOG.debug("Writing to {} {} {} {}", tableDesc.getTableName(),
617            region.getRegionNameAsString(), column.getNameAsString(),
618            Bytes.toStringBinary(rowToCheck));
619          try {
620            long startTime = EnvironmentEdgeManager.currentTime();
621            table.put(put);
622            long time = EnvironmentEdgeManager.currentTime() - startTime;
623            this.readWriteLatency.add(time);
624            sink.publishWriteTiming(serverName, region, column, time);
625          } catch (Exception e) {
626            sink.publishWriteFailure(serverName, region, column, e);
627          }
628        }
629        table.close();
630      } catch (IOException e) {
631        sink.publishWriteFailure(serverName, region, e);
632        sink.updateWriteFailures(region.getRegionNameAsString(), serverName.getHostname());
633      }
634      return null;
635    }
636  }
637
638  /**
639   * Run a single RegionServer Task and then exit. Get one row from a region on the regionserver and
640   * output latency or the failure.
641   */
642  static class RegionServerTask implements Callable<Void> {
643    private Connection connection;
644    private String serverName;
645    private RegionInfo region;
646    private RegionServerStdOutSink sink;
647    private AtomicLong successes;
648
649    RegionServerTask(Connection connection, String serverName, RegionInfo region,
650      RegionServerStdOutSink sink, AtomicLong successes) {
651      this.connection = connection;
652      this.serverName = serverName;
653      this.region = region;
654      this.sink = sink;
655      this.successes = successes;
656    }
657
658    @Override
659    public Void call() {
660      TableName tableName = null;
661      Table table = null;
662      Get get = null;
663      byte[] startKey = null;
664      Scan scan = null;
665      StopWatch stopWatch = new StopWatch();
666      // monitor one region on every region server
667      stopWatch.reset();
668      try {
669        tableName = region.getTable();
670        table = connection.getTable(tableName);
671        startKey = region.getStartKey();
672        // Can't do a get on empty start row so do a Scan of first element if any instead.
673        LOG.debug("Reading from {} {} {} {}", serverName, region.getTable(),
674          region.getRegionNameAsString(), Bytes.toStringBinary(startKey));
675        if (startKey.length > 0) {
676          get = new Get(startKey);
677          get.setCacheBlocks(false);
678          get.setFilter(new FirstKeyOnlyFilter());
679          stopWatch.start();
680          table.get(get);
681          stopWatch.stop();
682        } else {
683          scan = new Scan();
684          scan.setCacheBlocks(false);
685          scan.setFilter(new FirstKeyOnlyFilter());
686          scan.setCaching(1);
687          scan.setMaxResultSize(1L);
688          scan.setOneRowLimit();
689          stopWatch.start();
690          ResultScanner s = table.getScanner(scan);
691          s.next();
692          s.close();
693          stopWatch.stop();
694        }
695        successes.incrementAndGet();
696        sink.publishReadTiming(tableName.getNameAsString(), serverName, stopWatch.getTime());
697      } catch (TableNotFoundException tnfe) {
698        LOG.error("Table may be deleted", tnfe);
699        // This is ignored because it doesn't imply that the regionserver is dead
700      } catch (TableNotEnabledException tnee) {
701        // This is considered a success since we got a response.
702        successes.incrementAndGet();
703        LOG.debug("The targeted table was disabled.  Assuming success.");
704      } catch (DoNotRetryIOException dnrioe) {
705        sink.publishReadFailure(tableName.getNameAsString(), serverName);
706        LOG.error(dnrioe.toString(), dnrioe);
707      } catch (IOException e) {
708        sink.publishReadFailure(tableName.getNameAsString(), serverName);
709        LOG.error(e.toString(), e);
710      } finally {
711        if (table != null) {
712          try {
713            table.close();
714          } catch (IOException e) {/* DO NOTHING */
715            LOG.error("Close table failed", e);
716          }
717        }
718        scan = null;
719        get = null;
720        startKey = null;
721      }
722      return null;
723    }
724  }
725
726  private static final int USAGE_EXIT_CODE = 1;
727  private static final int INIT_ERROR_EXIT_CODE = 2;
728  private static final int TIMEOUT_ERROR_EXIT_CODE = 3;
729  private static final int ERROR_EXIT_CODE = 4;
730  private static final int FAILURE_EXIT_CODE = 5;
731
732  private static final long DEFAULT_INTERVAL = 60000;
733
734  private static final long DEFAULT_TIMEOUT = 600000; // 10 mins
735  private static final int MAX_THREADS_NUM = 16; // #threads to contact regions
736
737  private static final Logger LOG = LoggerFactory.getLogger(Canary.class);
738
739  public static final TableName DEFAULT_WRITE_TABLE_NAME =
740    TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "canary");
741
742  private static final String CANARY_TABLE_FAMILY_NAME = "Test";
743
744  private Configuration conf = null;
745  private long interval = 0;
746  private Sink sink = null;
747
748  /**
749   * True if we are to run in 'regionServer' mode.
750   */
751  private boolean regionServerMode = false;
752
753  /**
754   * True if we are to run in zookeeper 'mode'.
755   */
756  private boolean zookeeperMode = false;
757
758  /**
759   * This is a Map of table to timeout. The timeout is for reading all regions in the table; i.e. we
760   * aggregate time to fetch each region and it needs to be less than this value else we log an
761   * ERROR.
762   */
763  private HashMap<String, Long> configuredReadTableTimeouts = new HashMap<>();
764
765  public static final String HBASE_CANARY_REGIONSERVER_ALL_REGIONS =
766    "hbase.canary.regionserver_all_regions";
767
768  public static final String HBASE_CANARY_REGION_WRITE_SNIFFING =
769    "hbase.canary.region.write.sniffing";
770  public static final String HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT =
771    "hbase.canary.region.write.table.timeout";
772  public static final String HBASE_CANARY_REGION_WRITE_TABLE_NAME =
773    "hbase.canary.region.write.table.name";
774  public static final String HBASE_CANARY_REGION_READ_TABLE_TIMEOUT =
775    "hbase.canary.region.read.table.timeout";
776
777  public static final String HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES =
778    "hbase.canary.zookeeper.permitted.failures";
779
780  public static final String HBASE_CANARY_USE_REGEX = "hbase.canary.use.regex";
781  public static final String HBASE_CANARY_TIMEOUT = "hbase.canary.timeout";
782  public static final String HBASE_CANARY_FAIL_ON_ERROR = "hbase.canary.fail.on.error";
783
784  private ExecutorService executor; // threads to retrieve data from regionservers
785
786  public CanaryTool() {
787    this(new ScheduledThreadPoolExecutor(1));
788  }
789
790  public CanaryTool(ExecutorService executor) {
791    this(executor, null);
792  }
793
794  @InterfaceAudience.Private
795  CanaryTool(ExecutorService executor, Sink sink) {
796    this.executor = executor;
797    this.sink = sink;
798  }
799
800  CanaryTool(Configuration conf, ExecutorService executor) {
801    this(conf, executor, null);
802  }
803
804  CanaryTool(Configuration conf, ExecutorService executor, Sink sink) {
805    this(executor, sink);
806    setConf(conf);
807  }
808
809  @Override
810  public Configuration getConf() {
811    return conf;
812  }
813
814  @Override
815  public void setConf(Configuration conf) {
816    if (conf == null) {
817      conf = HBaseConfiguration.create();
818    }
819    this.conf = conf;
820  }
821
822  private int parseArgs(String[] args) {
823    int index = -1;
824    long permittedFailures = 0;
825    boolean regionServerAllRegions = false, writeSniffing = false;
826    String readTableTimeoutsStr = null;
827    // Process command line args
828    for (int i = 0; i < args.length; i++) {
829      String cmd = args[i];
830      if (cmd.startsWith("-")) {
831        if (index >= 0) {
832          // command line args must be in the form: [opts] [table 1 [table 2 ...]]
833          System.err.println("Invalid command line options");
834          printUsageAndExit();
835        }
836        if (cmd.equals("-help") || cmd.equals("-h")) {
837          // user asked for help, print the help and quit.
838          printUsageAndExit();
839        } else if (cmd.equals("-daemon") && interval == 0) {
840          // user asked for daemon mode, set a default interval between checks
841          interval = DEFAULT_INTERVAL;
842        } else if (cmd.equals("-interval")) {
843          // user has specified an interval for canary breaths (-interval N)
844          i++;
845
846          if (i == args.length) {
847            System.err.println("-interval takes a numeric seconds value argument.");
848            printUsageAndExit();
849          }
850          try {
851            interval = Long.parseLong(args[i]) * 1000;
852          } catch (NumberFormatException e) {
853            System.err.println("-interval needs a numeric value argument.");
854            printUsageAndExit();
855          }
856        } else if (cmd.equals("-zookeeper")) {
857          this.zookeeperMode = true;
858        } else if (cmd.equals("-regionserver")) {
859          this.regionServerMode = true;
860        } else if (cmd.equals("-allRegions")) {
861          conf.setBoolean(HBASE_CANARY_REGIONSERVER_ALL_REGIONS, true);
862          regionServerAllRegions = true;
863        } else if (cmd.equals("-writeSniffing")) {
864          writeSniffing = true;
865          conf.setBoolean(HBASE_CANARY_REGION_WRITE_SNIFFING, true);
866        } else if (cmd.equals("-treatFailureAsError") || cmd.equals("-failureAsError")) {
867          conf.setBoolean(HBASE_CANARY_FAIL_ON_ERROR, true);
868        } else if (cmd.equals("-e")) {
869          conf.setBoolean(HBASE_CANARY_USE_REGEX, true);
870        } else if (cmd.equals("-t")) {
871          i++;
872
873          if (i == args.length) {
874            System.err.println("-t takes a numeric milliseconds value argument.");
875            printUsageAndExit();
876          }
877          long timeout = 0;
878          try {
879            timeout = Long.parseLong(args[i]);
880          } catch (NumberFormatException e) {
881            System.err.println("-t takes a numeric milliseconds value argument.");
882            printUsageAndExit();
883          }
884          conf.setLong(HBASE_CANARY_TIMEOUT, timeout);
885        } else if (cmd.equals("-writeTableTimeout")) {
886          i++;
887
888          if (i == args.length) {
889            System.err.println("-writeTableTimeout takes a numeric milliseconds value argument.");
890            printUsageAndExit();
891          }
892          long configuredWriteTableTimeout = 0;
893          try {
894            configuredWriteTableTimeout = Long.parseLong(args[i]);
895          } catch (NumberFormatException e) {
896            System.err.println("-writeTableTimeout takes a numeric milliseconds value argument.");
897            printUsageAndExit();
898          }
899          conf.setLong(HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT, configuredWriteTableTimeout);
900        } else if (cmd.equals("-writeTable")) {
901          i++;
902
903          if (i == args.length) {
904            System.err.println("-writeTable takes a string tablename value argument.");
905            printUsageAndExit();
906          }
907          conf.set(HBASE_CANARY_REGION_WRITE_TABLE_NAME, args[i]);
908        } else if (cmd.equals("-f")) {
909          i++;
910          if (i == args.length) {
911            System.err.println("-f needs a boolean value argument (true|false).");
912            printUsageAndExit();
913          }
914
915          conf.setBoolean(HBASE_CANARY_FAIL_ON_ERROR, Boolean.parseBoolean(args[i]));
916        } else if (cmd.equals("-readTableTimeouts")) {
917          i++;
918          if (i == args.length) {
919            System.err.println("-readTableTimeouts needs a comma-separated list of read "
920              + "millisecond timeouts per table (without spaces).");
921            printUsageAndExit();
922          }
923          readTableTimeoutsStr = args[i];
924          conf.set(HBASE_CANARY_REGION_READ_TABLE_TIMEOUT, readTableTimeoutsStr);
925        } else if (cmd.equals("-permittedZookeeperFailures")) {
926          i++;
927
928          if (i == args.length) {
929            System.err.println("-permittedZookeeperFailures needs a numeric value argument.");
930            printUsageAndExit();
931          }
932          try {
933            permittedFailures = Long.parseLong(args[i]);
934          } catch (NumberFormatException e) {
935            System.err.println("-permittedZookeeperFailures needs a numeric value argument.");
936            printUsageAndExit();
937          }
938          conf.setLong(HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES, permittedFailures);
939        } else {
940          // no options match
941          System.err.println(cmd + " options is invalid.");
942          printUsageAndExit();
943        }
944      } else if (index < 0) {
945        // keep track of first table name specified by the user
946        index = i;
947      }
948    }
949    if (regionServerAllRegions && !this.regionServerMode) {
950      System.err.println("-allRegions can only be specified in regionserver mode.");
951      printUsageAndExit();
952    }
953    if (this.zookeeperMode) {
954      if (this.regionServerMode || regionServerAllRegions || writeSniffing) {
955        System.err.println("-zookeeper is exclusive and cannot be combined with " + "other modes.");
956        printUsageAndExit();
957      }
958    }
959    if (permittedFailures != 0 && !this.zookeeperMode) {
960      System.err.println("-permittedZookeeperFailures requires -zookeeper mode.");
961      printUsageAndExit();
962    }
963    if (readTableTimeoutsStr != null && (this.regionServerMode || this.zookeeperMode)) {
964      System.err.println("-readTableTimeouts can only be configured in region mode.");
965      printUsageAndExit();
966    }
967    return index;
968  }
969
970  @Override
971  public int run(String[] args) throws Exception {
972    int index = parseArgs(args);
973    String[] monitorTargets = null;
974
975    if (index >= 0) {
976      int length = args.length - index;
977      monitorTargets = new String[length];
978      System.arraycopy(args, index, monitorTargets, 0, length);
979    }
980    if (interval > 0) {
981      // Only show the web page in daemon mode
982      putUpWebUI();
983    }
984    if (zookeeperMode) {
985      return checkZooKeeper();
986    } else if (regionServerMode) {
987      return checkRegionServers(monitorTargets);
988    } else {
989      return checkRegions(monitorTargets);
990    }
991  }
992
993  private int runMonitor(String[] monitorTargets) throws Exception {
994    ChoreService choreService = null;
995
996    // Launches chore for refreshing kerberos credentials if security is enabled.
997    // Please see http://hbase.apache.org/book.html#_running_canary_in_a_kerberos_enabled_cluster
998    // for more details.
999    final ScheduledChore authChore = AuthUtil.getAuthChore(conf);
1000    if (authChore != null) {
1001      choreService = new ChoreService("CANARY_TOOL");
1002      choreService.scheduleChore(authChore);
1003    }
1004
1005    // Start to prepare the stuffs
1006    Monitor monitor = null;
1007    Thread monitorThread;
1008    long startTime = 0;
1009    long currentTimeLength = 0;
1010    boolean failOnError = conf.getBoolean(HBASE_CANARY_FAIL_ON_ERROR, true);
1011    long timeout = conf.getLong(HBASE_CANARY_TIMEOUT, DEFAULT_TIMEOUT);
1012    // Get a connection to use in below.
1013    try (Connection connection = ConnectionFactory.createConnection(this.conf)) {
1014      do {
1015        // Do monitor !!
1016        try {
1017          monitor = this.newMonitor(connection, monitorTargets);
1018          startTime = EnvironmentEdgeManager.currentTime();
1019          monitorThread = new Thread(monitor, "CanaryMonitor-" + startTime);
1020          monitorThread.start();
1021          while (!monitor.isDone()) {
1022            // wait for 1 sec
1023            Thread.sleep(1000);
1024            // exit if any error occurs
1025            if (failOnError && monitor.hasError()) {
1026              monitorThread.interrupt();
1027              if (monitor.initialized) {
1028                return monitor.errorCode;
1029              } else {
1030                return INIT_ERROR_EXIT_CODE;
1031              }
1032            }
1033            currentTimeLength = EnvironmentEdgeManager.currentTime() - startTime;
1034            if (currentTimeLength > timeout) {
1035              LOG.error("The monitor is running too long (" + currentTimeLength
1036                + ") after timeout limit:" + timeout + " will be killed itself !!");
1037              if (monitor.initialized) {
1038                return TIMEOUT_ERROR_EXIT_CODE;
1039              } else {
1040                return INIT_ERROR_EXIT_CODE;
1041              }
1042            }
1043          }
1044
1045          if (failOnError && monitor.finalCheckForErrors()) {
1046            monitorThread.interrupt();
1047            return monitor.errorCode;
1048          }
1049        } finally {
1050          if (monitor != null) {
1051            monitor.close();
1052          }
1053        }
1054
1055        Thread.sleep(interval);
1056      } while (interval > 0);
1057    } // try-with-resources close
1058
1059    if (choreService != null) {
1060      choreService.shutdown();
1061    }
1062    return monitor.errorCode;
1063  }
1064
1065  @Override
1066  public Map<String, String> getReadFailures() {
1067    return sink.getReadFailures();
1068  }
1069
1070  @Override
1071  public Map<String, String> getWriteFailures() {
1072    return sink.getWriteFailures();
1073  }
1074
1075  private void printUsageAndExit() {
1076    System.err.println(
1077      "Usage: canary [OPTIONS] [<TABLE1> [<TABLE2]...] | [<REGIONSERVER1> [<REGIONSERVER2]..]");
1078    System.err.println("Where [OPTIONS] are:");
1079    System.err.println(" -h,-help        show this help and exit.");
1080    System.err.println(
1081      " -regionserver   set 'regionserver mode'; gets row from random region on " + "server");
1082    System.err.println(
1083      " -allRegions     get from ALL regions when 'regionserver mode', not just " + "random one.");
1084    System.err.println(" -zookeeper      set 'zookeeper mode'; grab zookeeper.znode.parent on "
1085      + "each ensemble member");
1086    System.err.println(" -daemon         continuous check at defined intervals.");
1087    System.err.println(" -interval <N>   interval between checks in seconds");
1088    System.err
1089      .println(" -e              consider table/regionserver argument as regular " + "expression");
1090    System.err.println(" -f <B>          exit on first error; default=true");
1091    System.err.println(" -failureAsError treat read/write failure as error");
1092    System.err.println(" -t <N>          timeout for canary-test run; default=600000ms");
1093    System.err.println(" -writeSniffing  enable write sniffing");
1094    System.err.println(" -writeTable     the table used for write sniffing; default=hbase:canary");
1095    System.err.println(" -writeTableTimeout <N>  timeout for writeTable; default=600000ms");
1096    System.err.println(
1097      " -readTableTimeouts <tableName>=<read timeout>," + "<tableName>=<read timeout>,...");
1098    System.err
1099      .println("                comma-separated list of table read timeouts " + "(no spaces);");
1100    System.err.println("                logs 'ERROR' if takes longer. default=600000ms");
1101    System.err.println(" -permittedZookeeperFailures <N>  Ignore first N failures attempting to ");
1102    System.err.println("                connect to individual zookeeper nodes in ensemble");
1103    System.err.println("");
1104    System.err.println(" -D<configProperty>=<value> to assign or override configuration params");
1105    System.err.println(" -Dhbase.canary.read.raw.enabled=<true/false> Set to enable/disable "
1106      + "raw scan; default=false");
1107    System.err.println(
1108      " -Dhbase.canary.info.port=PORT_NUMBER  Set for a Canary UI; " + "default=-1 (None)");
1109    System.err.println("");
1110    System.err.println(
1111      "Canary runs in one of three modes: region (default), regionserver, or " + "zookeeper.");
1112    System.err.println("To sniff/probe all regions, pass no arguments.");
1113    System.err.println("To sniff/probe all regions of a table, pass tablename.");
1114    System.err.println("To sniff/probe regionservers, pass -regionserver, etc.");
1115    System.err.println("See http://hbase.apache.org/book.html#_canary for Canary documentation.");
1116    System.exit(USAGE_EXIT_CODE);
1117  }
1118
1119  Sink getSink(Configuration configuration, Class clazz) {
1120    // In test context, this.sink might be set. Use it if non-null. For testing.
1121    return this.sink != null
1122      ? this.sink
1123      : (Sink) ReflectionUtils
1124        .newInstance(configuration.getClass("hbase.canary.sink.class", clazz, Sink.class));
1125  }
1126
1127  /**
1128   * Canary region mode-specific data structure which stores information about each region to be
1129   * scanned
1130   */
1131  public static class RegionTaskResult {
1132    private RegionInfo region;
1133    private TableName tableName;
1134    private ServerName serverName;
1135    private ColumnFamilyDescriptor column;
1136    private AtomicLong readLatency = null;
1137    private AtomicLong writeLatency = null;
1138    private boolean readSuccess = false;
1139    private boolean writeSuccess = false;
1140
1141    public RegionTaskResult(RegionInfo region, TableName tableName, ServerName serverName,
1142      ColumnFamilyDescriptor column) {
1143      this.region = region;
1144      this.tableName = tableName;
1145      this.serverName = serverName;
1146      this.column = column;
1147    }
1148
1149    public RegionInfo getRegionInfo() {
1150      return this.region;
1151    }
1152
1153    public String getRegionNameAsString() {
1154      return this.region.getRegionNameAsString();
1155    }
1156
1157    public TableName getTableName() {
1158      return this.tableName;
1159    }
1160
1161    public String getTableNameAsString() {
1162      return this.tableName.getNameAsString();
1163    }
1164
1165    public ServerName getServerName() {
1166      return this.serverName;
1167    }
1168
1169    public String getServerNameAsString() {
1170      return this.serverName.getServerName();
1171    }
1172
1173    public ColumnFamilyDescriptor getColumnFamily() {
1174      return this.column;
1175    }
1176
1177    public String getColumnFamilyNameAsString() {
1178      return this.column.getNameAsString();
1179    }
1180
1181    public long getReadLatency() {
1182      if (this.readLatency == null) {
1183        return -1;
1184      }
1185      return this.readLatency.get();
1186    }
1187
1188    public void setReadLatency(long readLatency) {
1189      if (this.readLatency != null) {
1190        this.readLatency.set(readLatency);
1191      } else {
1192        this.readLatency = new AtomicLong(readLatency);
1193      }
1194    }
1195
1196    public long getWriteLatency() {
1197      if (this.writeLatency == null) {
1198        return -1;
1199      }
1200      return this.writeLatency.get();
1201    }
1202
1203    public void setWriteLatency(long writeLatency) {
1204      if (this.writeLatency != null) {
1205        this.writeLatency.set(writeLatency);
1206      } else {
1207        this.writeLatency = new AtomicLong(writeLatency);
1208      }
1209    }
1210
1211    public boolean isReadSuccess() {
1212      return this.readSuccess;
1213    }
1214
1215    public void setReadSuccess() {
1216      this.readSuccess = true;
1217    }
1218
1219    public boolean isWriteSuccess() {
1220      return this.writeSuccess;
1221    }
1222
1223    public void setWriteSuccess() {
1224      this.writeSuccess = true;
1225    }
1226  }
1227
1228  /**
1229   * A Factory method for {@link Monitor}. Makes a RegionServerMonitor, or a ZooKeeperMonitor, or a
1230   * RegionMonitor.
1231   * @return a Monitor instance
1232   */
1233  private Monitor newMonitor(final Connection connection, String[] monitorTargets) {
1234    Monitor monitor;
1235    boolean useRegExp = conf.getBoolean(HBASE_CANARY_USE_REGEX, false);
1236    boolean regionServerAllRegions = conf.getBoolean(HBASE_CANARY_REGIONSERVER_ALL_REGIONS, false);
1237    boolean failOnError = conf.getBoolean(HBASE_CANARY_FAIL_ON_ERROR, true);
1238    int permittedFailures = conf.getInt(HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES, 0);
1239    boolean writeSniffing = conf.getBoolean(HBASE_CANARY_REGION_WRITE_SNIFFING, false);
1240    String writeTableName =
1241      conf.get(HBASE_CANARY_REGION_WRITE_TABLE_NAME, DEFAULT_WRITE_TABLE_NAME.getNameAsString());
1242    long configuredWriteTableTimeout =
1243      conf.getLong(HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT, DEFAULT_TIMEOUT);
1244
1245    if (this.regionServerMode) {
1246      monitor = new RegionServerMonitor(connection, monitorTargets, useRegExp,
1247        getSink(connection.getConfiguration(), RegionServerStdOutSink.class), this.executor,
1248        regionServerAllRegions, failOnError, permittedFailures);
1249
1250    } else if (this.zookeeperMode) {
1251      monitor = new ZookeeperMonitor(connection, monitorTargets, useRegExp,
1252        getSink(connection.getConfiguration(), ZookeeperStdOutSink.class), this.executor,
1253        failOnError, permittedFailures);
1254    } else {
1255      monitor = new RegionMonitor(connection, monitorTargets, useRegExp,
1256        getSink(connection.getConfiguration(), RegionStdOutSink.class), this.executor,
1257        writeSniffing, TableName.valueOf(writeTableName), failOnError, configuredReadTableTimeouts,
1258        configuredWriteTableTimeout, permittedFailures);
1259    }
1260    return monitor;
1261  }
1262
1263  private void populateReadTableTimeoutsMap(String configuredReadTableTimeoutsStr) {
1264    String[] tableTimeouts = configuredReadTableTimeoutsStr.split(",");
1265    for (String tT : tableTimeouts) {
1266      String[] nameTimeout = tT.split("=");
1267      if (nameTimeout.length < 2) {
1268        throw new IllegalArgumentException("Each -readTableTimeouts argument must be of the form "
1269          + "<tableName>=<read timeout> (without spaces).");
1270      }
1271      long timeoutVal;
1272      try {
1273        timeoutVal = Long.parseLong(nameTimeout[1]);
1274      } catch (NumberFormatException e) {
1275        throw new IllegalArgumentException(
1276          "-readTableTimeouts read timeout for each table" + " must be a numeric value argument.");
1277      }
1278      configuredReadTableTimeouts.put(nameTimeout[0], timeoutVal);
1279    }
1280  }
1281
1282  /**
1283   * A Monitor super-class can be extended by users
1284   */
1285  public static abstract class Monitor implements Runnable, Closeable {
1286    protected Connection connection;
1287    protected Admin admin;
1288    /**
1289     * 'Target' dependent on 'mode'. Could be Tables or RegionServers or ZNodes. Passed on the
1290     * command-line as arguments.
1291     */
1292    protected String[] targets;
1293    protected boolean useRegExp;
1294    protected boolean treatFailureAsError;
1295    protected boolean initialized = false;
1296
1297    protected boolean done = false;
1298    protected int errorCode = 0;
1299    protected long allowedFailures = 0;
1300    protected Sink sink;
1301    protected ExecutorService executor;
1302
1303    public boolean isDone() {
1304      return done;
1305    }
1306
1307    public boolean hasError() {
1308      return errorCode != 0;
1309    }
1310
1311    public boolean finalCheckForErrors() {
1312      if (errorCode != 0) {
1313        return true;
1314      }
1315      if (
1316        treatFailureAsError && (sink.getReadFailureCount() > allowedFailures
1317          || sink.getWriteFailureCount() > allowedFailures)
1318      ) {
1319        LOG.error("Too many failures detected, treating failure as error, failing the Canary.");
1320        errorCode = FAILURE_EXIT_CODE;
1321        return true;
1322      }
1323      return false;
1324    }
1325
1326    @Override
1327    public void close() throws IOException {
1328      if (this.admin != null) {
1329        this.admin.close();
1330      }
1331    }
1332
1333    protected Monitor(Connection connection, String[] monitorTargets, boolean useRegExp, Sink sink,
1334      ExecutorService executor, boolean treatFailureAsError, long allowedFailures) {
1335      if (null == connection) {
1336        throw new IllegalArgumentException("connection shall not be null");
1337      }
1338
1339      this.connection = connection;
1340      this.targets = monitorTargets;
1341      this.useRegExp = useRegExp;
1342      this.treatFailureAsError = treatFailureAsError;
1343      this.sink = sink;
1344      this.executor = executor;
1345      this.allowedFailures = allowedFailures;
1346    }
1347
1348    @Override
1349    public abstract void run();
1350
1351    protected boolean initAdmin() {
1352      if (null == this.admin) {
1353        try {
1354          this.admin = this.connection.getAdmin();
1355        } catch (Exception e) {
1356          LOG.error("Initial HBaseAdmin failed...", e);
1357          this.errorCode = INIT_ERROR_EXIT_CODE;
1358        }
1359      } else if (admin.isAborted()) {
1360        LOG.error("HBaseAdmin aborted");
1361        this.errorCode = INIT_ERROR_EXIT_CODE;
1362      }
1363      return !this.hasError();
1364    }
1365  }
1366
1367  /**
1368   * A monitor for region mode.
1369   */
1370  private static class RegionMonitor extends Monitor {
1371    // 10 minutes
1372    private static final int DEFAULT_WRITE_TABLE_CHECK_PERIOD = 10 * 60 * 1000;
1373    // 1 days
1374    private static final int DEFAULT_WRITE_DATA_TTL = 24 * 60 * 60;
1375
1376    private long lastCheckTime = -1;
1377    private boolean writeSniffing;
1378    private TableName writeTableName;
1379    private int writeDataTTL;
1380    private float regionsLowerLimit;
1381    private float regionsUpperLimit;
1382    private int checkPeriod;
1383    private boolean rawScanEnabled;
1384    private boolean readAllCF;
1385
1386    /**
1387     * This is a timeout per table. If read of each region in the table aggregated takes longer than
1388     * what is configured here, we log an ERROR rather than just an INFO.
1389     */
1390    private HashMap<String, Long> configuredReadTableTimeouts;
1391
1392    private long configuredWriteTableTimeout;
1393
1394    public RegionMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
1395      Sink sink, ExecutorService executor, boolean writeSniffing, TableName writeTableName,
1396      boolean treatFailureAsError, HashMap<String, Long> configuredReadTableTimeouts,
1397      long configuredWriteTableTimeout, long allowedFailures) {
1398      super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError,
1399        allowedFailures);
1400      Configuration conf = connection.getConfiguration();
1401      this.writeSniffing = writeSniffing;
1402      this.writeTableName = writeTableName;
1403      this.writeDataTTL =
1404        conf.getInt(HConstants.HBASE_CANARY_WRITE_DATA_TTL_KEY, DEFAULT_WRITE_DATA_TTL);
1405      this.regionsLowerLimit =
1406        conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY, 1.0f);
1407      this.regionsUpperLimit =
1408        conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_UPPERLIMIT_KEY, 1.5f);
1409      this.checkPeriod = conf.getInt(HConstants.HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY,
1410        DEFAULT_WRITE_TABLE_CHECK_PERIOD);
1411      this.rawScanEnabled = conf.getBoolean(HConstants.HBASE_CANARY_READ_RAW_SCAN_KEY, false);
1412      this.configuredReadTableTimeouts = new HashMap<>(configuredReadTableTimeouts);
1413      this.configuredWriteTableTimeout = configuredWriteTableTimeout;
1414      this.readAllCF = conf.getBoolean(HConstants.HBASE_CANARY_READ_ALL_CF, true);
1415    }
1416
1417    private RegionStdOutSink getSink() {
1418      if (!(sink instanceof RegionStdOutSink)) {
1419        throw new RuntimeException("Can only write to Region sink");
1420      }
1421      return ((RegionStdOutSink) sink);
1422    }
1423
1424    @Override
1425    public void run() {
1426      if (this.initAdmin()) {
1427        try {
1428          List<Future<Void>> taskFutures = new LinkedList<>();
1429          RegionStdOutSink regionSink = this.getSink();
1430          regionSink.resetFailuresCountDetails();
1431          if (this.targets != null && this.targets.length > 0) {
1432            String[] tables = generateMonitorTables(this.targets);
1433            // Check to see that each table name passed in the -readTableTimeouts argument is also
1434            // passed as a monitor target.
1435            if (
1436              !new HashSet<>(Arrays.asList(tables))
1437                .containsAll(this.configuredReadTableTimeouts.keySet())
1438            ) {
1439              LOG.error("-readTableTimeouts can only specify read timeouts for monitor targets "
1440                + "passed via command line.");
1441              this.errorCode = USAGE_EXIT_CODE;
1442              return;
1443            }
1444            this.initialized = true;
1445            for (String table : tables) {
1446              LongAdder readLatency = regionSink.initializeAndGetReadLatencyForTable(table);
1447              taskFutures.addAll(CanaryTool.sniff(admin, regionSink, table, executor, TaskType.READ,
1448                this.rawScanEnabled, readLatency, readAllCF));
1449            }
1450          } else {
1451            taskFutures.addAll(sniff(TaskType.READ, regionSink));
1452          }
1453
1454          if (writeSniffing) {
1455            if (EnvironmentEdgeManager.currentTime() - lastCheckTime > checkPeriod) {
1456              try {
1457                checkWriteTableDistribution();
1458              } catch (IOException e) {
1459                LOG.error("Check canary table distribution failed!", e);
1460              }
1461              lastCheckTime = EnvironmentEdgeManager.currentTime();
1462            }
1463            // sniff canary table with write operation
1464            regionSink.initializeWriteLatency();
1465            LongAdder writeTableLatency = regionSink.getWriteLatency();
1466            taskFutures
1467              .addAll(CanaryTool.sniff(admin, regionSink, admin.getDescriptor(writeTableName),
1468                executor, TaskType.WRITE, this.rawScanEnabled, writeTableLatency, readAllCF));
1469          }
1470
1471          for (Future<Void> future : taskFutures) {
1472            try {
1473              future.get();
1474            } catch (ExecutionException e) {
1475              LOG.error("Sniff region failed!", e);
1476            }
1477          }
1478          Map<String, LongAdder> actualReadTableLatency = regionSink.getReadLatencyMap();
1479          for (Map.Entry<String, Long> entry : configuredReadTableTimeouts.entrySet()) {
1480            String tableName = entry.getKey();
1481            if (actualReadTableLatency.containsKey(tableName)) {
1482              Long actual = actualReadTableLatency.get(tableName).longValue();
1483              Long configured = entry.getValue();
1484              if (actual > configured) {
1485                LOG.error("Read operation for {} took {}ms exceeded the configured read timeout."
1486                  + "(Configured read timeout {}ms.", tableName, actual, configured);
1487              } else {
1488                LOG.info("Read operation for {} took {}ms (Configured read timeout {}ms.",
1489                  tableName, actual, configured);
1490              }
1491            } else {
1492              LOG.error("Read operation for {} failed!", tableName);
1493            }
1494          }
1495          if (this.writeSniffing) {
1496            String writeTableStringName = this.writeTableName.getNameAsString();
1497            long actualWriteLatency = regionSink.getWriteLatency().longValue();
1498            LOG.info("Write operation for {} took {}ms. Configured write timeout {}ms.",
1499              writeTableStringName, actualWriteLatency, this.configuredWriteTableTimeout);
1500            // Check that the writeTable write operation latency does not exceed the configured
1501            // timeout.
1502            if (actualWriteLatency > this.configuredWriteTableTimeout) {
1503              LOG.error("Write operation for {} exceeded the configured write timeout.",
1504                writeTableStringName);
1505            }
1506          }
1507        } catch (Exception e) {
1508          LOG.error("Run regionMonitor failed", e);
1509          this.errorCode = ERROR_EXIT_CODE;
1510        } finally {
1511          this.done = true;
1512        }
1513      }
1514      this.done = true;
1515    }
1516
1517    /**
1518     * @return List of tables to use in test.
1519     */
1520    private String[] generateMonitorTables(String[] monitorTargets) throws IOException {
1521      String[] returnTables = null;
1522
1523      if (this.useRegExp) {
1524        Pattern pattern = null;
1525        List<TableDescriptor> tds = null;
1526        Set<String> tmpTables = new TreeSet<>();
1527        try {
1528          LOG.debug(String.format("reading list of tables"));
1529          tds = this.admin.listTableDescriptors(pattern);
1530          if (tds == null) {
1531            tds = Collections.emptyList();
1532          }
1533          for (String monitorTarget : monitorTargets) {
1534            pattern = Pattern.compile(monitorTarget);
1535            for (TableDescriptor td : tds) {
1536              if (pattern.matcher(td.getTableName().getNameAsString()).matches()) {
1537                tmpTables.add(td.getTableName().getNameAsString());
1538              }
1539            }
1540          }
1541        } catch (IOException e) {
1542          LOG.error("Communicate with admin failed", e);
1543          throw e;
1544        }
1545
1546        if (tmpTables.size() > 0) {
1547          returnTables = tmpTables.toArray(new String[tmpTables.size()]);
1548        } else {
1549          String msg = "No HTable found, tablePattern:" + Arrays.toString(monitorTargets);
1550          LOG.error(msg);
1551          this.errorCode = INIT_ERROR_EXIT_CODE;
1552          throw new TableNotFoundException(msg);
1553        }
1554      } else {
1555        returnTables = monitorTargets;
1556      }
1557
1558      return returnTables;
1559    }
1560
1561    /*
1562     * Canary entry point to monitor all the tables.
1563     */
1564    private List<Future<Void>> sniff(TaskType taskType, RegionStdOutSink regionSink)
1565      throws Exception {
1566      LOG.debug("Reading list of tables");
1567      List<Future<Void>> taskFutures = new LinkedList<>();
1568      for (TableDescriptor td : admin.listTableDescriptors()) {
1569        if (
1570          admin.tableExists(td.getTableName()) && admin.isTableEnabled(td.getTableName())
1571            && (!td.getTableName().equals(writeTableName))
1572        ) {
1573          LongAdder readLatency =
1574            regionSink.initializeAndGetReadLatencyForTable(td.getTableName().getNameAsString());
1575          taskFutures.addAll(CanaryTool.sniff(admin, sink, td, executor, taskType,
1576            this.rawScanEnabled, readLatency, readAllCF));
1577        }
1578      }
1579      return taskFutures;
1580    }
1581
1582    private void checkWriteTableDistribution() throws IOException {
1583      if (!admin.tableExists(writeTableName)) {
1584        int numberOfServers = admin.getRegionServers().size();
1585        if (numberOfServers == 0) {
1586          throw new IllegalStateException("No live regionservers");
1587        }
1588        createWriteTable(numberOfServers);
1589      }
1590
1591      if (!admin.isTableEnabled(writeTableName)) {
1592        admin.enableTable(writeTableName);
1593      }
1594
1595      ClusterMetrics status =
1596        admin.getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER));
1597      int numberOfServers = status.getServersName().size();
1598      if (status.getServersName().contains(status.getMasterName())) {
1599        numberOfServers -= 1;
1600      }
1601
1602      List<Pair<RegionInfo, ServerName>> pairs =
1603        MetaTableAccessor.getTableRegionsAndLocations(connection, writeTableName);
1604      int numberOfRegions = pairs.size();
1605      if (
1606        numberOfRegions < numberOfServers * regionsLowerLimit
1607          || numberOfRegions > numberOfServers * regionsUpperLimit
1608      ) {
1609        admin.disableTable(writeTableName);
1610        admin.deleteTable(writeTableName);
1611        createWriteTable(numberOfServers);
1612      }
1613      HashSet<ServerName> serverSet = new HashSet<>();
1614      for (Pair<RegionInfo, ServerName> pair : pairs) {
1615        serverSet.add(pair.getSecond());
1616      }
1617      int numberOfCoveredServers = serverSet.size();
1618      if (numberOfCoveredServers < numberOfServers) {
1619        admin.balance();
1620      }
1621    }
1622
1623    private void createWriteTable(int numberOfServers) throws IOException {
1624      int numberOfRegions = (int) (numberOfServers * regionsLowerLimit);
1625      LOG.info("Number of live regionservers {}, pre-splitting the canary table into {} regions "
1626        + "(current lower limit of regions per server is {} and you can change it with config {}).",
1627        numberOfServers, numberOfRegions, regionsLowerLimit,
1628        HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY);
1629      ColumnFamilyDescriptor family =
1630        ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(CANARY_TABLE_FAMILY_NAME))
1631          .setMaxVersions(1).setTimeToLive(writeDataTTL).build();
1632      TableDescriptor desc =
1633        TableDescriptorBuilder.newBuilder(writeTableName).setColumnFamily(family).build();
1634      byte[][] splits = new RegionSplitter.HexStringSplit().split(numberOfRegions);
1635      admin.createTable(desc, splits);
1636    }
1637  }
1638
1639  /**
1640   * Canary entry point for specified table.
1641   * @throws Exception exception
1642   */
1643  private static List<Future<Void>> sniff(final Admin admin, final Sink sink, String tableName,
1644    ExecutorService executor, TaskType taskType, boolean rawScanEnabled, LongAdder readLatency,
1645    boolean readAllCF) throws Exception {
1646    LOG.debug("Checking table is enabled and getting table descriptor for table {}", tableName);
1647    if (admin.isTableEnabled(TableName.valueOf(tableName))) {
1648      return CanaryTool.sniff(admin, sink, admin.getDescriptor(TableName.valueOf(tableName)),
1649        executor, taskType, rawScanEnabled, readLatency, readAllCF);
1650    } else {
1651      LOG.warn("Table {} is not enabled", tableName);
1652    }
1653    return new LinkedList<>();
1654  }
1655
1656  /*
1657   * Loops over regions of this table, and outputs information about the state.
1658   */
1659  private static List<Future<Void>> sniff(final Admin admin, final Sink sink,
1660    TableDescriptor tableDesc, ExecutorService executor, TaskType taskType, boolean rawScanEnabled,
1661    LongAdder rwLatency, boolean readAllCF) throws Exception {
1662    LOG.debug("Reading list of regions for table {}", tableDesc.getTableName());
1663    try (Table table = admin.getConnection().getTable(tableDesc.getTableName())) {
1664      List<RegionTask> tasks = new ArrayList<>();
1665      try (RegionLocator regionLocator =
1666        admin.getConnection().getRegionLocator(tableDesc.getTableName())) {
1667        for (HRegionLocation location : regionLocator.getAllRegionLocations()) {
1668          if (location == null) {
1669            LOG.warn("Null location");
1670            continue;
1671          }
1672          ServerName rs = location.getServerName();
1673          RegionInfo region = location.getRegion();
1674          tasks.add(new RegionTask(admin.getConnection(), region, rs, (RegionStdOutSink) sink,
1675            taskType, rawScanEnabled, rwLatency, readAllCF));
1676          Map<String, List<RegionTaskResult>> regionMap = ((RegionStdOutSink) sink).getRegionMap();
1677          regionMap.put(region.getRegionNameAsString(), new ArrayList<RegionTaskResult>());
1678        }
1679        return executor.invokeAll(tasks);
1680      }
1681    } catch (TableNotFoundException e) {
1682      return Collections.EMPTY_LIST;
1683    }
1684  }
1685
1686  // monitor for zookeeper mode
1687  private static class ZookeeperMonitor extends Monitor {
1688    private List<String> hosts;
1689    private final String znode;
1690    private final int timeout;
1691
1692    protected ZookeeperMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
1693      Sink sink, ExecutorService executor, boolean treatFailureAsError, long allowedFailures) {
1694      super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError,
1695        allowedFailures);
1696      Configuration configuration = connection.getConfiguration();
1697      znode = configuration.get(ZOOKEEPER_ZNODE_PARENT, DEFAULT_ZOOKEEPER_ZNODE_PARENT);
1698      timeout =
1699        configuration.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
1700      ConnectStringParser parser =
1701        new ConnectStringParser(ZKConfig.getZKQuorumServersString(configuration));
1702      hosts = Lists.newArrayList();
1703      for (InetSocketAddress server : parser.getServerAddresses()) {
1704        hosts.add(inetSocketAddress2String(server));
1705      }
1706      if (allowedFailures > (hosts.size() - 1) / 2) {
1707        LOG.warn(
1708          "Confirm allowable number of failed ZooKeeper nodes, as quorum will "
1709            + "already be lost. Setting of {} failures is unexpected for {} ensemble size.",
1710          allowedFailures, hosts.size());
1711      }
1712    }
1713
1714    @Override
1715    public void run() {
1716      List<ZookeeperTask> tasks = Lists.newArrayList();
1717      ZookeeperStdOutSink zkSink = null;
1718      try {
1719        zkSink = this.getSink();
1720      } catch (RuntimeException e) {
1721        LOG.error("Run ZooKeeperMonitor failed!", e);
1722        this.errorCode = ERROR_EXIT_CODE;
1723      }
1724      this.initialized = true;
1725      for (final String host : hosts) {
1726        tasks.add(new ZookeeperTask(connection, host, znode, timeout, zkSink));
1727      }
1728      try {
1729        for (Future<Void> future : this.executor.invokeAll(tasks)) {
1730          try {
1731            future.get();
1732          } catch (ExecutionException e) {
1733            LOG.error("Sniff zookeeper failed!", e);
1734            this.errorCode = ERROR_EXIT_CODE;
1735          }
1736        }
1737      } catch (InterruptedException e) {
1738        this.errorCode = ERROR_EXIT_CODE;
1739        Thread.currentThread().interrupt();
1740        LOG.error("Sniff zookeeper interrupted!", e);
1741      }
1742      this.done = true;
1743    }
1744
1745    private ZookeeperStdOutSink getSink() {
1746      if (!(sink instanceof ZookeeperStdOutSink)) {
1747        throw new RuntimeException("Can only write to zookeeper sink");
1748      }
1749      return ((ZookeeperStdOutSink) sink);
1750    }
1751  }
1752
1753  /**
1754   * A monitor for regionserver mode
1755   */
1756  private static class RegionServerMonitor extends Monitor {
1757    private boolean allRegions;
1758
1759    public RegionServerMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
1760      Sink sink, ExecutorService executor, boolean allRegions, boolean treatFailureAsError,
1761      long allowedFailures) {
1762      super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError,
1763        allowedFailures);
1764      this.allRegions = allRegions;
1765    }
1766
1767    private RegionServerStdOutSink getSink() {
1768      if (!(sink instanceof RegionServerStdOutSink)) {
1769        throw new RuntimeException("Can only write to regionserver sink");
1770      }
1771      return ((RegionServerStdOutSink) sink);
1772    }
1773
1774    @Override
1775    public void run() {
1776      if (this.initAdmin() && this.checkNoTableNames()) {
1777        RegionServerStdOutSink regionServerSink = null;
1778        try {
1779          regionServerSink = this.getSink();
1780        } catch (RuntimeException e) {
1781          LOG.error("Run RegionServerMonitor failed!", e);
1782          this.errorCode = ERROR_EXIT_CODE;
1783        }
1784        Map<String, List<RegionInfo>> rsAndRMap = this.filterRegionServerByName();
1785        this.initialized = true;
1786        this.monitorRegionServers(rsAndRMap, regionServerSink);
1787      }
1788      this.done = true;
1789    }
1790
1791    private boolean checkNoTableNames() {
1792      List<String> foundTableNames = new ArrayList<>();
1793      TableName[] tableNames = null;
1794      LOG.debug("Reading list of tables");
1795      try {
1796        tableNames = this.admin.listTableNames();
1797      } catch (IOException e) {
1798        LOG.error("Get listTableNames failed", e);
1799        this.errorCode = INIT_ERROR_EXIT_CODE;
1800        return false;
1801      }
1802
1803      if (this.targets == null || this.targets.length == 0) {
1804        return true;
1805      }
1806
1807      for (String target : this.targets) {
1808        for (TableName tableName : tableNames) {
1809          if (target.equals(tableName.getNameAsString())) {
1810            foundTableNames.add(target);
1811          }
1812        }
1813      }
1814
1815      if (foundTableNames.size() > 0) {
1816        System.err.println("Cannot pass a tablename when using the -regionserver "
1817          + "option, tablenames:" + foundTableNames.toString());
1818        this.errorCode = USAGE_EXIT_CODE;
1819      }
1820      return foundTableNames.isEmpty();
1821    }
1822
1823    private void monitorRegionServers(Map<String, List<RegionInfo>> rsAndRMap,
1824      RegionServerStdOutSink regionServerSink) {
1825      List<RegionServerTask> tasks = new ArrayList<>();
1826      Map<String, AtomicLong> successMap = new HashMap<>();
1827      for (Map.Entry<String, List<RegionInfo>> entry : rsAndRMap.entrySet()) {
1828        String serverName = entry.getKey();
1829        AtomicLong successes = new AtomicLong(0);
1830        successMap.put(serverName, successes);
1831        if (entry.getValue().isEmpty()) {
1832          LOG.error("Regionserver not serving any regions - {}", serverName);
1833        } else if (this.allRegions) {
1834          for (RegionInfo region : entry.getValue()) {
1835            tasks.add(new RegionServerTask(this.connection, serverName, region, regionServerSink,
1836              successes));
1837          }
1838        } else {
1839          // random select a region if flag not set
1840          RegionInfo region =
1841            entry.getValue().get(ThreadLocalRandom.current().nextInt(entry.getValue().size()));
1842          tasks.add(
1843            new RegionServerTask(this.connection, serverName, region, regionServerSink, successes));
1844        }
1845      }
1846      try {
1847        for (Future<Void> future : this.executor.invokeAll(tasks)) {
1848          try {
1849            future.get();
1850          } catch (ExecutionException e) {
1851            LOG.error("Sniff regionserver failed!", e);
1852            this.errorCode = ERROR_EXIT_CODE;
1853          }
1854        }
1855        if (this.allRegions) {
1856          for (Map.Entry<String, List<RegionInfo>> entry : rsAndRMap.entrySet()) {
1857            String serverName = entry.getKey();
1858            LOG.info("Successfully read {} regions out of {} on regionserver {}",
1859              successMap.get(serverName), entry.getValue().size(), serverName);
1860          }
1861        }
1862      } catch (InterruptedException e) {
1863        this.errorCode = ERROR_EXIT_CODE;
1864        LOG.error("Sniff regionserver interrupted!", e);
1865      }
1866    }
1867
1868    private Map<String, List<RegionInfo>> filterRegionServerByName() {
1869      Map<String, List<RegionInfo>> regionServerAndRegionsMap = this.getAllRegionServerByName();
1870      regionServerAndRegionsMap = this.doFilterRegionServerByName(regionServerAndRegionsMap);
1871      return regionServerAndRegionsMap;
1872    }
1873
1874    private Map<String, List<RegionInfo>> getAllRegionServerByName() {
1875      Map<String, List<RegionInfo>> rsAndRMap = new HashMap<>();
1876      try {
1877        LOG.debug("Reading list of tables and locations");
1878        List<TableDescriptor> tableDescs = this.admin.listTableDescriptors();
1879        List<RegionInfo> regions = null;
1880        for (TableDescriptor tableDesc : tableDescs) {
1881          try (RegionLocator regionLocator =
1882            this.admin.getConnection().getRegionLocator(tableDesc.getTableName())) {
1883            for (HRegionLocation location : regionLocator.getAllRegionLocations()) {
1884              if (location == null) {
1885                LOG.warn("Null location");
1886                continue;
1887              }
1888              ServerName rs = location.getServerName();
1889              String rsName = rs.getHostname();
1890              RegionInfo r = location.getRegion();
1891              if (rsAndRMap.containsKey(rsName)) {
1892                regions = rsAndRMap.get(rsName);
1893              } else {
1894                regions = new ArrayList<>();
1895                rsAndRMap.put(rsName, regions);
1896              }
1897              regions.add(r);
1898            }
1899          }
1900        }
1901
1902        // get any live regionservers not serving any regions
1903        for (ServerName rs : this.admin.getRegionServers()) {
1904          String rsName = rs.getHostname();
1905          if (!rsAndRMap.containsKey(rsName)) {
1906            rsAndRMap.put(rsName, Collections.<RegionInfo> emptyList());
1907          }
1908        }
1909      } catch (IOException e) {
1910        LOG.error("Get HTables info failed", e);
1911        this.errorCode = INIT_ERROR_EXIT_CODE;
1912      }
1913      return rsAndRMap;
1914    }
1915
1916    private Map<String, List<RegionInfo>>
1917      doFilterRegionServerByName(Map<String, List<RegionInfo>> fullRsAndRMap) {
1918
1919      Map<String, List<RegionInfo>> filteredRsAndRMap = null;
1920
1921      if (this.targets != null && this.targets.length > 0) {
1922        filteredRsAndRMap = new HashMap<>();
1923        Pattern pattern = null;
1924        Matcher matcher = null;
1925        boolean regExpFound = false;
1926        for (String rsName : this.targets) {
1927          if (this.useRegExp) {
1928            regExpFound = false;
1929            pattern = Pattern.compile(rsName);
1930            for (Map.Entry<String, List<RegionInfo>> entry : fullRsAndRMap.entrySet()) {
1931              matcher = pattern.matcher(entry.getKey());
1932              if (matcher.matches()) {
1933                filteredRsAndRMap.put(entry.getKey(), entry.getValue());
1934                regExpFound = true;
1935              }
1936            }
1937            if (!regExpFound) {
1938              LOG.info("No RegionServerInfo found, regionServerPattern {}", rsName);
1939            }
1940          } else {
1941            if (fullRsAndRMap.containsKey(rsName)) {
1942              filteredRsAndRMap.put(rsName, fullRsAndRMap.get(rsName));
1943            } else {
1944              LOG.info("No RegionServerInfo found, regionServerName {}", rsName);
1945            }
1946          }
1947        }
1948      } else {
1949        filteredRsAndRMap = fullRsAndRMap;
1950      }
1951      return filteredRsAndRMap;
1952    }
1953  }
1954
1955  public static void main(String[] args) throws Exception {
1956    final Configuration conf = HBaseConfiguration.create();
1957
1958    int numThreads = conf.getInt("hbase.canary.threads.num", MAX_THREADS_NUM);
1959    LOG.info("Execution thread count={}", numThreads);
1960
1961    int exitCode;
1962    ExecutorService executor = new ScheduledThreadPoolExecutor(numThreads);
1963    try {
1964      exitCode = ToolRunner.run(conf, new CanaryTool(executor), args);
1965    } finally {
1966      executor.shutdown();
1967    }
1968    System.exit(exitCode);
1969  }
1970}