001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.tool;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT;
021import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_ZNODE_PARENT;
022import static org.apache.hadoop.hbase.util.Addressing.inetSocketAddress2String;
023
024import java.io.Closeable;
025import java.io.IOException;
026import java.net.BindException;
027import java.net.InetSocketAddress;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Collections;
031import java.util.EnumSet;
032import java.util.HashMap;
033import java.util.HashSet;
034import java.util.LinkedList;
035import java.util.List;
036import java.util.Map;
037import java.util.Set;
038import java.util.TreeSet;
039import java.util.concurrent.Callable;
040import java.util.concurrent.ConcurrentHashMap;
041import java.util.concurrent.ConcurrentMap;
042import java.util.concurrent.ExecutionException;
043import java.util.concurrent.ExecutorService;
044import java.util.concurrent.Future;
045import java.util.concurrent.ScheduledThreadPoolExecutor;
046import java.util.concurrent.ThreadLocalRandom;
047import java.util.concurrent.atomic.AtomicLong;
048import java.util.concurrent.atomic.LongAdder;
049import java.util.regex.Matcher;
050import java.util.regex.Pattern;
051import org.apache.commons.lang3.time.StopWatch;
052import org.apache.hadoop.conf.Configuration;
053import org.apache.hadoop.hbase.AuthUtil;
054import org.apache.hadoop.hbase.ChoreService;
055import org.apache.hadoop.hbase.ClusterMetrics;
056import org.apache.hadoop.hbase.ClusterMetrics.Option;
057import org.apache.hadoop.hbase.DoNotRetryIOException;
058import org.apache.hadoop.hbase.HBaseConfiguration;
059import org.apache.hadoop.hbase.HBaseInterfaceAudience;
060import org.apache.hadoop.hbase.HConstants;
061import org.apache.hadoop.hbase.HRegionLocation;
062import org.apache.hadoop.hbase.MetaTableAccessor;
063import org.apache.hadoop.hbase.NamespaceDescriptor;
064import org.apache.hadoop.hbase.ScheduledChore;
065import org.apache.hadoop.hbase.ServerName;
066import org.apache.hadoop.hbase.TableName;
067import org.apache.hadoop.hbase.TableNotEnabledException;
068import org.apache.hadoop.hbase.TableNotFoundException;
069import org.apache.hadoop.hbase.client.Admin;
070import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
071import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
072import org.apache.hadoop.hbase.client.Connection;
073import org.apache.hadoop.hbase.client.ConnectionFactory;
074import org.apache.hadoop.hbase.client.Get;
075import org.apache.hadoop.hbase.client.Put;
076import org.apache.hadoop.hbase.client.RegionInfo;
077import org.apache.hadoop.hbase.client.RegionLocator;
078import org.apache.hadoop.hbase.client.ResultScanner;
079import org.apache.hadoop.hbase.client.Scan;
080import org.apache.hadoop.hbase.client.Table;
081import org.apache.hadoop.hbase.client.TableDescriptor;
082import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
083import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
084import org.apache.hadoop.hbase.http.InfoServer;
085import org.apache.hadoop.hbase.tool.CanaryTool.RegionTask.TaskType;
086import org.apache.hadoop.hbase.util.Bytes;
087import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
088import org.apache.hadoop.hbase.util.Pair;
089import org.apache.hadoop.hbase.util.ReflectionUtils;
090import org.apache.hadoop.hbase.util.RegionSplitter;
091import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
092import org.apache.hadoop.hbase.zookeeper.ZKConfig;
093import org.apache.hadoop.util.Tool;
094import org.apache.hadoop.util.ToolRunner;
095import org.apache.yetus.audience.InterfaceAudience;
096import org.apache.zookeeper.KeeperException;
097import org.apache.zookeeper.ZooKeeper;
098import org.apache.zookeeper.client.ConnectStringParser;
099import org.apache.zookeeper.data.Stat;
100import org.slf4j.Logger;
101import org.slf4j.LoggerFactory;
102
103import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
104
105/**
106 * HBase Canary Tool for "canary monitoring" of a running HBase cluster. There are three modes:
107 * <ol>
108 * <li>region mode (Default): For each region, try to get one row per column family outputting
109 * information on failure (ERROR) or else the latency.</li>
110 * <li>regionserver mode: For each regionserver try to get one row from one table selected randomly
111 * outputting information on failure (ERROR) or else the latency.</li>
112 * <li>zookeeper mode: for each zookeeper instance, selects a znode outputting information on
113 * failure (ERROR) or else the latency.</li>
114 * </ol>
115 */
116@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
117public class CanaryTool implements Tool, Canary {
118  public static final String HBASE_CANARY_INFO_PORT = "hbase.canary.info.port";
119  public static final String HBASE_CANARY_INFO_BINDADDRESS = "hbase.canary.info.bindAddress";
120
121  private void putUpWebUI() throws IOException {
122    int port = conf.getInt(HBASE_CANARY_INFO_PORT, -1);
123    // -1 is for disabling info server
124    if (port < 0) {
125      return;
126    }
127    if (zookeeperMode) {
128      LOG.info("WebUI is not supported in Zookeeper mode");
129    } else if (regionServerMode) {
130      LOG.info("WebUI is not supported in RegionServer mode");
131    } else {
132      String addr = conf.get(HBASE_CANARY_INFO_BINDADDRESS, "0.0.0.0");
133      try {
134        InfoServer infoServer = new InfoServer("canary", addr, port, false, conf);
135        infoServer.addUnprivilegedServlet("canary", "/canary-status", CanaryStatusServlet.class);
136        infoServer.setAttribute("sink", getSink(conf, RegionStdOutSink.class));
137        infoServer.start();
138        LOG.info("Bind Canary http info server to {}:{} ", addr, port);
139      } catch (BindException e) {
140        LOG.warn("Failed binding Canary http info server to {}:{}", addr, port, e);
141      }
142    }
143  }
144
145  @Override
146  public int checkRegions(String[] targets) throws Exception {
147    String configuredReadTableTimeoutsStr = conf.get(HBASE_CANARY_REGION_READ_TABLE_TIMEOUT);
148    try {
149      LOG.info("Canary tool is running in Region mode");
150      if (configuredReadTableTimeoutsStr != null) {
151        populateReadTableTimeoutsMap(configuredReadTableTimeoutsStr);
152      }
153    } catch (IllegalArgumentException e) {
154      LOG.error("Constructing read table timeouts map failed ", e);
155      return USAGE_EXIT_CODE;
156    }
157    return runMonitor(targets);
158  }
159
160  @Override
161  public int checkRegionServers(String[] targets) throws Exception {
162    regionServerMode = true;
163    LOG.info("Canary tool is running in RegionServer mode");
164    return runMonitor(targets);
165  }
166
167  @Override
168  public int checkZooKeeper() throws Exception {
169    zookeeperMode = true;
170    LOG.info("Canary tool is running in ZooKeeper mode");
171    return runMonitor(null);
172  }
173
174  /**
175   * Sink interface used by the canary to output information
176   */
177  public interface Sink {
178    long getReadFailureCount();
179
180    long incReadFailureCount();
181
182    Map<String, String> getReadFailures();
183
184    void updateReadFailures(String regionName, String serverName);
185
186    long getWriteFailureCount();
187
188    long incWriteFailureCount();
189
190    Map<String, String> getWriteFailures();
191
192    void updateWriteFailures(String regionName, String serverName);
193
194    long getReadSuccessCount();
195
196    long incReadSuccessCount();
197
198    long getWriteSuccessCount();
199
200    long incWriteSuccessCount();
201
202    void stop();
203
204    boolean isStopped();
205  }
206
207  /**
208   * Simple implementation of canary sink that allows plotting to a file or standard output.
209   */
210  public static class StdOutSink implements Sink {
211    private AtomicLong readFailureCount = new AtomicLong(0), writeFailureCount = new AtomicLong(0),
212        readSuccessCount = new AtomicLong(0), writeSuccessCount = new AtomicLong(0);
213    private Map<String, String> readFailures = new ConcurrentHashMap<>();
214    private Map<String, String> writeFailures = new ConcurrentHashMap<>();
215    private volatile boolean stopped = false;
216
217    @Override
218    public long getReadFailureCount() {
219      return readFailureCount.get();
220    }
221
222    @Override
223    public long incReadFailureCount() {
224      return readFailureCount.incrementAndGet();
225    }
226
227    @Override
228    public Map<String, String> getReadFailures() {
229      return readFailures;
230    }
231
232    @Override
233    public void updateReadFailures(String regionName, String serverName) {
234      readFailures.put(regionName, serverName);
235    }
236
237    @Override
238    public long getWriteFailureCount() {
239      return writeFailureCount.get();
240    }
241
242    @Override
243    public long incWriteFailureCount() {
244      return writeFailureCount.incrementAndGet();
245    }
246
247    @Override
248    public Map<String, String> getWriteFailures() {
249      return writeFailures;
250    }
251
252    @Override
253    public void updateWriteFailures(String regionName, String serverName) {
254      writeFailures.put(regionName, serverName);
255    }
256
257    @Override
258    public long getReadSuccessCount() {
259      return readSuccessCount.get();
260    }
261
262    @Override
263    public long incReadSuccessCount() {
264      return readSuccessCount.incrementAndGet();
265    }
266
267    @Override
268    public long getWriteSuccessCount() {
269      return writeSuccessCount.get();
270    }
271
272    @Override
273    public long incWriteSuccessCount() {
274      return writeSuccessCount.incrementAndGet();
275    }
276
277    public void stop() {
278      stopped = true;
279    }
280
281    @Override
282    public boolean isStopped() {
283      return stopped;
284    }
285  }
286
287  /**
288   * By RegionServer, for 'regionserver' mode.
289   */
290  public static class RegionServerStdOutSink extends StdOutSink {
291    public void publishReadFailure(String table, String server) {
292      incReadFailureCount();
293      LOG.error("Read from {} on {}", table, server);
294    }
295
296    public void publishReadTiming(String table, String server, long msTime) {
297      LOG.info("Read from {} on {} in {}ms", table, server, msTime);
298    }
299  }
300
301  /**
302   * Output for 'zookeeper' mode.
303   */
304  public static class ZookeeperStdOutSink extends StdOutSink {
305    public void publishReadFailure(String znode, String server) {
306      incReadFailureCount();
307      LOG.error("Read from {} on {}", znode, server);
308    }
309
310    public void publishReadTiming(String znode, String server, long msTime) {
311      LOG.info("Read from {} on {} in {}ms", znode, server, msTime);
312    }
313  }
314
315  /**
316   * By Region, for 'region' mode.
317   */
318  public static class RegionStdOutSink extends StdOutSink {
319    private Map<String, LongAdder> perTableReadLatency = new HashMap<>();
320    private LongAdder writeLatency = new LongAdder();
321    private final ConcurrentMap<String, List<RegionTaskResult>> regionMap =
322      new ConcurrentHashMap<>();
323    private ConcurrentMap<ServerName, LongAdder> perServerFailuresCount = new ConcurrentHashMap<>();
324    private ConcurrentMap<String, LongAdder> perTableFailuresCount = new ConcurrentHashMap<>();
325
326    public ConcurrentMap<ServerName, LongAdder> getPerServerFailuresCount() {
327      return perServerFailuresCount;
328    }
329
330    public ConcurrentMap<String, LongAdder> getPerTableFailuresCount() {
331      return perTableFailuresCount;
332    }
333
334    public void resetFailuresCountDetails() {
335      perServerFailuresCount.clear();
336      perTableFailuresCount.clear();
337    }
338
339    private void incFailuresCountDetails(ServerName serverName, RegionInfo region) {
340      if (serverName != null) {
341        perServerFailuresCount.compute(serverName, (server, count) -> {
342          if (count == null) {
343            count = new LongAdder();
344          }
345          count.increment();
346          return count;
347        });
348      }
349      perTableFailuresCount.compute(region.getTable().getNameAsString(), (tableName, count) -> {
350        if (count == null) {
351          count = new LongAdder();
352        }
353        count.increment();
354        return count;
355      });
356    }
357
358    public void publishReadFailure(ServerName serverName, RegionInfo region, Exception e) {
359      LOG.error("Read from {} on serverName={} failed", region.getRegionNameAsString(), serverName,
360        e);
361      incReadFailureCount();
362      incFailuresCountDetails(serverName, region);
363    }
364
365    public void publishReadFailure(ServerName serverName, RegionInfo region,
366      ColumnFamilyDescriptor column, Exception e) {
367      LOG.error("Read from {} on serverName={}, columnFamily={} failed",
368        region.getRegionNameAsString(), serverName, column.getNameAsString(), e);
369      incReadFailureCount();
370      incFailuresCountDetails(serverName, region);
371    }
372
373    public void publishReadTiming(ServerName serverName, RegionInfo region,
374      ColumnFamilyDescriptor column, long msTime) {
375      RegionTaskResult rtr = new RegionTaskResult(region, region.getTable(), serverName, column);
376      rtr.setReadSuccess();
377      rtr.setReadLatency(msTime);
378      List<RegionTaskResult> rtrs = regionMap.get(region.getRegionNameAsString());
379      rtrs.add(rtr);
380      // Note that read success count will be equal to total column family read successes.
381      incReadSuccessCount();
382      LOG.info("Read from {} on {} {} in {}ms", region.getRegionNameAsString(), serverName,
383        column.getNameAsString(), msTime);
384    }
385
386    public void publishWriteFailure(ServerName serverName, RegionInfo region, Exception e) {
387      LOG.error("Write to {} on {} failed", region.getRegionNameAsString(), serverName, e);
388      incWriteFailureCount();
389      incFailuresCountDetails(serverName, region);
390    }
391
392    public void publishWriteFailure(ServerName serverName, RegionInfo region,
393      ColumnFamilyDescriptor column, Exception e) {
394      LOG.error("Write to {} on {} {} failed", region.getRegionNameAsString(), serverName,
395        column.getNameAsString(), e);
396      incWriteFailureCount();
397      incFailuresCountDetails(serverName, region);
398    }
399
400    public void publishWriteTiming(ServerName serverName, RegionInfo region,
401      ColumnFamilyDescriptor column, long msTime) {
402      RegionTaskResult rtr = new RegionTaskResult(region, region.getTable(), serverName, column);
403      rtr.setWriteSuccess();
404      rtr.setWriteLatency(msTime);
405      List<RegionTaskResult> rtrs = regionMap.get(region.getRegionNameAsString());
406      rtrs.add(rtr);
407      // Note that write success count will be equal to total column family write successes.
408      incWriteSuccessCount();
409      LOG.info("Write to {} on {} {} in {}ms", region.getRegionNameAsString(), serverName,
410        column.getNameAsString(), msTime);
411    }
412
413    public Map<String, LongAdder> getReadLatencyMap() {
414      return this.perTableReadLatency;
415    }
416
417    public LongAdder initializeAndGetReadLatencyForTable(String tableName) {
418      LongAdder initLatency = new LongAdder();
419      this.perTableReadLatency.put(tableName, initLatency);
420      return initLatency;
421    }
422
423    public void initializeWriteLatency() {
424      this.writeLatency.reset();
425    }
426
427    public LongAdder getWriteLatency() {
428      return this.writeLatency;
429    }
430
431    public ConcurrentMap<String, List<RegionTaskResult>> getRegionMap() {
432      return this.regionMap;
433    }
434
435    public int getTotalExpectedRegions() {
436      return this.regionMap.size();
437    }
438  }
439
440  /**
441   * Run a single zookeeper Task and then exit.
442   */
443  static class ZookeeperTask implements Callable<Void> {
444    private final Connection connection;
445    private final String host;
446    private String znode;
447    private final int timeout;
448    private ZookeeperStdOutSink sink;
449
450    public ZookeeperTask(Connection connection, String host, String znode, int timeout,
451      ZookeeperStdOutSink sink) {
452      this.connection = connection;
453      this.host = host;
454      this.znode = znode;
455      this.timeout = timeout;
456      this.sink = sink;
457    }
458
459    @Override
460    public Void call() throws Exception {
461      if (this.sink.isStopped()) {
462        return null;
463      }
464      ZooKeeper zooKeeper = null;
465      try {
466        zooKeeper = new ZooKeeper(host, timeout, EmptyWatcher.instance);
467        Stat exists = zooKeeper.exists(znode, false);
468        StopWatch stopwatch = new StopWatch();
469        stopwatch.start();
470        zooKeeper.getData(znode, false, exists);
471        stopwatch.stop();
472        sink.publishReadTiming(znode, host, stopwatch.getTime());
473      } catch (KeeperException | InterruptedException e) {
474        sink.publishReadFailure(znode, host);
475      } finally {
476        if (zooKeeper != null) {
477          zooKeeper.close();
478        }
479      }
480      return null;
481    }
482  }
483
484  /**
485   * Run a single Region Task and then exit. For each column family of the Region, get one row and
486   * output latency or failure.
487   */
488  static class RegionTask implements Callable<Void> {
489    public enum TaskType {
490      READ,
491      WRITE
492    }
493
494    private Connection connection;
495    private RegionInfo region;
496    private RegionStdOutSink sink;
497    private TaskType taskType;
498    private boolean rawScanEnabled;
499    private ServerName serverName;
500    private LongAdder readWriteLatency;
501    private boolean readAllCF;
502
503    RegionTask(Connection connection, RegionInfo region, ServerName serverName,
504      RegionStdOutSink sink, TaskType taskType, boolean rawScanEnabled, LongAdder rwLatency,
505      boolean readAllCF) {
506      this.connection = connection;
507      this.region = region;
508      this.serverName = serverName;
509      this.sink = sink;
510      this.taskType = taskType;
511      this.rawScanEnabled = rawScanEnabled;
512      this.readWriteLatency = rwLatency;
513      this.readAllCF = readAllCF;
514    }
515
516    @Override
517    public Void call() {
518      if (this.sink.isStopped()) {
519        return null;
520      }
521      switch (taskType) {
522        case READ:
523          return read();
524        case WRITE:
525          return write();
526        default:
527          return read();
528      }
529    }
530
531    private Void readColumnFamily(Table table, ColumnFamilyDescriptor column) {
532      byte[] startKey = null;
533      Scan scan = null;
534      ResultScanner rs = null;
535      StopWatch stopWatch = new StopWatch();
536      startKey = region.getStartKey();
537      // Can't do a get on empty start row so do a Scan of first element if any instead.
538      if (startKey.length > 0) {
539        Get get = new Get(startKey);
540        get.setCacheBlocks(false);
541        get.setFilter(new FirstKeyOnlyFilter());
542        get.addFamily(column.getName());
543        // Converting get object to scan to enable RAW SCAN.
544        // This will work for all the regions of the HBase tables except first region of the table.
545        scan = new Scan(get);
546        scan.setRaw(rawScanEnabled);
547      } else {
548        scan = new Scan();
549        // In case of first region of the HBase Table, we do not have start-key for the region.
550        // For Region Canary, we only need to scan a single row/cell in the region to make sure that
551        // region is accessible.
552        //
553        // When HBase table has more than 1 empty regions at start of the row-key space, Canary will
554        // create multiple scan object to find first available row in the table by scanning all the
555        // regions in sequence until it can find first available row.
556        //
557        // This could result in multiple millions of scans based on the size of table and number of
558        // empty regions in sequence. In test environment, A table with no data and 1100 empty
559        // regions, Single canary run was creating close to half million to 1 million scans to
560        // successfully do canary run for the table.
561        //
562        // Since First region of the table doesn't have any start key, We should set End Key as
563        // stop row and set inclusive=false to limit scan to single region only.
564        //
565        // TODO : In future, we can streamline Canary behaviour for all the regions by doing scan
566        // with startRow inclusive and stopRow exclusive instead of different behaviour for First
567        // Region of the table and rest of the region of the table. This way implementation is
568        // simplified. As of now this change has been kept minimal to avoid any unnecessary
569        // perf impact.
570        scan.withStopRow(region.getEndKey(), false);
571        LOG.debug("rawScan {} for {}", rawScanEnabled, region.getTable());
572        scan.setRaw(rawScanEnabled);
573        scan.setCaching(1);
574        scan.setCacheBlocks(false);
575        scan.setFilter(new FirstKeyOnlyFilter());
576        scan.addFamily(column.getName());
577        scan.setMaxResultSize(1L);
578        scan.setOneRowLimit();
579      }
580      LOG.debug("Reading from {} {} {} {}", region.getTable(), region.getRegionNameAsString(),
581        column.getNameAsString(), Bytes.toStringBinary(startKey));
582      try {
583        stopWatch.start();
584        rs = table.getScanner(scan);
585        rs.next();
586        stopWatch.stop();
587        this.readWriteLatency.add(stopWatch.getTime());
588        sink.publishReadTiming(serverName, region, column, stopWatch.getTime());
589      } catch (Exception e) {
590        sink.publishReadFailure(serverName, region, column, e);
591        sink.updateReadFailures(region.getRegionNameAsString(),
592          serverName == null ? "NULL" : serverName.getHostname());
593      } finally {
594        if (rs != null) {
595          rs.close();
596        }
597      }
598      return null;
599    }
600
601    private ColumnFamilyDescriptor randomPickOneColumnFamily(ColumnFamilyDescriptor[] cfs) {
602      int size = cfs.length;
603      return cfs[ThreadLocalRandom.current().nextInt(size)];
604
605    }
606
607    public Void read() {
608      Table table = null;
609      TableDescriptor tableDesc = null;
610      try {
611        LOG.debug("Reading table descriptor for table {}", region.getTable());
612        table = connection.getTable(region.getTable());
613        tableDesc = table.getDescriptor();
614      } catch (IOException e) {
615        LOG.debug("sniffRegion {} of {} failed", region.getEncodedName(), e);
616        sink.publishReadFailure(serverName, region, e);
617        if (table != null) {
618          try {
619            table.close();
620          } catch (IOException ioe) {
621            LOG.error("Close table failed", e);
622          }
623        }
624        return null;
625      }
626
627      if (readAllCF) {
628        for (ColumnFamilyDescriptor column : tableDesc.getColumnFamilies()) {
629          readColumnFamily(table, column);
630        }
631      } else {
632        readColumnFamily(table, randomPickOneColumnFamily(tableDesc.getColumnFamilies()));
633      }
634      try {
635        table.close();
636      } catch (IOException e) {
637        LOG.error("Close table failed", e);
638      }
639      return null;
640    }
641
642    /**
643     * Check writes for the canary table
644     */
645    private Void write() {
646      Table table = null;
647      TableDescriptor tableDesc = null;
648      try {
649        table = connection.getTable(region.getTable());
650        tableDesc = table.getDescriptor();
651        byte[] rowToCheck = region.getStartKey();
652        if (rowToCheck.length == 0) {
653          rowToCheck = new byte[] { 0x0 };
654        }
655        int writeValueSize =
656          connection.getConfiguration().getInt(HConstants.HBASE_CANARY_WRITE_VALUE_SIZE_KEY, 10);
657        for (ColumnFamilyDescriptor column : tableDesc.getColumnFamilies()) {
658          Put put = new Put(rowToCheck);
659          byte[] value = new byte[writeValueSize];
660          Bytes.random(value);
661          put.addColumn(column.getName(), HConstants.EMPTY_BYTE_ARRAY, value);
662          LOG.debug("Writing to {} {} {} {}", tableDesc.getTableName(),
663            region.getRegionNameAsString(), column.getNameAsString(),
664            Bytes.toStringBinary(rowToCheck));
665          try {
666            long startTime = EnvironmentEdgeManager.currentTime();
667            table.put(put);
668            long time = EnvironmentEdgeManager.currentTime() - startTime;
669            this.readWriteLatency.add(time);
670            sink.publishWriteTiming(serverName, region, column, time);
671          } catch (Exception e) {
672            sink.publishWriteFailure(serverName, region, column, e);
673          }
674        }
675        table.close();
676      } catch (IOException e) {
677        sink.publishWriteFailure(serverName, region, e);
678        sink.updateWriteFailures(region.getRegionNameAsString(), serverName.getHostname());
679      }
680      return null;
681    }
682  }
683
684  /**
685   * Run a single RegionServer Task and then exit. Get one row from a region on the regionserver and
686   * output latency or the failure.
687   */
688  static class RegionServerTask implements Callable<Void> {
689    private Connection connection;
690    private String serverName;
691    private RegionInfo region;
692    private RegionServerStdOutSink sink;
693    private Boolean rawScanEnabled;
694    private AtomicLong successes;
695
696    RegionServerTask(Connection connection, String serverName, RegionInfo region,
697      RegionServerStdOutSink sink, Boolean rawScanEnabled, AtomicLong successes) {
698      this.connection = connection;
699      this.serverName = serverName;
700      this.region = region;
701      this.sink = sink;
702      this.rawScanEnabled = rawScanEnabled;
703      this.successes = successes;
704    }
705
706    @Override
707    public Void call() {
708      if (this.sink.isStopped()) {
709        return null;
710      }
711      TableName tableName = null;
712      Table table = null;
713      Get get = null;
714      byte[] startKey = null;
715      Scan scan = null;
716      StopWatch stopWatch = new StopWatch();
717      // monitor one region on every region server
718      stopWatch.reset();
719      try {
720        tableName = region.getTable();
721        table = connection.getTable(tableName);
722        startKey = region.getStartKey();
723        // Can't do a get on empty start row so do a Scan of first element if any instead.
724        LOG.debug("Reading from {} {} {} {}", serverName, region.getTable(),
725          region.getRegionNameAsString(), Bytes.toStringBinary(startKey));
726        if (startKey.length > 0) {
727          get = new Get(startKey);
728          get.setCacheBlocks(false);
729          get.setFilter(new FirstKeyOnlyFilter());
730          // Converting get object to scan to enable RAW SCAN.
731          // This will work for all the regions of the HBase tables except first region.
732          scan = new Scan(get);
733
734        } else {
735          scan = new Scan();
736          // In case of first region of the HBase Table, we do not have start-key for the region.
737          // For Region Canary, we only need scan a single row/cell in the region to make sure that
738          // region is accessible.
739          //
740          // When HBase table has more than 1 empty regions at start of the row-key space, Canary
741          // will create multiple scan object to find first available row in the table by scanning
742          // all the regions in sequence until it can find first available row.
743          //
744          // Since First region of the table doesn't have any start key, We should set End Key as
745          // stop row and set inclusive=false to limit scan to first region only.
746          scan.withStopRow(region.getEndKey(), false);
747          scan.setCacheBlocks(false);
748          scan.setFilter(new FirstKeyOnlyFilter());
749          scan.setCaching(1);
750          scan.setMaxResultSize(1L);
751          scan.setOneRowLimit();
752        }
753        scan.setRaw(rawScanEnabled);
754        stopWatch.start();
755        ResultScanner s = table.getScanner(scan);
756        s.next();
757        s.close();
758        stopWatch.stop();
759        successes.incrementAndGet();
760        sink.publishReadTiming(tableName.getNameAsString(), serverName, stopWatch.getTime());
761      } catch (TableNotFoundException tnfe) {
762        LOG.error("Table may be deleted", tnfe);
763        // This is ignored because it doesn't imply that the regionserver is dead
764      } catch (TableNotEnabledException tnee) {
765        // This is considered a success since we got a response.
766        successes.incrementAndGet();
767        LOG.debug("The targeted table was disabled.  Assuming success.");
768      } catch (DoNotRetryIOException dnrioe) {
769        sink.publishReadFailure(tableName.getNameAsString(), serverName);
770        LOG.error(dnrioe.toString(), dnrioe);
771      } catch (IOException e) {
772        sink.publishReadFailure(tableName.getNameAsString(), serverName);
773        LOG.error(e.toString(), e);
774      } finally {
775        if (table != null) {
776          try {
777            table.close();
778          } catch (IOException e) {/* DO NOTHING */
779            LOG.error("Close table failed", e);
780          }
781        }
782        scan = null;
783        get = null;
784        startKey = null;
785      }
786      return null;
787    }
788  }
789
790  private static final int USAGE_EXIT_CODE = 1;
791  private static final int INIT_ERROR_EXIT_CODE = 2;
792  private static final int TIMEOUT_ERROR_EXIT_CODE = 3;
793  private static final int ERROR_EXIT_CODE = 4;
794  private static final int FAILURE_EXIT_CODE = 5;
795
796  private static final long DEFAULT_INTERVAL = 60000;
797
798  private static final long DEFAULT_TIMEOUT = 600000; // 10 mins
799  private static final int MAX_THREADS_NUM = 16; // #threads to contact regions
800
801  private static final Logger LOG = LoggerFactory.getLogger(Canary.class);
802
803  public static final TableName DEFAULT_WRITE_TABLE_NAME =
804    TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "canary");
805
806  private static final String CANARY_TABLE_FAMILY_NAME = "Test";
807
808  private Configuration conf = null;
809  private long interval = 0;
810  private Sink sink = null;
811
812  /**
813   * True if we are to run in 'regionServer' mode.
814   */
815  private boolean regionServerMode = false;
816
817  /**
818   * True if we are to run in zookeeper 'mode'.
819   */
820  private boolean zookeeperMode = false;
821
822  /**
823   * This is a Map of table to timeout. The timeout is for reading all regions in the table; i.e. we
824   * aggregate time to fetch each region and it needs to be less than this value else we log an
825   * ERROR.
826   */
827  private HashMap<String, Long> configuredReadTableTimeouts = new HashMap<>();
828
829  public static final String HBASE_CANARY_REGIONSERVER_ALL_REGIONS =
830    "hbase.canary.regionserver_all_regions";
831
832  public static final String HBASE_CANARY_REGION_WRITE_SNIFFING =
833    "hbase.canary.region.write.sniffing";
834  public static final String HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT =
835    "hbase.canary.region.write.table.timeout";
836  public static final String HBASE_CANARY_REGION_WRITE_TABLE_NAME =
837    "hbase.canary.region.write.table.name";
838  public static final String HBASE_CANARY_REGION_READ_TABLE_TIMEOUT =
839    "hbase.canary.region.read.table.timeout";
840
841  public static final String HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES =
842    "hbase.canary.zookeeper.permitted.failures";
843
844  public static final String HBASE_CANARY_USE_REGEX = "hbase.canary.use.regex";
845  public static final String HBASE_CANARY_TIMEOUT = "hbase.canary.timeout";
846  public static final String HBASE_CANARY_FAIL_ON_ERROR = "hbase.canary.fail.on.error";
847
848  private ExecutorService executor; // threads to retrieve data from regionservers
849
850  public CanaryTool() {
851    this(new ScheduledThreadPoolExecutor(1));
852  }
853
854  public CanaryTool(ExecutorService executor) {
855    this(executor, null);
856  }
857
858  @InterfaceAudience.Private
859  CanaryTool(ExecutorService executor, Sink sink) {
860    this.executor = executor;
861    this.sink = sink;
862  }
863
864  CanaryTool(Configuration conf, ExecutorService executor) {
865    this(conf, executor, null);
866  }
867
868  CanaryTool(Configuration conf, ExecutorService executor, Sink sink) {
869    this(executor, sink);
870    setConf(conf);
871  }
872
873  @Override
874  public Configuration getConf() {
875    return conf;
876  }
877
878  @Override
879  public void setConf(Configuration conf) {
880    if (conf == null) {
881      conf = HBaseConfiguration.create();
882    }
883    this.conf = conf;
884  }
885
886  private int parseArgs(String[] args) {
887    int index = -1;
888    long permittedFailures = 0;
889    boolean regionServerAllRegions = false, writeSniffing = false;
890    String readTableTimeoutsStr = null;
891    // Process command line args
892    for (int i = 0; i < args.length; i++) {
893      String cmd = args[i];
894      if (cmd.startsWith("-")) {
895        if (index >= 0) {
896          // command line args must be in the form: [opts] [table 1 [table 2 ...]]
897          System.err.println("Invalid command line options");
898          printUsageAndExit();
899        }
900        if (cmd.equals("-help") || cmd.equals("-h")) {
901          // user asked for help, print the help and quit.
902          printUsageAndExit();
903        } else if (cmd.equals("-daemon") && interval == 0) {
904          // user asked for daemon mode, set a default interval between checks
905          interval = DEFAULT_INTERVAL;
906        } else if (cmd.equals("-interval")) {
907          // user has specified an interval for canary breaths (-interval N)
908          i++;
909
910          if (i == args.length) {
911            System.err.println("-interval takes a numeric seconds value argument.");
912            printUsageAndExit();
913          }
914          try {
915            interval = Long.parseLong(args[i]) * 1000;
916          } catch (NumberFormatException e) {
917            System.err.println("-interval needs a numeric value argument.");
918            printUsageAndExit();
919          }
920        } else if (cmd.equals("-zookeeper")) {
921          this.zookeeperMode = true;
922        } else if (cmd.equals("-regionserver")) {
923          this.regionServerMode = true;
924        } else if (cmd.equals("-allRegions")) {
925          conf.setBoolean(HBASE_CANARY_REGIONSERVER_ALL_REGIONS, true);
926          regionServerAllRegions = true;
927        } else if (cmd.equals("-writeSniffing")) {
928          writeSniffing = true;
929          conf.setBoolean(HBASE_CANARY_REGION_WRITE_SNIFFING, true);
930        } else if (cmd.equals("-treatFailureAsError") || cmd.equals("-failureAsError")) {
931          conf.setBoolean(HBASE_CANARY_FAIL_ON_ERROR, true);
932        } else if (cmd.equals("-e")) {
933          conf.setBoolean(HBASE_CANARY_USE_REGEX, true);
934        } else if (cmd.equals("-t")) {
935          i++;
936
937          if (i == args.length) {
938            System.err.println("-t takes a numeric milliseconds value argument.");
939            printUsageAndExit();
940          }
941          long timeout = 0;
942          try {
943            timeout = Long.parseLong(args[i]);
944          } catch (NumberFormatException e) {
945            System.err.println("-t takes a numeric milliseconds value argument.");
946            printUsageAndExit();
947          }
948          conf.setLong(HBASE_CANARY_TIMEOUT, timeout);
949        } else if (cmd.equals("-writeTableTimeout")) {
950          i++;
951
952          if (i == args.length) {
953            System.err.println("-writeTableTimeout takes a numeric milliseconds value argument.");
954            printUsageAndExit();
955          }
956          long configuredWriteTableTimeout = 0;
957          try {
958            configuredWriteTableTimeout = Long.parseLong(args[i]);
959          } catch (NumberFormatException e) {
960            System.err.println("-writeTableTimeout takes a numeric milliseconds value argument.");
961            printUsageAndExit();
962          }
963          conf.setLong(HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT, configuredWriteTableTimeout);
964        } else if (cmd.equals("-writeTable")) {
965          i++;
966
967          if (i == args.length) {
968            System.err.println("-writeTable takes a string tablename value argument.");
969            printUsageAndExit();
970          }
971          conf.set(HBASE_CANARY_REGION_WRITE_TABLE_NAME, args[i]);
972        } else if (cmd.equals("-f")) {
973          i++;
974          if (i == args.length) {
975            System.err.println("-f needs a boolean value argument (true|false).");
976            printUsageAndExit();
977          }
978
979          conf.setBoolean(HBASE_CANARY_FAIL_ON_ERROR, Boolean.parseBoolean(args[i]));
980        } else if (cmd.equals("-readTableTimeouts")) {
981          i++;
982          if (i == args.length) {
983            System.err.println("-readTableTimeouts needs a comma-separated list of read "
984              + "millisecond timeouts per table (without spaces).");
985            printUsageAndExit();
986          }
987          readTableTimeoutsStr = args[i];
988          conf.set(HBASE_CANARY_REGION_READ_TABLE_TIMEOUT, readTableTimeoutsStr);
989        } else if (cmd.equals("-permittedZookeeperFailures")) {
990          i++;
991
992          if (i == args.length) {
993            System.err.println("-permittedZookeeperFailures needs a numeric value argument.");
994            printUsageAndExit();
995          }
996          try {
997            permittedFailures = Long.parseLong(args[i]);
998          } catch (NumberFormatException e) {
999            System.err.println("-permittedZookeeperFailures needs a numeric value argument.");
1000            printUsageAndExit();
1001          }
1002          conf.setLong(HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES, permittedFailures);
1003        } else {
1004          // no options match
1005          System.err.println(cmd + " options is invalid.");
1006          printUsageAndExit();
1007        }
1008      } else if (index < 0) {
1009        // keep track of first table name specified by the user
1010        index = i;
1011      }
1012    }
1013    if (regionServerAllRegions && !this.regionServerMode) {
1014      System.err.println("-allRegions can only be specified in regionserver mode.");
1015      printUsageAndExit();
1016    }
1017    if (this.zookeeperMode) {
1018      if (this.regionServerMode || regionServerAllRegions || writeSniffing) {
1019        System.err.println("-zookeeper is exclusive and cannot be combined with " + "other modes.");
1020        printUsageAndExit();
1021      }
1022    }
1023    if (permittedFailures != 0 && !this.zookeeperMode) {
1024      System.err.println("-permittedZookeeperFailures requires -zookeeper mode.");
1025      printUsageAndExit();
1026    }
1027    if (readTableTimeoutsStr != null && (this.regionServerMode || this.zookeeperMode)) {
1028      System.err.println("-readTableTimeouts can only be configured in region mode.");
1029      printUsageAndExit();
1030    }
1031    return index;
1032  }
1033
1034  @Override
1035  public int run(String[] args) throws Exception {
1036    int index = parseArgs(args);
1037    String[] monitorTargets = null;
1038
1039    if (index >= 0) {
1040      int length = args.length - index;
1041      monitorTargets = new String[length];
1042      System.arraycopy(args, index, monitorTargets, 0, length);
1043    }
1044    if (interval > 0) {
1045      // Only show the web page in daemon mode
1046      putUpWebUI();
1047    }
1048    if (zookeeperMode) {
1049      return checkZooKeeper();
1050    } else if (regionServerMode) {
1051      return checkRegionServers(monitorTargets);
1052    } else {
1053      return checkRegions(monitorTargets);
1054    }
1055  }
1056
1057  private int runMonitor(String[] monitorTargets) throws Exception {
1058    ChoreService choreService = null;
1059
1060    // Launches chore for refreshing kerberos credentials if security is enabled.
1061    // Please see
1062    // https://hbase.apache.org/docs/operational-management/tools#running-canary-in-a-kerberos-enabled-cluster
1063    // for more details.
1064    final ScheduledChore authChore = AuthUtil.getAuthChore(conf);
1065    if (authChore != null) {
1066      choreService = new ChoreService("CANARY_TOOL");
1067      choreService.scheduleChore(authChore);
1068    }
1069
1070    // Start to prepare the stuffs
1071    Monitor monitor = null;
1072    Thread monitorThread;
1073    long startTime = 0;
1074    long currentTimeLength = 0;
1075    boolean failOnError = conf.getBoolean(HBASE_CANARY_FAIL_ON_ERROR, true);
1076    long timeout = conf.getLong(HBASE_CANARY_TIMEOUT, DEFAULT_TIMEOUT);
1077    // Get a connection to use in below.
1078    try (Connection connection = ConnectionFactory.createConnection(this.conf)) {
1079      do {
1080        // Do monitor !!
1081        try {
1082          monitor = this.newMonitor(connection, monitorTargets);
1083          startTime = EnvironmentEdgeManager.currentTime();
1084          monitorThread = new Thread(monitor, "CanaryMonitor-" + startTime);
1085          monitorThread.start();
1086          while (!monitor.isDone()) {
1087            // wait for 1 sec
1088            Thread.sleep(1000);
1089            // exit if any error occurs
1090            if (failOnError && monitor.hasError()) {
1091              monitorThread.interrupt();
1092              if (monitor.initialized) {
1093                return monitor.errorCode;
1094              } else {
1095                return INIT_ERROR_EXIT_CODE;
1096              }
1097            }
1098            currentTimeLength = EnvironmentEdgeManager.currentTime() - startTime;
1099            if (currentTimeLength > timeout) {
1100              LOG.error("The monitor is running too long (" + currentTimeLength
1101                + ") after timeout limit:" + timeout + " will be killed itself !!");
1102              monitorThread.interrupt();
1103              if (monitor.initialized) {
1104                return TIMEOUT_ERROR_EXIT_CODE;
1105              } else {
1106                return INIT_ERROR_EXIT_CODE;
1107              }
1108            }
1109          }
1110
1111          if (failOnError && monitor.finalCheckForErrors()) {
1112            monitorThread.interrupt();
1113            return monitor.errorCode;
1114          }
1115        } finally {
1116          if (monitor != null) {
1117            monitor.close();
1118          }
1119        }
1120
1121        Thread.sleep(interval);
1122      } while (interval > 0);
1123    } // try-with-resources close
1124
1125    if (choreService != null) {
1126      choreService.shutdown();
1127    }
1128    return monitor.errorCode;
1129  }
1130
1131  @Override
1132  public Map<String, String> getReadFailures() {
1133    return sink.getReadFailures();
1134  }
1135
1136  @Override
1137  public Map<String, String> getWriteFailures() {
1138    return sink.getWriteFailures();
1139  }
1140
1141  /**
1142   * Return a CanaryTool.Sink object containing the detailed results of the canary run. The Sink may
1143   * not have been created if a Monitor thread is not yet running.
1144   * @return the active Sink if one exists, null otherwise.
1145   */
1146  public Sink getActiveSink() {
1147    return sink;
1148  }
1149
1150  private void printUsageAndExit() {
1151    System.err.println(
1152      "Usage: canary [OPTIONS] [<TABLE1> [<TABLE2]...] | [<REGIONSERVER1> [<REGIONSERVER2]..]");
1153    System.err.println("Where [OPTIONS] are:");
1154    System.err.println(" -h,-help        show this help and exit.");
1155    System.err.println(
1156      " -regionserver   set 'regionserver mode'; gets row from random region on " + "server");
1157    System.err.println(
1158      " -allRegions     get from ALL regions when 'regionserver mode', not just " + "random one.");
1159    System.err.println(" -zookeeper      set 'zookeeper mode'; grab zookeeper.znode.parent on "
1160      + "each ensemble member");
1161    System.err.println(" -daemon         continuous check at defined intervals.");
1162    System.err.println(" -interval <N>   interval between checks in seconds");
1163    System.err
1164      .println(" -e              consider table/regionserver argument as regular " + "expression");
1165    System.err.println(" -f <B>          exit on first error; default=true");
1166    System.err.println(" -failureAsError treat read/write failure as error");
1167    System.err.println(" -t <N>          timeout for canary-test run; default=600000ms");
1168    System.err.println(" -writeSniffing  enable write sniffing");
1169    System.err.println(" -writeTable     the table used for write sniffing; default=hbase:canary");
1170    System.err.println(" -writeTableTimeout <N>  timeout for writeTable; default=600000ms");
1171    System.err.println(
1172      " -readTableTimeouts <tableName>=<read timeout>," + "<tableName>=<read timeout>,...");
1173    System.err
1174      .println("                comma-separated list of table read timeouts " + "(no spaces);");
1175    System.err.println("                logs 'ERROR' if takes longer. default=600000ms");
1176    System.err.println(" -permittedZookeeperFailures <N>  Ignore first N failures attempting to ");
1177    System.err.println("                connect to individual zookeeper nodes in ensemble");
1178    System.err.println("");
1179    System.err.println(" -D<configProperty>=<value> to assign or override configuration params");
1180    System.err.println(" -Dhbase.canary.read.raw.enabled=<true/false> Set to enable/disable "
1181      + "raw scan; default=false");
1182    System.err.println(
1183      " -Dhbase.canary.info.port=PORT_NUMBER  Set for a Canary UI; " + "default=-1 (None)");
1184    System.err.println("");
1185    System.err.println(
1186      "Canary runs in one of three modes: region (default), regionserver, or " + "zookeeper.");
1187    System.err.println("To sniff/probe all regions, pass no arguments.");
1188    System.err.println("To sniff/probe all regions of a table, pass tablename.");
1189    System.err.println("To sniff/probe regionservers, pass -regionserver, etc.");
1190    System.err.println(
1191      "See https://hbase.apache.org/docs/operational-management/tools#canary for Canary documentation.");
1192    System.exit(USAGE_EXIT_CODE);
1193  }
1194
1195  Sink getSink(Configuration configuration, Class clazz) {
1196    // In test context, this.sink might be set. Use it if non-null. For testing.
1197    if (this.sink == null) {
1198      this.sink = (Sink) ReflectionUtils
1199        .newInstance(configuration.getClass("hbase.canary.sink.class", clazz, Sink.class));
1200    }
1201    return this.sink;
1202  }
1203
1204  /**
1205   * Canary region mode-specific data structure which stores information about each region to be
1206   * scanned
1207   */
1208  public static class RegionTaskResult {
1209    private RegionInfo region;
1210    private TableName tableName;
1211    private ServerName serverName;
1212    private ColumnFamilyDescriptor column;
1213    private AtomicLong readLatency = null;
1214    private AtomicLong writeLatency = null;
1215    private boolean readSuccess = false;
1216    private boolean writeSuccess = false;
1217
1218    public RegionTaskResult(RegionInfo region, TableName tableName, ServerName serverName,
1219      ColumnFamilyDescriptor column) {
1220      this.region = region;
1221      this.tableName = tableName;
1222      this.serverName = serverName;
1223      this.column = column;
1224    }
1225
1226    public RegionInfo getRegionInfo() {
1227      return this.region;
1228    }
1229
1230    public String getRegionNameAsString() {
1231      return this.region.getRegionNameAsString();
1232    }
1233
1234    public TableName getTableName() {
1235      return this.tableName;
1236    }
1237
1238    public String getTableNameAsString() {
1239      return this.tableName.getNameAsString();
1240    }
1241
1242    public ServerName getServerName() {
1243      return this.serverName;
1244    }
1245
1246    public String getServerNameAsString() {
1247      return this.serverName.getServerName();
1248    }
1249
1250    public ColumnFamilyDescriptor getColumnFamily() {
1251      return this.column;
1252    }
1253
1254    public String getColumnFamilyNameAsString() {
1255      return this.column.getNameAsString();
1256    }
1257
1258    public long getReadLatency() {
1259      if (this.readLatency == null) {
1260        return -1;
1261      }
1262      return this.readLatency.get();
1263    }
1264
1265    public void setReadLatency(long readLatency) {
1266      if (this.readLatency != null) {
1267        this.readLatency.set(readLatency);
1268      } else {
1269        this.readLatency = new AtomicLong(readLatency);
1270      }
1271    }
1272
1273    public long getWriteLatency() {
1274      if (this.writeLatency == null) {
1275        return -1;
1276      }
1277      return this.writeLatency.get();
1278    }
1279
1280    public void setWriteLatency(long writeLatency) {
1281      if (this.writeLatency != null) {
1282        this.writeLatency.set(writeLatency);
1283      } else {
1284        this.writeLatency = new AtomicLong(writeLatency);
1285      }
1286    }
1287
1288    public boolean isReadSuccess() {
1289      return this.readSuccess;
1290    }
1291
1292    public void setReadSuccess() {
1293      this.readSuccess = true;
1294    }
1295
1296    public boolean isWriteSuccess() {
1297      return this.writeSuccess;
1298    }
1299
1300    public void setWriteSuccess() {
1301      this.writeSuccess = true;
1302    }
1303  }
1304
1305  /**
1306   * A Factory method for {@link Monitor}. Makes a RegionServerMonitor, or a ZooKeeperMonitor, or a
1307   * RegionMonitor.
1308   * @return a Monitor instance
1309   */
1310  private Monitor newMonitor(final Connection connection, String[] monitorTargets) {
1311    Monitor monitor;
1312    boolean useRegExp = conf.getBoolean(HBASE_CANARY_USE_REGEX, false);
1313    boolean regionServerAllRegions = conf.getBoolean(HBASE_CANARY_REGIONSERVER_ALL_REGIONS, false);
1314    boolean failOnError = conf.getBoolean(HBASE_CANARY_FAIL_ON_ERROR, true);
1315    int permittedFailures = conf.getInt(HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES, 0);
1316    boolean writeSniffing = conf.getBoolean(HBASE_CANARY_REGION_WRITE_SNIFFING, false);
1317    String writeTableName =
1318      conf.get(HBASE_CANARY_REGION_WRITE_TABLE_NAME, DEFAULT_WRITE_TABLE_NAME.getNameAsString());
1319    long configuredWriteTableTimeout =
1320      conf.getLong(HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT, DEFAULT_TIMEOUT);
1321
1322    if (this.regionServerMode) {
1323      monitor = new RegionServerMonitor(connection, monitorTargets, useRegExp,
1324        getSink(connection.getConfiguration(), RegionServerStdOutSink.class), this.executor,
1325        regionServerAllRegions, failOnError, permittedFailures);
1326
1327    } else if (this.zookeeperMode) {
1328      monitor = new ZookeeperMonitor(connection, monitorTargets, useRegExp,
1329        getSink(connection.getConfiguration(), ZookeeperStdOutSink.class), this.executor,
1330        failOnError, permittedFailures);
1331    } else {
1332      monitor = new RegionMonitor(connection, monitorTargets, useRegExp,
1333        getSink(connection.getConfiguration(), RegionStdOutSink.class), this.executor,
1334        writeSniffing, TableName.valueOf(writeTableName), failOnError, configuredReadTableTimeouts,
1335        configuredWriteTableTimeout, permittedFailures);
1336    }
1337    return monitor;
1338  }
1339
1340  private void populateReadTableTimeoutsMap(String configuredReadTableTimeoutsStr) {
1341    String[] tableTimeouts = configuredReadTableTimeoutsStr.split(",");
1342    for (String tT : tableTimeouts) {
1343      String[] nameTimeout = tT.split("=");
1344      if (nameTimeout.length < 2) {
1345        throw new IllegalArgumentException("Each -readTableTimeouts argument must be of the form "
1346          + "<tableName>=<read timeout> (without spaces).");
1347      }
1348      long timeoutVal;
1349      try {
1350        timeoutVal = Long.parseLong(nameTimeout[1]);
1351      } catch (NumberFormatException e) {
1352        throw new IllegalArgumentException(
1353          "-readTableTimeouts read timeout for each table" + " must be a numeric value argument.");
1354      }
1355      configuredReadTableTimeouts.put(nameTimeout[0], timeoutVal);
1356    }
1357  }
1358
1359  /**
1360   * A Monitor super-class can be extended by users
1361   */
1362  public static abstract class Monitor implements Runnable, Closeable {
1363    protected Connection connection;
1364    protected Admin admin;
1365    /**
1366     * 'Target' dependent on 'mode'. Could be Tables or RegionServers or ZNodes. Passed on the
1367     * command-line as arguments.
1368     */
1369    protected String[] targets;
1370    protected boolean useRegExp;
1371    protected boolean treatFailureAsError;
1372    protected boolean initialized = false;
1373
1374    protected boolean done = false;
1375    protected int errorCode = 0;
1376    protected long allowedFailures = 0;
1377    protected Sink sink;
1378    protected ExecutorService executor;
1379
1380    public boolean isDone() {
1381      return done;
1382    }
1383
1384    public boolean hasError() {
1385      return errorCode != 0;
1386    }
1387
1388    public boolean finalCheckForErrors() {
1389      if (errorCode != 0) {
1390        return true;
1391      }
1392      if (
1393        treatFailureAsError && (sink.getReadFailureCount() > allowedFailures
1394          || sink.getWriteFailureCount() > allowedFailures)
1395      ) {
1396        LOG.error("Too many failures detected, treating failure as error, failing the Canary.");
1397        errorCode = FAILURE_EXIT_CODE;
1398        return true;
1399      }
1400      return false;
1401    }
1402
1403    @Override
1404    public void close() throws IOException {
1405      this.sink.stop();
1406      if (this.admin != null) {
1407        this.admin.close();
1408      }
1409    }
1410
1411    protected Monitor(Connection connection, String[] monitorTargets, boolean useRegExp, Sink sink,
1412      ExecutorService executor, boolean treatFailureAsError, long allowedFailures) {
1413      if (null == connection) {
1414        throw new IllegalArgumentException("connection shall not be null");
1415      }
1416
1417      this.connection = connection;
1418      this.targets = monitorTargets;
1419      this.useRegExp = useRegExp;
1420      this.treatFailureAsError = treatFailureAsError;
1421      this.sink = sink;
1422      this.executor = executor;
1423      this.allowedFailures = allowedFailures;
1424    }
1425
1426    @Override
1427    public abstract void run();
1428
1429    protected boolean initAdmin() {
1430      if (null == this.admin) {
1431        try {
1432          this.admin = this.connection.getAdmin();
1433        } catch (Exception e) {
1434          LOG.error("Initial HBaseAdmin failed...", e);
1435          this.errorCode = INIT_ERROR_EXIT_CODE;
1436        }
1437      } else if (admin.isAborted()) {
1438        LOG.error("HBaseAdmin aborted");
1439        this.errorCode = INIT_ERROR_EXIT_CODE;
1440      }
1441      return !this.hasError();
1442    }
1443  }
1444
1445  /**
1446   * A monitor for region mode.
1447   */
1448  private static class RegionMonitor extends Monitor {
1449    // 10 minutes
1450    private static final int DEFAULT_WRITE_TABLE_CHECK_PERIOD = 10 * 60 * 1000;
1451    // 1 days
1452    private static final int DEFAULT_WRITE_DATA_TTL = 24 * 60 * 60;
1453
1454    private long lastCheckTime = -1;
1455    private boolean writeSniffing;
1456    private TableName writeTableName;
1457    private int writeDataTTL;
1458    private float regionsLowerLimit;
1459    private float regionsUpperLimit;
1460    private int checkPeriod;
1461    private boolean rawScanEnabled;
1462    private boolean readAllCF;
1463
1464    /**
1465     * This is a timeout per table. If read of each region in the table aggregated takes longer than
1466     * what is configured here, we log an ERROR rather than just an INFO.
1467     */
1468    private HashMap<String, Long> configuredReadTableTimeouts;
1469
1470    private long configuredWriteTableTimeout;
1471
1472    public RegionMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
1473      Sink sink, ExecutorService executor, boolean writeSniffing, TableName writeTableName,
1474      boolean treatFailureAsError, HashMap<String, Long> configuredReadTableTimeouts,
1475      long configuredWriteTableTimeout, long allowedFailures) {
1476      super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError,
1477        allowedFailures);
1478      Configuration conf = connection.getConfiguration();
1479      this.writeSniffing = writeSniffing;
1480      this.writeTableName = writeTableName;
1481      this.writeDataTTL =
1482        conf.getInt(HConstants.HBASE_CANARY_WRITE_DATA_TTL_KEY, DEFAULT_WRITE_DATA_TTL);
1483      this.regionsLowerLimit =
1484        conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY, 1.0f);
1485      this.regionsUpperLimit =
1486        conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_UPPERLIMIT_KEY, 1.5f);
1487      this.checkPeriod = conf.getInt(HConstants.HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY,
1488        DEFAULT_WRITE_TABLE_CHECK_PERIOD);
1489      this.rawScanEnabled = conf.getBoolean(HConstants.HBASE_CANARY_READ_RAW_SCAN_KEY, false);
1490      this.configuredReadTableTimeouts = new HashMap<>(configuredReadTableTimeouts);
1491      this.configuredWriteTableTimeout = configuredWriteTableTimeout;
1492      this.readAllCF = conf.getBoolean(HConstants.HBASE_CANARY_READ_ALL_CF, true);
1493    }
1494
1495    private RegionStdOutSink getSink() {
1496      if (!(sink instanceof RegionStdOutSink)) {
1497        throw new RuntimeException("Can only write to Region sink");
1498      }
1499      return ((RegionStdOutSink) sink);
1500    }
1501
1502    @Override
1503    public void run() {
1504      if (this.initAdmin()) {
1505        try {
1506          List<Future<Void>> taskFutures = new LinkedList<>();
1507          RegionStdOutSink regionSink = this.getSink();
1508          regionSink.resetFailuresCountDetails();
1509          if (this.targets != null && this.targets.length > 0) {
1510            String[] tables = generateMonitorTables(this.targets);
1511            // Check to see that each table name passed in the -readTableTimeouts argument is also
1512            // passed as a monitor target.
1513            if (
1514              !new HashSet<>(Arrays.asList(tables))
1515                .containsAll(this.configuredReadTableTimeouts.keySet())
1516            ) {
1517              LOG.error("-readTableTimeouts can only specify read timeouts for monitor targets "
1518                + "passed via command line.");
1519              this.errorCode = USAGE_EXIT_CODE;
1520              return;
1521            }
1522            this.initialized = true;
1523            for (String table : tables) {
1524              LongAdder readLatency = regionSink.initializeAndGetReadLatencyForTable(table);
1525              taskFutures.addAll(CanaryTool.sniff(admin, regionSink, table, executor, TaskType.READ,
1526                this.rawScanEnabled, readLatency, readAllCF));
1527            }
1528          } else {
1529            taskFutures.addAll(sniff(TaskType.READ, regionSink));
1530          }
1531
1532          if (writeSniffing) {
1533            if (EnvironmentEdgeManager.currentTime() - lastCheckTime > checkPeriod) {
1534              try {
1535                checkWriteTableDistribution();
1536              } catch (IOException e) {
1537                LOG.error("Check canary table distribution failed!", e);
1538              }
1539              lastCheckTime = EnvironmentEdgeManager.currentTime();
1540            }
1541            // sniff canary table with write operation
1542            regionSink.initializeWriteLatency();
1543            LongAdder writeTableLatency = regionSink.getWriteLatency();
1544            taskFutures
1545              .addAll(CanaryTool.sniff(admin, regionSink, admin.getDescriptor(writeTableName),
1546                executor, TaskType.WRITE, this.rawScanEnabled, writeTableLatency, readAllCF));
1547          }
1548
1549          for (Future<Void> future : taskFutures) {
1550            try {
1551              future.get();
1552            } catch (ExecutionException e) {
1553              LOG.error("Sniff region failed!", e);
1554            }
1555          }
1556          Map<String, LongAdder> actualReadTableLatency = regionSink.getReadLatencyMap();
1557          for (Map.Entry<String, Long> entry : configuredReadTableTimeouts.entrySet()) {
1558            String tableName = entry.getKey();
1559            if (actualReadTableLatency.containsKey(tableName)) {
1560              Long actual = actualReadTableLatency.get(tableName).longValue();
1561              Long configured = entry.getValue();
1562              if (actual > configured) {
1563                LOG.error("Read operation for {} took {}ms exceeded the configured read timeout."
1564                  + "(Configured read timeout {}ms.", tableName, actual, configured);
1565              } else {
1566                LOG.info("Read operation for {} took {}ms (Configured read timeout {}ms.",
1567                  tableName, actual, configured);
1568              }
1569            } else {
1570              LOG.error("Read operation for {} failed!", tableName);
1571            }
1572          }
1573          if (this.writeSniffing) {
1574            String writeTableStringName = this.writeTableName.getNameAsString();
1575            long actualWriteLatency = regionSink.getWriteLatency().longValue();
1576            LOG.info("Write operation for {} took {}ms. Configured write timeout {}ms.",
1577              writeTableStringName, actualWriteLatency, this.configuredWriteTableTimeout);
1578            // Check that the writeTable write operation latency does not exceed the configured
1579            // timeout.
1580            if (actualWriteLatency > this.configuredWriteTableTimeout) {
1581              LOG.error("Write operation for {} exceeded the configured write timeout.",
1582                writeTableStringName);
1583            }
1584          }
1585        } catch (Exception e) {
1586          LOG.error("Run regionMonitor failed", e);
1587          this.errorCode = ERROR_EXIT_CODE;
1588        } finally {
1589          this.done = true;
1590        }
1591      }
1592      this.done = true;
1593    }
1594
1595    /** Returns List of tables to use in test. */
1596    private String[] generateMonitorTables(String[] monitorTargets) throws IOException {
1597      String[] returnTables = null;
1598
1599      if (this.useRegExp) {
1600        Pattern pattern = null;
1601        List<TableDescriptor> tds = null;
1602        Set<String> tmpTables = new TreeSet<>();
1603        try {
1604          LOG.debug(String.format("reading list of tables"));
1605          tds = this.admin.listTableDescriptors(pattern);
1606          if (tds == null) {
1607            tds = Collections.emptyList();
1608          }
1609          for (String monitorTarget : monitorTargets) {
1610            pattern = Pattern.compile(monitorTarget);
1611            for (TableDescriptor td : tds) {
1612              if (pattern.matcher(td.getTableName().getNameAsString()).matches()) {
1613                tmpTables.add(td.getTableName().getNameAsString());
1614              }
1615            }
1616          }
1617        } catch (IOException e) {
1618          LOG.error("Communicate with admin failed", e);
1619          throw e;
1620        }
1621
1622        if (tmpTables.size() > 0) {
1623          returnTables = tmpTables.toArray(new String[tmpTables.size()]);
1624        } else {
1625          String msg = "No HTable found, tablePattern:" + Arrays.toString(monitorTargets);
1626          LOG.error(msg);
1627          this.errorCode = INIT_ERROR_EXIT_CODE;
1628          throw new TableNotFoundException(msg);
1629        }
1630      } else {
1631        returnTables = monitorTargets;
1632      }
1633
1634      return returnTables;
1635    }
1636
1637    /*
1638     * Canary entry point to monitor all the tables.
1639     */
1640    private List<Future<Void>> sniff(TaskType taskType, RegionStdOutSink regionSink)
1641      throws Exception {
1642      LOG.debug("Reading list of tables");
1643      List<Future<Void>> taskFutures = new LinkedList<>();
1644      for (TableDescriptor td : admin.listTableDescriptors()) {
1645        if (
1646          admin.tableExists(td.getTableName()) && admin.isTableEnabled(td.getTableName())
1647            && (!td.getTableName().equals(writeTableName))
1648        ) {
1649          LongAdder readLatency =
1650            regionSink.initializeAndGetReadLatencyForTable(td.getTableName().getNameAsString());
1651          taskFutures.addAll(CanaryTool.sniff(admin, sink, td, executor, taskType,
1652            this.rawScanEnabled, readLatency, readAllCF));
1653        }
1654      }
1655      return taskFutures;
1656    }
1657
1658    private void checkWriteTableDistribution() throws IOException {
1659      if (!admin.tableExists(writeTableName)) {
1660        int numberOfServers = admin.getRegionServers().size();
1661        if (numberOfServers == 0) {
1662          throw new IllegalStateException("No live regionservers");
1663        }
1664        createWriteTable(numberOfServers);
1665      }
1666
1667      if (!admin.isTableEnabled(writeTableName)) {
1668        admin.enableTable(writeTableName);
1669      }
1670
1671      ClusterMetrics status =
1672        admin.getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER));
1673      int numberOfServers = status.getServersName().size();
1674      if (status.getServersName().contains(status.getMasterName())) {
1675        numberOfServers -= 1;
1676      }
1677
1678      List<Pair<RegionInfo, ServerName>> pairs =
1679        MetaTableAccessor.getTableRegionsAndLocations(connection, writeTableName);
1680      int numberOfRegions = pairs.size();
1681      if (
1682        numberOfRegions < numberOfServers * regionsLowerLimit
1683          || numberOfRegions > numberOfServers * regionsUpperLimit
1684      ) {
1685        admin.disableTable(writeTableName);
1686        admin.deleteTable(writeTableName);
1687        createWriteTable(numberOfServers);
1688      }
1689      HashSet<ServerName> serverSet = new HashSet<>();
1690      for (Pair<RegionInfo, ServerName> pair : pairs) {
1691        serverSet.add(pair.getSecond());
1692      }
1693      int numberOfCoveredServers = serverSet.size();
1694      if (numberOfCoveredServers < numberOfServers) {
1695        admin.balance();
1696      }
1697    }
1698
1699    private void createWriteTable(int numberOfServers) throws IOException {
1700      int numberOfRegions = (int) (numberOfServers * regionsLowerLimit);
1701      LOG.info("Number of live regionservers {}, pre-splitting the canary table into {} regions "
1702        + "(current lower limit of regions per server is {} and you can change it with config {}).",
1703        numberOfServers, numberOfRegions, regionsLowerLimit,
1704        HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY);
1705      ColumnFamilyDescriptor family =
1706        ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(CANARY_TABLE_FAMILY_NAME))
1707          .setMaxVersions(1).setTimeToLive(writeDataTTL).build();
1708      TableDescriptor desc =
1709        TableDescriptorBuilder.newBuilder(writeTableName).setColumnFamily(family).build();
1710      byte[][] splits = new RegionSplitter.HexStringSplit().split(numberOfRegions);
1711      admin.createTable(desc, splits);
1712    }
1713  }
1714
1715  /**
1716   * Canary entry point for specified table.
1717   * @throws Exception exception
1718   */
1719  private static List<Future<Void>> sniff(final Admin admin, final Sink sink, String tableName,
1720    ExecutorService executor, TaskType taskType, boolean rawScanEnabled, LongAdder readLatency,
1721    boolean readAllCF) throws Exception {
1722    LOG.debug("Checking table is enabled and getting table descriptor for table {}", tableName);
1723    if (admin.isTableEnabled(TableName.valueOf(tableName))) {
1724      return CanaryTool.sniff(admin, sink, admin.getDescriptor(TableName.valueOf(tableName)),
1725        executor, taskType, rawScanEnabled, readLatency, readAllCF);
1726    } else {
1727      LOG.warn("Table {} is not enabled", tableName);
1728    }
1729    return new LinkedList<>();
1730  }
1731
1732  /*
1733   * Loops over regions of this table, and outputs information about the state.
1734   */
1735  private static List<Future<Void>> sniff(final Admin admin, final Sink sink,
1736    TableDescriptor tableDesc, ExecutorService executor, TaskType taskType, boolean rawScanEnabled,
1737    LongAdder rwLatency, boolean readAllCF) throws Exception {
1738    LOG.debug("Reading list of regions for table {}", tableDesc.getTableName());
1739    List<RegionTask> tasks = new ArrayList<>();
1740    try (RegionLocator regionLocator =
1741      admin.getConnection().getRegionLocator(tableDesc.getTableName())) {
1742      for (HRegionLocation location : regionLocator.getAllRegionLocations()) {
1743        if (location == null) {
1744          LOG.warn("Null location for table {}", tableDesc.getTableName());
1745          continue;
1746        }
1747        ServerName rs = location.getServerName();
1748        RegionInfo region = location.getRegion();
1749        tasks.add(new RegionTask(admin.getConnection(), region, rs, (RegionStdOutSink) sink,
1750          taskType, rawScanEnabled, rwLatency, readAllCF));
1751        Map<String, List<RegionTaskResult>> regionMap = ((RegionStdOutSink) sink).getRegionMap();
1752        regionMap.put(region.getRegionNameAsString(), new ArrayList<RegionTaskResult>());
1753      }
1754      return executor.invokeAll(tasks);
1755    }
1756  }
1757
1758  // monitor for zookeeper mode
1759  private static class ZookeeperMonitor extends Monitor {
1760    private List<String> hosts;
1761    private final String znode;
1762    private final int timeout;
1763
1764    protected ZookeeperMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
1765      Sink sink, ExecutorService executor, boolean treatFailureAsError, long allowedFailures) {
1766      super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError,
1767        allowedFailures);
1768      Configuration configuration = connection.getConfiguration();
1769      znode = configuration.get(ZOOKEEPER_ZNODE_PARENT, DEFAULT_ZOOKEEPER_ZNODE_PARENT);
1770      timeout =
1771        configuration.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
1772      ConnectStringParser parser =
1773        new ConnectStringParser(ZKConfig.getZKQuorumServersString(configuration));
1774      hosts = Lists.newArrayList();
1775      for (InetSocketAddress server : parser.getServerAddresses()) {
1776        hosts.add(inetSocketAddress2String(server));
1777      }
1778      if (allowedFailures > (hosts.size() - 1) / 2) {
1779        LOG.warn(
1780          "Confirm allowable number of failed ZooKeeper nodes, as quorum will "
1781            + "already be lost. Setting of {} failures is unexpected for {} ensemble size.",
1782          allowedFailures, hosts.size());
1783      }
1784    }
1785
1786    @Override
1787    public void run() {
1788      List<ZookeeperTask> tasks = Lists.newArrayList();
1789      ZookeeperStdOutSink zkSink = null;
1790      try {
1791        zkSink = this.getSink();
1792      } catch (RuntimeException e) {
1793        LOG.error("Run ZooKeeperMonitor failed!", e);
1794        this.errorCode = ERROR_EXIT_CODE;
1795      }
1796      this.initialized = true;
1797      for (final String host : hosts) {
1798        tasks.add(new ZookeeperTask(connection, host, znode, timeout, zkSink));
1799      }
1800      try {
1801        for (Future<Void> future : this.executor.invokeAll(tasks)) {
1802          try {
1803            future.get();
1804          } catch (ExecutionException e) {
1805            LOG.error("Sniff zookeeper failed!", e);
1806            this.errorCode = ERROR_EXIT_CODE;
1807          }
1808        }
1809      } catch (InterruptedException e) {
1810        this.errorCode = ERROR_EXIT_CODE;
1811        Thread.currentThread().interrupt();
1812        LOG.error("Sniff zookeeper interrupted!", e);
1813      }
1814      this.done = true;
1815    }
1816
1817    private ZookeeperStdOutSink getSink() {
1818      if (!(sink instanceof ZookeeperStdOutSink)) {
1819        throw new RuntimeException("Can only write to zookeeper sink");
1820      }
1821      return ((ZookeeperStdOutSink) sink);
1822    }
1823  }
1824
1825  /**
1826   * A monitor for regionserver mode
1827   */
1828  private static class RegionServerMonitor extends Monitor {
1829    private boolean rawScanEnabled;
1830    private boolean allRegions;
1831
1832    public RegionServerMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
1833      Sink sink, ExecutorService executor, boolean allRegions, boolean treatFailureAsError,
1834      long allowedFailures) {
1835      super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError,
1836        allowedFailures);
1837      Configuration conf = connection.getConfiguration();
1838      this.rawScanEnabled = conf.getBoolean(HConstants.HBASE_CANARY_READ_RAW_SCAN_KEY, false);
1839      this.allRegions = allRegions;
1840    }
1841
1842    private RegionServerStdOutSink getSink() {
1843      if (!(sink instanceof RegionServerStdOutSink)) {
1844        throw new RuntimeException("Can only write to regionserver sink");
1845      }
1846      return ((RegionServerStdOutSink) sink);
1847    }
1848
1849    @Override
1850    public void run() {
1851      if (this.initAdmin() && this.checkNoTableNames()) {
1852        RegionServerStdOutSink regionServerSink = null;
1853        try {
1854          regionServerSink = this.getSink();
1855        } catch (RuntimeException e) {
1856          LOG.error("Run RegionServerMonitor failed!", e);
1857          this.errorCode = ERROR_EXIT_CODE;
1858        }
1859        Map<String, List<RegionInfo>> rsAndRMap = this.filterRegionServerByName();
1860        this.initialized = true;
1861        this.monitorRegionServers(rsAndRMap, regionServerSink);
1862      }
1863      this.done = true;
1864    }
1865
1866    private boolean checkNoTableNames() {
1867      List<String> foundTableNames = new ArrayList<>();
1868      TableName[] tableNames = null;
1869      LOG.debug("Reading list of tables");
1870      try {
1871        tableNames = this.admin.listTableNames();
1872      } catch (IOException e) {
1873        LOG.error("Get listTableNames failed", e);
1874        this.errorCode = INIT_ERROR_EXIT_CODE;
1875        return false;
1876      }
1877
1878      if (this.targets == null || this.targets.length == 0) {
1879        return true;
1880      }
1881
1882      for (String target : this.targets) {
1883        for (TableName tableName : tableNames) {
1884          if (target.equals(tableName.getNameAsString())) {
1885            foundTableNames.add(target);
1886          }
1887        }
1888      }
1889
1890      if (foundTableNames.size() > 0) {
1891        System.err.println("Cannot pass a tablename when using the -regionserver "
1892          + "option, tablenames:" + foundTableNames.toString());
1893        this.errorCode = USAGE_EXIT_CODE;
1894      }
1895      return foundTableNames.isEmpty();
1896    }
1897
1898    private void monitorRegionServers(Map<String, List<RegionInfo>> rsAndRMap,
1899      RegionServerStdOutSink regionServerSink) {
1900      List<RegionServerTask> tasks = new ArrayList<>();
1901      Map<String, AtomicLong> successMap = new HashMap<>();
1902      for (Map.Entry<String, List<RegionInfo>> entry : rsAndRMap.entrySet()) {
1903        String serverName = entry.getKey();
1904        AtomicLong successes = new AtomicLong(0);
1905        successMap.put(serverName, successes);
1906        if (entry.getValue().isEmpty()) {
1907          LOG.error("Regionserver not serving any regions - {}", serverName);
1908        } else if (this.allRegions) {
1909          for (RegionInfo region : entry.getValue()) {
1910            tasks.add(new RegionServerTask(this.connection, serverName, region, regionServerSink,
1911              this.rawScanEnabled, successes));
1912          }
1913        } else {
1914          // random select a region if flag not set
1915          RegionInfo region =
1916            entry.getValue().get(ThreadLocalRandom.current().nextInt(entry.getValue().size()));
1917          tasks.add(new RegionServerTask(this.connection, serverName, region, regionServerSink,
1918            this.rawScanEnabled, successes));
1919        }
1920      }
1921      try {
1922        for (Future<Void> future : this.executor.invokeAll(tasks)) {
1923          try {
1924            future.get();
1925          } catch (ExecutionException e) {
1926            LOG.error("Sniff regionserver failed!", e);
1927            this.errorCode = ERROR_EXIT_CODE;
1928          }
1929        }
1930        if (this.allRegions) {
1931          for (Map.Entry<String, List<RegionInfo>> entry : rsAndRMap.entrySet()) {
1932            String serverName = entry.getKey();
1933            LOG.info("Successfully read {} regions out of {} on regionserver {}",
1934              successMap.get(serverName), entry.getValue().size(), serverName);
1935          }
1936        }
1937      } catch (InterruptedException e) {
1938        this.errorCode = ERROR_EXIT_CODE;
1939        LOG.error("Sniff regionserver interrupted!", e);
1940      }
1941    }
1942
1943    private Map<String, List<RegionInfo>> filterRegionServerByName() {
1944      Map<String, List<RegionInfo>> regionServerAndRegionsMap = this.getAllRegionServerByName();
1945      regionServerAndRegionsMap = this.doFilterRegionServerByName(regionServerAndRegionsMap);
1946      return regionServerAndRegionsMap;
1947    }
1948
1949    private Map<String, List<RegionInfo>> getAllRegionServerByName() {
1950      Map<String, List<RegionInfo>> rsAndRMap = new HashMap<>();
1951      try {
1952        LOG.debug("Reading list of tables and locations");
1953        List<TableDescriptor> tableDescs = this.admin.listTableDescriptors();
1954        List<RegionInfo> regions = null;
1955        for (TableDescriptor tableDesc : tableDescs) {
1956          try (RegionLocator regionLocator =
1957            this.admin.getConnection().getRegionLocator(tableDesc.getTableName())) {
1958            for (HRegionLocation location : regionLocator.getAllRegionLocations()) {
1959              if (location == null) {
1960                LOG.warn("Null location for table {}", tableDesc.getTableName());
1961                continue;
1962              }
1963              ServerName rs = location.getServerName();
1964              String rsName = rs.getHostname();
1965              RegionInfo r = location.getRegion();
1966              if (rsAndRMap.containsKey(rsName)) {
1967                regions = rsAndRMap.get(rsName);
1968              } else {
1969                regions = new ArrayList<>();
1970                rsAndRMap.put(rsName, regions);
1971              }
1972              regions.add(r);
1973            }
1974          }
1975        }
1976
1977        // get any live regionservers not serving any regions
1978        for (ServerName rs : this.admin.getRegionServers()) {
1979          String rsName = rs.getHostname();
1980          if (!rsAndRMap.containsKey(rsName)) {
1981            rsAndRMap.put(rsName, Collections.<RegionInfo> emptyList());
1982          }
1983        }
1984      } catch (IOException e) {
1985        LOG.error("Get HTables info failed", e);
1986        this.errorCode = INIT_ERROR_EXIT_CODE;
1987      }
1988      return rsAndRMap;
1989    }
1990
1991    private Map<String, List<RegionInfo>>
1992      doFilterRegionServerByName(Map<String, List<RegionInfo>> fullRsAndRMap) {
1993
1994      Map<String, List<RegionInfo>> filteredRsAndRMap = null;
1995
1996      if (this.targets != null && this.targets.length > 0) {
1997        filteredRsAndRMap = new HashMap<>();
1998        Pattern pattern = null;
1999        Matcher matcher = null;
2000        boolean regExpFound = false;
2001        for (String rsName : this.targets) {
2002          if (this.useRegExp) {
2003            regExpFound = false;
2004            pattern = Pattern.compile(rsName);
2005            for (Map.Entry<String, List<RegionInfo>> entry : fullRsAndRMap.entrySet()) {
2006              matcher = pattern.matcher(entry.getKey());
2007              if (matcher.matches()) {
2008                filteredRsAndRMap.put(entry.getKey(), entry.getValue());
2009                regExpFound = true;
2010              }
2011            }
2012            if (!regExpFound) {
2013              LOG.info("No RegionServerInfo found, regionServerPattern {}", rsName);
2014            }
2015          } else {
2016            if (fullRsAndRMap.containsKey(rsName)) {
2017              filteredRsAndRMap.put(rsName, fullRsAndRMap.get(rsName));
2018            } else {
2019              LOG.info("No RegionServerInfo found, regionServerName {}", rsName);
2020            }
2021          }
2022        }
2023      } else {
2024        filteredRsAndRMap = fullRsAndRMap;
2025      }
2026      return filteredRsAndRMap;
2027    }
2028  }
2029
2030  public static void main(String[] args) throws Exception {
2031    final Configuration conf = HBaseConfiguration.create();
2032
2033    int numThreads = conf.getInt("hbase.canary.threads.num", MAX_THREADS_NUM);
2034    LOG.info("Execution thread count={}", numThreads);
2035
2036    int exitCode;
2037    ExecutorService executor = new ScheduledThreadPoolExecutor(numThreads);
2038    try {
2039      exitCode = ToolRunner.run(conf, new CanaryTool(executor), args);
2040    } finally {
2041      executor.shutdown();
2042    }
2043    System.exit(exitCode);
2044  }
2045}