001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.tool;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT;
021import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_ZNODE_PARENT;
022import static org.apache.hadoop.hbase.util.Addressing.inetSocketAddress2String;
023
024import java.io.Closeable;
025import java.io.IOException;
026import java.net.BindException;
027import java.net.InetSocketAddress;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Collections;
031import java.util.EnumSet;
032import java.util.HashMap;
033import java.util.HashSet;
034import java.util.LinkedList;
035import java.util.List;
036import java.util.Map;
037import java.util.Set;
038import java.util.TreeSet;
039import java.util.concurrent.Callable;
040import java.util.concurrent.ConcurrentHashMap;
041import java.util.concurrent.ConcurrentMap;
042import java.util.concurrent.ExecutionException;
043import java.util.concurrent.ExecutorService;
044import java.util.concurrent.Future;
045import java.util.concurrent.ScheduledThreadPoolExecutor;
046import java.util.concurrent.ThreadLocalRandom;
047import java.util.concurrent.atomic.AtomicLong;
048import java.util.concurrent.atomic.LongAdder;
049import java.util.regex.Matcher;
050import java.util.regex.Pattern;
051import org.apache.commons.lang3.time.StopWatch;
052import org.apache.hadoop.conf.Configuration;
053import org.apache.hadoop.hbase.AuthUtil;
054import org.apache.hadoop.hbase.ChoreService;
055import org.apache.hadoop.hbase.ClusterMetrics;
056import org.apache.hadoop.hbase.ClusterMetrics.Option;
057import org.apache.hadoop.hbase.DoNotRetryIOException;
058import org.apache.hadoop.hbase.HBaseConfiguration;
059import org.apache.hadoop.hbase.HBaseInterfaceAudience;
060import org.apache.hadoop.hbase.HConstants;
061import org.apache.hadoop.hbase.HRegionLocation;
062import org.apache.hadoop.hbase.MetaTableAccessor;
063import org.apache.hadoop.hbase.NamespaceDescriptor;
064import org.apache.hadoop.hbase.ScheduledChore;
065import org.apache.hadoop.hbase.ServerName;
066import org.apache.hadoop.hbase.TableName;
067import org.apache.hadoop.hbase.TableNotEnabledException;
068import org.apache.hadoop.hbase.TableNotFoundException;
069import org.apache.hadoop.hbase.client.Admin;
070import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
071import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
072import org.apache.hadoop.hbase.client.Connection;
073import org.apache.hadoop.hbase.client.ConnectionFactory;
074import org.apache.hadoop.hbase.client.Get;
075import org.apache.hadoop.hbase.client.Put;
076import org.apache.hadoop.hbase.client.RegionInfo;
077import org.apache.hadoop.hbase.client.RegionLocator;
078import org.apache.hadoop.hbase.client.ResultScanner;
079import org.apache.hadoop.hbase.client.Scan;
080import org.apache.hadoop.hbase.client.Table;
081import org.apache.hadoop.hbase.client.TableDescriptor;
082import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
083import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
084import org.apache.hadoop.hbase.http.InfoServer;
085import org.apache.hadoop.hbase.tool.CanaryTool.RegionTask.TaskType;
086import org.apache.hadoop.hbase.util.Bytes;
087import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
088import org.apache.hadoop.hbase.util.Pair;
089import org.apache.hadoop.hbase.util.ReflectionUtils;
090import org.apache.hadoop.hbase.util.RegionSplitter;
091import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
092import org.apache.hadoop.hbase.zookeeper.ZKConfig;
093import org.apache.hadoop.util.Tool;
094import org.apache.hadoop.util.ToolRunner;
095import org.apache.yetus.audience.InterfaceAudience;
096import org.apache.zookeeper.KeeperException;
097import org.apache.zookeeper.ZooKeeper;
098import org.apache.zookeeper.client.ConnectStringParser;
099import org.apache.zookeeper.data.Stat;
100import org.slf4j.Logger;
101import org.slf4j.LoggerFactory;
102
103import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
104
105/**
106 * HBase Canary Tool for "canary monitoring" of a running HBase cluster. There are three modes:
107 * <ol>
108 * <li>region mode (Default): For each region, try to get one row per column family outputting
109 * information on failure (ERROR) or else the latency.</li>
110 * <li>regionserver mode: For each regionserver try to get one row from one table selected randomly
111 * outputting information on failure (ERROR) or else the latency.</li>
112 * <li>zookeeper mode: for each zookeeper instance, selects a znode outputting information on
113 * failure (ERROR) or else the latency.</li>
114 * </ol>
115 */
116@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
117public class CanaryTool implements Tool, Canary {
118  public static final String HBASE_CANARY_INFO_PORT = "hbase.canary.info.port";
119  public static final String HBASE_CANARY_INFO_BINDADDRESS = "hbase.canary.info.bindAddress";
120
121  private void putUpWebUI() throws IOException {
122    int port = conf.getInt(HBASE_CANARY_INFO_PORT, -1);
123    // -1 is for disabling info server
124    if (port < 0) {
125      return;
126    }
127    if (zookeeperMode) {
128      LOG.info("WebUI is not supported in Zookeeper mode");
129    } else if (regionServerMode) {
130      LOG.info("WebUI is not supported in RegionServer mode");
131    } else {
132      String addr = conf.get(HBASE_CANARY_INFO_BINDADDRESS, "0.0.0.0");
133      try {
134        InfoServer infoServer = new InfoServer("canary", addr, port, false, conf);
135        infoServer.addUnprivilegedServlet("canary", "/canary-status", CanaryStatusServlet.class);
136        infoServer.setAttribute("sink", getSink(conf, RegionStdOutSink.class));
137        infoServer.start();
138        LOG.info("Bind Canary http info server to {}:{} ", addr, port);
139      } catch (BindException e) {
140        LOG.warn("Failed binding Canary http info server to {}:{}", addr, port, e);
141      }
142    }
143  }
144
145  @Override
146  public int checkRegions(String[] targets) throws Exception {
147    String configuredReadTableTimeoutsStr = conf.get(HBASE_CANARY_REGION_READ_TABLE_TIMEOUT);
148    try {
149      LOG.info("Canary tool is running in Region mode");
150      if (configuredReadTableTimeoutsStr != null) {
151        populateReadTableTimeoutsMap(configuredReadTableTimeoutsStr);
152      }
153    } catch (IllegalArgumentException e) {
154      LOG.error("Constructing read table timeouts map failed ", e);
155      return USAGE_EXIT_CODE;
156    }
157    return runMonitor(targets);
158  }
159
160  @Override
161  public int checkRegionServers(String[] targets) throws Exception {
162    regionServerMode = true;
163    LOG.info("Canary tool is running in RegionServer mode");
164    return runMonitor(targets);
165  }
166
167  @Override
168  public int checkZooKeeper() throws Exception {
169    zookeeperMode = true;
170    LOG.info("Canary tool is running in ZooKeeper mode");
171    return runMonitor(null);
172  }
173
174  /**
175   * Sink interface used by the canary to output information
176   */
177  public interface Sink {
178    long getReadFailureCount();
179
180    long incReadFailureCount();
181
182    Map<String, String> getReadFailures();
183
184    void updateReadFailures(String regionName, String serverName);
185
186    long getWriteFailureCount();
187
188    long incWriteFailureCount();
189
190    Map<String, String> getWriteFailures();
191
192    void updateWriteFailures(String regionName, String serverName);
193
194    long getReadSuccessCount();
195
196    long incReadSuccessCount();
197
198    long getWriteSuccessCount();
199
200    long incWriteSuccessCount();
201  }
202
203  /**
204   * Simple implementation of canary sink that allows plotting to a file or standard output.
205   */
206  public static class StdOutSink implements Sink {
207    private AtomicLong readFailureCount = new AtomicLong(0), writeFailureCount = new AtomicLong(0),
208        readSuccessCount = new AtomicLong(0), writeSuccessCount = new AtomicLong(0);
209    private Map<String, String> readFailures = new ConcurrentHashMap<>();
210    private Map<String, String> writeFailures = new ConcurrentHashMap<>();
211
212    @Override
213    public long getReadFailureCount() {
214      return readFailureCount.get();
215    }
216
217    @Override
218    public long incReadFailureCount() {
219      return readFailureCount.incrementAndGet();
220    }
221
222    @Override
223    public Map<String, String> getReadFailures() {
224      return readFailures;
225    }
226
227    @Override
228    public void updateReadFailures(String regionName, String serverName) {
229      readFailures.put(regionName, serverName);
230    }
231
232    @Override
233    public long getWriteFailureCount() {
234      return writeFailureCount.get();
235    }
236
237    @Override
238    public long incWriteFailureCount() {
239      return writeFailureCount.incrementAndGet();
240    }
241
242    @Override
243    public Map<String, String> getWriteFailures() {
244      return writeFailures;
245    }
246
247    @Override
248    public void updateWriteFailures(String regionName, String serverName) {
249      writeFailures.put(regionName, serverName);
250    }
251
252    @Override
253    public long getReadSuccessCount() {
254      return readSuccessCount.get();
255    }
256
257    @Override
258    public long incReadSuccessCount() {
259      return readSuccessCount.incrementAndGet();
260    }
261
262    @Override
263    public long getWriteSuccessCount() {
264      return writeSuccessCount.get();
265    }
266
267    @Override
268    public long incWriteSuccessCount() {
269      return writeSuccessCount.incrementAndGet();
270    }
271  }
272
273  /**
274   * By RegionServer, for 'regionserver' mode.
275   */
276  public static class RegionServerStdOutSink extends StdOutSink {
277    public void publishReadFailure(String table, String server) {
278      incReadFailureCount();
279      LOG.error("Read from {} on {}", table, server);
280    }
281
282    public void publishReadTiming(String table, String server, long msTime) {
283      LOG.info("Read from {} on {} in {}ms", table, server, msTime);
284    }
285  }
286
287  /**
288   * Output for 'zookeeper' mode.
289   */
290  public static class ZookeeperStdOutSink extends StdOutSink {
291    public void publishReadFailure(String znode, String server) {
292      incReadFailureCount();
293      LOG.error("Read from {} on {}", znode, server);
294    }
295
296    public void publishReadTiming(String znode, String server, long msTime) {
297      LOG.info("Read from {} on {} in {}ms", znode, server, msTime);
298    }
299  }
300
301  /**
302   * By Region, for 'region' mode.
303   */
304  public static class RegionStdOutSink extends StdOutSink {
305    private Map<String, LongAdder> perTableReadLatency = new HashMap<>();
306    private LongAdder writeLatency = new LongAdder();
307    private final ConcurrentMap<String, List<RegionTaskResult>> regionMap =
308      new ConcurrentHashMap<>();
309    private ConcurrentMap<ServerName, LongAdder> perServerFailuresCount = new ConcurrentHashMap<>();
310    private ConcurrentMap<String, LongAdder> perTableFailuresCount = new ConcurrentHashMap<>();
311
312    public ConcurrentMap<ServerName, LongAdder> getPerServerFailuresCount() {
313      return perServerFailuresCount;
314    }
315
316    public ConcurrentMap<String, LongAdder> getPerTableFailuresCount() {
317      return perTableFailuresCount;
318    }
319
320    public void resetFailuresCountDetails() {
321      perServerFailuresCount.clear();
322      perTableFailuresCount.clear();
323    }
324
325    private void incFailuresCountDetails(ServerName serverName, RegionInfo region) {
326      if (serverName != null) {
327        perServerFailuresCount.compute(serverName, (server, count) -> {
328          if (count == null) {
329            count = new LongAdder();
330          }
331          count.increment();
332          return count;
333        });
334      }
335      perTableFailuresCount.compute(region.getTable().getNameAsString(), (tableName, count) -> {
336        if (count == null) {
337          count = new LongAdder();
338        }
339        count.increment();
340        return count;
341      });
342    }
343
344    public void publishReadFailure(ServerName serverName, RegionInfo region, Exception e) {
345      LOG.error("Read from {} on serverName={} failed", region.getRegionNameAsString(), serverName,
346        e);
347      incReadFailureCount();
348      incFailuresCountDetails(serverName, region);
349    }
350
351    public void publishReadFailure(ServerName serverName, RegionInfo region,
352      ColumnFamilyDescriptor column, Exception e) {
353      LOG.error("Read from {} on serverName={}, columnFamily={} failed",
354        region.getRegionNameAsString(), serverName, column.getNameAsString(), e);
355      incReadFailureCount();
356      incFailuresCountDetails(serverName, region);
357    }
358
359    public void publishReadTiming(ServerName serverName, RegionInfo region,
360      ColumnFamilyDescriptor column, long msTime) {
361      RegionTaskResult rtr = new RegionTaskResult(region, region.getTable(), serverName, column);
362      rtr.setReadSuccess();
363      rtr.setReadLatency(msTime);
364      List<RegionTaskResult> rtrs = regionMap.get(region.getRegionNameAsString());
365      rtrs.add(rtr);
366      // Note that read success count will be equal to total column family read successes.
367      incReadSuccessCount();
368      LOG.info("Read from {} on {} {} in {}ms", region.getRegionNameAsString(), serverName,
369        column.getNameAsString(), msTime);
370    }
371
372    public void publishWriteFailure(ServerName serverName, RegionInfo region, Exception e) {
373      LOG.error("Write to {} on {} failed", region.getRegionNameAsString(), serverName, e);
374      incWriteFailureCount();
375      incFailuresCountDetails(serverName, region);
376    }
377
378    public void publishWriteFailure(ServerName serverName, RegionInfo region,
379      ColumnFamilyDescriptor column, Exception e) {
380      LOG.error("Write to {} on {} {} failed", region.getRegionNameAsString(), serverName,
381        column.getNameAsString(), e);
382      incWriteFailureCount();
383      incFailuresCountDetails(serverName, region);
384    }
385
386    public void publishWriteTiming(ServerName serverName, RegionInfo region,
387      ColumnFamilyDescriptor column, long msTime) {
388      RegionTaskResult rtr = new RegionTaskResult(region, region.getTable(), serverName, column);
389      rtr.setWriteSuccess();
390      rtr.setWriteLatency(msTime);
391      List<RegionTaskResult> rtrs = regionMap.get(region.getRegionNameAsString());
392      rtrs.add(rtr);
393      // Note that write success count will be equal to total column family write successes.
394      incWriteSuccessCount();
395      LOG.info("Write to {} on {} {} in {}ms", region.getRegionNameAsString(), serverName,
396        column.getNameAsString(), msTime);
397    }
398
399    public Map<String, LongAdder> getReadLatencyMap() {
400      return this.perTableReadLatency;
401    }
402
403    public LongAdder initializeAndGetReadLatencyForTable(String tableName) {
404      LongAdder initLatency = new LongAdder();
405      this.perTableReadLatency.put(tableName, initLatency);
406      return initLatency;
407    }
408
409    public void initializeWriteLatency() {
410      this.writeLatency.reset();
411    }
412
413    public LongAdder getWriteLatency() {
414      return this.writeLatency;
415    }
416
417    public ConcurrentMap<String, List<RegionTaskResult>> getRegionMap() {
418      return this.regionMap;
419    }
420
421    public int getTotalExpectedRegions() {
422      return this.regionMap.size();
423    }
424  }
425
426  /**
427   * Run a single zookeeper Task and then exit.
428   */
429  static class ZookeeperTask implements Callable<Void> {
430    private final Connection connection;
431    private final String host;
432    private String znode;
433    private final int timeout;
434    private ZookeeperStdOutSink sink;
435
436    public ZookeeperTask(Connection connection, String host, String znode, int timeout,
437      ZookeeperStdOutSink sink) {
438      this.connection = connection;
439      this.host = host;
440      this.znode = znode;
441      this.timeout = timeout;
442      this.sink = sink;
443    }
444
445    @Override
446    public Void call() throws Exception {
447      ZooKeeper zooKeeper = null;
448      try {
449        zooKeeper = new ZooKeeper(host, timeout, EmptyWatcher.instance);
450        Stat exists = zooKeeper.exists(znode, false);
451        StopWatch stopwatch = new StopWatch();
452        stopwatch.start();
453        zooKeeper.getData(znode, false, exists);
454        stopwatch.stop();
455        sink.publishReadTiming(znode, host, stopwatch.getTime());
456      } catch (KeeperException | InterruptedException e) {
457        sink.publishReadFailure(znode, host);
458      } finally {
459        if (zooKeeper != null) {
460          zooKeeper.close();
461        }
462      }
463      return null;
464    }
465  }
466
467  /**
468   * Run a single Region Task and then exit. For each column family of the Region, get one row and
469   * output latency or failure.
470   */
471  static class RegionTask implements Callable<Void> {
472    public enum TaskType {
473      READ,
474      WRITE
475    }
476
477    private Connection connection;
478    private RegionInfo region;
479    private RegionStdOutSink sink;
480    private TaskType taskType;
481    private boolean rawScanEnabled;
482    private ServerName serverName;
483    private LongAdder readWriteLatency;
484    private boolean readAllCF;
485
486    RegionTask(Connection connection, RegionInfo region, ServerName serverName,
487      RegionStdOutSink sink, TaskType taskType, boolean rawScanEnabled, LongAdder rwLatency,
488      boolean readAllCF) {
489      this.connection = connection;
490      this.region = region;
491      this.serverName = serverName;
492      this.sink = sink;
493      this.taskType = taskType;
494      this.rawScanEnabled = rawScanEnabled;
495      this.readWriteLatency = rwLatency;
496      this.readAllCF = readAllCF;
497    }
498
499    @Override
500    public Void call() {
501      switch (taskType) {
502        case READ:
503          return read();
504        case WRITE:
505          return write();
506        default:
507          return read();
508      }
509    }
510
511    private Void readColumnFamily(Table table, ColumnFamilyDescriptor column) {
512      byte[] startKey = null;
513      Scan scan = null;
514      ResultScanner rs = null;
515      StopWatch stopWatch = new StopWatch();
516      startKey = region.getStartKey();
517      // Can't do a get on empty start row so do a Scan of first element if any instead.
518      if (startKey.length > 0) {
519        Get get = new Get(startKey);
520        get.setCacheBlocks(false);
521        get.setFilter(new FirstKeyOnlyFilter());
522        get.addFamily(column.getName());
523        // Converting get object to scan to enable RAW SCAN.
524        // This will work for all the regions of the HBase tables except first region of the table.
525        scan = new Scan(get);
526        scan.setRaw(rawScanEnabled);
527      } else {
528        scan = new Scan();
529        // In case of first region of the HBase Table, we do not have start-key for the region.
530        // For Region Canary, we only need to scan a single row/cell in the region to make sure that
531        // region is accessible.
532        //
533        // When HBase table has more than 1 empty regions at start of the row-key space, Canary will
534        // create multiple scan object to find first available row in the table by scanning all the
535        // regions in sequence until it can find first available row.
536        //
537        // This could result in multiple millions of scans based on the size of table and number of
538        // empty regions in sequence. In test environment, A table with no data and 1100 empty
539        // regions, Single canary run was creating close to half million to 1 million scans to
540        // successfully do canary run for the table.
541        //
542        // Since First region of the table doesn't have any start key, We should set End Key as
543        // stop row and set inclusive=false to limit scan to single region only.
544        //
545        // TODO : In future, we can streamline Canary behaviour for all the regions by doing scan
546        // with startRow inclusive and stopRow exclusive instead of different behaviour for First
547        // Region of the table and rest of the region of the table. This way implementation is
548        // simplified. As of now this change has been kept minimal to avoid any unnecessary
549        // perf impact.
550        scan.withStopRow(region.getEndKey(), false);
551        LOG.debug("rawScan {} for {}", rawScanEnabled, region.getTable());
552        scan.setRaw(rawScanEnabled);
553        scan.setCaching(1);
554        scan.setCacheBlocks(false);
555        scan.setFilter(new FirstKeyOnlyFilter());
556        scan.addFamily(column.getName());
557        scan.setMaxResultSize(1L);
558        scan.setOneRowLimit();
559      }
560      LOG.debug("Reading from {} {} {} {}", region.getTable(), region.getRegionNameAsString(),
561        column.getNameAsString(), Bytes.toStringBinary(startKey));
562      try {
563        stopWatch.start();
564        rs = table.getScanner(scan);
565        rs.next();
566        stopWatch.stop();
567        this.readWriteLatency.add(stopWatch.getTime());
568        sink.publishReadTiming(serverName, region, column, stopWatch.getTime());
569      } catch (Exception e) {
570        sink.publishReadFailure(serverName, region, column, e);
571        sink.updateReadFailures(region.getRegionNameAsString(),
572          serverName == null ? "NULL" : serverName.getHostname());
573      } finally {
574        if (rs != null) {
575          rs.close();
576        }
577      }
578      return null;
579    }
580
581    private ColumnFamilyDescriptor randomPickOneColumnFamily(ColumnFamilyDescriptor[] cfs) {
582      int size = cfs.length;
583      return cfs[ThreadLocalRandom.current().nextInt(size)];
584
585    }
586
587    public Void read() {
588      Table table = null;
589      TableDescriptor tableDesc = null;
590      try {
591        LOG.debug("Reading table descriptor for table {}", region.getTable());
592        table = connection.getTable(region.getTable());
593        tableDesc = table.getDescriptor();
594      } catch (IOException e) {
595        LOG.debug("sniffRegion {} of {} failed", region.getEncodedName(), e);
596        sink.publishReadFailure(serverName, region, e);
597        if (table != null) {
598          try {
599            table.close();
600          } catch (IOException ioe) {
601            LOG.error("Close table failed", e);
602          }
603        }
604        return null;
605      }
606
607      if (readAllCF) {
608        for (ColumnFamilyDescriptor column : tableDesc.getColumnFamilies()) {
609          readColumnFamily(table, column);
610        }
611      } else {
612        readColumnFamily(table, randomPickOneColumnFamily(tableDesc.getColumnFamilies()));
613      }
614      try {
615        table.close();
616      } catch (IOException e) {
617        LOG.error("Close table failed", e);
618      }
619      return null;
620    }
621
622    /**
623     * Check writes for the canary table
624     */
625    private Void write() {
626      Table table = null;
627      TableDescriptor tableDesc = null;
628      try {
629        table = connection.getTable(region.getTable());
630        tableDesc = table.getDescriptor();
631        byte[] rowToCheck = region.getStartKey();
632        if (rowToCheck.length == 0) {
633          rowToCheck = new byte[] { 0x0 };
634        }
635        int writeValueSize =
636          connection.getConfiguration().getInt(HConstants.HBASE_CANARY_WRITE_VALUE_SIZE_KEY, 10);
637        for (ColumnFamilyDescriptor column : tableDesc.getColumnFamilies()) {
638          Put put = new Put(rowToCheck);
639          byte[] value = new byte[writeValueSize];
640          Bytes.random(value);
641          put.addColumn(column.getName(), HConstants.EMPTY_BYTE_ARRAY, value);
642          LOG.debug("Writing to {} {} {} {}", tableDesc.getTableName(),
643            region.getRegionNameAsString(), column.getNameAsString(),
644            Bytes.toStringBinary(rowToCheck));
645          try {
646            long startTime = EnvironmentEdgeManager.currentTime();
647            table.put(put);
648            long time = EnvironmentEdgeManager.currentTime() - startTime;
649            this.readWriteLatency.add(time);
650            sink.publishWriteTiming(serverName, region, column, time);
651          } catch (Exception e) {
652            sink.publishWriteFailure(serverName, region, column, e);
653          }
654        }
655        table.close();
656      } catch (IOException e) {
657        sink.publishWriteFailure(serverName, region, e);
658        sink.updateWriteFailures(region.getRegionNameAsString(), serverName.getHostname());
659      }
660      return null;
661    }
662  }
663
664  /**
665   * Run a single RegionServer Task and then exit. Get one row from a region on the regionserver and
666   * output latency or the failure.
667   */
668  static class RegionServerTask implements Callable<Void> {
669    private Connection connection;
670    private String serverName;
671    private RegionInfo region;
672    private RegionServerStdOutSink sink;
673    private Boolean rawScanEnabled;
674    private AtomicLong successes;
675
676    RegionServerTask(Connection connection, String serverName, RegionInfo region,
677      RegionServerStdOutSink sink, Boolean rawScanEnabled, AtomicLong successes) {
678      this.connection = connection;
679      this.serverName = serverName;
680      this.region = region;
681      this.sink = sink;
682      this.rawScanEnabled = rawScanEnabled;
683      this.successes = successes;
684    }
685
686    @Override
687    public Void call() {
688      TableName tableName = null;
689      Table table = null;
690      Get get = null;
691      byte[] startKey = null;
692      Scan scan = null;
693      StopWatch stopWatch = new StopWatch();
694      // monitor one region on every region server
695      stopWatch.reset();
696      try {
697        tableName = region.getTable();
698        table = connection.getTable(tableName);
699        startKey = region.getStartKey();
700        // Can't do a get on empty start row so do a Scan of first element if any instead.
701        LOG.debug("Reading from {} {} {} {}", serverName, region.getTable(),
702          region.getRegionNameAsString(), Bytes.toStringBinary(startKey));
703        if (startKey.length > 0) {
704          get = new Get(startKey);
705          get.setCacheBlocks(false);
706          get.setFilter(new FirstKeyOnlyFilter());
707          // Converting get object to scan to enable RAW SCAN.
708          // This will work for all the regions of the HBase tables except first region.
709          scan = new Scan(get);
710
711        } else {
712          scan = new Scan();
713          // In case of first region of the HBase Table, we do not have start-key for the region.
714          // For Region Canary, we only need scan a single row/cell in the region to make sure that
715          // region is accessible.
716          //
717          // When HBase table has more than 1 empty regions at start of the row-key space, Canary
718          // will create multiple scan object to find first available row in the table by scanning
719          // all the regions in sequence until it can find first available row.
720          //
721          // Since First region of the table doesn't have any start key, We should set End Key as
722          // stop row and set inclusive=false to limit scan to first region only.
723          scan.withStopRow(region.getEndKey(), false);
724          scan.setCacheBlocks(false);
725          scan.setFilter(new FirstKeyOnlyFilter());
726          scan.setCaching(1);
727          scan.setMaxResultSize(1L);
728          scan.setOneRowLimit();
729        }
730        scan.setRaw(rawScanEnabled);
731        stopWatch.start();
732        ResultScanner s = table.getScanner(scan);
733        s.next();
734        s.close();
735        stopWatch.stop();
736        successes.incrementAndGet();
737        sink.publishReadTiming(tableName.getNameAsString(), serverName, stopWatch.getTime());
738      } catch (TableNotFoundException tnfe) {
739        LOG.error("Table may be deleted", tnfe);
740        // This is ignored because it doesn't imply that the regionserver is dead
741      } catch (TableNotEnabledException tnee) {
742        // This is considered a success since we got a response.
743        successes.incrementAndGet();
744        LOG.debug("The targeted table was disabled.  Assuming success.");
745      } catch (DoNotRetryIOException dnrioe) {
746        sink.publishReadFailure(tableName.getNameAsString(), serverName);
747        LOG.error(dnrioe.toString(), dnrioe);
748      } catch (IOException e) {
749        sink.publishReadFailure(tableName.getNameAsString(), serverName);
750        LOG.error(e.toString(), e);
751      } finally {
752        if (table != null) {
753          try {
754            table.close();
755          } catch (IOException e) {/* DO NOTHING */
756            LOG.error("Close table failed", e);
757          }
758        }
759        scan = null;
760        get = null;
761        startKey = null;
762      }
763      return null;
764    }
765  }
766
767  private static final int USAGE_EXIT_CODE = 1;
768  private static final int INIT_ERROR_EXIT_CODE = 2;
769  private static final int TIMEOUT_ERROR_EXIT_CODE = 3;
770  private static final int ERROR_EXIT_CODE = 4;
771  private static final int FAILURE_EXIT_CODE = 5;
772
773  private static final long DEFAULT_INTERVAL = 60000;
774
775  private static final long DEFAULT_TIMEOUT = 600000; // 10 mins
776  private static final int MAX_THREADS_NUM = 16; // #threads to contact regions
777
778  private static final Logger LOG = LoggerFactory.getLogger(Canary.class);
779
780  public static final TableName DEFAULT_WRITE_TABLE_NAME =
781    TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "canary");
782
783  private static final String CANARY_TABLE_FAMILY_NAME = "Test";
784
785  private Configuration conf = null;
786  private long interval = 0;
787  private Sink sink = null;
788
789  /**
790   * True if we are to run in 'regionServer' mode.
791   */
792  private boolean regionServerMode = false;
793
794  /**
795   * True if we are to run in zookeeper 'mode'.
796   */
797  private boolean zookeeperMode = false;
798
799  /**
800   * This is a Map of table to timeout. The timeout is for reading all regions in the table; i.e. we
801   * aggregate time to fetch each region and it needs to be less than this value else we log an
802   * ERROR.
803   */
804  private HashMap<String, Long> configuredReadTableTimeouts = new HashMap<>();
805
806  public static final String HBASE_CANARY_REGIONSERVER_ALL_REGIONS =
807    "hbase.canary.regionserver_all_regions";
808
809  public static final String HBASE_CANARY_REGION_WRITE_SNIFFING =
810    "hbase.canary.region.write.sniffing";
811  public static final String HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT =
812    "hbase.canary.region.write.table.timeout";
813  public static final String HBASE_CANARY_REGION_WRITE_TABLE_NAME =
814    "hbase.canary.region.write.table.name";
815  public static final String HBASE_CANARY_REGION_READ_TABLE_TIMEOUT =
816    "hbase.canary.region.read.table.timeout";
817
818  public static final String HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES =
819    "hbase.canary.zookeeper.permitted.failures";
820
821  public static final String HBASE_CANARY_USE_REGEX = "hbase.canary.use.regex";
822  public static final String HBASE_CANARY_TIMEOUT = "hbase.canary.timeout";
823  public static final String HBASE_CANARY_FAIL_ON_ERROR = "hbase.canary.fail.on.error";
824
825  private ExecutorService executor; // threads to retrieve data from regionservers
826
827  public CanaryTool() {
828    this(new ScheduledThreadPoolExecutor(1));
829  }
830
831  public CanaryTool(ExecutorService executor) {
832    this(executor, null);
833  }
834
835  @InterfaceAudience.Private
836  CanaryTool(ExecutorService executor, Sink sink) {
837    this.executor = executor;
838    this.sink = sink;
839  }
840
841  CanaryTool(Configuration conf, ExecutorService executor) {
842    this(conf, executor, null);
843  }
844
845  CanaryTool(Configuration conf, ExecutorService executor, Sink sink) {
846    this(executor, sink);
847    setConf(conf);
848  }
849
850  @Override
851  public Configuration getConf() {
852    return conf;
853  }
854
855  @Override
856  public void setConf(Configuration conf) {
857    if (conf == null) {
858      conf = HBaseConfiguration.create();
859    }
860    this.conf = conf;
861  }
862
863  private int parseArgs(String[] args) {
864    int index = -1;
865    long permittedFailures = 0;
866    boolean regionServerAllRegions = false, writeSniffing = false;
867    String readTableTimeoutsStr = null;
868    // Process command line args
869    for (int i = 0; i < args.length; i++) {
870      String cmd = args[i];
871      if (cmd.startsWith("-")) {
872        if (index >= 0) {
873          // command line args must be in the form: [opts] [table 1 [table 2 ...]]
874          System.err.println("Invalid command line options");
875          printUsageAndExit();
876        }
877        if (cmd.equals("-help") || cmd.equals("-h")) {
878          // user asked for help, print the help and quit.
879          printUsageAndExit();
880        } else if (cmd.equals("-daemon") && interval == 0) {
881          // user asked for daemon mode, set a default interval between checks
882          interval = DEFAULT_INTERVAL;
883        } else if (cmd.equals("-interval")) {
884          // user has specified an interval for canary breaths (-interval N)
885          i++;
886
887          if (i == args.length) {
888            System.err.println("-interval takes a numeric seconds value argument.");
889            printUsageAndExit();
890          }
891          try {
892            interval = Long.parseLong(args[i]) * 1000;
893          } catch (NumberFormatException e) {
894            System.err.println("-interval needs a numeric value argument.");
895            printUsageAndExit();
896          }
897        } else if (cmd.equals("-zookeeper")) {
898          this.zookeeperMode = true;
899        } else if (cmd.equals("-regionserver")) {
900          this.regionServerMode = true;
901        } else if (cmd.equals("-allRegions")) {
902          conf.setBoolean(HBASE_CANARY_REGIONSERVER_ALL_REGIONS, true);
903          regionServerAllRegions = true;
904        } else if (cmd.equals("-writeSniffing")) {
905          writeSniffing = true;
906          conf.setBoolean(HBASE_CANARY_REGION_WRITE_SNIFFING, true);
907        } else if (cmd.equals("-treatFailureAsError") || cmd.equals("-failureAsError")) {
908          conf.setBoolean(HBASE_CANARY_FAIL_ON_ERROR, true);
909        } else if (cmd.equals("-e")) {
910          conf.setBoolean(HBASE_CANARY_USE_REGEX, true);
911        } else if (cmd.equals("-t")) {
912          i++;
913
914          if (i == args.length) {
915            System.err.println("-t takes a numeric milliseconds value argument.");
916            printUsageAndExit();
917          }
918          long timeout = 0;
919          try {
920            timeout = Long.parseLong(args[i]);
921          } catch (NumberFormatException e) {
922            System.err.println("-t takes a numeric milliseconds value argument.");
923            printUsageAndExit();
924          }
925          conf.setLong(HBASE_CANARY_TIMEOUT, timeout);
926        } else if (cmd.equals("-writeTableTimeout")) {
927          i++;
928
929          if (i == args.length) {
930            System.err.println("-writeTableTimeout takes a numeric milliseconds value argument.");
931            printUsageAndExit();
932          }
933          long configuredWriteTableTimeout = 0;
934          try {
935            configuredWriteTableTimeout = Long.parseLong(args[i]);
936          } catch (NumberFormatException e) {
937            System.err.println("-writeTableTimeout takes a numeric milliseconds value argument.");
938            printUsageAndExit();
939          }
940          conf.setLong(HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT, configuredWriteTableTimeout);
941        } else if (cmd.equals("-writeTable")) {
942          i++;
943
944          if (i == args.length) {
945            System.err.println("-writeTable takes a string tablename value argument.");
946            printUsageAndExit();
947          }
948          conf.set(HBASE_CANARY_REGION_WRITE_TABLE_NAME, args[i]);
949        } else if (cmd.equals("-f")) {
950          i++;
951          if (i == args.length) {
952            System.err.println("-f needs a boolean value argument (true|false).");
953            printUsageAndExit();
954          }
955
956          conf.setBoolean(HBASE_CANARY_FAIL_ON_ERROR, Boolean.parseBoolean(args[i]));
957        } else if (cmd.equals("-readTableTimeouts")) {
958          i++;
959          if (i == args.length) {
960            System.err.println("-readTableTimeouts needs a comma-separated list of read "
961              + "millisecond timeouts per table (without spaces).");
962            printUsageAndExit();
963          }
964          readTableTimeoutsStr = args[i];
965          conf.set(HBASE_CANARY_REGION_READ_TABLE_TIMEOUT, readTableTimeoutsStr);
966        } else if (cmd.equals("-permittedZookeeperFailures")) {
967          i++;
968
969          if (i == args.length) {
970            System.err.println("-permittedZookeeperFailures needs a numeric value argument.");
971            printUsageAndExit();
972          }
973          try {
974            permittedFailures = Long.parseLong(args[i]);
975          } catch (NumberFormatException e) {
976            System.err.println("-permittedZookeeperFailures needs a numeric value argument.");
977            printUsageAndExit();
978          }
979          conf.setLong(HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES, permittedFailures);
980        } else {
981          // no options match
982          System.err.println(cmd + " options is invalid.");
983          printUsageAndExit();
984        }
985      } else if (index < 0) {
986        // keep track of first table name specified by the user
987        index = i;
988      }
989    }
990    if (regionServerAllRegions && !this.regionServerMode) {
991      System.err.println("-allRegions can only be specified in regionserver mode.");
992      printUsageAndExit();
993    }
994    if (this.zookeeperMode) {
995      if (this.regionServerMode || regionServerAllRegions || writeSniffing) {
996        System.err.println("-zookeeper is exclusive and cannot be combined with " + "other modes.");
997        printUsageAndExit();
998      }
999    }
1000    if (permittedFailures != 0 && !this.zookeeperMode) {
1001      System.err.println("-permittedZookeeperFailures requires -zookeeper mode.");
1002      printUsageAndExit();
1003    }
1004    if (readTableTimeoutsStr != null && (this.regionServerMode || this.zookeeperMode)) {
1005      System.err.println("-readTableTimeouts can only be configured in region mode.");
1006      printUsageAndExit();
1007    }
1008    return index;
1009  }
1010
1011  @Override
1012  public int run(String[] args) throws Exception {
1013    int index = parseArgs(args);
1014    String[] monitorTargets = null;
1015
1016    if (index >= 0) {
1017      int length = args.length - index;
1018      monitorTargets = new String[length];
1019      System.arraycopy(args, index, monitorTargets, 0, length);
1020    }
1021    if (interval > 0) {
1022      // Only show the web page in daemon mode
1023      putUpWebUI();
1024    }
1025    if (zookeeperMode) {
1026      return checkZooKeeper();
1027    } else if (regionServerMode) {
1028      return checkRegionServers(monitorTargets);
1029    } else {
1030      return checkRegions(monitorTargets);
1031    }
1032  }
1033
1034  private int runMonitor(String[] monitorTargets) throws Exception {
1035    ChoreService choreService = null;
1036
1037    // Launches chore for refreshing kerberos credentials if security is enabled.
1038    // Please see http://hbase.apache.org/book.html#_running_canary_in_a_kerberos_enabled_cluster
1039    // for more details.
1040    final ScheduledChore authChore = AuthUtil.getAuthChore(conf);
1041    if (authChore != null) {
1042      choreService = new ChoreService("CANARY_TOOL");
1043      choreService.scheduleChore(authChore);
1044    }
1045
1046    // Start to prepare the stuffs
1047    Monitor monitor = null;
1048    Thread monitorThread;
1049    long startTime = 0;
1050    long currentTimeLength = 0;
1051    boolean failOnError = conf.getBoolean(HBASE_CANARY_FAIL_ON_ERROR, true);
1052    long timeout = conf.getLong(HBASE_CANARY_TIMEOUT, DEFAULT_TIMEOUT);
1053    // Get a connection to use in below.
1054    try (Connection connection = ConnectionFactory.createConnection(this.conf)) {
1055      do {
1056        // Do monitor !!
1057        try {
1058          monitor = this.newMonitor(connection, monitorTargets);
1059          startTime = EnvironmentEdgeManager.currentTime();
1060          monitorThread = new Thread(monitor, "CanaryMonitor-" + startTime);
1061          monitorThread.start();
1062          while (!monitor.isDone()) {
1063            // wait for 1 sec
1064            Thread.sleep(1000);
1065            // exit if any error occurs
1066            if (failOnError && monitor.hasError()) {
1067              monitorThread.interrupt();
1068              if (monitor.initialized) {
1069                return monitor.errorCode;
1070              } else {
1071                return INIT_ERROR_EXIT_CODE;
1072              }
1073            }
1074            currentTimeLength = EnvironmentEdgeManager.currentTime() - startTime;
1075            if (currentTimeLength > timeout) {
1076              LOG.error("The monitor is running too long (" + currentTimeLength
1077                + ") after timeout limit:" + timeout + " will be killed itself !!");
1078              if (monitor.initialized) {
1079                return TIMEOUT_ERROR_EXIT_CODE;
1080              } else {
1081                return INIT_ERROR_EXIT_CODE;
1082              }
1083            }
1084          }
1085
1086          if (failOnError && monitor.finalCheckForErrors()) {
1087            monitorThread.interrupt();
1088            return monitor.errorCode;
1089          }
1090        } finally {
1091          if (monitor != null) {
1092            monitor.close();
1093          }
1094        }
1095
1096        Thread.sleep(interval);
1097      } while (interval > 0);
1098    } // try-with-resources close
1099
1100    if (choreService != null) {
1101      choreService.shutdown();
1102    }
1103    return monitor.errorCode;
1104  }
1105
1106  @Override
1107  public Map<String, String> getReadFailures() {
1108    return sink.getReadFailures();
1109  }
1110
1111  @Override
1112  public Map<String, String> getWriteFailures() {
1113    return sink.getWriteFailures();
1114  }
1115
1116  private void printUsageAndExit() {
1117    System.err.println(
1118      "Usage: canary [OPTIONS] [<TABLE1> [<TABLE2]...] | [<REGIONSERVER1> [<REGIONSERVER2]..]");
1119    System.err.println("Where [OPTIONS] are:");
1120    System.err.println(" -h,-help        show this help and exit.");
1121    System.err.println(
1122      " -regionserver   set 'regionserver mode'; gets row from random region on " + "server");
1123    System.err.println(
1124      " -allRegions     get from ALL regions when 'regionserver mode', not just " + "random one.");
1125    System.err.println(" -zookeeper      set 'zookeeper mode'; grab zookeeper.znode.parent on "
1126      + "each ensemble member");
1127    System.err.println(" -daemon         continuous check at defined intervals.");
1128    System.err.println(" -interval <N>   interval between checks in seconds");
1129    System.err
1130      .println(" -e              consider table/regionserver argument as regular " + "expression");
1131    System.err.println(" -f <B>          exit on first error; default=true");
1132    System.err.println(" -failureAsError treat read/write failure as error");
1133    System.err.println(" -t <N>          timeout for canary-test run; default=600000ms");
1134    System.err.println(" -writeSniffing  enable write sniffing");
1135    System.err.println(" -writeTable     the table used for write sniffing; default=hbase:canary");
1136    System.err.println(" -writeTableTimeout <N>  timeout for writeTable; default=600000ms");
1137    System.err.println(
1138      " -readTableTimeouts <tableName>=<read timeout>," + "<tableName>=<read timeout>,...");
1139    System.err
1140      .println("                comma-separated list of table read timeouts " + "(no spaces);");
1141    System.err.println("                logs 'ERROR' if takes longer. default=600000ms");
1142    System.err.println(" -permittedZookeeperFailures <N>  Ignore first N failures attempting to ");
1143    System.err.println("                connect to individual zookeeper nodes in ensemble");
1144    System.err.println("");
1145    System.err.println(" -D<configProperty>=<value> to assign or override configuration params");
1146    System.err.println(" -Dhbase.canary.read.raw.enabled=<true/false> Set to enable/disable "
1147      + "raw scan; default=false");
1148    System.err.println(
1149      " -Dhbase.canary.info.port=PORT_NUMBER  Set for a Canary UI; " + "default=-1 (None)");
1150    System.err.println("");
1151    System.err.println(
1152      "Canary runs in one of three modes: region (default), regionserver, or " + "zookeeper.");
1153    System.err.println("To sniff/probe all regions, pass no arguments.");
1154    System.err.println("To sniff/probe all regions of a table, pass tablename.");
1155    System.err.println("To sniff/probe regionservers, pass -regionserver, etc.");
1156    System.err.println("See http://hbase.apache.org/book.html#_canary for Canary documentation.");
1157    System.exit(USAGE_EXIT_CODE);
1158  }
1159
1160  Sink getSink(Configuration configuration, Class clazz) {
1161    // In test context, this.sink might be set. Use it if non-null. For testing.
1162    return this.sink != null
1163      ? this.sink
1164      : (Sink) ReflectionUtils
1165        .newInstance(configuration.getClass("hbase.canary.sink.class", clazz, Sink.class));
1166  }
1167
1168  /**
1169   * Canary region mode-specific data structure which stores information about each region to be
1170   * scanned
1171   */
1172  public static class RegionTaskResult {
1173    private RegionInfo region;
1174    private TableName tableName;
1175    private ServerName serverName;
1176    private ColumnFamilyDescriptor column;
1177    private AtomicLong readLatency = null;
1178    private AtomicLong writeLatency = null;
1179    private boolean readSuccess = false;
1180    private boolean writeSuccess = false;
1181
1182    public RegionTaskResult(RegionInfo region, TableName tableName, ServerName serverName,
1183      ColumnFamilyDescriptor column) {
1184      this.region = region;
1185      this.tableName = tableName;
1186      this.serverName = serverName;
1187      this.column = column;
1188    }
1189
1190    public RegionInfo getRegionInfo() {
1191      return this.region;
1192    }
1193
1194    public String getRegionNameAsString() {
1195      return this.region.getRegionNameAsString();
1196    }
1197
1198    public TableName getTableName() {
1199      return this.tableName;
1200    }
1201
1202    public String getTableNameAsString() {
1203      return this.tableName.getNameAsString();
1204    }
1205
1206    public ServerName getServerName() {
1207      return this.serverName;
1208    }
1209
1210    public String getServerNameAsString() {
1211      return this.serverName.getServerName();
1212    }
1213
1214    public ColumnFamilyDescriptor getColumnFamily() {
1215      return this.column;
1216    }
1217
1218    public String getColumnFamilyNameAsString() {
1219      return this.column.getNameAsString();
1220    }
1221
1222    public long getReadLatency() {
1223      if (this.readLatency == null) {
1224        return -1;
1225      }
1226      return this.readLatency.get();
1227    }
1228
1229    public void setReadLatency(long readLatency) {
1230      if (this.readLatency != null) {
1231        this.readLatency.set(readLatency);
1232      } else {
1233        this.readLatency = new AtomicLong(readLatency);
1234      }
1235    }
1236
1237    public long getWriteLatency() {
1238      if (this.writeLatency == null) {
1239        return -1;
1240      }
1241      return this.writeLatency.get();
1242    }
1243
1244    public void setWriteLatency(long writeLatency) {
1245      if (this.writeLatency != null) {
1246        this.writeLatency.set(writeLatency);
1247      } else {
1248        this.writeLatency = new AtomicLong(writeLatency);
1249      }
1250    }
1251
1252    public boolean isReadSuccess() {
1253      return this.readSuccess;
1254    }
1255
1256    public void setReadSuccess() {
1257      this.readSuccess = true;
1258    }
1259
1260    public boolean isWriteSuccess() {
1261      return this.writeSuccess;
1262    }
1263
1264    public void setWriteSuccess() {
1265      this.writeSuccess = true;
1266    }
1267  }
1268
1269  /**
1270   * A Factory method for {@link Monitor}. Makes a RegionServerMonitor, or a ZooKeeperMonitor, or a
1271   * RegionMonitor.
1272   * @return a Monitor instance
1273   */
1274  private Monitor newMonitor(final Connection connection, String[] monitorTargets) {
1275    Monitor monitor;
1276    boolean useRegExp = conf.getBoolean(HBASE_CANARY_USE_REGEX, false);
1277    boolean regionServerAllRegions = conf.getBoolean(HBASE_CANARY_REGIONSERVER_ALL_REGIONS, false);
1278    boolean failOnError = conf.getBoolean(HBASE_CANARY_FAIL_ON_ERROR, true);
1279    int permittedFailures = conf.getInt(HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES, 0);
1280    boolean writeSniffing = conf.getBoolean(HBASE_CANARY_REGION_WRITE_SNIFFING, false);
1281    String writeTableName =
1282      conf.get(HBASE_CANARY_REGION_WRITE_TABLE_NAME, DEFAULT_WRITE_TABLE_NAME.getNameAsString());
1283    long configuredWriteTableTimeout =
1284      conf.getLong(HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT, DEFAULT_TIMEOUT);
1285
1286    if (this.regionServerMode) {
1287      monitor = new RegionServerMonitor(connection, monitorTargets, useRegExp,
1288        getSink(connection.getConfiguration(), RegionServerStdOutSink.class), this.executor,
1289        regionServerAllRegions, failOnError, permittedFailures);
1290
1291    } else if (this.zookeeperMode) {
1292      monitor = new ZookeeperMonitor(connection, monitorTargets, useRegExp,
1293        getSink(connection.getConfiguration(), ZookeeperStdOutSink.class), this.executor,
1294        failOnError, permittedFailures);
1295    } else {
1296      monitor = new RegionMonitor(connection, monitorTargets, useRegExp,
1297        getSink(connection.getConfiguration(), RegionStdOutSink.class), this.executor,
1298        writeSniffing, TableName.valueOf(writeTableName), failOnError, configuredReadTableTimeouts,
1299        configuredWriteTableTimeout, permittedFailures);
1300    }
1301    return monitor;
1302  }
1303
1304  private void populateReadTableTimeoutsMap(String configuredReadTableTimeoutsStr) {
1305    String[] tableTimeouts = configuredReadTableTimeoutsStr.split(",");
1306    for (String tT : tableTimeouts) {
1307      String[] nameTimeout = tT.split("=");
1308      if (nameTimeout.length < 2) {
1309        throw new IllegalArgumentException("Each -readTableTimeouts argument must be of the form "
1310          + "<tableName>=<read timeout> (without spaces).");
1311      }
1312      long timeoutVal;
1313      try {
1314        timeoutVal = Long.parseLong(nameTimeout[1]);
1315      } catch (NumberFormatException e) {
1316        throw new IllegalArgumentException(
1317          "-readTableTimeouts read timeout for each table" + " must be a numeric value argument.");
1318      }
1319      configuredReadTableTimeouts.put(nameTimeout[0], timeoutVal);
1320    }
1321  }
1322
1323  /**
1324   * A Monitor super-class can be extended by users
1325   */
1326  public static abstract class Monitor implements Runnable, Closeable {
1327    protected Connection connection;
1328    protected Admin admin;
1329    /**
1330     * 'Target' dependent on 'mode'. Could be Tables or RegionServers or ZNodes. Passed on the
1331     * command-line as arguments.
1332     */
1333    protected String[] targets;
1334    protected boolean useRegExp;
1335    protected boolean treatFailureAsError;
1336    protected boolean initialized = false;
1337
1338    protected boolean done = false;
1339    protected int errorCode = 0;
1340    protected long allowedFailures = 0;
1341    protected Sink sink;
1342    protected ExecutorService executor;
1343
1344    public boolean isDone() {
1345      return done;
1346    }
1347
1348    public boolean hasError() {
1349      return errorCode != 0;
1350    }
1351
1352    public boolean finalCheckForErrors() {
1353      if (errorCode != 0) {
1354        return true;
1355      }
1356      if (
1357        treatFailureAsError && (sink.getReadFailureCount() > allowedFailures
1358          || sink.getWriteFailureCount() > allowedFailures)
1359      ) {
1360        LOG.error("Too many failures detected, treating failure as error, failing the Canary.");
1361        errorCode = FAILURE_EXIT_CODE;
1362        return true;
1363      }
1364      return false;
1365    }
1366
1367    @Override
1368    public void close() throws IOException {
1369      if (this.admin != null) {
1370        this.admin.close();
1371      }
1372    }
1373
1374    protected Monitor(Connection connection, String[] monitorTargets, boolean useRegExp, Sink sink,
1375      ExecutorService executor, boolean treatFailureAsError, long allowedFailures) {
1376      if (null == connection) {
1377        throw new IllegalArgumentException("connection shall not be null");
1378      }
1379
1380      this.connection = connection;
1381      this.targets = monitorTargets;
1382      this.useRegExp = useRegExp;
1383      this.treatFailureAsError = treatFailureAsError;
1384      this.sink = sink;
1385      this.executor = executor;
1386      this.allowedFailures = allowedFailures;
1387    }
1388
1389    @Override
1390    public abstract void run();
1391
1392    protected boolean initAdmin() {
1393      if (null == this.admin) {
1394        try {
1395          this.admin = this.connection.getAdmin();
1396        } catch (Exception e) {
1397          LOG.error("Initial HBaseAdmin failed...", e);
1398          this.errorCode = INIT_ERROR_EXIT_CODE;
1399        }
1400      } else if (admin.isAborted()) {
1401        LOG.error("HBaseAdmin aborted");
1402        this.errorCode = INIT_ERROR_EXIT_CODE;
1403      }
1404      return !this.hasError();
1405    }
1406  }
1407
1408  /**
1409   * A monitor for region mode.
1410   */
1411  private static class RegionMonitor extends Monitor {
1412    // 10 minutes
1413    private static final int DEFAULT_WRITE_TABLE_CHECK_PERIOD = 10 * 60 * 1000;
1414    // 1 days
1415    private static final int DEFAULT_WRITE_DATA_TTL = 24 * 60 * 60;
1416
1417    private long lastCheckTime = -1;
1418    private boolean writeSniffing;
1419    private TableName writeTableName;
1420    private int writeDataTTL;
1421    private float regionsLowerLimit;
1422    private float regionsUpperLimit;
1423    private int checkPeriod;
1424    private boolean rawScanEnabled;
1425    private boolean readAllCF;
1426
1427    /**
1428     * This is a timeout per table. If read of each region in the table aggregated takes longer than
1429     * what is configured here, we log an ERROR rather than just an INFO.
1430     */
1431    private HashMap<String, Long> configuredReadTableTimeouts;
1432
1433    private long configuredWriteTableTimeout;
1434
1435    public RegionMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
1436      Sink sink, ExecutorService executor, boolean writeSniffing, TableName writeTableName,
1437      boolean treatFailureAsError, HashMap<String, Long> configuredReadTableTimeouts,
1438      long configuredWriteTableTimeout, long allowedFailures) {
1439      super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError,
1440        allowedFailures);
1441      Configuration conf = connection.getConfiguration();
1442      this.writeSniffing = writeSniffing;
1443      this.writeTableName = writeTableName;
1444      this.writeDataTTL =
1445        conf.getInt(HConstants.HBASE_CANARY_WRITE_DATA_TTL_KEY, DEFAULT_WRITE_DATA_TTL);
1446      this.regionsLowerLimit =
1447        conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY, 1.0f);
1448      this.regionsUpperLimit =
1449        conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_UPPERLIMIT_KEY, 1.5f);
1450      this.checkPeriod = conf.getInt(HConstants.HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY,
1451        DEFAULT_WRITE_TABLE_CHECK_PERIOD);
1452      this.rawScanEnabled = conf.getBoolean(HConstants.HBASE_CANARY_READ_RAW_SCAN_KEY, false);
1453      this.configuredReadTableTimeouts = new HashMap<>(configuredReadTableTimeouts);
1454      this.configuredWriteTableTimeout = configuredWriteTableTimeout;
1455      this.readAllCF = conf.getBoolean(HConstants.HBASE_CANARY_READ_ALL_CF, true);
1456    }
1457
1458    private RegionStdOutSink getSink() {
1459      if (!(sink instanceof RegionStdOutSink)) {
1460        throw new RuntimeException("Can only write to Region sink");
1461      }
1462      return ((RegionStdOutSink) sink);
1463    }
1464
1465    @Override
1466    public void run() {
1467      if (this.initAdmin()) {
1468        try {
1469          List<Future<Void>> taskFutures = new LinkedList<>();
1470          RegionStdOutSink regionSink = this.getSink();
1471          regionSink.resetFailuresCountDetails();
1472          if (this.targets != null && this.targets.length > 0) {
1473            String[] tables = generateMonitorTables(this.targets);
1474            // Check to see that each table name passed in the -readTableTimeouts argument is also
1475            // passed as a monitor target.
1476            if (
1477              !new HashSet<>(Arrays.asList(tables))
1478                .containsAll(this.configuredReadTableTimeouts.keySet())
1479            ) {
1480              LOG.error("-readTableTimeouts can only specify read timeouts for monitor targets "
1481                + "passed via command line.");
1482              this.errorCode = USAGE_EXIT_CODE;
1483              return;
1484            }
1485            this.initialized = true;
1486            for (String table : tables) {
1487              LongAdder readLatency = regionSink.initializeAndGetReadLatencyForTable(table);
1488              taskFutures.addAll(CanaryTool.sniff(admin, regionSink, table, executor, TaskType.READ,
1489                this.rawScanEnabled, readLatency, readAllCF));
1490            }
1491          } else {
1492            taskFutures.addAll(sniff(TaskType.READ, regionSink));
1493          }
1494
1495          if (writeSniffing) {
1496            if (EnvironmentEdgeManager.currentTime() - lastCheckTime > checkPeriod) {
1497              try {
1498                checkWriteTableDistribution();
1499              } catch (IOException e) {
1500                LOG.error("Check canary table distribution failed!", e);
1501              }
1502              lastCheckTime = EnvironmentEdgeManager.currentTime();
1503            }
1504            // sniff canary table with write operation
1505            regionSink.initializeWriteLatency();
1506            LongAdder writeTableLatency = regionSink.getWriteLatency();
1507            taskFutures
1508              .addAll(CanaryTool.sniff(admin, regionSink, admin.getDescriptor(writeTableName),
1509                executor, TaskType.WRITE, this.rawScanEnabled, writeTableLatency, readAllCF));
1510          }
1511
1512          for (Future<Void> future : taskFutures) {
1513            try {
1514              future.get();
1515            } catch (ExecutionException e) {
1516              LOG.error("Sniff region failed!", e);
1517            }
1518          }
1519          Map<String, LongAdder> actualReadTableLatency = regionSink.getReadLatencyMap();
1520          for (Map.Entry<String, Long> entry : configuredReadTableTimeouts.entrySet()) {
1521            String tableName = entry.getKey();
1522            if (actualReadTableLatency.containsKey(tableName)) {
1523              Long actual = actualReadTableLatency.get(tableName).longValue();
1524              Long configured = entry.getValue();
1525              if (actual > configured) {
1526                LOG.error("Read operation for {} took {}ms exceeded the configured read timeout."
1527                  + "(Configured read timeout {}ms.", tableName, actual, configured);
1528              } else {
1529                LOG.info("Read operation for {} took {}ms (Configured read timeout {}ms.",
1530                  tableName, actual, configured);
1531              }
1532            } else {
1533              LOG.error("Read operation for {} failed!", tableName);
1534            }
1535          }
1536          if (this.writeSniffing) {
1537            String writeTableStringName = this.writeTableName.getNameAsString();
1538            long actualWriteLatency = regionSink.getWriteLatency().longValue();
1539            LOG.info("Write operation for {} took {}ms. Configured write timeout {}ms.",
1540              writeTableStringName, actualWriteLatency, this.configuredWriteTableTimeout);
1541            // Check that the writeTable write operation latency does not exceed the configured
1542            // timeout.
1543            if (actualWriteLatency > this.configuredWriteTableTimeout) {
1544              LOG.error("Write operation for {} exceeded the configured write timeout.",
1545                writeTableStringName);
1546            }
1547          }
1548        } catch (Exception e) {
1549          LOG.error("Run regionMonitor failed", e);
1550          this.errorCode = ERROR_EXIT_CODE;
1551        } finally {
1552          this.done = true;
1553        }
1554      }
1555      this.done = true;
1556    }
1557
1558    /** Returns List of tables to use in test. */
1559    private String[] generateMonitorTables(String[] monitorTargets) throws IOException {
1560      String[] returnTables = null;
1561
1562      if (this.useRegExp) {
1563        Pattern pattern = null;
1564        List<TableDescriptor> tds = null;
1565        Set<String> tmpTables = new TreeSet<>();
1566        try {
1567          LOG.debug(String.format("reading list of tables"));
1568          tds = this.admin.listTableDescriptors(pattern);
1569          if (tds == null) {
1570            tds = Collections.emptyList();
1571          }
1572          for (String monitorTarget : monitorTargets) {
1573            pattern = Pattern.compile(monitorTarget);
1574            for (TableDescriptor td : tds) {
1575              if (pattern.matcher(td.getTableName().getNameAsString()).matches()) {
1576                tmpTables.add(td.getTableName().getNameAsString());
1577              }
1578            }
1579          }
1580        } catch (IOException e) {
1581          LOG.error("Communicate with admin failed", e);
1582          throw e;
1583        }
1584
1585        if (tmpTables.size() > 0) {
1586          returnTables = tmpTables.toArray(new String[tmpTables.size()]);
1587        } else {
1588          String msg = "No HTable found, tablePattern:" + Arrays.toString(monitorTargets);
1589          LOG.error(msg);
1590          this.errorCode = INIT_ERROR_EXIT_CODE;
1591          throw new TableNotFoundException(msg);
1592        }
1593      } else {
1594        returnTables = monitorTargets;
1595      }
1596
1597      return returnTables;
1598    }
1599
1600    /*
1601     * Canary entry point to monitor all the tables.
1602     */
1603    private List<Future<Void>> sniff(TaskType taskType, RegionStdOutSink regionSink)
1604      throws Exception {
1605      LOG.debug("Reading list of tables");
1606      List<Future<Void>> taskFutures = new LinkedList<>();
1607      for (TableDescriptor td : admin.listTableDescriptors()) {
1608        if (
1609          admin.tableExists(td.getTableName()) && admin.isTableEnabled(td.getTableName())
1610            && (!td.getTableName().equals(writeTableName))
1611        ) {
1612          LongAdder readLatency =
1613            regionSink.initializeAndGetReadLatencyForTable(td.getTableName().getNameAsString());
1614          taskFutures.addAll(CanaryTool.sniff(admin, sink, td, executor, taskType,
1615            this.rawScanEnabled, readLatency, readAllCF));
1616        }
1617      }
1618      return taskFutures;
1619    }
1620
1621    private void checkWriteTableDistribution() throws IOException {
1622      if (!admin.tableExists(writeTableName)) {
1623        int numberOfServers = admin.getRegionServers().size();
1624        if (numberOfServers == 0) {
1625          throw new IllegalStateException("No live regionservers");
1626        }
1627        createWriteTable(numberOfServers);
1628      }
1629
1630      if (!admin.isTableEnabled(writeTableName)) {
1631        admin.enableTable(writeTableName);
1632      }
1633
1634      ClusterMetrics status =
1635        admin.getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER));
1636      int numberOfServers = status.getServersName().size();
1637      if (status.getServersName().contains(status.getMasterName())) {
1638        numberOfServers -= 1;
1639      }
1640
1641      List<Pair<RegionInfo, ServerName>> pairs =
1642        MetaTableAccessor.getTableRegionsAndLocations(connection, writeTableName);
1643      int numberOfRegions = pairs.size();
1644      if (
1645        numberOfRegions < numberOfServers * regionsLowerLimit
1646          || numberOfRegions > numberOfServers * regionsUpperLimit
1647      ) {
1648        admin.disableTable(writeTableName);
1649        admin.deleteTable(writeTableName);
1650        createWriteTable(numberOfServers);
1651      }
1652      HashSet<ServerName> serverSet = new HashSet<>();
1653      for (Pair<RegionInfo, ServerName> pair : pairs) {
1654        serverSet.add(pair.getSecond());
1655      }
1656      int numberOfCoveredServers = serverSet.size();
1657      if (numberOfCoveredServers < numberOfServers) {
1658        admin.balance();
1659      }
1660    }
1661
1662    private void createWriteTable(int numberOfServers) throws IOException {
1663      int numberOfRegions = (int) (numberOfServers * regionsLowerLimit);
1664      LOG.info("Number of live regionservers {}, pre-splitting the canary table into {} regions "
1665        + "(current lower limit of regions per server is {} and you can change it with config {}).",
1666        numberOfServers, numberOfRegions, regionsLowerLimit,
1667        HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY);
1668      ColumnFamilyDescriptor family =
1669        ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(CANARY_TABLE_FAMILY_NAME))
1670          .setMaxVersions(1).setTimeToLive(writeDataTTL).build();
1671      TableDescriptor desc =
1672        TableDescriptorBuilder.newBuilder(writeTableName).setColumnFamily(family).build();
1673      byte[][] splits = new RegionSplitter.HexStringSplit().split(numberOfRegions);
1674      admin.createTable(desc, splits);
1675    }
1676  }
1677
1678  /**
1679   * Canary entry point for specified table.
1680   * @throws Exception exception
1681   */
1682  private static List<Future<Void>> sniff(final Admin admin, final Sink sink, String tableName,
1683    ExecutorService executor, TaskType taskType, boolean rawScanEnabled, LongAdder readLatency,
1684    boolean readAllCF) throws Exception {
1685    LOG.debug("Checking table is enabled and getting table descriptor for table {}", tableName);
1686    if (admin.isTableEnabled(TableName.valueOf(tableName))) {
1687      return CanaryTool.sniff(admin, sink, admin.getDescriptor(TableName.valueOf(tableName)),
1688        executor, taskType, rawScanEnabled, readLatency, readAllCF);
1689    } else {
1690      LOG.warn("Table {} is not enabled", tableName);
1691    }
1692    return new LinkedList<>();
1693  }
1694
1695  /*
1696   * Loops over regions of this table, and outputs information about the state.
1697   */
1698  private static List<Future<Void>> sniff(final Admin admin, final Sink sink,
1699    TableDescriptor tableDesc, ExecutorService executor, TaskType taskType, boolean rawScanEnabled,
1700    LongAdder rwLatency, boolean readAllCF) throws Exception {
1701    LOG.debug("Reading list of regions for table {}", tableDesc.getTableName());
1702    try (Table table = admin.getConnection().getTable(tableDesc.getTableName())) {
1703      List<RegionTask> tasks = new ArrayList<>();
1704      try (RegionLocator regionLocator =
1705        admin.getConnection().getRegionLocator(tableDesc.getTableName())) {
1706        for (HRegionLocation location : regionLocator.getAllRegionLocations()) {
1707          if (location == null) {
1708            LOG.warn("Null location");
1709            continue;
1710          }
1711          ServerName rs = location.getServerName();
1712          RegionInfo region = location.getRegion();
1713          tasks.add(new RegionTask(admin.getConnection(), region, rs, (RegionStdOutSink) sink,
1714            taskType, rawScanEnabled, rwLatency, readAllCF));
1715          Map<String, List<RegionTaskResult>> regionMap = ((RegionStdOutSink) sink).getRegionMap();
1716          regionMap.put(region.getRegionNameAsString(), new ArrayList<RegionTaskResult>());
1717        }
1718        return executor.invokeAll(tasks);
1719      }
1720    } catch (TableNotFoundException e) {
1721      return Collections.EMPTY_LIST;
1722    }
1723  }
1724
1725  // monitor for zookeeper mode
1726  private static class ZookeeperMonitor extends Monitor {
1727    private List<String> hosts;
1728    private final String znode;
1729    private final int timeout;
1730
1731    protected ZookeeperMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
1732      Sink sink, ExecutorService executor, boolean treatFailureAsError, long allowedFailures) {
1733      super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError,
1734        allowedFailures);
1735      Configuration configuration = connection.getConfiguration();
1736      znode = configuration.get(ZOOKEEPER_ZNODE_PARENT, DEFAULT_ZOOKEEPER_ZNODE_PARENT);
1737      timeout =
1738        configuration.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
1739      ConnectStringParser parser =
1740        new ConnectStringParser(ZKConfig.getZKQuorumServersString(configuration));
1741      hosts = Lists.newArrayList();
1742      for (InetSocketAddress server : parser.getServerAddresses()) {
1743        hosts.add(inetSocketAddress2String(server));
1744      }
1745      if (allowedFailures > (hosts.size() - 1) / 2) {
1746        LOG.warn(
1747          "Confirm allowable number of failed ZooKeeper nodes, as quorum will "
1748            + "already be lost. Setting of {} failures is unexpected for {} ensemble size.",
1749          allowedFailures, hosts.size());
1750      }
1751    }
1752
1753    @Override
1754    public void run() {
1755      List<ZookeeperTask> tasks = Lists.newArrayList();
1756      ZookeeperStdOutSink zkSink = null;
1757      try {
1758        zkSink = this.getSink();
1759      } catch (RuntimeException e) {
1760        LOG.error("Run ZooKeeperMonitor failed!", e);
1761        this.errorCode = ERROR_EXIT_CODE;
1762      }
1763      this.initialized = true;
1764      for (final String host : hosts) {
1765        tasks.add(new ZookeeperTask(connection, host, znode, timeout, zkSink));
1766      }
1767      try {
1768        for (Future<Void> future : this.executor.invokeAll(tasks)) {
1769          try {
1770            future.get();
1771          } catch (ExecutionException e) {
1772            LOG.error("Sniff zookeeper failed!", e);
1773            this.errorCode = ERROR_EXIT_CODE;
1774          }
1775        }
1776      } catch (InterruptedException e) {
1777        this.errorCode = ERROR_EXIT_CODE;
1778        Thread.currentThread().interrupt();
1779        LOG.error("Sniff zookeeper interrupted!", e);
1780      }
1781      this.done = true;
1782    }
1783
1784    private ZookeeperStdOutSink getSink() {
1785      if (!(sink instanceof ZookeeperStdOutSink)) {
1786        throw new RuntimeException("Can only write to zookeeper sink");
1787      }
1788      return ((ZookeeperStdOutSink) sink);
1789    }
1790  }
1791
1792  /**
1793   * A monitor for regionserver mode
1794   */
1795  private static class RegionServerMonitor extends Monitor {
1796    private boolean rawScanEnabled;
1797    private boolean allRegions;
1798
1799    public RegionServerMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
1800      Sink sink, ExecutorService executor, boolean allRegions, boolean treatFailureAsError,
1801      long allowedFailures) {
1802      super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError,
1803        allowedFailures);
1804      Configuration conf = connection.getConfiguration();
1805      this.rawScanEnabled = conf.getBoolean(HConstants.HBASE_CANARY_READ_RAW_SCAN_KEY, false);
1806      this.allRegions = allRegions;
1807    }
1808
1809    private RegionServerStdOutSink getSink() {
1810      if (!(sink instanceof RegionServerStdOutSink)) {
1811        throw new RuntimeException("Can only write to regionserver sink");
1812      }
1813      return ((RegionServerStdOutSink) sink);
1814    }
1815
1816    @Override
1817    public void run() {
1818      if (this.initAdmin() && this.checkNoTableNames()) {
1819        RegionServerStdOutSink regionServerSink = null;
1820        try {
1821          regionServerSink = this.getSink();
1822        } catch (RuntimeException e) {
1823          LOG.error("Run RegionServerMonitor failed!", e);
1824          this.errorCode = ERROR_EXIT_CODE;
1825        }
1826        Map<String, List<RegionInfo>> rsAndRMap = this.filterRegionServerByName();
1827        this.initialized = true;
1828        this.monitorRegionServers(rsAndRMap, regionServerSink);
1829      }
1830      this.done = true;
1831    }
1832
1833    private boolean checkNoTableNames() {
1834      List<String> foundTableNames = new ArrayList<>();
1835      TableName[] tableNames = null;
1836      LOG.debug("Reading list of tables");
1837      try {
1838        tableNames = this.admin.listTableNames();
1839      } catch (IOException e) {
1840        LOG.error("Get listTableNames failed", e);
1841        this.errorCode = INIT_ERROR_EXIT_CODE;
1842        return false;
1843      }
1844
1845      if (this.targets == null || this.targets.length == 0) {
1846        return true;
1847      }
1848
1849      for (String target : this.targets) {
1850        for (TableName tableName : tableNames) {
1851          if (target.equals(tableName.getNameAsString())) {
1852            foundTableNames.add(target);
1853          }
1854        }
1855      }
1856
1857      if (foundTableNames.size() > 0) {
1858        System.err.println("Cannot pass a tablename when using the -regionserver "
1859          + "option, tablenames:" + foundTableNames.toString());
1860        this.errorCode = USAGE_EXIT_CODE;
1861      }
1862      return foundTableNames.isEmpty();
1863    }
1864
1865    private void monitorRegionServers(Map<String, List<RegionInfo>> rsAndRMap,
1866      RegionServerStdOutSink regionServerSink) {
1867      List<RegionServerTask> tasks = new ArrayList<>();
1868      Map<String, AtomicLong> successMap = new HashMap<>();
1869      for (Map.Entry<String, List<RegionInfo>> entry : rsAndRMap.entrySet()) {
1870        String serverName = entry.getKey();
1871        AtomicLong successes = new AtomicLong(0);
1872        successMap.put(serverName, successes);
1873        if (entry.getValue().isEmpty()) {
1874          LOG.error("Regionserver not serving any regions - {}", serverName);
1875        } else if (this.allRegions) {
1876          for (RegionInfo region : entry.getValue()) {
1877            tasks.add(new RegionServerTask(this.connection, serverName, region, regionServerSink,
1878              this.rawScanEnabled, successes));
1879          }
1880        } else {
1881          // random select a region if flag not set
1882          RegionInfo region =
1883            entry.getValue().get(ThreadLocalRandom.current().nextInt(entry.getValue().size()));
1884          tasks.add(new RegionServerTask(this.connection, serverName, region, regionServerSink,
1885            this.rawScanEnabled, successes));
1886        }
1887      }
1888      try {
1889        for (Future<Void> future : this.executor.invokeAll(tasks)) {
1890          try {
1891            future.get();
1892          } catch (ExecutionException e) {
1893            LOG.error("Sniff regionserver failed!", e);
1894            this.errorCode = ERROR_EXIT_CODE;
1895          }
1896        }
1897        if (this.allRegions) {
1898          for (Map.Entry<String, List<RegionInfo>> entry : rsAndRMap.entrySet()) {
1899            String serverName = entry.getKey();
1900            LOG.info("Successfully read {} regions out of {} on regionserver {}",
1901              successMap.get(serverName), entry.getValue().size(), serverName);
1902          }
1903        }
1904      } catch (InterruptedException e) {
1905        this.errorCode = ERROR_EXIT_CODE;
1906        LOG.error("Sniff regionserver interrupted!", e);
1907      }
1908    }
1909
1910    private Map<String, List<RegionInfo>> filterRegionServerByName() {
1911      Map<String, List<RegionInfo>> regionServerAndRegionsMap = this.getAllRegionServerByName();
1912      regionServerAndRegionsMap = this.doFilterRegionServerByName(regionServerAndRegionsMap);
1913      return regionServerAndRegionsMap;
1914    }
1915
1916    private Map<String, List<RegionInfo>> getAllRegionServerByName() {
1917      Map<String, List<RegionInfo>> rsAndRMap = new HashMap<>();
1918      try {
1919        LOG.debug("Reading list of tables and locations");
1920        List<TableDescriptor> tableDescs = this.admin.listTableDescriptors();
1921        List<RegionInfo> regions = null;
1922        for (TableDescriptor tableDesc : tableDescs) {
1923          try (RegionLocator regionLocator =
1924            this.admin.getConnection().getRegionLocator(tableDesc.getTableName())) {
1925            for (HRegionLocation location : regionLocator.getAllRegionLocations()) {
1926              if (location == null) {
1927                LOG.warn("Null location");
1928                continue;
1929              }
1930              ServerName rs = location.getServerName();
1931              String rsName = rs.getHostname();
1932              RegionInfo r = location.getRegion();
1933              if (rsAndRMap.containsKey(rsName)) {
1934                regions = rsAndRMap.get(rsName);
1935              } else {
1936                regions = new ArrayList<>();
1937                rsAndRMap.put(rsName, regions);
1938              }
1939              regions.add(r);
1940            }
1941          }
1942        }
1943
1944        // get any live regionservers not serving any regions
1945        for (ServerName rs : this.admin.getRegionServers()) {
1946          String rsName = rs.getHostname();
1947          if (!rsAndRMap.containsKey(rsName)) {
1948            rsAndRMap.put(rsName, Collections.<RegionInfo> emptyList());
1949          }
1950        }
1951      } catch (IOException e) {
1952        LOG.error("Get HTables info failed", e);
1953        this.errorCode = INIT_ERROR_EXIT_CODE;
1954      }
1955      return rsAndRMap;
1956    }
1957
1958    private Map<String, List<RegionInfo>>
1959      doFilterRegionServerByName(Map<String, List<RegionInfo>> fullRsAndRMap) {
1960
1961      Map<String, List<RegionInfo>> filteredRsAndRMap = null;
1962
1963      if (this.targets != null && this.targets.length > 0) {
1964        filteredRsAndRMap = new HashMap<>();
1965        Pattern pattern = null;
1966        Matcher matcher = null;
1967        boolean regExpFound = false;
1968        for (String rsName : this.targets) {
1969          if (this.useRegExp) {
1970            regExpFound = false;
1971            pattern = Pattern.compile(rsName);
1972            for (Map.Entry<String, List<RegionInfo>> entry : fullRsAndRMap.entrySet()) {
1973              matcher = pattern.matcher(entry.getKey());
1974              if (matcher.matches()) {
1975                filteredRsAndRMap.put(entry.getKey(), entry.getValue());
1976                regExpFound = true;
1977              }
1978            }
1979            if (!regExpFound) {
1980              LOG.info("No RegionServerInfo found, regionServerPattern {}", rsName);
1981            }
1982          } else {
1983            if (fullRsAndRMap.containsKey(rsName)) {
1984              filteredRsAndRMap.put(rsName, fullRsAndRMap.get(rsName));
1985            } else {
1986              LOG.info("No RegionServerInfo found, regionServerName {}", rsName);
1987            }
1988          }
1989        }
1990      } else {
1991        filteredRsAndRMap = fullRsAndRMap;
1992      }
1993      return filteredRsAndRMap;
1994    }
1995  }
1996
1997  public static void main(String[] args) throws Exception {
1998    final Configuration conf = HBaseConfiguration.create();
1999
2000    int numThreads = conf.getInt("hbase.canary.threads.num", MAX_THREADS_NUM);
2001    LOG.info("Execution thread count={}", numThreads);
2002
2003    int exitCode;
2004    ExecutorService executor = new ScheduledThreadPoolExecutor(numThreads);
2005    try {
2006      exitCode = ToolRunner.run(conf, new CanaryTool(executor), args);
2007    } finally {
2008      executor.shutdown();
2009    }
2010    System.exit(exitCode);
2011  }
2012}