001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.tool; 019 020import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT; 021import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_ZNODE_PARENT; 022import static org.apache.hadoop.hbase.util.Addressing.inetSocketAddress2String; 023 024import java.io.Closeable; 025import java.io.IOException; 026import java.net.BindException; 027import java.net.InetSocketAddress; 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.Collections; 031import java.util.EnumSet; 032import java.util.HashMap; 033import java.util.HashSet; 034import java.util.LinkedList; 035import java.util.List; 036import java.util.Map; 037import java.util.Set; 038import java.util.TreeSet; 039import java.util.concurrent.Callable; 040import java.util.concurrent.ConcurrentHashMap; 041import java.util.concurrent.ConcurrentMap; 042import java.util.concurrent.ExecutionException; 043import java.util.concurrent.ExecutorService; 044import java.util.concurrent.Future; 045import java.util.concurrent.ScheduledThreadPoolExecutor; 046import java.util.concurrent.ThreadLocalRandom; 047import java.util.concurrent.atomic.AtomicLong; 048import java.util.concurrent.atomic.LongAdder; 049import java.util.regex.Matcher; 050import java.util.regex.Pattern; 051import org.apache.commons.lang3.time.StopWatch; 052import org.apache.hadoop.conf.Configuration; 053import org.apache.hadoop.hbase.AuthUtil; 054import org.apache.hadoop.hbase.ChoreService; 055import org.apache.hadoop.hbase.ClusterMetrics; 056import org.apache.hadoop.hbase.ClusterMetrics.Option; 057import org.apache.hadoop.hbase.DoNotRetryIOException; 058import org.apache.hadoop.hbase.HBaseConfiguration; 059import org.apache.hadoop.hbase.HBaseInterfaceAudience; 060import org.apache.hadoop.hbase.HConstants; 061import org.apache.hadoop.hbase.HRegionLocation; 062import org.apache.hadoop.hbase.MetaTableAccessor; 063import org.apache.hadoop.hbase.NamespaceDescriptor; 064import org.apache.hadoop.hbase.ScheduledChore; 065import org.apache.hadoop.hbase.ServerName; 066import org.apache.hadoop.hbase.TableName; 067import org.apache.hadoop.hbase.TableNotEnabledException; 068import org.apache.hadoop.hbase.TableNotFoundException; 069import org.apache.hadoop.hbase.client.Admin; 070import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 071import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 072import org.apache.hadoop.hbase.client.Connection; 073import org.apache.hadoop.hbase.client.ConnectionFactory; 074import org.apache.hadoop.hbase.client.Get; 075import org.apache.hadoop.hbase.client.Put; 076import org.apache.hadoop.hbase.client.RegionInfo; 077import org.apache.hadoop.hbase.client.RegionLocator; 078import org.apache.hadoop.hbase.client.ResultScanner; 079import org.apache.hadoop.hbase.client.Scan; 080import org.apache.hadoop.hbase.client.Table; 081import org.apache.hadoop.hbase.client.TableDescriptor; 082import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 083import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; 084import org.apache.hadoop.hbase.http.InfoServer; 085import org.apache.hadoop.hbase.tool.CanaryTool.RegionTask.TaskType; 086import org.apache.hadoop.hbase.util.Bytes; 087import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 088import org.apache.hadoop.hbase.util.Pair; 089import org.apache.hadoop.hbase.util.ReflectionUtils; 090import org.apache.hadoop.hbase.util.RegionSplitter; 091import org.apache.hadoop.hbase.zookeeper.EmptyWatcher; 092import org.apache.hadoop.hbase.zookeeper.ZKConfig; 093import org.apache.hadoop.util.Tool; 094import org.apache.hadoop.util.ToolRunner; 095import org.apache.yetus.audience.InterfaceAudience; 096import org.apache.zookeeper.KeeperException; 097import org.apache.zookeeper.ZooKeeper; 098import org.apache.zookeeper.client.ConnectStringParser; 099import org.apache.zookeeper.data.Stat; 100import org.slf4j.Logger; 101import org.slf4j.LoggerFactory; 102 103import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 104 105/** 106 * HBase Canary Tool for "canary monitoring" of a running HBase cluster. There are three modes: 107 * <ol> 108 * <li>region mode (Default): For each region, try to get one row per column family outputting 109 * information on failure (ERROR) or else the latency.</li> 110 * <li>regionserver mode: For each regionserver try to get one row from one table selected randomly 111 * outputting information on failure (ERROR) or else the latency.</li> 112 * <li>zookeeper mode: for each zookeeper instance, selects a znode outputting information on 113 * failure (ERROR) or else the latency.</li> 114 * </ol> 115 */ 116@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 117public class CanaryTool implements Tool, Canary { 118 public static final String HBASE_CANARY_INFO_PORT = "hbase.canary.info.port"; 119 public static final String HBASE_CANARY_INFO_BINDADDRESS = "hbase.canary.info.bindAddress"; 120 121 private void putUpWebUI() throws IOException { 122 int port = conf.getInt(HBASE_CANARY_INFO_PORT, -1); 123 // -1 is for disabling info server 124 if (port < 0) { 125 return; 126 } 127 if (zookeeperMode) { 128 LOG.info("WebUI is not supported in Zookeeper mode"); 129 } else if (regionServerMode) { 130 LOG.info("WebUI is not supported in RegionServer mode"); 131 } else { 132 String addr = conf.get(HBASE_CANARY_INFO_BINDADDRESS, "0.0.0.0"); 133 try { 134 InfoServer infoServer = new InfoServer("canary", addr, port, false, conf); 135 infoServer.addUnprivilegedServlet("canary", "/canary-status", CanaryStatusServlet.class); 136 infoServer.setAttribute("sink", getSink(conf, RegionStdOutSink.class)); 137 infoServer.start(); 138 LOG.info("Bind Canary http info server to {}:{} ", addr, port); 139 } catch (BindException e) { 140 LOG.warn("Failed binding Canary http info server to {}:{}", addr, port, e); 141 } 142 } 143 } 144 145 @Override 146 public int checkRegions(String[] targets) throws Exception { 147 String configuredReadTableTimeoutsStr = conf.get(HBASE_CANARY_REGION_READ_TABLE_TIMEOUT); 148 try { 149 LOG.info("Canary tool is running in Region mode"); 150 if (configuredReadTableTimeoutsStr != null) { 151 populateReadTableTimeoutsMap(configuredReadTableTimeoutsStr); 152 } 153 } catch (IllegalArgumentException e) { 154 LOG.error("Constructing read table timeouts map failed ", e); 155 return USAGE_EXIT_CODE; 156 } 157 return runMonitor(targets); 158 } 159 160 @Override 161 public int checkRegionServers(String[] targets) throws Exception { 162 regionServerMode = true; 163 LOG.info("Canary tool is running in RegionServer mode"); 164 return runMonitor(targets); 165 } 166 167 @Override 168 public int checkZooKeeper() throws Exception { 169 zookeeperMode = true; 170 LOG.info("Canary tool is running in ZooKeeper mode"); 171 return runMonitor(null); 172 } 173 174 /** 175 * Sink interface used by the canary to output information 176 */ 177 public interface Sink { 178 long getReadFailureCount(); 179 180 long incReadFailureCount(); 181 182 Map<String, String> getReadFailures(); 183 184 void updateReadFailures(String regionName, String serverName); 185 186 long getWriteFailureCount(); 187 188 long incWriteFailureCount(); 189 190 Map<String, String> getWriteFailures(); 191 192 void updateWriteFailures(String regionName, String serverName); 193 194 long getReadSuccessCount(); 195 196 long incReadSuccessCount(); 197 198 long getWriteSuccessCount(); 199 200 long incWriteSuccessCount(); 201 202 void stop(); 203 204 boolean isStopped(); 205 } 206 207 /** 208 * Simple implementation of canary sink that allows plotting to a file or standard output. 209 */ 210 public static class StdOutSink implements Sink { 211 private AtomicLong readFailureCount = new AtomicLong(0), writeFailureCount = new AtomicLong(0), 212 readSuccessCount = new AtomicLong(0), writeSuccessCount = new AtomicLong(0); 213 private Map<String, String> readFailures = new ConcurrentHashMap<>(); 214 private Map<String, String> writeFailures = new ConcurrentHashMap<>(); 215 private volatile boolean stopped = false; 216 217 @Override 218 public long getReadFailureCount() { 219 return readFailureCount.get(); 220 } 221 222 @Override 223 public long incReadFailureCount() { 224 return readFailureCount.incrementAndGet(); 225 } 226 227 @Override 228 public Map<String, String> getReadFailures() { 229 return readFailures; 230 } 231 232 @Override 233 public void updateReadFailures(String regionName, String serverName) { 234 readFailures.put(regionName, serverName); 235 } 236 237 @Override 238 public long getWriteFailureCount() { 239 return writeFailureCount.get(); 240 } 241 242 @Override 243 public long incWriteFailureCount() { 244 return writeFailureCount.incrementAndGet(); 245 } 246 247 @Override 248 public Map<String, String> getWriteFailures() { 249 return writeFailures; 250 } 251 252 @Override 253 public void updateWriteFailures(String regionName, String serverName) { 254 writeFailures.put(regionName, serverName); 255 } 256 257 @Override 258 public long getReadSuccessCount() { 259 return readSuccessCount.get(); 260 } 261 262 @Override 263 public long incReadSuccessCount() { 264 return readSuccessCount.incrementAndGet(); 265 } 266 267 @Override 268 public long getWriteSuccessCount() { 269 return writeSuccessCount.get(); 270 } 271 272 @Override 273 public long incWriteSuccessCount() { 274 return writeSuccessCount.incrementAndGet(); 275 } 276 277 public void stop() { 278 stopped = true; 279 } 280 281 @Override 282 public boolean isStopped() { 283 return stopped; 284 } 285 } 286 287 /** 288 * By RegionServer, for 'regionserver' mode. 289 */ 290 public static class RegionServerStdOutSink extends StdOutSink { 291 public void publishReadFailure(String table, String server) { 292 incReadFailureCount(); 293 LOG.error("Read from {} on {}", table, server); 294 } 295 296 public void publishReadTiming(String table, String server, long msTime) { 297 LOG.info("Read from {} on {} in {}ms", table, server, msTime); 298 } 299 } 300 301 /** 302 * Output for 'zookeeper' mode. 303 */ 304 public static class ZookeeperStdOutSink extends StdOutSink { 305 public void publishReadFailure(String znode, String server) { 306 incReadFailureCount(); 307 LOG.error("Read from {} on {}", znode, server); 308 } 309 310 public void publishReadTiming(String znode, String server, long msTime) { 311 LOG.info("Read from {} on {} in {}ms", znode, server, msTime); 312 } 313 } 314 315 /** 316 * By Region, for 'region' mode. 317 */ 318 public static class RegionStdOutSink extends StdOutSink { 319 private Map<String, LongAdder> perTableReadLatency = new HashMap<>(); 320 private LongAdder writeLatency = new LongAdder(); 321 private final ConcurrentMap<String, List<RegionTaskResult>> regionMap = 322 new ConcurrentHashMap<>(); 323 private ConcurrentMap<ServerName, LongAdder> perServerFailuresCount = new ConcurrentHashMap<>(); 324 private ConcurrentMap<String, LongAdder> perTableFailuresCount = new ConcurrentHashMap<>(); 325 326 public ConcurrentMap<ServerName, LongAdder> getPerServerFailuresCount() { 327 return perServerFailuresCount; 328 } 329 330 public ConcurrentMap<String, LongAdder> getPerTableFailuresCount() { 331 return perTableFailuresCount; 332 } 333 334 public void resetFailuresCountDetails() { 335 perServerFailuresCount.clear(); 336 perTableFailuresCount.clear(); 337 } 338 339 private void incFailuresCountDetails(ServerName serverName, RegionInfo region) { 340 if (serverName != null) { 341 perServerFailuresCount.compute(serverName, (server, count) -> { 342 if (count == null) { 343 count = new LongAdder(); 344 } 345 count.increment(); 346 return count; 347 }); 348 } 349 perTableFailuresCount.compute(region.getTable().getNameAsString(), (tableName, count) -> { 350 if (count == null) { 351 count = new LongAdder(); 352 } 353 count.increment(); 354 return count; 355 }); 356 } 357 358 public void publishReadFailure(ServerName serverName, RegionInfo region, Exception e) { 359 LOG.error("Read from {} on serverName={} failed", region.getRegionNameAsString(), serverName, 360 e); 361 incReadFailureCount(); 362 incFailuresCountDetails(serverName, region); 363 } 364 365 public void publishReadFailure(ServerName serverName, RegionInfo region, 366 ColumnFamilyDescriptor column, Exception e) { 367 LOG.error("Read from {} on serverName={}, columnFamily={} failed", 368 region.getRegionNameAsString(), serverName, column.getNameAsString(), e); 369 incReadFailureCount(); 370 incFailuresCountDetails(serverName, region); 371 } 372 373 public void publishReadTiming(ServerName serverName, RegionInfo region, 374 ColumnFamilyDescriptor column, long msTime) { 375 RegionTaskResult rtr = new RegionTaskResult(region, region.getTable(), serverName, column); 376 rtr.setReadSuccess(); 377 rtr.setReadLatency(msTime); 378 List<RegionTaskResult> rtrs = regionMap.get(region.getRegionNameAsString()); 379 rtrs.add(rtr); 380 // Note that read success count will be equal to total column family read successes. 381 incReadSuccessCount(); 382 LOG.info("Read from {} on {} {} in {}ms", region.getRegionNameAsString(), serverName, 383 column.getNameAsString(), msTime); 384 } 385 386 public void publishWriteFailure(ServerName serverName, RegionInfo region, Exception e) { 387 LOG.error("Write to {} on {} failed", region.getRegionNameAsString(), serverName, e); 388 incWriteFailureCount(); 389 incFailuresCountDetails(serverName, region); 390 } 391 392 public void publishWriteFailure(ServerName serverName, RegionInfo region, 393 ColumnFamilyDescriptor column, Exception e) { 394 LOG.error("Write to {} on {} {} failed", region.getRegionNameAsString(), serverName, 395 column.getNameAsString(), e); 396 incWriteFailureCount(); 397 incFailuresCountDetails(serverName, region); 398 } 399 400 public void publishWriteTiming(ServerName serverName, RegionInfo region, 401 ColumnFamilyDescriptor column, long msTime) { 402 RegionTaskResult rtr = new RegionTaskResult(region, region.getTable(), serverName, column); 403 rtr.setWriteSuccess(); 404 rtr.setWriteLatency(msTime); 405 List<RegionTaskResult> rtrs = regionMap.get(region.getRegionNameAsString()); 406 rtrs.add(rtr); 407 // Note that write success count will be equal to total column family write successes. 408 incWriteSuccessCount(); 409 LOG.info("Write to {} on {} {} in {}ms", region.getRegionNameAsString(), serverName, 410 column.getNameAsString(), msTime); 411 } 412 413 public Map<String, LongAdder> getReadLatencyMap() { 414 return this.perTableReadLatency; 415 } 416 417 public LongAdder initializeAndGetReadLatencyForTable(String tableName) { 418 LongAdder initLatency = new LongAdder(); 419 this.perTableReadLatency.put(tableName, initLatency); 420 return initLatency; 421 } 422 423 public void initializeWriteLatency() { 424 this.writeLatency.reset(); 425 } 426 427 public LongAdder getWriteLatency() { 428 return this.writeLatency; 429 } 430 431 public ConcurrentMap<String, List<RegionTaskResult>> getRegionMap() { 432 return this.regionMap; 433 } 434 435 public int getTotalExpectedRegions() { 436 return this.regionMap.size(); 437 } 438 } 439 440 /** 441 * Run a single zookeeper Task and then exit. 442 */ 443 static class ZookeeperTask implements Callable<Void> { 444 private final Connection connection; 445 private final String host; 446 private String znode; 447 private final int timeout; 448 private ZookeeperStdOutSink sink; 449 450 public ZookeeperTask(Connection connection, String host, String znode, int timeout, 451 ZookeeperStdOutSink sink) { 452 this.connection = connection; 453 this.host = host; 454 this.znode = znode; 455 this.timeout = timeout; 456 this.sink = sink; 457 } 458 459 @Override 460 public Void call() throws Exception { 461 if (this.sink.isStopped()) { 462 return null; 463 } 464 ZooKeeper zooKeeper = null; 465 try { 466 zooKeeper = new ZooKeeper(host, timeout, EmptyWatcher.instance); 467 Stat exists = zooKeeper.exists(znode, false); 468 StopWatch stopwatch = new StopWatch(); 469 stopwatch.start(); 470 zooKeeper.getData(znode, false, exists); 471 stopwatch.stop(); 472 sink.publishReadTiming(znode, host, stopwatch.getTime()); 473 } catch (KeeperException | InterruptedException e) { 474 sink.publishReadFailure(znode, host); 475 } finally { 476 if (zooKeeper != null) { 477 zooKeeper.close(); 478 } 479 } 480 return null; 481 } 482 } 483 484 /** 485 * Run a single Region Task and then exit. For each column family of the Region, get one row and 486 * output latency or failure. 487 */ 488 static class RegionTask implements Callable<Void> { 489 public enum TaskType { 490 READ, 491 WRITE 492 } 493 494 private Connection connection; 495 private RegionInfo region; 496 private RegionStdOutSink sink; 497 private TaskType taskType; 498 private boolean rawScanEnabled; 499 private ServerName serverName; 500 private LongAdder readWriteLatency; 501 private boolean readAllCF; 502 503 RegionTask(Connection connection, RegionInfo region, ServerName serverName, 504 RegionStdOutSink sink, TaskType taskType, boolean rawScanEnabled, LongAdder rwLatency, 505 boolean readAllCF) { 506 this.connection = connection; 507 this.region = region; 508 this.serverName = serverName; 509 this.sink = sink; 510 this.taskType = taskType; 511 this.rawScanEnabled = rawScanEnabled; 512 this.readWriteLatency = rwLatency; 513 this.readAllCF = readAllCF; 514 } 515 516 @Override 517 public Void call() { 518 if (this.sink.isStopped()) { 519 return null; 520 } 521 switch (taskType) { 522 case READ: 523 return read(); 524 case WRITE: 525 return write(); 526 default: 527 return read(); 528 } 529 } 530 531 private Void readColumnFamily(Table table, ColumnFamilyDescriptor column) { 532 byte[] startKey = null; 533 Scan scan = null; 534 ResultScanner rs = null; 535 StopWatch stopWatch = new StopWatch(); 536 startKey = region.getStartKey(); 537 // Can't do a get on empty start row so do a Scan of first element if any instead. 538 if (startKey.length > 0) { 539 Get get = new Get(startKey); 540 get.setCacheBlocks(false); 541 get.setFilter(new FirstKeyOnlyFilter()); 542 get.addFamily(column.getName()); 543 // Converting get object to scan to enable RAW SCAN. 544 // This will work for all the regions of the HBase tables except first region of the table. 545 scan = new Scan(get); 546 scan.setRaw(rawScanEnabled); 547 } else { 548 scan = new Scan(); 549 // In case of first region of the HBase Table, we do not have start-key for the region. 550 // For Region Canary, we only need to scan a single row/cell in the region to make sure that 551 // region is accessible. 552 // 553 // When HBase table has more than 1 empty regions at start of the row-key space, Canary will 554 // create multiple scan object to find first available row in the table by scanning all the 555 // regions in sequence until it can find first available row. 556 // 557 // This could result in multiple millions of scans based on the size of table and number of 558 // empty regions in sequence. In test environment, A table with no data and 1100 empty 559 // regions, Single canary run was creating close to half million to 1 million scans to 560 // successfully do canary run for the table. 561 // 562 // Since First region of the table doesn't have any start key, We should set End Key as 563 // stop row and set inclusive=false to limit scan to single region only. 564 // 565 // TODO : In future, we can streamline Canary behaviour for all the regions by doing scan 566 // with startRow inclusive and stopRow exclusive instead of different behaviour for First 567 // Region of the table and rest of the region of the table. This way implementation is 568 // simplified. As of now this change has been kept minimal to avoid any unnecessary 569 // perf impact. 570 scan.withStopRow(region.getEndKey(), false); 571 LOG.debug("rawScan {} for {}", rawScanEnabled, region.getTable()); 572 scan.setRaw(rawScanEnabled); 573 scan.setCaching(1); 574 scan.setCacheBlocks(false); 575 scan.setFilter(new FirstKeyOnlyFilter()); 576 scan.addFamily(column.getName()); 577 scan.setMaxResultSize(1L); 578 scan.setOneRowLimit(); 579 } 580 LOG.debug("Reading from {} {} {} {}", region.getTable(), region.getRegionNameAsString(), 581 column.getNameAsString(), Bytes.toStringBinary(startKey)); 582 try { 583 stopWatch.start(); 584 rs = table.getScanner(scan); 585 rs.next(); 586 stopWatch.stop(); 587 this.readWriteLatency.add(stopWatch.getTime()); 588 sink.publishReadTiming(serverName, region, column, stopWatch.getTime()); 589 } catch (Exception e) { 590 sink.publishReadFailure(serverName, region, column, e); 591 sink.updateReadFailures(region.getRegionNameAsString(), 592 serverName == null ? "NULL" : serverName.getHostname()); 593 } finally { 594 if (rs != null) { 595 rs.close(); 596 } 597 } 598 return null; 599 } 600 601 private ColumnFamilyDescriptor randomPickOneColumnFamily(ColumnFamilyDescriptor[] cfs) { 602 int size = cfs.length; 603 return cfs[ThreadLocalRandom.current().nextInt(size)]; 604 605 } 606 607 public Void read() { 608 Table table = null; 609 TableDescriptor tableDesc = null; 610 try { 611 LOG.debug("Reading table descriptor for table {}", region.getTable()); 612 table = connection.getTable(region.getTable()); 613 tableDesc = table.getDescriptor(); 614 } catch (IOException e) { 615 LOG.debug("sniffRegion {} of {} failed", region.getEncodedName(), e); 616 sink.publishReadFailure(serverName, region, e); 617 if (table != null) { 618 try { 619 table.close(); 620 } catch (IOException ioe) { 621 LOG.error("Close table failed", e); 622 } 623 } 624 return null; 625 } 626 627 if (readAllCF) { 628 for (ColumnFamilyDescriptor column : tableDesc.getColumnFamilies()) { 629 readColumnFamily(table, column); 630 } 631 } else { 632 readColumnFamily(table, randomPickOneColumnFamily(tableDesc.getColumnFamilies())); 633 } 634 try { 635 table.close(); 636 } catch (IOException e) { 637 LOG.error("Close table failed", e); 638 } 639 return null; 640 } 641 642 /** 643 * Check writes for the canary table 644 */ 645 private Void write() { 646 Table table = null; 647 TableDescriptor tableDesc = null; 648 try { 649 table = connection.getTable(region.getTable()); 650 tableDesc = table.getDescriptor(); 651 byte[] rowToCheck = region.getStartKey(); 652 if (rowToCheck.length == 0) { 653 rowToCheck = new byte[] { 0x0 }; 654 } 655 int writeValueSize = 656 connection.getConfiguration().getInt(HConstants.HBASE_CANARY_WRITE_VALUE_SIZE_KEY, 10); 657 for (ColumnFamilyDescriptor column : tableDesc.getColumnFamilies()) { 658 Put put = new Put(rowToCheck); 659 byte[] value = new byte[writeValueSize]; 660 Bytes.random(value); 661 put.addColumn(column.getName(), HConstants.EMPTY_BYTE_ARRAY, value); 662 LOG.debug("Writing to {} {} {} {}", tableDesc.getTableName(), 663 region.getRegionNameAsString(), column.getNameAsString(), 664 Bytes.toStringBinary(rowToCheck)); 665 try { 666 long startTime = EnvironmentEdgeManager.currentTime(); 667 table.put(put); 668 long time = EnvironmentEdgeManager.currentTime() - startTime; 669 this.readWriteLatency.add(time); 670 sink.publishWriteTiming(serverName, region, column, time); 671 } catch (Exception e) { 672 sink.publishWriteFailure(serverName, region, column, e); 673 } 674 } 675 table.close(); 676 } catch (IOException e) { 677 sink.publishWriteFailure(serverName, region, e); 678 sink.updateWriteFailures(region.getRegionNameAsString(), serverName.getHostname()); 679 } 680 return null; 681 } 682 } 683 684 /** 685 * Run a single RegionServer Task and then exit. Get one row from a region on the regionserver and 686 * output latency or the failure. 687 */ 688 static class RegionServerTask implements Callable<Void> { 689 private Connection connection; 690 private String serverName; 691 private RegionInfo region; 692 private RegionServerStdOutSink sink; 693 private Boolean rawScanEnabled; 694 private AtomicLong successes; 695 696 RegionServerTask(Connection connection, String serverName, RegionInfo region, 697 RegionServerStdOutSink sink, Boolean rawScanEnabled, AtomicLong successes) { 698 this.connection = connection; 699 this.serverName = serverName; 700 this.region = region; 701 this.sink = sink; 702 this.rawScanEnabled = rawScanEnabled; 703 this.successes = successes; 704 } 705 706 @Override 707 public Void call() { 708 if (this.sink.isStopped()) { 709 return null; 710 } 711 TableName tableName = null; 712 Table table = null; 713 Get get = null; 714 byte[] startKey = null; 715 Scan scan = null; 716 StopWatch stopWatch = new StopWatch(); 717 // monitor one region on every region server 718 stopWatch.reset(); 719 try { 720 tableName = region.getTable(); 721 table = connection.getTable(tableName); 722 startKey = region.getStartKey(); 723 // Can't do a get on empty start row so do a Scan of first element if any instead. 724 LOG.debug("Reading from {} {} {} {}", serverName, region.getTable(), 725 region.getRegionNameAsString(), Bytes.toStringBinary(startKey)); 726 if (startKey.length > 0) { 727 get = new Get(startKey); 728 get.setCacheBlocks(false); 729 get.setFilter(new FirstKeyOnlyFilter()); 730 // Converting get object to scan to enable RAW SCAN. 731 // This will work for all the regions of the HBase tables except first region. 732 scan = new Scan(get); 733 734 } else { 735 scan = new Scan(); 736 // In case of first region of the HBase Table, we do not have start-key for the region. 737 // For Region Canary, we only need scan a single row/cell in the region to make sure that 738 // region is accessible. 739 // 740 // When HBase table has more than 1 empty regions at start of the row-key space, Canary 741 // will create multiple scan object to find first available row in the table by scanning 742 // all the regions in sequence until it can find first available row. 743 // 744 // Since First region of the table doesn't have any start key, We should set End Key as 745 // stop row and set inclusive=false to limit scan to first region only. 746 scan.withStopRow(region.getEndKey(), false); 747 scan.setCacheBlocks(false); 748 scan.setFilter(new FirstKeyOnlyFilter()); 749 scan.setCaching(1); 750 scan.setMaxResultSize(1L); 751 scan.setOneRowLimit(); 752 } 753 scan.setRaw(rawScanEnabled); 754 stopWatch.start(); 755 ResultScanner s = table.getScanner(scan); 756 s.next(); 757 s.close(); 758 stopWatch.stop(); 759 successes.incrementAndGet(); 760 sink.publishReadTiming(tableName.getNameAsString(), serverName, stopWatch.getTime()); 761 } catch (TableNotFoundException tnfe) { 762 LOG.error("Table may be deleted", tnfe); 763 // This is ignored because it doesn't imply that the regionserver is dead 764 } catch (TableNotEnabledException tnee) { 765 // This is considered a success since we got a response. 766 successes.incrementAndGet(); 767 LOG.debug("The targeted table was disabled. Assuming success."); 768 } catch (DoNotRetryIOException dnrioe) { 769 sink.publishReadFailure(tableName.getNameAsString(), serverName); 770 LOG.error(dnrioe.toString(), dnrioe); 771 } catch (IOException e) { 772 sink.publishReadFailure(tableName.getNameAsString(), serverName); 773 LOG.error(e.toString(), e); 774 } finally { 775 if (table != null) { 776 try { 777 table.close(); 778 } catch (IOException e) {/* DO NOTHING */ 779 LOG.error("Close table failed", e); 780 } 781 } 782 scan = null; 783 get = null; 784 startKey = null; 785 } 786 return null; 787 } 788 } 789 790 private static final int USAGE_EXIT_CODE = 1; 791 private static final int INIT_ERROR_EXIT_CODE = 2; 792 private static final int TIMEOUT_ERROR_EXIT_CODE = 3; 793 private static final int ERROR_EXIT_CODE = 4; 794 private static final int FAILURE_EXIT_CODE = 5; 795 796 private static final long DEFAULT_INTERVAL = 60000; 797 798 private static final long DEFAULT_TIMEOUT = 600000; // 10 mins 799 private static final int MAX_THREADS_NUM = 16; // #threads to contact regions 800 801 private static final Logger LOG = LoggerFactory.getLogger(Canary.class); 802 803 public static final TableName DEFAULT_WRITE_TABLE_NAME = 804 TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "canary"); 805 806 private static final String CANARY_TABLE_FAMILY_NAME = "Test"; 807 808 private Configuration conf = null; 809 private long interval = 0; 810 private Sink sink = null; 811 812 /** 813 * True if we are to run in 'regionServer' mode. 814 */ 815 private boolean regionServerMode = false; 816 817 /** 818 * True if we are to run in zookeeper 'mode'. 819 */ 820 private boolean zookeeperMode = false; 821 822 /** 823 * This is a Map of table to timeout. The timeout is for reading all regions in the table; i.e. we 824 * aggregate time to fetch each region and it needs to be less than this value else we log an 825 * ERROR. 826 */ 827 private HashMap<String, Long> configuredReadTableTimeouts = new HashMap<>(); 828 829 public static final String HBASE_CANARY_REGIONSERVER_ALL_REGIONS = 830 "hbase.canary.regionserver_all_regions"; 831 832 public static final String HBASE_CANARY_REGION_WRITE_SNIFFING = 833 "hbase.canary.region.write.sniffing"; 834 public static final String HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT = 835 "hbase.canary.region.write.table.timeout"; 836 public static final String HBASE_CANARY_REGION_WRITE_TABLE_NAME = 837 "hbase.canary.region.write.table.name"; 838 public static final String HBASE_CANARY_REGION_READ_TABLE_TIMEOUT = 839 "hbase.canary.region.read.table.timeout"; 840 841 public static final String HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES = 842 "hbase.canary.zookeeper.permitted.failures"; 843 844 public static final String HBASE_CANARY_USE_REGEX = "hbase.canary.use.regex"; 845 public static final String HBASE_CANARY_TIMEOUT = "hbase.canary.timeout"; 846 public static final String HBASE_CANARY_FAIL_ON_ERROR = "hbase.canary.fail.on.error"; 847 848 private ExecutorService executor; // threads to retrieve data from regionservers 849 850 public CanaryTool() { 851 this(new ScheduledThreadPoolExecutor(1)); 852 } 853 854 public CanaryTool(ExecutorService executor) { 855 this(executor, null); 856 } 857 858 @InterfaceAudience.Private 859 CanaryTool(ExecutorService executor, Sink sink) { 860 this.executor = executor; 861 this.sink = sink; 862 } 863 864 CanaryTool(Configuration conf, ExecutorService executor) { 865 this(conf, executor, null); 866 } 867 868 CanaryTool(Configuration conf, ExecutorService executor, Sink sink) { 869 this(executor, sink); 870 setConf(conf); 871 } 872 873 @Override 874 public Configuration getConf() { 875 return conf; 876 } 877 878 @Override 879 public void setConf(Configuration conf) { 880 if (conf == null) { 881 conf = HBaseConfiguration.create(); 882 } 883 this.conf = conf; 884 } 885 886 private int parseArgs(String[] args) { 887 int index = -1; 888 long permittedFailures = 0; 889 boolean regionServerAllRegions = false, writeSniffing = false; 890 String readTableTimeoutsStr = null; 891 // Process command line args 892 for (int i = 0; i < args.length; i++) { 893 String cmd = args[i]; 894 if (cmd.startsWith("-")) { 895 if (index >= 0) { 896 // command line args must be in the form: [opts] [table 1 [table 2 ...]] 897 System.err.println("Invalid command line options"); 898 printUsageAndExit(); 899 } 900 if (cmd.equals("-help") || cmd.equals("-h")) { 901 // user asked for help, print the help and quit. 902 printUsageAndExit(); 903 } else if (cmd.equals("-daemon") && interval == 0) { 904 // user asked for daemon mode, set a default interval between checks 905 interval = DEFAULT_INTERVAL; 906 } else if (cmd.equals("-interval")) { 907 // user has specified an interval for canary breaths (-interval N) 908 i++; 909 910 if (i == args.length) { 911 System.err.println("-interval takes a numeric seconds value argument."); 912 printUsageAndExit(); 913 } 914 try { 915 interval = Long.parseLong(args[i]) * 1000; 916 } catch (NumberFormatException e) { 917 System.err.println("-interval needs a numeric value argument."); 918 printUsageAndExit(); 919 } 920 } else if (cmd.equals("-zookeeper")) { 921 this.zookeeperMode = true; 922 } else if (cmd.equals("-regionserver")) { 923 this.regionServerMode = true; 924 } else if (cmd.equals("-allRegions")) { 925 conf.setBoolean(HBASE_CANARY_REGIONSERVER_ALL_REGIONS, true); 926 regionServerAllRegions = true; 927 } else if (cmd.equals("-writeSniffing")) { 928 writeSniffing = true; 929 conf.setBoolean(HBASE_CANARY_REGION_WRITE_SNIFFING, true); 930 } else if (cmd.equals("-treatFailureAsError") || cmd.equals("-failureAsError")) { 931 conf.setBoolean(HBASE_CANARY_FAIL_ON_ERROR, true); 932 } else if (cmd.equals("-e")) { 933 conf.setBoolean(HBASE_CANARY_USE_REGEX, true); 934 } else if (cmd.equals("-t")) { 935 i++; 936 937 if (i == args.length) { 938 System.err.println("-t takes a numeric milliseconds value argument."); 939 printUsageAndExit(); 940 } 941 long timeout = 0; 942 try { 943 timeout = Long.parseLong(args[i]); 944 } catch (NumberFormatException e) { 945 System.err.println("-t takes a numeric milliseconds value argument."); 946 printUsageAndExit(); 947 } 948 conf.setLong(HBASE_CANARY_TIMEOUT, timeout); 949 } else if (cmd.equals("-writeTableTimeout")) { 950 i++; 951 952 if (i == args.length) { 953 System.err.println("-writeTableTimeout takes a numeric milliseconds value argument."); 954 printUsageAndExit(); 955 } 956 long configuredWriteTableTimeout = 0; 957 try { 958 configuredWriteTableTimeout = Long.parseLong(args[i]); 959 } catch (NumberFormatException e) { 960 System.err.println("-writeTableTimeout takes a numeric milliseconds value argument."); 961 printUsageAndExit(); 962 } 963 conf.setLong(HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT, configuredWriteTableTimeout); 964 } else if (cmd.equals("-writeTable")) { 965 i++; 966 967 if (i == args.length) { 968 System.err.println("-writeTable takes a string tablename value argument."); 969 printUsageAndExit(); 970 } 971 conf.set(HBASE_CANARY_REGION_WRITE_TABLE_NAME, args[i]); 972 } else if (cmd.equals("-f")) { 973 i++; 974 if (i == args.length) { 975 System.err.println("-f needs a boolean value argument (true|false)."); 976 printUsageAndExit(); 977 } 978 979 conf.setBoolean(HBASE_CANARY_FAIL_ON_ERROR, Boolean.parseBoolean(args[i])); 980 } else if (cmd.equals("-readTableTimeouts")) { 981 i++; 982 if (i == args.length) { 983 System.err.println("-readTableTimeouts needs a comma-separated list of read " 984 + "millisecond timeouts per table (without spaces)."); 985 printUsageAndExit(); 986 } 987 readTableTimeoutsStr = args[i]; 988 conf.set(HBASE_CANARY_REGION_READ_TABLE_TIMEOUT, readTableTimeoutsStr); 989 } else if (cmd.equals("-permittedZookeeperFailures")) { 990 i++; 991 992 if (i == args.length) { 993 System.err.println("-permittedZookeeperFailures needs a numeric value argument."); 994 printUsageAndExit(); 995 } 996 try { 997 permittedFailures = Long.parseLong(args[i]); 998 } catch (NumberFormatException e) { 999 System.err.println("-permittedZookeeperFailures needs a numeric value argument."); 1000 printUsageAndExit(); 1001 } 1002 conf.setLong(HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES, permittedFailures); 1003 } else { 1004 // no options match 1005 System.err.println(cmd + " options is invalid."); 1006 printUsageAndExit(); 1007 } 1008 } else if (index < 0) { 1009 // keep track of first table name specified by the user 1010 index = i; 1011 } 1012 } 1013 if (regionServerAllRegions && !this.regionServerMode) { 1014 System.err.println("-allRegions can only be specified in regionserver mode."); 1015 printUsageAndExit(); 1016 } 1017 if (this.zookeeperMode) { 1018 if (this.regionServerMode || regionServerAllRegions || writeSniffing) { 1019 System.err.println("-zookeeper is exclusive and cannot be combined with " + "other modes."); 1020 printUsageAndExit(); 1021 } 1022 } 1023 if (permittedFailures != 0 && !this.zookeeperMode) { 1024 System.err.println("-permittedZookeeperFailures requires -zookeeper mode."); 1025 printUsageAndExit(); 1026 } 1027 if (readTableTimeoutsStr != null && (this.regionServerMode || this.zookeeperMode)) { 1028 System.err.println("-readTableTimeouts can only be configured in region mode."); 1029 printUsageAndExit(); 1030 } 1031 return index; 1032 } 1033 1034 @Override 1035 public int run(String[] args) throws Exception { 1036 int index = parseArgs(args); 1037 String[] monitorTargets = null; 1038 1039 if (index >= 0) { 1040 int length = args.length - index; 1041 monitorTargets = new String[length]; 1042 System.arraycopy(args, index, monitorTargets, 0, length); 1043 } 1044 if (interval > 0) { 1045 // Only show the web page in daemon mode 1046 putUpWebUI(); 1047 } 1048 if (zookeeperMode) { 1049 return checkZooKeeper(); 1050 } else if (regionServerMode) { 1051 return checkRegionServers(monitorTargets); 1052 } else { 1053 return checkRegions(monitorTargets); 1054 } 1055 } 1056 1057 private int runMonitor(String[] monitorTargets) throws Exception { 1058 ChoreService choreService = null; 1059 1060 // Launches chore for refreshing kerberos credentials if security is enabled. 1061 // Please see 1062 // https://hbase.apache.org/docs/operational-management/tools#running-canary-in-a-kerberos-enabled-cluster 1063 // for more details. 1064 final ScheduledChore authChore = AuthUtil.getAuthChore(conf); 1065 if (authChore != null) { 1066 choreService = new ChoreService("CANARY_TOOL"); 1067 choreService.scheduleChore(authChore); 1068 } 1069 1070 // Start to prepare the stuffs 1071 Monitor monitor = null; 1072 Thread monitorThread; 1073 long startTime = 0; 1074 long currentTimeLength = 0; 1075 boolean failOnError = conf.getBoolean(HBASE_CANARY_FAIL_ON_ERROR, true); 1076 long timeout = conf.getLong(HBASE_CANARY_TIMEOUT, DEFAULT_TIMEOUT); 1077 // Get a connection to use in below. 1078 try (Connection connection = ConnectionFactory.createConnection(this.conf)) { 1079 do { 1080 // Do monitor !! 1081 try { 1082 monitor = this.newMonitor(connection, monitorTargets); 1083 startTime = EnvironmentEdgeManager.currentTime(); 1084 monitorThread = new Thread(monitor, "CanaryMonitor-" + startTime); 1085 monitorThread.start(); 1086 while (!monitor.isDone()) { 1087 // wait for 1 sec 1088 Thread.sleep(1000); 1089 // exit if any error occurs 1090 if (failOnError && monitor.hasError()) { 1091 monitorThread.interrupt(); 1092 if (monitor.initialized) { 1093 return monitor.errorCode; 1094 } else { 1095 return INIT_ERROR_EXIT_CODE; 1096 } 1097 } 1098 currentTimeLength = EnvironmentEdgeManager.currentTime() - startTime; 1099 if (currentTimeLength > timeout) { 1100 LOG.error("The monitor is running too long (" + currentTimeLength 1101 + ") after timeout limit:" + timeout + " will be killed itself !!"); 1102 monitorThread.interrupt(); 1103 if (monitor.initialized) { 1104 return TIMEOUT_ERROR_EXIT_CODE; 1105 } else { 1106 return INIT_ERROR_EXIT_CODE; 1107 } 1108 } 1109 } 1110 1111 if (failOnError && monitor.finalCheckForErrors()) { 1112 monitorThread.interrupt(); 1113 return monitor.errorCode; 1114 } 1115 } finally { 1116 if (monitor != null) { 1117 monitor.close(); 1118 } 1119 } 1120 1121 Thread.sleep(interval); 1122 } while (interval > 0); 1123 } // try-with-resources close 1124 1125 if (choreService != null) { 1126 choreService.shutdown(); 1127 } 1128 return monitor.errorCode; 1129 } 1130 1131 @Override 1132 public Map<String, String> getReadFailures() { 1133 return sink.getReadFailures(); 1134 } 1135 1136 @Override 1137 public Map<String, String> getWriteFailures() { 1138 return sink.getWriteFailures(); 1139 } 1140 1141 /** 1142 * Return a CanaryTool.Sink object containing the detailed results of the canary run. The Sink may 1143 * not have been created if a Monitor thread is not yet running. 1144 * @return the active Sink if one exists, null otherwise. 1145 */ 1146 public Sink getActiveSink() { 1147 return sink; 1148 } 1149 1150 private void printUsageAndExit() { 1151 System.err.println( 1152 "Usage: canary [OPTIONS] [<TABLE1> [<TABLE2]...] | [<REGIONSERVER1> [<REGIONSERVER2]..]"); 1153 System.err.println("Where [OPTIONS] are:"); 1154 System.err.println(" -h,-help show this help and exit."); 1155 System.err.println( 1156 " -regionserver set 'regionserver mode'; gets row from random region on " + "server"); 1157 System.err.println( 1158 " -allRegions get from ALL regions when 'regionserver mode', not just " + "random one."); 1159 System.err.println(" -zookeeper set 'zookeeper mode'; grab zookeeper.znode.parent on " 1160 + "each ensemble member"); 1161 System.err.println(" -daemon continuous check at defined intervals."); 1162 System.err.println(" -interval <N> interval between checks in seconds"); 1163 System.err 1164 .println(" -e consider table/regionserver argument as regular " + "expression"); 1165 System.err.println(" -f <B> exit on first error; default=true"); 1166 System.err.println(" -failureAsError treat read/write failure as error"); 1167 System.err.println(" -t <N> timeout for canary-test run; default=600000ms"); 1168 System.err.println(" -writeSniffing enable write sniffing"); 1169 System.err.println(" -writeTable the table used for write sniffing; default=hbase:canary"); 1170 System.err.println(" -writeTableTimeout <N> timeout for writeTable; default=600000ms"); 1171 System.err.println( 1172 " -readTableTimeouts <tableName>=<read timeout>," + "<tableName>=<read timeout>,..."); 1173 System.err 1174 .println(" comma-separated list of table read timeouts " + "(no spaces);"); 1175 System.err.println(" logs 'ERROR' if takes longer. default=600000ms"); 1176 System.err.println(" -permittedZookeeperFailures <N> Ignore first N failures attempting to "); 1177 System.err.println(" connect to individual zookeeper nodes in ensemble"); 1178 System.err.println(""); 1179 System.err.println(" -D<configProperty>=<value> to assign or override configuration params"); 1180 System.err.println(" -Dhbase.canary.read.raw.enabled=<true/false> Set to enable/disable " 1181 + "raw scan; default=false"); 1182 System.err.println( 1183 " -Dhbase.canary.info.port=PORT_NUMBER Set for a Canary UI; " + "default=-1 (None)"); 1184 System.err.println(""); 1185 System.err.println( 1186 "Canary runs in one of three modes: region (default), regionserver, or " + "zookeeper."); 1187 System.err.println("To sniff/probe all regions, pass no arguments."); 1188 System.err.println("To sniff/probe all regions of a table, pass tablename."); 1189 System.err.println("To sniff/probe regionservers, pass -regionserver, etc."); 1190 System.err.println( 1191 "See https://hbase.apache.org/docs/operational-management/tools#canary for Canary documentation."); 1192 System.exit(USAGE_EXIT_CODE); 1193 } 1194 1195 Sink getSink(Configuration configuration, Class clazz) { 1196 // In test context, this.sink might be set. Use it if non-null. For testing. 1197 if (this.sink == null) { 1198 this.sink = (Sink) ReflectionUtils 1199 .newInstance(configuration.getClass("hbase.canary.sink.class", clazz, Sink.class)); 1200 } 1201 return this.sink; 1202 } 1203 1204 /** 1205 * Canary region mode-specific data structure which stores information about each region to be 1206 * scanned 1207 */ 1208 public static class RegionTaskResult { 1209 private RegionInfo region; 1210 private TableName tableName; 1211 private ServerName serverName; 1212 private ColumnFamilyDescriptor column; 1213 private AtomicLong readLatency = null; 1214 private AtomicLong writeLatency = null; 1215 private boolean readSuccess = false; 1216 private boolean writeSuccess = false; 1217 1218 public RegionTaskResult(RegionInfo region, TableName tableName, ServerName serverName, 1219 ColumnFamilyDescriptor column) { 1220 this.region = region; 1221 this.tableName = tableName; 1222 this.serverName = serverName; 1223 this.column = column; 1224 } 1225 1226 public RegionInfo getRegionInfo() { 1227 return this.region; 1228 } 1229 1230 public String getRegionNameAsString() { 1231 return this.region.getRegionNameAsString(); 1232 } 1233 1234 public TableName getTableName() { 1235 return this.tableName; 1236 } 1237 1238 public String getTableNameAsString() { 1239 return this.tableName.getNameAsString(); 1240 } 1241 1242 public ServerName getServerName() { 1243 return this.serverName; 1244 } 1245 1246 public String getServerNameAsString() { 1247 return this.serverName.getServerName(); 1248 } 1249 1250 public ColumnFamilyDescriptor getColumnFamily() { 1251 return this.column; 1252 } 1253 1254 public String getColumnFamilyNameAsString() { 1255 return this.column.getNameAsString(); 1256 } 1257 1258 public long getReadLatency() { 1259 if (this.readLatency == null) { 1260 return -1; 1261 } 1262 return this.readLatency.get(); 1263 } 1264 1265 public void setReadLatency(long readLatency) { 1266 if (this.readLatency != null) { 1267 this.readLatency.set(readLatency); 1268 } else { 1269 this.readLatency = new AtomicLong(readLatency); 1270 } 1271 } 1272 1273 public long getWriteLatency() { 1274 if (this.writeLatency == null) { 1275 return -1; 1276 } 1277 return this.writeLatency.get(); 1278 } 1279 1280 public void setWriteLatency(long writeLatency) { 1281 if (this.writeLatency != null) { 1282 this.writeLatency.set(writeLatency); 1283 } else { 1284 this.writeLatency = new AtomicLong(writeLatency); 1285 } 1286 } 1287 1288 public boolean isReadSuccess() { 1289 return this.readSuccess; 1290 } 1291 1292 public void setReadSuccess() { 1293 this.readSuccess = true; 1294 } 1295 1296 public boolean isWriteSuccess() { 1297 return this.writeSuccess; 1298 } 1299 1300 public void setWriteSuccess() { 1301 this.writeSuccess = true; 1302 } 1303 } 1304 1305 /** 1306 * A Factory method for {@link Monitor}. Makes a RegionServerMonitor, or a ZooKeeperMonitor, or a 1307 * RegionMonitor. 1308 * @return a Monitor instance 1309 */ 1310 private Monitor newMonitor(final Connection connection, String[] monitorTargets) { 1311 Monitor monitor; 1312 boolean useRegExp = conf.getBoolean(HBASE_CANARY_USE_REGEX, false); 1313 boolean regionServerAllRegions = conf.getBoolean(HBASE_CANARY_REGIONSERVER_ALL_REGIONS, false); 1314 boolean failOnError = conf.getBoolean(HBASE_CANARY_FAIL_ON_ERROR, true); 1315 int permittedFailures = conf.getInt(HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES, 0); 1316 boolean writeSniffing = conf.getBoolean(HBASE_CANARY_REGION_WRITE_SNIFFING, false); 1317 String writeTableName = 1318 conf.get(HBASE_CANARY_REGION_WRITE_TABLE_NAME, DEFAULT_WRITE_TABLE_NAME.getNameAsString()); 1319 long configuredWriteTableTimeout = 1320 conf.getLong(HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT, DEFAULT_TIMEOUT); 1321 1322 if (this.regionServerMode) { 1323 monitor = new RegionServerMonitor(connection, monitorTargets, useRegExp, 1324 getSink(connection.getConfiguration(), RegionServerStdOutSink.class), this.executor, 1325 regionServerAllRegions, failOnError, permittedFailures); 1326 1327 } else if (this.zookeeperMode) { 1328 monitor = new ZookeeperMonitor(connection, monitorTargets, useRegExp, 1329 getSink(connection.getConfiguration(), ZookeeperStdOutSink.class), this.executor, 1330 failOnError, permittedFailures); 1331 } else { 1332 monitor = new RegionMonitor(connection, monitorTargets, useRegExp, 1333 getSink(connection.getConfiguration(), RegionStdOutSink.class), this.executor, 1334 writeSniffing, TableName.valueOf(writeTableName), failOnError, configuredReadTableTimeouts, 1335 configuredWriteTableTimeout, permittedFailures); 1336 } 1337 return monitor; 1338 } 1339 1340 private void populateReadTableTimeoutsMap(String configuredReadTableTimeoutsStr) { 1341 String[] tableTimeouts = configuredReadTableTimeoutsStr.split(","); 1342 for (String tT : tableTimeouts) { 1343 String[] nameTimeout = tT.split("="); 1344 if (nameTimeout.length < 2) { 1345 throw new IllegalArgumentException("Each -readTableTimeouts argument must be of the form " 1346 + "<tableName>=<read timeout> (without spaces)."); 1347 } 1348 long timeoutVal; 1349 try { 1350 timeoutVal = Long.parseLong(nameTimeout[1]); 1351 } catch (NumberFormatException e) { 1352 throw new IllegalArgumentException( 1353 "-readTableTimeouts read timeout for each table" + " must be a numeric value argument."); 1354 } 1355 configuredReadTableTimeouts.put(nameTimeout[0], timeoutVal); 1356 } 1357 } 1358 1359 /** 1360 * A Monitor super-class can be extended by users 1361 */ 1362 public static abstract class Monitor implements Runnable, Closeable { 1363 protected Connection connection; 1364 protected Admin admin; 1365 /** 1366 * 'Target' dependent on 'mode'. Could be Tables or RegionServers or ZNodes. Passed on the 1367 * command-line as arguments. 1368 */ 1369 protected String[] targets; 1370 protected boolean useRegExp; 1371 protected boolean treatFailureAsError; 1372 protected boolean initialized = false; 1373 1374 protected boolean done = false; 1375 protected int errorCode = 0; 1376 protected long allowedFailures = 0; 1377 protected Sink sink; 1378 protected ExecutorService executor; 1379 1380 public boolean isDone() { 1381 return done; 1382 } 1383 1384 public boolean hasError() { 1385 return errorCode != 0; 1386 } 1387 1388 public boolean finalCheckForErrors() { 1389 if (errorCode != 0) { 1390 return true; 1391 } 1392 if ( 1393 treatFailureAsError && (sink.getReadFailureCount() > allowedFailures 1394 || sink.getWriteFailureCount() > allowedFailures) 1395 ) { 1396 LOG.error("Too many failures detected, treating failure as error, failing the Canary."); 1397 errorCode = FAILURE_EXIT_CODE; 1398 return true; 1399 } 1400 return false; 1401 } 1402 1403 @Override 1404 public void close() throws IOException { 1405 this.sink.stop(); 1406 if (this.admin != null) { 1407 this.admin.close(); 1408 } 1409 } 1410 1411 protected Monitor(Connection connection, String[] monitorTargets, boolean useRegExp, Sink sink, 1412 ExecutorService executor, boolean treatFailureAsError, long allowedFailures) { 1413 if (null == connection) { 1414 throw new IllegalArgumentException("connection shall not be null"); 1415 } 1416 1417 this.connection = connection; 1418 this.targets = monitorTargets; 1419 this.useRegExp = useRegExp; 1420 this.treatFailureAsError = treatFailureAsError; 1421 this.sink = sink; 1422 this.executor = executor; 1423 this.allowedFailures = allowedFailures; 1424 } 1425 1426 @Override 1427 public abstract void run(); 1428 1429 protected boolean initAdmin() { 1430 if (null == this.admin) { 1431 try { 1432 this.admin = this.connection.getAdmin(); 1433 } catch (Exception e) { 1434 LOG.error("Initial HBaseAdmin failed...", e); 1435 this.errorCode = INIT_ERROR_EXIT_CODE; 1436 } 1437 } else if (admin.isAborted()) { 1438 LOG.error("HBaseAdmin aborted"); 1439 this.errorCode = INIT_ERROR_EXIT_CODE; 1440 } 1441 return !this.hasError(); 1442 } 1443 } 1444 1445 /** 1446 * A monitor for region mode. 1447 */ 1448 private static class RegionMonitor extends Monitor { 1449 // 10 minutes 1450 private static final int DEFAULT_WRITE_TABLE_CHECK_PERIOD = 10 * 60 * 1000; 1451 // 1 days 1452 private static final int DEFAULT_WRITE_DATA_TTL = 24 * 60 * 60; 1453 1454 private long lastCheckTime = -1; 1455 private boolean writeSniffing; 1456 private TableName writeTableName; 1457 private int writeDataTTL; 1458 private float regionsLowerLimit; 1459 private float regionsUpperLimit; 1460 private int checkPeriod; 1461 private boolean rawScanEnabled; 1462 private boolean readAllCF; 1463 1464 /** 1465 * This is a timeout per table. If read of each region in the table aggregated takes longer than 1466 * what is configured here, we log an ERROR rather than just an INFO. 1467 */ 1468 private HashMap<String, Long> configuredReadTableTimeouts; 1469 1470 private long configuredWriteTableTimeout; 1471 1472 public RegionMonitor(Connection connection, String[] monitorTargets, boolean useRegExp, 1473 Sink sink, ExecutorService executor, boolean writeSniffing, TableName writeTableName, 1474 boolean treatFailureAsError, HashMap<String, Long> configuredReadTableTimeouts, 1475 long configuredWriteTableTimeout, long allowedFailures) { 1476 super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError, 1477 allowedFailures); 1478 Configuration conf = connection.getConfiguration(); 1479 this.writeSniffing = writeSniffing; 1480 this.writeTableName = writeTableName; 1481 this.writeDataTTL = 1482 conf.getInt(HConstants.HBASE_CANARY_WRITE_DATA_TTL_KEY, DEFAULT_WRITE_DATA_TTL); 1483 this.regionsLowerLimit = 1484 conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY, 1.0f); 1485 this.regionsUpperLimit = 1486 conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_UPPERLIMIT_KEY, 1.5f); 1487 this.checkPeriod = conf.getInt(HConstants.HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY, 1488 DEFAULT_WRITE_TABLE_CHECK_PERIOD); 1489 this.rawScanEnabled = conf.getBoolean(HConstants.HBASE_CANARY_READ_RAW_SCAN_KEY, false); 1490 this.configuredReadTableTimeouts = new HashMap<>(configuredReadTableTimeouts); 1491 this.configuredWriteTableTimeout = configuredWriteTableTimeout; 1492 this.readAllCF = conf.getBoolean(HConstants.HBASE_CANARY_READ_ALL_CF, true); 1493 } 1494 1495 private RegionStdOutSink getSink() { 1496 if (!(sink instanceof RegionStdOutSink)) { 1497 throw new RuntimeException("Can only write to Region sink"); 1498 } 1499 return ((RegionStdOutSink) sink); 1500 } 1501 1502 @Override 1503 public void run() { 1504 if (this.initAdmin()) { 1505 try { 1506 List<Future<Void>> taskFutures = new LinkedList<>(); 1507 RegionStdOutSink regionSink = this.getSink(); 1508 regionSink.resetFailuresCountDetails(); 1509 if (this.targets != null && this.targets.length > 0) { 1510 String[] tables = generateMonitorTables(this.targets); 1511 // Check to see that each table name passed in the -readTableTimeouts argument is also 1512 // passed as a monitor target. 1513 if ( 1514 !new HashSet<>(Arrays.asList(tables)) 1515 .containsAll(this.configuredReadTableTimeouts.keySet()) 1516 ) { 1517 LOG.error("-readTableTimeouts can only specify read timeouts for monitor targets " 1518 + "passed via command line."); 1519 this.errorCode = USAGE_EXIT_CODE; 1520 return; 1521 } 1522 this.initialized = true; 1523 for (String table : tables) { 1524 LongAdder readLatency = regionSink.initializeAndGetReadLatencyForTable(table); 1525 taskFutures.addAll(CanaryTool.sniff(admin, regionSink, table, executor, TaskType.READ, 1526 this.rawScanEnabled, readLatency, readAllCF)); 1527 } 1528 } else { 1529 taskFutures.addAll(sniff(TaskType.READ, regionSink)); 1530 } 1531 1532 if (writeSniffing) { 1533 if (EnvironmentEdgeManager.currentTime() - lastCheckTime > checkPeriod) { 1534 try { 1535 checkWriteTableDistribution(); 1536 } catch (IOException e) { 1537 LOG.error("Check canary table distribution failed!", e); 1538 } 1539 lastCheckTime = EnvironmentEdgeManager.currentTime(); 1540 } 1541 // sniff canary table with write operation 1542 regionSink.initializeWriteLatency(); 1543 LongAdder writeTableLatency = regionSink.getWriteLatency(); 1544 taskFutures 1545 .addAll(CanaryTool.sniff(admin, regionSink, admin.getDescriptor(writeTableName), 1546 executor, TaskType.WRITE, this.rawScanEnabled, writeTableLatency, readAllCF)); 1547 } 1548 1549 for (Future<Void> future : taskFutures) { 1550 try { 1551 future.get(); 1552 } catch (ExecutionException e) { 1553 LOG.error("Sniff region failed!", e); 1554 } 1555 } 1556 Map<String, LongAdder> actualReadTableLatency = regionSink.getReadLatencyMap(); 1557 for (Map.Entry<String, Long> entry : configuredReadTableTimeouts.entrySet()) { 1558 String tableName = entry.getKey(); 1559 if (actualReadTableLatency.containsKey(tableName)) { 1560 Long actual = actualReadTableLatency.get(tableName).longValue(); 1561 Long configured = entry.getValue(); 1562 if (actual > configured) { 1563 LOG.error("Read operation for {} took {}ms exceeded the configured read timeout." 1564 + "(Configured read timeout {}ms.", tableName, actual, configured); 1565 } else { 1566 LOG.info("Read operation for {} took {}ms (Configured read timeout {}ms.", 1567 tableName, actual, configured); 1568 } 1569 } else { 1570 LOG.error("Read operation for {} failed!", tableName); 1571 } 1572 } 1573 if (this.writeSniffing) { 1574 String writeTableStringName = this.writeTableName.getNameAsString(); 1575 long actualWriteLatency = regionSink.getWriteLatency().longValue(); 1576 LOG.info("Write operation for {} took {}ms. Configured write timeout {}ms.", 1577 writeTableStringName, actualWriteLatency, this.configuredWriteTableTimeout); 1578 // Check that the writeTable write operation latency does not exceed the configured 1579 // timeout. 1580 if (actualWriteLatency > this.configuredWriteTableTimeout) { 1581 LOG.error("Write operation for {} exceeded the configured write timeout.", 1582 writeTableStringName); 1583 } 1584 } 1585 } catch (Exception e) { 1586 LOG.error("Run regionMonitor failed", e); 1587 this.errorCode = ERROR_EXIT_CODE; 1588 } finally { 1589 this.done = true; 1590 } 1591 } 1592 this.done = true; 1593 } 1594 1595 /** Returns List of tables to use in test. */ 1596 private String[] generateMonitorTables(String[] monitorTargets) throws IOException { 1597 String[] returnTables = null; 1598 1599 if (this.useRegExp) { 1600 Pattern pattern = null; 1601 List<TableDescriptor> tds = null; 1602 Set<String> tmpTables = new TreeSet<>(); 1603 try { 1604 LOG.debug(String.format("reading list of tables")); 1605 tds = this.admin.listTableDescriptors(pattern); 1606 if (tds == null) { 1607 tds = Collections.emptyList(); 1608 } 1609 for (String monitorTarget : monitorTargets) { 1610 pattern = Pattern.compile(monitorTarget); 1611 for (TableDescriptor td : tds) { 1612 if (pattern.matcher(td.getTableName().getNameAsString()).matches()) { 1613 tmpTables.add(td.getTableName().getNameAsString()); 1614 } 1615 } 1616 } 1617 } catch (IOException e) { 1618 LOG.error("Communicate with admin failed", e); 1619 throw e; 1620 } 1621 1622 if (tmpTables.size() > 0) { 1623 returnTables = tmpTables.toArray(new String[tmpTables.size()]); 1624 } else { 1625 String msg = "No HTable found, tablePattern:" + Arrays.toString(monitorTargets); 1626 LOG.error(msg); 1627 this.errorCode = INIT_ERROR_EXIT_CODE; 1628 throw new TableNotFoundException(msg); 1629 } 1630 } else { 1631 returnTables = monitorTargets; 1632 } 1633 1634 return returnTables; 1635 } 1636 1637 /* 1638 * Canary entry point to monitor all the tables. 1639 */ 1640 private List<Future<Void>> sniff(TaskType taskType, RegionStdOutSink regionSink) 1641 throws Exception { 1642 LOG.debug("Reading list of tables"); 1643 List<Future<Void>> taskFutures = new LinkedList<>(); 1644 for (TableDescriptor td : admin.listTableDescriptors()) { 1645 if ( 1646 admin.tableExists(td.getTableName()) && admin.isTableEnabled(td.getTableName()) 1647 && (!td.getTableName().equals(writeTableName)) 1648 ) { 1649 LongAdder readLatency = 1650 regionSink.initializeAndGetReadLatencyForTable(td.getTableName().getNameAsString()); 1651 taskFutures.addAll(CanaryTool.sniff(admin, sink, td, executor, taskType, 1652 this.rawScanEnabled, readLatency, readAllCF)); 1653 } 1654 } 1655 return taskFutures; 1656 } 1657 1658 private void checkWriteTableDistribution() throws IOException { 1659 if (!admin.tableExists(writeTableName)) { 1660 int numberOfServers = admin.getRegionServers().size(); 1661 if (numberOfServers == 0) { 1662 throw new IllegalStateException("No live regionservers"); 1663 } 1664 createWriteTable(numberOfServers); 1665 } 1666 1667 if (!admin.isTableEnabled(writeTableName)) { 1668 admin.enableTable(writeTableName); 1669 } 1670 1671 ClusterMetrics status = 1672 admin.getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER)); 1673 int numberOfServers = status.getServersName().size(); 1674 if (status.getServersName().contains(status.getMasterName())) { 1675 numberOfServers -= 1; 1676 } 1677 1678 List<Pair<RegionInfo, ServerName>> pairs = 1679 MetaTableAccessor.getTableRegionsAndLocations(connection, writeTableName); 1680 int numberOfRegions = pairs.size(); 1681 if ( 1682 numberOfRegions < numberOfServers * regionsLowerLimit 1683 || numberOfRegions > numberOfServers * regionsUpperLimit 1684 ) { 1685 admin.disableTable(writeTableName); 1686 admin.deleteTable(writeTableName); 1687 createWriteTable(numberOfServers); 1688 } 1689 HashSet<ServerName> serverSet = new HashSet<>(); 1690 for (Pair<RegionInfo, ServerName> pair : pairs) { 1691 serverSet.add(pair.getSecond()); 1692 } 1693 int numberOfCoveredServers = serverSet.size(); 1694 if (numberOfCoveredServers < numberOfServers) { 1695 admin.balance(); 1696 } 1697 } 1698 1699 private void createWriteTable(int numberOfServers) throws IOException { 1700 int numberOfRegions = (int) (numberOfServers * regionsLowerLimit); 1701 LOG.info("Number of live regionservers {}, pre-splitting the canary table into {} regions " 1702 + "(current lower limit of regions per server is {} and you can change it with config {}).", 1703 numberOfServers, numberOfRegions, regionsLowerLimit, 1704 HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY); 1705 ColumnFamilyDescriptor family = 1706 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(CANARY_TABLE_FAMILY_NAME)) 1707 .setMaxVersions(1).setTimeToLive(writeDataTTL).build(); 1708 TableDescriptor desc = 1709 TableDescriptorBuilder.newBuilder(writeTableName).setColumnFamily(family).build(); 1710 byte[][] splits = new RegionSplitter.HexStringSplit().split(numberOfRegions); 1711 admin.createTable(desc, splits); 1712 } 1713 } 1714 1715 /** 1716 * Canary entry point for specified table. 1717 * @throws Exception exception 1718 */ 1719 private static List<Future<Void>> sniff(final Admin admin, final Sink sink, String tableName, 1720 ExecutorService executor, TaskType taskType, boolean rawScanEnabled, LongAdder readLatency, 1721 boolean readAllCF) throws Exception { 1722 LOG.debug("Checking table is enabled and getting table descriptor for table {}", tableName); 1723 if (admin.isTableEnabled(TableName.valueOf(tableName))) { 1724 return CanaryTool.sniff(admin, sink, admin.getDescriptor(TableName.valueOf(tableName)), 1725 executor, taskType, rawScanEnabled, readLatency, readAllCF); 1726 } else { 1727 LOG.warn("Table {} is not enabled", tableName); 1728 } 1729 return new LinkedList<>(); 1730 } 1731 1732 /* 1733 * Loops over regions of this table, and outputs information about the state. 1734 */ 1735 private static List<Future<Void>> sniff(final Admin admin, final Sink sink, 1736 TableDescriptor tableDesc, ExecutorService executor, TaskType taskType, boolean rawScanEnabled, 1737 LongAdder rwLatency, boolean readAllCF) throws Exception { 1738 LOG.debug("Reading list of regions for table {}", tableDesc.getTableName()); 1739 List<RegionTask> tasks = new ArrayList<>(); 1740 try (RegionLocator regionLocator = 1741 admin.getConnection().getRegionLocator(tableDesc.getTableName())) { 1742 for (HRegionLocation location : regionLocator.getAllRegionLocations()) { 1743 if (location == null) { 1744 LOG.warn("Null location for table {}", tableDesc.getTableName()); 1745 continue; 1746 } 1747 ServerName rs = location.getServerName(); 1748 RegionInfo region = location.getRegion(); 1749 tasks.add(new RegionTask(admin.getConnection(), region, rs, (RegionStdOutSink) sink, 1750 taskType, rawScanEnabled, rwLatency, readAllCF)); 1751 Map<String, List<RegionTaskResult>> regionMap = ((RegionStdOutSink) sink).getRegionMap(); 1752 regionMap.put(region.getRegionNameAsString(), new ArrayList<RegionTaskResult>()); 1753 } 1754 return executor.invokeAll(tasks); 1755 } 1756 } 1757 1758 // monitor for zookeeper mode 1759 private static class ZookeeperMonitor extends Monitor { 1760 private List<String> hosts; 1761 private final String znode; 1762 private final int timeout; 1763 1764 protected ZookeeperMonitor(Connection connection, String[] monitorTargets, boolean useRegExp, 1765 Sink sink, ExecutorService executor, boolean treatFailureAsError, long allowedFailures) { 1766 super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError, 1767 allowedFailures); 1768 Configuration configuration = connection.getConfiguration(); 1769 znode = configuration.get(ZOOKEEPER_ZNODE_PARENT, DEFAULT_ZOOKEEPER_ZNODE_PARENT); 1770 timeout = 1771 configuration.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT); 1772 ConnectStringParser parser = 1773 new ConnectStringParser(ZKConfig.getZKQuorumServersString(configuration)); 1774 hosts = Lists.newArrayList(); 1775 for (InetSocketAddress server : parser.getServerAddresses()) { 1776 hosts.add(inetSocketAddress2String(server)); 1777 } 1778 if (allowedFailures > (hosts.size() - 1) / 2) { 1779 LOG.warn( 1780 "Confirm allowable number of failed ZooKeeper nodes, as quorum will " 1781 + "already be lost. Setting of {} failures is unexpected for {} ensemble size.", 1782 allowedFailures, hosts.size()); 1783 } 1784 } 1785 1786 @Override 1787 public void run() { 1788 List<ZookeeperTask> tasks = Lists.newArrayList(); 1789 ZookeeperStdOutSink zkSink = null; 1790 try { 1791 zkSink = this.getSink(); 1792 } catch (RuntimeException e) { 1793 LOG.error("Run ZooKeeperMonitor failed!", e); 1794 this.errorCode = ERROR_EXIT_CODE; 1795 } 1796 this.initialized = true; 1797 for (final String host : hosts) { 1798 tasks.add(new ZookeeperTask(connection, host, znode, timeout, zkSink)); 1799 } 1800 try { 1801 for (Future<Void> future : this.executor.invokeAll(tasks)) { 1802 try { 1803 future.get(); 1804 } catch (ExecutionException e) { 1805 LOG.error("Sniff zookeeper failed!", e); 1806 this.errorCode = ERROR_EXIT_CODE; 1807 } 1808 } 1809 } catch (InterruptedException e) { 1810 this.errorCode = ERROR_EXIT_CODE; 1811 Thread.currentThread().interrupt(); 1812 LOG.error("Sniff zookeeper interrupted!", e); 1813 } 1814 this.done = true; 1815 } 1816 1817 private ZookeeperStdOutSink getSink() { 1818 if (!(sink instanceof ZookeeperStdOutSink)) { 1819 throw new RuntimeException("Can only write to zookeeper sink"); 1820 } 1821 return ((ZookeeperStdOutSink) sink); 1822 } 1823 } 1824 1825 /** 1826 * A monitor for regionserver mode 1827 */ 1828 private static class RegionServerMonitor extends Monitor { 1829 private boolean rawScanEnabled; 1830 private boolean allRegions; 1831 1832 public RegionServerMonitor(Connection connection, String[] monitorTargets, boolean useRegExp, 1833 Sink sink, ExecutorService executor, boolean allRegions, boolean treatFailureAsError, 1834 long allowedFailures) { 1835 super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError, 1836 allowedFailures); 1837 Configuration conf = connection.getConfiguration(); 1838 this.rawScanEnabled = conf.getBoolean(HConstants.HBASE_CANARY_READ_RAW_SCAN_KEY, false); 1839 this.allRegions = allRegions; 1840 } 1841 1842 private RegionServerStdOutSink getSink() { 1843 if (!(sink instanceof RegionServerStdOutSink)) { 1844 throw new RuntimeException("Can only write to regionserver sink"); 1845 } 1846 return ((RegionServerStdOutSink) sink); 1847 } 1848 1849 @Override 1850 public void run() { 1851 if (this.initAdmin() && this.checkNoTableNames()) { 1852 RegionServerStdOutSink regionServerSink = null; 1853 try { 1854 regionServerSink = this.getSink(); 1855 } catch (RuntimeException e) { 1856 LOG.error("Run RegionServerMonitor failed!", e); 1857 this.errorCode = ERROR_EXIT_CODE; 1858 } 1859 Map<String, List<RegionInfo>> rsAndRMap = this.filterRegionServerByName(); 1860 this.initialized = true; 1861 this.monitorRegionServers(rsAndRMap, regionServerSink); 1862 } 1863 this.done = true; 1864 } 1865 1866 private boolean checkNoTableNames() { 1867 List<String> foundTableNames = new ArrayList<>(); 1868 TableName[] tableNames = null; 1869 LOG.debug("Reading list of tables"); 1870 try { 1871 tableNames = this.admin.listTableNames(); 1872 } catch (IOException e) { 1873 LOG.error("Get listTableNames failed", e); 1874 this.errorCode = INIT_ERROR_EXIT_CODE; 1875 return false; 1876 } 1877 1878 if (this.targets == null || this.targets.length == 0) { 1879 return true; 1880 } 1881 1882 for (String target : this.targets) { 1883 for (TableName tableName : tableNames) { 1884 if (target.equals(tableName.getNameAsString())) { 1885 foundTableNames.add(target); 1886 } 1887 } 1888 } 1889 1890 if (foundTableNames.size() > 0) { 1891 System.err.println("Cannot pass a tablename when using the -regionserver " 1892 + "option, tablenames:" + foundTableNames.toString()); 1893 this.errorCode = USAGE_EXIT_CODE; 1894 } 1895 return foundTableNames.isEmpty(); 1896 } 1897 1898 private void monitorRegionServers(Map<String, List<RegionInfo>> rsAndRMap, 1899 RegionServerStdOutSink regionServerSink) { 1900 List<RegionServerTask> tasks = new ArrayList<>(); 1901 Map<String, AtomicLong> successMap = new HashMap<>(); 1902 for (Map.Entry<String, List<RegionInfo>> entry : rsAndRMap.entrySet()) { 1903 String serverName = entry.getKey(); 1904 AtomicLong successes = new AtomicLong(0); 1905 successMap.put(serverName, successes); 1906 if (entry.getValue().isEmpty()) { 1907 LOG.error("Regionserver not serving any regions - {}", serverName); 1908 } else if (this.allRegions) { 1909 for (RegionInfo region : entry.getValue()) { 1910 tasks.add(new RegionServerTask(this.connection, serverName, region, regionServerSink, 1911 this.rawScanEnabled, successes)); 1912 } 1913 } else { 1914 // random select a region if flag not set 1915 RegionInfo region = 1916 entry.getValue().get(ThreadLocalRandom.current().nextInt(entry.getValue().size())); 1917 tasks.add(new RegionServerTask(this.connection, serverName, region, regionServerSink, 1918 this.rawScanEnabled, successes)); 1919 } 1920 } 1921 try { 1922 for (Future<Void> future : this.executor.invokeAll(tasks)) { 1923 try { 1924 future.get(); 1925 } catch (ExecutionException e) { 1926 LOG.error("Sniff regionserver failed!", e); 1927 this.errorCode = ERROR_EXIT_CODE; 1928 } 1929 } 1930 if (this.allRegions) { 1931 for (Map.Entry<String, List<RegionInfo>> entry : rsAndRMap.entrySet()) { 1932 String serverName = entry.getKey(); 1933 LOG.info("Successfully read {} regions out of {} on regionserver {}", 1934 successMap.get(serverName), entry.getValue().size(), serverName); 1935 } 1936 } 1937 } catch (InterruptedException e) { 1938 this.errorCode = ERROR_EXIT_CODE; 1939 LOG.error("Sniff regionserver interrupted!", e); 1940 } 1941 } 1942 1943 private Map<String, List<RegionInfo>> filterRegionServerByName() { 1944 Map<String, List<RegionInfo>> regionServerAndRegionsMap = this.getAllRegionServerByName(); 1945 regionServerAndRegionsMap = this.doFilterRegionServerByName(regionServerAndRegionsMap); 1946 return regionServerAndRegionsMap; 1947 } 1948 1949 private Map<String, List<RegionInfo>> getAllRegionServerByName() { 1950 Map<String, List<RegionInfo>> rsAndRMap = new HashMap<>(); 1951 try { 1952 LOG.debug("Reading list of tables and locations"); 1953 List<TableDescriptor> tableDescs = this.admin.listTableDescriptors(); 1954 List<RegionInfo> regions = null; 1955 for (TableDescriptor tableDesc : tableDescs) { 1956 try (RegionLocator regionLocator = 1957 this.admin.getConnection().getRegionLocator(tableDesc.getTableName())) { 1958 for (HRegionLocation location : regionLocator.getAllRegionLocations()) { 1959 if (location == null) { 1960 LOG.warn("Null location for table {}", tableDesc.getTableName()); 1961 continue; 1962 } 1963 ServerName rs = location.getServerName(); 1964 String rsName = rs.getHostname(); 1965 RegionInfo r = location.getRegion(); 1966 if (rsAndRMap.containsKey(rsName)) { 1967 regions = rsAndRMap.get(rsName); 1968 } else { 1969 regions = new ArrayList<>(); 1970 rsAndRMap.put(rsName, regions); 1971 } 1972 regions.add(r); 1973 } 1974 } 1975 } 1976 1977 // get any live regionservers not serving any regions 1978 for (ServerName rs : this.admin.getRegionServers()) { 1979 String rsName = rs.getHostname(); 1980 if (!rsAndRMap.containsKey(rsName)) { 1981 rsAndRMap.put(rsName, Collections.<RegionInfo> emptyList()); 1982 } 1983 } 1984 } catch (IOException e) { 1985 LOG.error("Get HTables info failed", e); 1986 this.errorCode = INIT_ERROR_EXIT_CODE; 1987 } 1988 return rsAndRMap; 1989 } 1990 1991 private Map<String, List<RegionInfo>> 1992 doFilterRegionServerByName(Map<String, List<RegionInfo>> fullRsAndRMap) { 1993 1994 Map<String, List<RegionInfo>> filteredRsAndRMap = null; 1995 1996 if (this.targets != null && this.targets.length > 0) { 1997 filteredRsAndRMap = new HashMap<>(); 1998 Pattern pattern = null; 1999 Matcher matcher = null; 2000 boolean regExpFound = false; 2001 for (String rsName : this.targets) { 2002 if (this.useRegExp) { 2003 regExpFound = false; 2004 pattern = Pattern.compile(rsName); 2005 for (Map.Entry<String, List<RegionInfo>> entry : fullRsAndRMap.entrySet()) { 2006 matcher = pattern.matcher(entry.getKey()); 2007 if (matcher.matches()) { 2008 filteredRsAndRMap.put(entry.getKey(), entry.getValue()); 2009 regExpFound = true; 2010 } 2011 } 2012 if (!regExpFound) { 2013 LOG.info("No RegionServerInfo found, regionServerPattern {}", rsName); 2014 } 2015 } else { 2016 if (fullRsAndRMap.containsKey(rsName)) { 2017 filteredRsAndRMap.put(rsName, fullRsAndRMap.get(rsName)); 2018 } else { 2019 LOG.info("No RegionServerInfo found, regionServerName {}", rsName); 2020 } 2021 } 2022 } 2023 } else { 2024 filteredRsAndRMap = fullRsAndRMap; 2025 } 2026 return filteredRsAndRMap; 2027 } 2028 } 2029 2030 public static void main(String[] args) throws Exception { 2031 final Configuration conf = HBaseConfiguration.create(); 2032 2033 int numThreads = conf.getInt("hbase.canary.threads.num", MAX_THREADS_NUM); 2034 LOG.info("Execution thread count={}", numThreads); 2035 2036 int exitCode; 2037 ExecutorService executor = new ScheduledThreadPoolExecutor(numThreads); 2038 try { 2039 exitCode = ToolRunner.run(conf, new CanaryTool(executor), args); 2040 } finally { 2041 executor.shutdown(); 2042 } 2043 System.exit(exitCode); 2044 } 2045}