001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.tool; 021 022import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT; 023import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_ZNODE_PARENT; 024 025import java.io.Closeable; 026import java.io.IOException; 027import java.net.InetSocketAddress; 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.Collections; 031import java.util.EnumSet; 032import java.util.HashMap; 033import java.util.HashSet; 034import java.util.LinkedList; 035import java.util.List; 036import java.util.Map; 037import java.util.Random; 038import java.util.Set; 039import java.util.TreeSet; 040import java.util.concurrent.Callable; 041import java.util.concurrent.ConcurrentHashMap; 042import java.util.concurrent.ExecutionException; 043import java.util.concurrent.ExecutorService; 044import java.util.concurrent.Future; 045import java.util.concurrent.ScheduledThreadPoolExecutor; 046import java.util.concurrent.atomic.AtomicLong; 047import java.util.concurrent.atomic.LongAdder; 048import java.util.regex.Matcher; 049import java.util.regex.Pattern; 050import org.apache.commons.lang3.time.StopWatch; 051import org.apache.hadoop.conf.Configuration; 052import org.apache.hadoop.hbase.AuthUtil; 053import org.apache.hadoop.hbase.ChoreService; 054import org.apache.hadoop.hbase.ClusterMetrics; 055import org.apache.hadoop.hbase.ClusterMetrics.Option; 056import org.apache.hadoop.hbase.DoNotRetryIOException; 057import org.apache.hadoop.hbase.HBaseConfiguration; 058import org.apache.hadoop.hbase.HColumnDescriptor; 059import org.apache.hadoop.hbase.HConstants; 060import org.apache.hadoop.hbase.HRegionLocation; 061import org.apache.hadoop.hbase.HTableDescriptor; 062import org.apache.hadoop.hbase.MetaTableAccessor; 063import org.apache.hadoop.hbase.NamespaceDescriptor; 064import org.apache.hadoop.hbase.ScheduledChore; 065import org.apache.hadoop.hbase.ServerName; 066import org.apache.hadoop.hbase.TableName; 067import org.apache.hadoop.hbase.TableNotEnabledException; 068import org.apache.hadoop.hbase.TableNotFoundException; 069import org.apache.hadoop.hbase.client.Admin; 070import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 071import org.apache.hadoop.hbase.client.Connection; 072import org.apache.hadoop.hbase.client.ConnectionFactory; 073import org.apache.hadoop.hbase.client.Get; 074import org.apache.hadoop.hbase.client.Put; 075import org.apache.hadoop.hbase.client.RegionInfo; 076import org.apache.hadoop.hbase.client.RegionLocator; 077import org.apache.hadoop.hbase.client.ResultScanner; 078import org.apache.hadoop.hbase.client.Scan; 079import org.apache.hadoop.hbase.client.Table; 080import org.apache.hadoop.hbase.client.TableDescriptor; 081import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; 082import org.apache.hadoop.hbase.tool.Canary.RegionTask.TaskType; 083import org.apache.hadoop.hbase.util.Bytes; 084import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 085import org.apache.hadoop.hbase.util.Pair; 086import org.apache.hadoop.hbase.util.ReflectionUtils; 087import org.apache.hadoop.hbase.util.RegionSplitter; 088import org.apache.hadoop.hbase.zookeeper.EmptyWatcher; 089import org.apache.hadoop.hbase.zookeeper.ZKConfig; 090import org.apache.hadoop.util.GenericOptionsParser; 091import org.apache.hadoop.util.Tool; 092import org.apache.hadoop.util.ToolRunner; 093import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 094import org.apache.yetus.audience.InterfaceAudience; 095import org.apache.zookeeper.KeeperException; 096import org.apache.zookeeper.ZooKeeper; 097import org.apache.zookeeper.client.ConnectStringParser; 098import org.apache.zookeeper.data.Stat; 099import org.slf4j.Logger; 100import org.slf4j.LoggerFactory; 101 102import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 103 104/** 105 * HBase Canary Tool for "canary monitoring" of a running HBase cluster. 106 * 107 * There are three modes: 108 * <ol> 109 * <li>region mode (Default): For each region, try to get one row per column family outputting 110 * information on failure (ERROR) or else the latency. 111 * </li> 112 * 113 * <li>regionserver mode: For each regionserver try to get one row from one table selected 114 * randomly outputting information on failure (ERROR) or else the latency. 115 * </li> 116 * 117 * <li>zookeeper mode: for each zookeeper instance, selects a znode outputting information on 118 * failure (ERROR) or else the latency. 119 * </li> 120 * </ol> 121 */ 122@InterfaceAudience.Private 123public final class Canary implements Tool { 124 /** 125 * Sink interface used by the canary to output information 126 */ 127 public interface Sink { 128 long getReadFailureCount(); 129 long incReadFailureCount(); 130 Map<String,String> getReadFailures(); 131 void updateReadFailures(String regionName, String serverName); 132 long getWriteFailureCount(); 133 long incWriteFailureCount(); 134 Map<String,String> getWriteFailures(); 135 void updateWriteFailures(String regionName, String serverName); 136 } 137 138 /** 139 * Simple implementation of canary sink that allows plotting to a file or standard output. 140 */ 141 public static class StdOutSink implements Sink { 142 private AtomicLong readFailureCount = new AtomicLong(0), 143 writeFailureCount = new AtomicLong(0); 144 private Map<String, String> readFailures = new ConcurrentHashMap<>(); 145 private Map<String, String> writeFailures = new ConcurrentHashMap<>(); 146 147 @Override 148 public long getReadFailureCount() { 149 return readFailureCount.get(); 150 } 151 152 @Override 153 public long incReadFailureCount() { 154 return readFailureCount.incrementAndGet(); 155 } 156 157 @Override 158 public Map<String, String> getReadFailures() { 159 return readFailures; 160 } 161 162 @Override 163 public void updateReadFailures(String regionName, String serverName) { 164 readFailures.put(regionName, serverName); 165 } 166 167 @Override 168 public long getWriteFailureCount() { 169 return writeFailureCount.get(); 170 } 171 172 @Override 173 public long incWriteFailureCount() { 174 return writeFailureCount.incrementAndGet(); 175 } 176 177 @Override 178 public Map<String, String> getWriteFailures() { 179 return writeFailures; 180 } 181 182 @Override 183 public void updateWriteFailures(String regionName, String serverName) { 184 writeFailures.put(regionName, serverName); 185 } 186 } 187 188 /** 189 * By RegionServer, for 'regionserver' mode. 190 */ 191 public static class RegionServerStdOutSink extends StdOutSink { 192 public void publishReadFailure(String table, String server) { 193 incReadFailureCount(); 194 LOG.error("Read from {} on {}", table, server); 195 } 196 197 public void publishReadTiming(String table, String server, long msTime) { 198 LOG.info("Read from {} on {} in {}ms", table, server, msTime); 199 } 200 } 201 202 /** 203 * Output for 'zookeeper' mode. 204 */ 205 public static class ZookeeperStdOutSink extends StdOutSink { 206 public void publishReadFailure(String znode, String server) { 207 incReadFailureCount(); 208 LOG.error("Read from {} on {}", znode, server); 209 } 210 211 public void publishReadTiming(String znode, String server, long msTime) { 212 LOG.info("Read from {} on {} in {}ms", znode, server, msTime); 213 } 214 } 215 216 /** 217 * By Region, for 'region' mode. 218 */ 219 public static class RegionStdOutSink extends StdOutSink { 220 private Map<String, LongAdder> perTableReadLatency = new HashMap<>(); 221 private LongAdder writeLatency = new LongAdder(); 222 223 public void publishReadFailure(ServerName serverName, RegionInfo region, Exception e) { 224 incReadFailureCount(); 225 LOG.error("Read from {} on {} failed", region.getRegionNameAsString(), serverName, e); 226 } 227 228 public void publishReadFailure(ServerName serverName, RegionInfo region, 229 ColumnFamilyDescriptor column, Exception e) { 230 incReadFailureCount(); 231 LOG.error("Read from {} on {} {} failed", region.getRegionNameAsString(), serverName, 232 column.getNameAsString(), e); 233 } 234 235 public void publishReadTiming(ServerName serverName, RegionInfo region, 236 ColumnFamilyDescriptor column, long msTime) { 237 LOG.info("Read from {} on {} {} in {}ms", region.getRegionNameAsString(), serverName, 238 column.getNameAsString(), msTime); 239 } 240 241 public void publishWriteFailure(ServerName serverName, RegionInfo region, Exception e) { 242 incWriteFailureCount(); 243 LOG.error("Write to {} on {} failed", region.getRegionNameAsString(), serverName, e); 244 } 245 246 public void publishWriteFailure(ServerName serverName, RegionInfo region, 247 ColumnFamilyDescriptor column, Exception e) { 248 incWriteFailureCount(); 249 LOG.error("Write to {} on {} {} failed", region.getRegionNameAsString(), serverName, 250 column.getNameAsString(), e); 251 } 252 253 public void publishWriteTiming(ServerName serverName, RegionInfo region, 254 ColumnFamilyDescriptor column, long msTime) { 255 LOG.info("Write to {} on {} {} in {}ms", 256 region.getRegionNameAsString(), serverName, column.getNameAsString(), msTime); 257 } 258 259 public Map<String, LongAdder> getReadLatencyMap() { 260 return this.perTableReadLatency; 261 } 262 263 public LongAdder initializeAndGetReadLatencyForTable(String tableName) { 264 LongAdder initLatency = new LongAdder(); 265 this.perTableReadLatency.put(tableName, initLatency); 266 return initLatency; 267 } 268 269 public void initializeWriteLatency() { 270 this.writeLatency.reset(); 271 } 272 273 public LongAdder getWriteLatency() { 274 return this.writeLatency; 275 } 276 } 277 278 /** 279 * Run a single zookeeper Task and then exit. 280 */ 281 static class ZookeeperTask implements Callable<Void> { 282 private final Connection connection; 283 private final String host; 284 private String znode; 285 private final int timeout; 286 private ZookeeperStdOutSink sink; 287 288 public ZookeeperTask(Connection connection, String host, String znode, int timeout, 289 ZookeeperStdOutSink sink) { 290 this.connection = connection; 291 this.host = host; 292 this.znode = znode; 293 this.timeout = timeout; 294 this.sink = sink; 295 } 296 297 @Override public Void call() throws Exception { 298 ZooKeeper zooKeeper = null; 299 try { 300 zooKeeper = new ZooKeeper(host, timeout, EmptyWatcher.instance); 301 Stat exists = zooKeeper.exists(znode, false); 302 StopWatch stopwatch = new StopWatch(); 303 stopwatch.start(); 304 zooKeeper.getData(znode, false, exists); 305 stopwatch.stop(); 306 sink.publishReadTiming(znode, host, stopwatch.getTime()); 307 } catch (KeeperException | InterruptedException e) { 308 sink.publishReadFailure(znode, host); 309 } finally { 310 if (zooKeeper != null) { 311 zooKeeper.close(); 312 } 313 } 314 return null; 315 } 316 } 317 318 /** 319 * Run a single Region Task and then exit. For each column family of the Region, get one row and 320 * output latency or failure. 321 */ 322 static class RegionTask implements Callable<Void> { 323 public enum TaskType{ 324 READ, WRITE 325 } 326 private Connection connection; 327 private RegionInfo region; 328 private RegionStdOutSink sink; 329 private TaskType taskType; 330 private boolean rawScanEnabled; 331 private ServerName serverName; 332 private LongAdder readWriteLatency; 333 334 RegionTask(Connection connection, RegionInfo region, ServerName serverName, 335 RegionStdOutSink sink, TaskType taskType, boolean rawScanEnabled, LongAdder rwLatency) { 336 this.connection = connection; 337 this.region = region; 338 this.serverName = serverName; 339 this.sink = sink; 340 this.taskType = taskType; 341 this.rawScanEnabled = rawScanEnabled; 342 this.readWriteLatency = rwLatency; 343 } 344 345 @Override 346 public Void call() { 347 switch (taskType) { 348 case READ: 349 return read(); 350 case WRITE: 351 return write(); 352 default: 353 return read(); 354 } 355 } 356 357 public Void read() { 358 Table table = null; 359 TableDescriptor tableDesc = null; 360 try { 361 LOG.debug("Reading table descriptor for table {}", region.getTable()); 362 table = connection.getTable(region.getTable()); 363 tableDesc = table.getDescriptor(); 364 } catch (IOException e) { 365 LOG.debug("sniffRegion {} of {} failed", region.getEncodedName(), e); 366 sink.publishReadFailure(serverName, region, e); 367 if (table != null) { 368 try { 369 table.close(); 370 } catch (IOException ioe) { 371 LOG.error("Close table failed", e); 372 } 373 } 374 return null; 375 } 376 377 byte[] startKey = null; 378 Get get = null; 379 Scan scan = null; 380 ResultScanner rs = null; 381 StopWatch stopWatch = new StopWatch(); 382 for (ColumnFamilyDescriptor column : tableDesc.getColumnFamilies()) { 383 stopWatch.reset(); 384 startKey = region.getStartKey(); 385 // Can't do a get on empty start row so do a Scan of first element if any instead. 386 if (startKey.length > 0) { 387 get = new Get(startKey); 388 get.setCacheBlocks(false); 389 get.setFilter(new FirstKeyOnlyFilter()); 390 get.addFamily(column.getName()); 391 } else { 392 scan = new Scan(); 393 LOG.debug("rawScan {} for {}", rawScanEnabled, tableDesc.getTableName()); 394 scan.setRaw(rawScanEnabled); 395 scan.setCaching(1); 396 scan.setCacheBlocks(false); 397 scan.setFilter(new FirstKeyOnlyFilter()); 398 scan.addFamily(column.getName()); 399 scan.setMaxResultSize(1L); 400 scan.setOneRowLimit(); 401 } 402 LOG.debug("Reading from {} {} {} {}", tableDesc.getTableName(), 403 region.getRegionNameAsString(), column.getNameAsString(), 404 Bytes.toStringBinary(startKey)); 405 try { 406 stopWatch.start(); 407 if (startKey.length > 0) { 408 table.get(get); 409 } else { 410 rs = table.getScanner(scan); 411 rs.next(); 412 } 413 stopWatch.stop(); 414 this.readWriteLatency.add(stopWatch.getTime()); 415 sink.publishReadTiming(serverName, region, column, stopWatch.getTime()); 416 } catch (Exception e) { 417 sink.publishReadFailure(serverName, region, column, e); 418 sink.updateReadFailures(region.getRegionNameAsString(), serverName.getHostname()); 419 } finally { 420 if (rs != null) { 421 rs.close(); 422 } 423 scan = null; 424 get = null; 425 } 426 } 427 try { 428 table.close(); 429 } catch (IOException e) { 430 LOG.error("Close table failed", e); 431 } 432 return null; 433 } 434 435 /** 436 * Check writes for the canary table 437 */ 438 private Void write() { 439 Table table = null; 440 TableDescriptor tableDesc = null; 441 try { 442 table = connection.getTable(region.getTable()); 443 tableDesc = table.getDescriptor(); 444 byte[] rowToCheck = region.getStartKey(); 445 if (rowToCheck.length == 0) { 446 rowToCheck = new byte[]{0x0}; 447 } 448 int writeValueSize = 449 connection.getConfiguration().getInt(HConstants.HBASE_CANARY_WRITE_VALUE_SIZE_KEY, 10); 450 for (ColumnFamilyDescriptor column : tableDesc.getColumnFamilies()) { 451 Put put = new Put(rowToCheck); 452 byte[] value = new byte[writeValueSize]; 453 Bytes.random(value); 454 put.addColumn(column.getName(), HConstants.EMPTY_BYTE_ARRAY, value); 455 456 LOG.debug("Writing to {} {} {} {}", 457 tableDesc.getTableName(), region.getRegionNameAsString(), column.getNameAsString(), 458 Bytes.toStringBinary(rowToCheck)); 459 try { 460 long startTime = System.currentTimeMillis(); 461 table.put(put); 462 long time = System.currentTimeMillis() - startTime; 463 this.readWriteLatency.add(time); 464 sink.publishWriteTiming(serverName, region, column, time); 465 } catch (Exception e) { 466 sink.publishWriteFailure(serverName, region, column, e); 467 } 468 } 469 table.close(); 470 } catch (IOException e) { 471 sink.publishWriteFailure(serverName, region, e); 472 sink.updateWriteFailures(region.getRegionNameAsString(), serverName.getHostname() ); 473 } 474 return null; 475 } 476 } 477 478 /** 479 * Run a single RegionServer Task and then exit. 480 * Get one row from a region on the regionserver and output latency or the failure. 481 */ 482 static class RegionServerTask implements Callable<Void> { 483 private Connection connection; 484 private String serverName; 485 private RegionInfo region; 486 private RegionServerStdOutSink sink; 487 private AtomicLong successes; 488 489 RegionServerTask(Connection connection, String serverName, RegionInfo region, 490 RegionServerStdOutSink sink, AtomicLong successes) { 491 this.connection = connection; 492 this.serverName = serverName; 493 this.region = region; 494 this.sink = sink; 495 this.successes = successes; 496 } 497 498 @Override 499 public Void call() { 500 TableName tableName = null; 501 Table table = null; 502 Get get = null; 503 byte[] startKey = null; 504 Scan scan = null; 505 StopWatch stopWatch = new StopWatch(); 506 // monitor one region on every region server 507 stopWatch.reset(); 508 try { 509 tableName = region.getTable(); 510 table = connection.getTable(tableName); 511 startKey = region.getStartKey(); 512 // Can't do a get on empty start row so do a Scan of first element if any instead. 513 LOG.debug("Reading from {} {} {} {}", 514 serverName, region.getTable(), region.getRegionNameAsString(), 515 Bytes.toStringBinary(startKey)); 516 if (startKey.length > 0) { 517 get = new Get(startKey); 518 get.setCacheBlocks(false); 519 get.setFilter(new FirstKeyOnlyFilter()); 520 stopWatch.start(); 521 table.get(get); 522 stopWatch.stop(); 523 } else { 524 scan = new Scan(); 525 scan.setCacheBlocks(false); 526 scan.setFilter(new FirstKeyOnlyFilter()); 527 scan.setCaching(1); 528 scan.setMaxResultSize(1L); 529 scan.setOneRowLimit(); 530 stopWatch.start(); 531 ResultScanner s = table.getScanner(scan); 532 s.next(); 533 s.close(); 534 stopWatch.stop(); 535 } 536 successes.incrementAndGet(); 537 sink.publishReadTiming(tableName.getNameAsString(), serverName, stopWatch.getTime()); 538 } catch (TableNotFoundException tnfe) { 539 LOG.error("Table may be deleted", tnfe); 540 // This is ignored because it doesn't imply that the regionserver is dead 541 } catch (TableNotEnabledException tnee) { 542 // This is considered a success since we got a response. 543 successes.incrementAndGet(); 544 LOG.debug("The targeted table was disabled. Assuming success."); 545 } catch (DoNotRetryIOException dnrioe) { 546 sink.publishReadFailure(tableName.getNameAsString(), serverName); 547 LOG.error(dnrioe.toString(), dnrioe); 548 } catch (IOException e) { 549 sink.publishReadFailure(tableName.getNameAsString(), serverName); 550 LOG.error(e.toString(), e); 551 } finally { 552 if (table != null) { 553 try { 554 table.close(); 555 } catch (IOException e) {/* DO NOTHING */ 556 LOG.error("Close table failed", e); 557 } 558 } 559 scan = null; 560 get = null; 561 startKey = null; 562 } 563 return null; 564 } 565 } 566 567 private static final int USAGE_EXIT_CODE = 1; 568 private static final int INIT_ERROR_EXIT_CODE = 2; 569 private static final int TIMEOUT_ERROR_EXIT_CODE = 3; 570 private static final int ERROR_EXIT_CODE = 4; 571 private static final int FAILURE_EXIT_CODE = 5; 572 573 private static final long DEFAULT_INTERVAL = 60000; 574 575 private static final long DEFAULT_TIMEOUT = 600000; // 10 mins 576 private static final int MAX_THREADS_NUM = 16; // #threads to contact regions 577 578 private static final Logger LOG = LoggerFactory.getLogger(Canary.class); 579 580 public static final TableName DEFAULT_WRITE_TABLE_NAME = TableName.valueOf( 581 NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "canary"); 582 583 private static final String CANARY_TABLE_FAMILY_NAME = "Test"; 584 585 private Configuration conf = null; 586 private long interval = 0; 587 private Sink sink = null; 588 589 private boolean useRegExp; 590 private long timeout = DEFAULT_TIMEOUT; 591 private boolean failOnError = true; 592 593 /** 594 * True if we are to run in 'regionServer' mode. 595 */ 596 private boolean regionServerMode = false; 597 598 /** 599 * True if we are to run in zookeeper 'mode'. 600 */ 601 private boolean zookeeperMode = false; 602 private long permittedFailures = 0; 603 private boolean regionServerAllRegions = false; 604 private boolean writeSniffing = false; 605 private long configuredWriteTableTimeout = DEFAULT_TIMEOUT; 606 private boolean treatFailureAsError = false; 607 private TableName writeTableName = DEFAULT_WRITE_TABLE_NAME; 608 609 /** 610 * This is a Map of table to timeout. The timeout is for reading all regions in the table; i.e. 611 * we aggregate time to fetch each region and it needs to be less than this value else we 612 * log an ERROR. 613 */ 614 private HashMap<String, Long> configuredReadTableTimeouts = new HashMap<>(); 615 616 private ExecutorService executor; // threads to retrieve data from regionservers 617 618 public Canary() { 619 this(new ScheduledThreadPoolExecutor(1)); 620 } 621 622 public Canary(ExecutorService executor) { 623 this(executor, null); 624 } 625 626 @VisibleForTesting 627 Canary(ExecutorService executor, Sink sink) { 628 this.executor = executor; 629 this.sink = sink; 630 } 631 632 @Override 633 public Configuration getConf() { 634 return conf; 635 } 636 637 @Override 638 public void setConf(Configuration conf) { 639 this.conf = conf; 640 } 641 642 private int parseArgs(String[] args) { 643 int index = -1; 644 // Process command line args 645 for (int i = 0; i < args.length; i++) { 646 String cmd = args[i]; 647 648 if (cmd.startsWith("-")) { 649 if (index >= 0) { 650 // command line args must be in the form: [opts] [table 1 [table 2 ...]] 651 System.err.println("Invalid command line options"); 652 printUsageAndExit(); 653 } 654 655 if (cmd.equals("-help") || cmd.equals("-h")) { 656 // user asked for help, print the help and quit. 657 printUsageAndExit(); 658 } else if (cmd.equals("-daemon") && interval == 0) { 659 // user asked for daemon mode, set a default interval between checks 660 interval = DEFAULT_INTERVAL; 661 } else if (cmd.equals("-interval")) { 662 // user has specified an interval for canary breaths (-interval N) 663 i++; 664 665 if (i == args.length) { 666 System.err.println("-interval takes a numeric seconds value argument."); 667 printUsageAndExit(); 668 } 669 670 try { 671 interval = Long.parseLong(args[i]) * 1000; 672 } catch (NumberFormatException e) { 673 System.err.println("-interval needs a numeric value argument."); 674 printUsageAndExit(); 675 } 676 } else if (cmd.equals("-zookeeper")) { 677 this.zookeeperMode = true; 678 } else if(cmd.equals("-regionserver")) { 679 this.regionServerMode = true; 680 } else if(cmd.equals("-allRegions")) { 681 this.regionServerAllRegions = true; 682 } else if(cmd.equals("-writeSniffing")) { 683 this.writeSniffing = true; 684 } else if(cmd.equals("-treatFailureAsError") || cmd.equals("-failureAsError")) { 685 this.treatFailureAsError = true; 686 } else if (cmd.equals("-e")) { 687 this.useRegExp = true; 688 } else if (cmd.equals("-t")) { 689 i++; 690 691 if (i == args.length) { 692 System.err.println("-t takes a numeric milliseconds value argument."); 693 printUsageAndExit(); 694 } 695 696 try { 697 this.timeout = Long.parseLong(args[i]); 698 } catch (NumberFormatException e) { 699 System.err.println("-t takes a numeric milliseconds value argument."); 700 printUsageAndExit(); 701 } 702 } else if(cmd.equals("-writeTableTimeout")) { 703 i++; 704 705 if (i == args.length) { 706 System.err.println("-writeTableTimeout takes a numeric milliseconds value argument."); 707 printUsageAndExit(); 708 } 709 710 try { 711 this.configuredWriteTableTimeout = Long.parseLong(args[i]); 712 } catch (NumberFormatException e) { 713 System.err.println("-writeTableTimeout takes a numeric milliseconds value argument."); 714 printUsageAndExit(); 715 } 716 } else if (cmd.equals("-writeTable")) { 717 i++; 718 719 if (i == args.length) { 720 System.err.println("-writeTable takes a string tablename value argument."); 721 printUsageAndExit(); 722 } 723 this.writeTableName = TableName.valueOf(args[i]); 724 } else if (cmd.equals("-f")) { 725 i++; 726 727 if (i == args.length) { 728 System.err 729 .println("-f needs a boolean value argument (true|false)."); 730 printUsageAndExit(); 731 } 732 733 this.failOnError = Boolean.parseBoolean(args[i]); 734 } else if (cmd.equals("-readTableTimeouts")) { 735 i++; 736 737 if (i == args.length) { 738 System.err.println("-readTableTimeouts needs a comma-separated list of read " + 739 "millisecond timeouts per table (without spaces)."); 740 printUsageAndExit(); 741 } 742 String [] tableTimeouts = args[i].split(","); 743 for (String tT: tableTimeouts) { 744 String [] nameTimeout = tT.split("="); 745 if (nameTimeout.length < 2) { 746 System.err.println("Each -readTableTimeouts argument must be of the form " + 747 "<tableName>=<read timeout> (without spaces)."); 748 printUsageAndExit(); 749 } 750 long timeoutVal = 0L; 751 try { 752 timeoutVal = Long.parseLong(nameTimeout[1]); 753 } catch (NumberFormatException e) { 754 System.err.println("-readTableTimeouts read timeout for each table must be a numeric value argument."); 755 printUsageAndExit(); 756 } 757 this.configuredReadTableTimeouts.put(nameTimeout[0], timeoutVal); 758 } 759 } else if (cmd.equals("-permittedZookeeperFailures")) { 760 i++; 761 762 if (i == args.length) { 763 System.err.println("-permittedZookeeperFailures needs a numeric value argument."); 764 printUsageAndExit(); 765 } 766 try { 767 this.permittedFailures = Long.parseLong(args[i]); 768 } catch (NumberFormatException e) { 769 System.err.println("-permittedZookeeperFailures needs a numeric value argument."); 770 printUsageAndExit(); 771 } 772 } else { 773 // no options match 774 System.err.println(cmd + " options is invalid."); 775 printUsageAndExit(); 776 } 777 } else if (index < 0) { 778 // keep track of first table name specified by the user 779 index = i; 780 } 781 } 782 if (this.regionServerAllRegions && !this.regionServerMode) { 783 System.err.println("-allRegions can only be specified in regionserver mode."); 784 printUsageAndExit(); 785 } 786 if (this.zookeeperMode) { 787 if (this.regionServerMode || this.regionServerAllRegions || this.writeSniffing) { 788 System.err.println("-zookeeper is exclusive and cannot be combined with " 789 + "other modes."); 790 printUsageAndExit(); 791 } 792 } 793 if (this.permittedFailures != 0 && !this.zookeeperMode) { 794 System.err.println("-permittedZookeeperFailures requires -zookeeper mode."); 795 printUsageAndExit(); 796 } 797 if (!this.configuredReadTableTimeouts.isEmpty() && (this.regionServerMode || this.zookeeperMode)) { 798 System.err.println("-readTableTimeouts can only be configured in region mode."); 799 printUsageAndExit(); 800 } 801 return index; 802 } 803 804 @Override 805 public int run(String[] args) throws Exception { 806 int index = parseArgs(args); 807 ChoreService choreService = null; 808 809 // Launches chore for refreshing kerberos credentials if security is enabled. 810 // Please see http://hbase.apache.org/book.html#_running_canary_in_a_kerberos_enabled_cluster 811 // for more details. 812 final ScheduledChore authChore = AuthUtil.getAuthChore(conf); 813 if (authChore != null) { 814 choreService = new ChoreService("CANARY_TOOL"); 815 choreService.scheduleChore(authChore); 816 } 817 818 // Start to prepare the stuffs 819 Monitor monitor = null; 820 Thread monitorThread = null; 821 long startTime = 0; 822 long currentTimeLength = 0; 823 // Get a connection to use in below. 824 try (Connection connection = ConnectionFactory.createConnection(this.conf)) { 825 do { 826 // Do monitor !! 827 try { 828 monitor = this.newMonitor(connection, index, args); 829 monitorThread = new Thread(monitor, "CanaryMonitor-" + System.currentTimeMillis()); 830 startTime = System.currentTimeMillis(); 831 monitorThread.start(); 832 while (!monitor.isDone()) { 833 // wait for 1 sec 834 Thread.sleep(1000); 835 // exit if any error occurs 836 if (this.failOnError && monitor.hasError()) { 837 monitorThread.interrupt(); 838 if (monitor.initialized) { 839 return monitor.errorCode; 840 } else { 841 return INIT_ERROR_EXIT_CODE; 842 } 843 } 844 currentTimeLength = System.currentTimeMillis() - startTime; 845 if (currentTimeLength > this.timeout) { 846 LOG.error("The monitor is running too long (" + currentTimeLength 847 + ") after timeout limit:" + this.timeout 848 + " will be killed itself !!"); 849 if (monitor.initialized) { 850 return TIMEOUT_ERROR_EXIT_CODE; 851 } else { 852 return INIT_ERROR_EXIT_CODE; 853 } 854 } 855 } 856 857 if (this.failOnError && monitor.finalCheckForErrors()) { 858 monitorThread.interrupt(); 859 return monitor.errorCode; 860 } 861 } finally { 862 if (monitor != null) monitor.close(); 863 } 864 865 Thread.sleep(interval); 866 } while (interval > 0); 867 } // try-with-resources close 868 869 if (choreService != null) { 870 choreService.shutdown(); 871 } 872 return monitor.errorCode; 873 } 874 875 public Map<String, String> getReadFailures() { 876 return sink.getReadFailures(); 877 } 878 879 public Map<String, String> getWriteFailures() { 880 return sink.getWriteFailures(); 881 } 882 883 private void printUsageAndExit() { 884 System.err.println( 885 "Usage: canary [OPTIONS] [<TABLE1> [<TABLE2]...] | [<REGIONSERVER1> [<REGIONSERVER2]..]"); 886 System.err.println("Where [OPTIONS] are:"); 887 System.err.println(" -h,-help show this help and exit."); 888 System.err.println(" -regionserver set 'regionserver mode'; gets row from random region on " + 889 "server"); 890 System.err.println(" -allRegions get from ALL regions when 'regionserver mode', not just " + 891 "random one."); 892 System.err.println(" -zookeeper set 'zookeeper mode'; grab zookeeper.znode.parent on " + 893 "each ensemble member"); 894 System.err.println(" -permittedZookeeperFailures <N> Ignore first N failures when attempting to " + 895 "connect to individual zookeeper nodes in the ensemble"); 896 System.err.println(" -daemon continuous check at defined intervals."); 897 System.err.println(" -interval <N> interval between checks in seconds"); 898 System.err.println(" -e consider table/regionserver argument as regular " + 899 "expression"); 900 System.err.println(" -f <B> exit on first error; default=true"); 901 System.err.println(" -failureAsError treat read/write failure as error"); 902 System.err.println(" -t <N> timeout for canary-test run; default=600000ms"); 903 System.err.println(" -writeSniffing enable write sniffing"); 904 System.err.println(" -writeTable the table used for write sniffing; default=hbase:canary"); 905 System.err.println(" -writeTableTimeout <N> timeout for writeTable; default=600000ms"); 906 System.err.println(" -readTableTimeouts <tableName>=<read timeout>," + 907 "<tableName>=<read timeout>,..."); 908 System.err.println(" comma-separated list of table read timeouts " + 909 "(no spaces);"); 910 System.err.println(" logs 'ERROR' if takes longer. default=600000ms"); 911 System.err.println(""); 912 System.err.println(" -D<configProperty>=<value> to assign or override configuration params"); 913 System.err.println(" -Dhbase.canary.read.raw.enabled=<true/false> Set to enable/disable " + 914 "raw scan; default=false"); 915 System.err.println(""); 916 System.err.println("Canary runs in one of three modes: region (default), regionserver, or " + 917 "zookeeper."); 918 System.err.println("To sniff/probe all regions, pass no arguments."); 919 System.err.println("To sniff/probe all regions of a table, pass tablename."); 920 System.err.println("To sniff/probe regionservers, pass -regionserver, etc."); 921 System.err.println("See http://hbase.apache.org/book.html#_canary for Canary documentation."); 922 System.exit(USAGE_EXIT_CODE); 923 } 924 925 Sink getSink(Configuration configuration, Class clazz) { 926 // In test context, this.sink might be set. Use it if non-null. For testing. 927 return this.sink != null? this.sink: 928 (Sink)ReflectionUtils.newInstance(configuration.getClass("hbase.canary.sink.class", 929 clazz, Sink.class)); 930 } 931 932 /** 933 * A Factory method for {@link Monitor}. 934 * Makes a RegionServerMonitor, or a ZooKeeperMonitor, or a RegionMonitor. 935 * @param index a start index for monitor target 936 * @param args args passed from user 937 * @return a Monitor instance 938 */ 939 public Monitor newMonitor(final Connection connection, int index, String[] args) { 940 Monitor monitor = null; 941 String[] monitorTargets = null; 942 943 if (index >= 0) { 944 int length = args.length - index; 945 monitorTargets = new String[length]; 946 System.arraycopy(args, index, monitorTargets, 0, length); 947 } 948 949 if (this.regionServerMode) { 950 monitor = 951 new RegionServerMonitor(connection, monitorTargets, this.useRegExp, 952 getSink(connection.getConfiguration(), RegionServerStdOutSink.class), 953 this.executor, this.regionServerAllRegions, 954 this.treatFailureAsError, this.permittedFailures); 955 } else if (this.zookeeperMode) { 956 monitor = 957 new ZookeeperMonitor(connection, monitorTargets, this.useRegExp, 958 getSink(connection.getConfiguration(), ZookeeperStdOutSink.class), 959 this.executor, this.treatFailureAsError, this.permittedFailures); 960 } else { 961 monitor = 962 new RegionMonitor(connection, monitorTargets, this.useRegExp, 963 getSink(connection.getConfiguration(), RegionStdOutSink.class), 964 this.executor, this.writeSniffing, 965 this.writeTableName, this.treatFailureAsError, this.configuredReadTableTimeouts, 966 this.configuredWriteTableTimeout, this.permittedFailures); 967 } 968 return monitor; 969 } 970 971 /** 972 * A Monitor super-class can be extended by users 973 */ 974 public static abstract class Monitor implements Runnable, Closeable { 975 protected Connection connection; 976 protected Admin admin; 977 /** 978 * 'Target' dependent on 'mode'. Could be Tables or RegionServers or ZNodes. 979 * Passed on the command-line as arguments. 980 */ 981 protected String[] targets; 982 protected boolean useRegExp; 983 protected boolean treatFailureAsError; 984 protected boolean initialized = false; 985 986 protected boolean done = false; 987 protected int errorCode = 0; 988 protected long allowedFailures = 0; 989 protected Sink sink; 990 protected ExecutorService executor; 991 992 public boolean isDone() { 993 return done; 994 } 995 996 public boolean hasError() { 997 return errorCode != 0; 998 } 999 1000 public boolean finalCheckForErrors() { 1001 if (errorCode != 0) { 1002 return true; 1003 } 1004 if (treatFailureAsError && 1005 (sink.getReadFailureCount() > allowedFailures || sink.getWriteFailureCount() > allowedFailures)) { 1006 LOG.error("Too many failures detected, treating failure as error, failing the Canary."); 1007 errorCode = FAILURE_EXIT_CODE; 1008 return true; 1009 } 1010 return false; 1011 } 1012 1013 @Override 1014 public void close() throws IOException { 1015 if (this.admin != null) this.admin.close(); 1016 } 1017 1018 protected Monitor(Connection connection, String[] monitorTargets, boolean useRegExp, Sink sink, 1019 ExecutorService executor, boolean treatFailureAsError, long allowedFailures) { 1020 if (null == connection) throw new IllegalArgumentException("connection shall not be null"); 1021 1022 this.connection = connection; 1023 this.targets = monitorTargets; 1024 this.useRegExp = useRegExp; 1025 this.treatFailureAsError = treatFailureAsError; 1026 this.sink = sink; 1027 this.executor = executor; 1028 this.allowedFailures = allowedFailures; 1029 } 1030 1031 @Override 1032 public abstract void run(); 1033 1034 protected boolean initAdmin() { 1035 if (null == this.admin) { 1036 try { 1037 this.admin = this.connection.getAdmin(); 1038 } catch (Exception e) { 1039 LOG.error("Initial HBaseAdmin failed...", e); 1040 this.errorCode = INIT_ERROR_EXIT_CODE; 1041 } 1042 } else if (admin.isAborted()) { 1043 LOG.error("HBaseAdmin aborted"); 1044 this.errorCode = INIT_ERROR_EXIT_CODE; 1045 } 1046 return !this.hasError(); 1047 } 1048 } 1049 1050 /** 1051 * A monitor for region mode. 1052 */ 1053 private static class RegionMonitor extends Monitor { 1054 // 10 minutes 1055 private static final int DEFAULT_WRITE_TABLE_CHECK_PERIOD = 10 * 60 * 1000; 1056 // 1 days 1057 private static final int DEFAULT_WRITE_DATA_TTL = 24 * 60 * 60; 1058 1059 private long lastCheckTime = -1; 1060 private boolean writeSniffing; 1061 private TableName writeTableName; 1062 private int writeDataTTL; 1063 private float regionsLowerLimit; 1064 private float regionsUpperLimit; 1065 private int checkPeriod; 1066 private boolean rawScanEnabled; 1067 1068 /** 1069 * This is a timeout per table. If read of each region in the table aggregated takes longer 1070 * than what is configured here, we log an ERROR rather than just an INFO. 1071 */ 1072 private HashMap<String, Long> configuredReadTableTimeouts; 1073 1074 private long configuredWriteTableTimeout; 1075 1076 public RegionMonitor(Connection connection, String[] monitorTargets, boolean useRegExp, 1077 Sink sink, ExecutorService executor, boolean writeSniffing, TableName writeTableName, 1078 boolean treatFailureAsError, HashMap<String, Long> configuredReadTableTimeouts, 1079 long configuredWriteTableTimeout, long allowedFailures) { 1080 super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError, allowedFailures); 1081 Configuration conf = connection.getConfiguration(); 1082 this.writeSniffing = writeSniffing; 1083 this.writeTableName = writeTableName; 1084 this.writeDataTTL = 1085 conf.getInt(HConstants.HBASE_CANARY_WRITE_DATA_TTL_KEY, DEFAULT_WRITE_DATA_TTL); 1086 this.regionsLowerLimit = 1087 conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY, 1.0f); 1088 this.regionsUpperLimit = 1089 conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_UPPERLIMIT_KEY, 1.5f); 1090 this.checkPeriod = 1091 conf.getInt(HConstants.HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY, 1092 DEFAULT_WRITE_TABLE_CHECK_PERIOD); 1093 this.rawScanEnabled = conf.getBoolean(HConstants.HBASE_CANARY_READ_RAW_SCAN_KEY, false); 1094 this.configuredReadTableTimeouts = new HashMap<>(configuredReadTableTimeouts); 1095 this.configuredWriteTableTimeout = configuredWriteTableTimeout; 1096 } 1097 1098 private RegionStdOutSink getSink() { 1099 if (!(sink instanceof RegionStdOutSink)) { 1100 throw new RuntimeException("Can only write to Region sink"); 1101 } 1102 return ((RegionStdOutSink) sink); 1103 } 1104 1105 @Override 1106 public void run() { 1107 if (this.initAdmin()) { 1108 try { 1109 List<Future<Void>> taskFutures = new LinkedList<>(); 1110 RegionStdOutSink regionSink = this.getSink(); 1111 if (this.targets != null && this.targets.length > 0) { 1112 String[] tables = generateMonitorTables(this.targets); 1113 // Check to see that each table name passed in the -readTableTimeouts argument is also 1114 // passed as a monitor target. 1115 if (!new HashSet<>(Arrays.asList(tables)). 1116 containsAll(this.configuredReadTableTimeouts.keySet())) { 1117 LOG.error("-readTableTimeouts can only specify read timeouts for monitor targets " + 1118 "passed via command line."); 1119 this.errorCode = USAGE_EXIT_CODE; 1120 return; 1121 } 1122 this.initialized = true; 1123 for (String table : tables) { 1124 LongAdder readLatency = regionSink.initializeAndGetReadLatencyForTable(table); 1125 taskFutures.addAll(Canary.sniff(admin, regionSink, table, executor, TaskType.READ, 1126 this.rawScanEnabled, readLatency)); 1127 } 1128 } else { 1129 taskFutures.addAll(sniff(TaskType.READ, regionSink)); 1130 } 1131 1132 if (writeSniffing) { 1133 if (EnvironmentEdgeManager.currentTime() - lastCheckTime > checkPeriod) { 1134 try { 1135 checkWriteTableDistribution(); 1136 } catch (IOException e) { 1137 LOG.error("Check canary table distribution failed!", e); 1138 } 1139 lastCheckTime = EnvironmentEdgeManager.currentTime(); 1140 } 1141 // sniff canary table with write operation 1142 regionSink.initializeWriteLatency(); 1143 LongAdder writeTableLatency = regionSink.getWriteLatency(); 1144 taskFutures.addAll(Canary.sniff(admin, regionSink, admin.getDescriptor(writeTableName), 1145 executor, TaskType.WRITE, this.rawScanEnabled, writeTableLatency)); 1146 } 1147 1148 for (Future<Void> future : taskFutures) { 1149 try { 1150 future.get(); 1151 } catch (ExecutionException e) { 1152 LOG.error("Sniff region failed!", e); 1153 } 1154 } 1155 Map<String, LongAdder> actualReadTableLatency = regionSink.getReadLatencyMap(); 1156 for (Map.Entry<String, Long> entry : configuredReadTableTimeouts.entrySet()) { 1157 String tableName = entry.getKey(); 1158 if (actualReadTableLatency.containsKey(tableName)) { 1159 Long actual = actualReadTableLatency.get(tableName).longValue(); 1160 Long configured = entry.getValue(); 1161 if (actual > configured) { 1162 LOG.error("Read operation for {} took {}ms (Configured read timeout {}ms.", 1163 tableName, actual, configured); 1164 } else { 1165 LOG.info("Read operation for {} took {}ms (Configured read timeout {}ms.", 1166 tableName, actual, configured); 1167 } 1168 } else { 1169 LOG.error("Read operation for {} failed!", tableName); 1170 } 1171 } 1172 if (this.writeSniffing) { 1173 String writeTableStringName = this.writeTableName.getNameAsString(); 1174 long actualWriteLatency = regionSink.getWriteLatency().longValue(); 1175 LOG.info("Write operation for {} took {}ms. Configured write timeout {}ms.", 1176 writeTableStringName, actualWriteLatency, this.configuredWriteTableTimeout); 1177 // Check that the writeTable write operation latency does not exceed the configured timeout. 1178 if (actualWriteLatency > this.configuredWriteTableTimeout) { 1179 LOG.error("Write operation for {} exceeded the configured write timeout.", 1180 writeTableStringName); 1181 } 1182 } 1183 } catch (Exception e) { 1184 LOG.error("Run regionMonitor failed", e); 1185 this.errorCode = ERROR_EXIT_CODE; 1186 } finally { 1187 this.done = true; 1188 } 1189 } 1190 this.done = true; 1191 } 1192 1193 /** 1194 * @return List of tables to use in test. 1195 */ 1196 private String[] generateMonitorTables(String[] monitorTargets) throws IOException { 1197 String[] returnTables = null; 1198 1199 if (this.useRegExp) { 1200 Pattern pattern = null; 1201 TableDescriptor[] tds = null; 1202 Set<String> tmpTables = new TreeSet<>(); 1203 try { 1204 LOG.debug(String.format("reading list of tables")); 1205 tds = this.admin.listTables(pattern); 1206 if (tds == null) { 1207 tds = new TableDescriptor[0]; 1208 } 1209 for (String monitorTarget : monitorTargets) { 1210 pattern = Pattern.compile(monitorTarget); 1211 for (TableDescriptor td : tds) { 1212 if (pattern.matcher(td.getTableName().getNameAsString()).matches()) { 1213 tmpTables.add(td.getTableName().getNameAsString()); 1214 } 1215 } 1216 } 1217 } catch (IOException e) { 1218 LOG.error("Communicate with admin failed", e); 1219 throw e; 1220 } 1221 1222 if (tmpTables.size() > 0) { 1223 returnTables = tmpTables.toArray(new String[tmpTables.size()]); 1224 } else { 1225 String msg = "No HTable found, tablePattern:" + Arrays.toString(monitorTargets); 1226 LOG.error(msg); 1227 this.errorCode = INIT_ERROR_EXIT_CODE; 1228 throw new TableNotFoundException(msg); 1229 } 1230 } else { 1231 returnTables = monitorTargets; 1232 } 1233 1234 return returnTables; 1235 } 1236 1237 /* 1238 * Canary entry point to monitor all the tables. 1239 */ 1240 private List<Future<Void>> sniff(TaskType taskType, RegionStdOutSink regionSink) 1241 throws Exception { 1242 LOG.debug("Reading list of tables"); 1243 List<Future<Void>> taskFutures = new LinkedList<>(); 1244 for (TableDescriptor td: admin.listTableDescriptors()) { 1245 if (admin.isTableEnabled(td.getTableName()) && 1246 (!td.getTableName().equals(writeTableName))) { 1247 LongAdder readLatency = 1248 regionSink.initializeAndGetReadLatencyForTable(td.getTableName().getNameAsString()); 1249 taskFutures.addAll(Canary.sniff(admin, sink, td, executor, taskType, this.rawScanEnabled, 1250 readLatency)); 1251 } 1252 } 1253 return taskFutures; 1254 } 1255 1256 private void checkWriteTableDistribution() throws IOException { 1257 if (!admin.tableExists(writeTableName)) { 1258 int numberOfServers = 1259 admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics().size(); 1260 if (numberOfServers == 0) { 1261 throw new IllegalStateException("No live regionservers"); 1262 } 1263 createWriteTable(numberOfServers); 1264 } 1265 1266 if (!admin.isTableEnabled(writeTableName)) { 1267 admin.enableTable(writeTableName); 1268 } 1269 1270 ClusterMetrics status = 1271 admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS, Option.MASTER)); 1272 int numberOfServers = status.getLiveServerMetrics().size(); 1273 if (status.getLiveServerMetrics().containsKey(status.getMasterName())) { 1274 numberOfServers -= 1; 1275 } 1276 1277 List<Pair<RegionInfo, ServerName>> pairs = 1278 MetaTableAccessor.getTableRegionsAndLocations(connection, writeTableName); 1279 int numberOfRegions = pairs.size(); 1280 if (numberOfRegions < numberOfServers * regionsLowerLimit 1281 || numberOfRegions > numberOfServers * regionsUpperLimit) { 1282 admin.disableTable(writeTableName); 1283 admin.deleteTable(writeTableName); 1284 createWriteTable(numberOfServers); 1285 } 1286 HashSet<ServerName> serverSet = new HashSet<>(); 1287 for (Pair<RegionInfo, ServerName> pair : pairs) { 1288 serverSet.add(pair.getSecond()); 1289 } 1290 int numberOfCoveredServers = serverSet.size(); 1291 if (numberOfCoveredServers < numberOfServers) { 1292 admin.balancer(); 1293 } 1294 } 1295 1296 private void createWriteTable(int numberOfServers) throws IOException { 1297 int numberOfRegions = (int)(numberOfServers * regionsLowerLimit); 1298 LOG.info("Number of live regionservers {}, pre-splitting the canary table into {} regions " + 1299 "(current lower limit of regions per server is {} and you can change it with config {}).", 1300 numberOfServers, numberOfRegions, regionsLowerLimit, 1301 HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY); 1302 HTableDescriptor desc = new HTableDescriptor(writeTableName); 1303 HColumnDescriptor family = new HColumnDescriptor(CANARY_TABLE_FAMILY_NAME); 1304 family.setMaxVersions(1); 1305 family.setTimeToLive(writeDataTTL); 1306 1307 desc.addFamily(family); 1308 byte[][] splits = new RegionSplitter.HexStringSplit().split(numberOfRegions); 1309 admin.createTable(desc, splits); 1310 } 1311 } 1312 1313 /** 1314 * Canary entry point for specified table. 1315 * @throws Exception 1316 */ 1317 private static List<Future<Void>> sniff(final Admin admin, final Sink sink, String tableName, 1318 ExecutorService executor, TaskType taskType, boolean rawScanEnabled, LongAdder readLatency) 1319 throws Exception { 1320 LOG.debug("Checking table is enabled and getting table descriptor for table {}", tableName); 1321 if (admin.isTableEnabled(TableName.valueOf(tableName))) { 1322 return Canary.sniff(admin, sink, admin.getDescriptor(TableName.valueOf(tableName)), 1323 executor, taskType, rawScanEnabled, readLatency); 1324 } else { 1325 LOG.warn("Table {} is not enabled", tableName); 1326 } 1327 return new LinkedList<>(); 1328 } 1329 1330 /* 1331 * Loops over regions of this table, and outputs information about the state. 1332 */ 1333 private static List<Future<Void>> sniff(final Admin admin, final Sink sink, 1334 TableDescriptor tableDesc, ExecutorService executor, TaskType taskType, 1335 boolean rawScanEnabled, LongAdder rwLatency) throws Exception { 1336 LOG.debug("Reading list of regions for table {}", tableDesc.getTableName()); 1337 try (Table table = admin.getConnection().getTable(tableDesc.getTableName())) { 1338 List<RegionTask> tasks = new ArrayList<>(); 1339 try (RegionLocator regionLocator = 1340 admin.getConnection().getRegionLocator(tableDesc.getTableName())) { 1341 for (HRegionLocation location: regionLocator.getAllRegionLocations()) { 1342 ServerName rs = location.getServerName(); 1343 RegionInfo region = location.getRegion(); 1344 tasks.add(new RegionTask(admin.getConnection(), region, rs, (RegionStdOutSink)sink, 1345 taskType, rawScanEnabled, rwLatency)); 1346 } 1347 return executor.invokeAll(tasks); 1348 } 1349 } catch (TableNotFoundException e) { 1350 return Collections.EMPTY_LIST; 1351 } 1352 } 1353 1354 // monitor for zookeeper mode 1355 private static class ZookeeperMonitor extends Monitor { 1356 private List<String> hosts; 1357 private final String znode; 1358 private final int timeout; 1359 1360 protected ZookeeperMonitor(Connection connection, String[] monitorTargets, boolean useRegExp, 1361 Sink sink, ExecutorService executor, boolean treatFailureAsError, long allowedFailures) { 1362 super(connection, monitorTargets, useRegExp, 1363 sink, executor, treatFailureAsError, allowedFailures); 1364 Configuration configuration = connection.getConfiguration(); 1365 znode = 1366 configuration.get(ZOOKEEPER_ZNODE_PARENT, 1367 DEFAULT_ZOOKEEPER_ZNODE_PARENT); 1368 timeout = configuration 1369 .getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT); 1370 ConnectStringParser parser = 1371 new ConnectStringParser(ZKConfig.getZKQuorumServersString(configuration)); 1372 hosts = Lists.newArrayList(); 1373 for (InetSocketAddress server : parser.getServerAddresses()) { 1374 hosts.add(server.toString()); 1375 } 1376 if (allowedFailures > (hosts.size() - 1) / 2) { 1377 LOG.warn("Confirm allowable number of failed ZooKeeper nodes, as quorum will " + 1378 "already be lost. Setting of {} failures is unexpected for {} ensemble size.", 1379 allowedFailures, hosts.size()); 1380 } 1381 } 1382 1383 @Override public void run() { 1384 List<ZookeeperTask> tasks = Lists.newArrayList(); 1385 ZookeeperStdOutSink zkSink = null; 1386 try { 1387 zkSink = this.getSink(); 1388 } catch (RuntimeException e) { 1389 LOG.error("Run ZooKeeperMonitor failed!", e); 1390 this.errorCode = ERROR_EXIT_CODE; 1391 } 1392 this.initialized = true; 1393 for (final String host : hosts) { 1394 tasks.add(new ZookeeperTask(connection, host, znode, timeout, zkSink)); 1395 } 1396 try { 1397 for (Future<Void> future : this.executor.invokeAll(tasks)) { 1398 try { 1399 future.get(); 1400 } catch (ExecutionException e) { 1401 LOG.error("Sniff zookeeper failed!", e); 1402 this.errorCode = ERROR_EXIT_CODE; 1403 } 1404 } 1405 } catch (InterruptedException e) { 1406 this.errorCode = ERROR_EXIT_CODE; 1407 Thread.currentThread().interrupt(); 1408 LOG.error("Sniff zookeeper interrupted!", e); 1409 } 1410 this.done = true; 1411 } 1412 1413 private ZookeeperStdOutSink getSink() { 1414 if (!(sink instanceof ZookeeperStdOutSink)) { 1415 throw new RuntimeException("Can only write to zookeeper sink"); 1416 } 1417 return ((ZookeeperStdOutSink) sink); 1418 } 1419 } 1420 1421 1422 /** 1423 * A monitor for regionserver mode 1424 */ 1425 private static class RegionServerMonitor extends Monitor { 1426 private boolean allRegions; 1427 1428 public RegionServerMonitor(Connection connection, String[] monitorTargets, boolean useRegExp, 1429 Sink sink, ExecutorService executor, boolean allRegions, 1430 boolean treatFailureAsError, long allowedFailures) { 1431 super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError, allowedFailures); 1432 this.allRegions = allRegions; 1433 } 1434 1435 private RegionServerStdOutSink getSink() { 1436 if (!(sink instanceof RegionServerStdOutSink)) { 1437 throw new RuntimeException("Can only write to regionserver sink"); 1438 } 1439 return ((RegionServerStdOutSink) sink); 1440 } 1441 1442 @Override 1443 public void run() { 1444 if (this.initAdmin() && this.checkNoTableNames()) { 1445 RegionServerStdOutSink regionServerSink = null; 1446 try { 1447 regionServerSink = this.getSink(); 1448 } catch (RuntimeException e) { 1449 LOG.error("Run RegionServerMonitor failed!", e); 1450 this.errorCode = ERROR_EXIT_CODE; 1451 } 1452 Map<String, List<RegionInfo>> rsAndRMap = this.filterRegionServerByName(); 1453 this.initialized = true; 1454 this.monitorRegionServers(rsAndRMap, regionServerSink); 1455 } 1456 this.done = true; 1457 } 1458 1459 private boolean checkNoTableNames() { 1460 List<String> foundTableNames = new ArrayList<>(); 1461 TableName[] tableNames = null; 1462 LOG.debug("Reading list of tables"); 1463 try { 1464 tableNames = this.admin.listTableNames(); 1465 } catch (IOException e) { 1466 LOG.error("Get listTableNames failed", e); 1467 this.errorCode = INIT_ERROR_EXIT_CODE; 1468 return false; 1469 } 1470 1471 if (this.targets == null || this.targets.length == 0) return true; 1472 1473 for (String target : this.targets) { 1474 for (TableName tableName : tableNames) { 1475 if (target.equals(tableName.getNameAsString())) { 1476 foundTableNames.add(target); 1477 } 1478 } 1479 } 1480 1481 if (foundTableNames.size() > 0) { 1482 System.err.println("Cannot pass a tablename when using the -regionserver " + 1483 "option, tablenames:" + foundTableNames.toString()); 1484 this.errorCode = USAGE_EXIT_CODE; 1485 } 1486 return foundTableNames.isEmpty(); 1487 } 1488 1489 private void monitorRegionServers(Map<String, List<RegionInfo>> rsAndRMap, RegionServerStdOutSink regionServerSink) { 1490 List<RegionServerTask> tasks = new ArrayList<>(); 1491 Map<String, AtomicLong> successMap = new HashMap<>(); 1492 Random rand = new Random(); 1493 for (Map.Entry<String, List<RegionInfo>> entry : rsAndRMap.entrySet()) { 1494 String serverName = entry.getKey(); 1495 AtomicLong successes = new AtomicLong(0); 1496 successMap.put(serverName, successes); 1497 if (entry.getValue().isEmpty()) { 1498 LOG.error("Regionserver not serving any regions - {}", serverName); 1499 } else if (this.allRegions) { 1500 for (RegionInfo region : entry.getValue()) { 1501 tasks.add(new RegionServerTask(this.connection, 1502 serverName, 1503 region, 1504 regionServerSink, 1505 successes)); 1506 } 1507 } else { 1508 // random select a region if flag not set 1509 RegionInfo region = entry.getValue().get(rand.nextInt(entry.getValue().size())); 1510 tasks.add(new RegionServerTask(this.connection, 1511 serverName, 1512 region, 1513 regionServerSink, 1514 successes)); 1515 } 1516 } 1517 try { 1518 for (Future<Void> future : this.executor.invokeAll(tasks)) { 1519 try { 1520 future.get(); 1521 } catch (ExecutionException e) { 1522 LOG.error("Sniff regionserver failed!", e); 1523 this.errorCode = ERROR_EXIT_CODE; 1524 } 1525 } 1526 if (this.allRegions) { 1527 for (Map.Entry<String, List<RegionInfo>> entry : rsAndRMap.entrySet()) { 1528 String serverName = entry.getKey(); 1529 LOG.info("Successfully read {} regions out of {} on regionserver {}", 1530 successMap.get(serverName), entry.getValue().size(), serverName); 1531 } 1532 } 1533 } catch (InterruptedException e) { 1534 this.errorCode = ERROR_EXIT_CODE; 1535 LOG.error("Sniff regionserver interrupted!", e); 1536 } 1537 } 1538 1539 private Map<String, List<RegionInfo>> filterRegionServerByName() { 1540 Map<String, List<RegionInfo>> regionServerAndRegionsMap = this.getAllRegionServerByName(); 1541 regionServerAndRegionsMap = this.doFilterRegionServerByName(regionServerAndRegionsMap); 1542 return regionServerAndRegionsMap; 1543 } 1544 1545 private Map<String, List<RegionInfo>> getAllRegionServerByName() { 1546 Map<String, List<RegionInfo>> rsAndRMap = new HashMap<>(); 1547 try { 1548 LOG.debug("Reading list of tables and locations"); 1549 List<TableDescriptor> tableDescs = this.admin.listTableDescriptors(); 1550 List<RegionInfo> regions = null; 1551 for (TableDescriptor tableDesc: tableDescs) { 1552 try (RegionLocator regionLocator = 1553 this.admin.getConnection().getRegionLocator(tableDesc.getTableName())) { 1554 for (HRegionLocation location : regionLocator.getAllRegionLocations()) { 1555 ServerName rs = location.getServerName(); 1556 String rsName = rs.getHostname(); 1557 RegionInfo r = location.getRegion(); 1558 if (rsAndRMap.containsKey(rsName)) { 1559 regions = rsAndRMap.get(rsName); 1560 } else { 1561 regions = new ArrayList<>(); 1562 rsAndRMap.put(rsName, regions); 1563 } 1564 regions.add(r); 1565 } 1566 } 1567 } 1568 1569 // get any live regionservers not serving any regions 1570 for (ServerName rs: this.admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)) 1571 .getLiveServerMetrics().keySet()) { 1572 String rsName = rs.getHostname(); 1573 if (!rsAndRMap.containsKey(rsName)) { 1574 rsAndRMap.put(rsName, Collections.<RegionInfo> emptyList()); 1575 } 1576 } 1577 } catch (IOException e) { 1578 LOG.error("Get HTables info failed", e); 1579 this.errorCode = INIT_ERROR_EXIT_CODE; 1580 } 1581 return rsAndRMap; 1582 } 1583 1584 private Map<String, List<RegionInfo>> doFilterRegionServerByName( 1585 Map<String, List<RegionInfo>> fullRsAndRMap) { 1586 1587 Map<String, List<RegionInfo>> filteredRsAndRMap = null; 1588 1589 if (this.targets != null && this.targets.length > 0) { 1590 filteredRsAndRMap = new HashMap<>(); 1591 Pattern pattern = null; 1592 Matcher matcher = null; 1593 boolean regExpFound = false; 1594 for (String rsName : this.targets) { 1595 if (this.useRegExp) { 1596 regExpFound = false; 1597 pattern = Pattern.compile(rsName); 1598 for (Map.Entry<String, List<RegionInfo>> entry : fullRsAndRMap.entrySet()) { 1599 matcher = pattern.matcher(entry.getKey()); 1600 if (matcher.matches()) { 1601 filteredRsAndRMap.put(entry.getKey(), entry.getValue()); 1602 regExpFound = true; 1603 } 1604 } 1605 if (!regExpFound) { 1606 LOG.info("No RegionServerInfo found, regionServerPattern {}", rsName); 1607 } 1608 } else { 1609 if (fullRsAndRMap.containsKey(rsName)) { 1610 filteredRsAndRMap.put(rsName, fullRsAndRMap.get(rsName)); 1611 } else { 1612 LOG.info("No RegionServerInfo found, regionServerName {}", rsName); 1613 } 1614 } 1615 } 1616 } else { 1617 filteredRsAndRMap = fullRsAndRMap; 1618 } 1619 return filteredRsAndRMap; 1620 } 1621 } 1622 1623 public static void main(String[] args) throws Exception { 1624 final Configuration conf = HBaseConfiguration.create(); 1625 1626 // Loading the generic options to conf 1627 new GenericOptionsParser(conf, args); 1628 1629 int numThreads = conf.getInt("hbase.canary.threads.num", MAX_THREADS_NUM); 1630 LOG.info("Execution thread count={}", numThreads); 1631 1632 int exitCode = 0; 1633 ExecutorService executor = new ScheduledThreadPoolExecutor(numThreads); 1634 try { 1635 exitCode = ToolRunner.run(conf, new Canary(executor), args); 1636 } finally { 1637 executor.shutdown(); 1638 } 1639 System.exit(exitCode); 1640 } 1641}