001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.tool; 021 022import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT; 023import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_ZNODE_PARENT; 024 025import java.io.Closeable; 026import java.io.IOException; 027import java.net.InetSocketAddress; 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.Collections; 031import java.util.EnumSet; 032import java.util.HashMap; 033import java.util.HashSet; 034import java.util.LinkedList; 035import java.util.List; 036import java.util.Map; 037import java.util.Random; 038import java.util.Set; 039import java.util.TreeSet; 040import java.util.concurrent.Callable; 041import java.util.concurrent.ConcurrentHashMap; 042import java.util.concurrent.ExecutionException; 043import java.util.concurrent.ExecutorService; 044import java.util.concurrent.Future; 045import java.util.concurrent.ScheduledThreadPoolExecutor; 046import java.util.concurrent.atomic.AtomicLong; 047import java.util.concurrent.atomic.LongAdder; 048import java.util.regex.Matcher; 049import java.util.regex.Pattern; 050 051import org.apache.commons.lang3.time.StopWatch; 052import org.apache.hadoop.conf.Configuration; 053import org.apache.hadoop.hbase.AuthUtil; 054import org.apache.hadoop.hbase.ChoreService; 055import org.apache.hadoop.hbase.ClusterMetrics; 056import org.apache.hadoop.hbase.ClusterMetrics.Option; 057import org.apache.hadoop.hbase.DoNotRetryIOException; 058import org.apache.hadoop.hbase.HBaseConfiguration; 059import org.apache.hadoop.hbase.HBaseInterfaceAudience; 060import org.apache.hadoop.hbase.HColumnDescriptor; 061import org.apache.hadoop.hbase.HConstants; 062import org.apache.hadoop.hbase.HRegionLocation; 063import org.apache.hadoop.hbase.HTableDescriptor; 064import org.apache.hadoop.hbase.MetaTableAccessor; 065import org.apache.hadoop.hbase.NamespaceDescriptor; 066import org.apache.hadoop.hbase.ScheduledChore; 067import org.apache.hadoop.hbase.ServerName; 068import org.apache.hadoop.hbase.TableName; 069import org.apache.hadoop.hbase.TableNotEnabledException; 070import org.apache.hadoop.hbase.TableNotFoundException; 071import org.apache.hadoop.hbase.client.Admin; 072import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 073import org.apache.hadoop.hbase.client.Connection; 074import org.apache.hadoop.hbase.client.ConnectionFactory; 075import org.apache.hadoop.hbase.client.Get; 076import org.apache.hadoop.hbase.client.Put; 077import org.apache.hadoop.hbase.client.RegionInfo; 078import org.apache.hadoop.hbase.client.RegionLocator; 079import org.apache.hadoop.hbase.client.ResultScanner; 080import org.apache.hadoop.hbase.client.Scan; 081import org.apache.hadoop.hbase.client.Table; 082import org.apache.hadoop.hbase.client.TableDescriptor; 083import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; 084import org.apache.hadoop.hbase.tool.CanaryTool.RegionTask.TaskType; 085import org.apache.hadoop.hbase.util.Bytes; 086import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 087import org.apache.hadoop.hbase.util.Pair; 088import org.apache.hadoop.hbase.util.ReflectionUtils; 089import org.apache.hadoop.hbase.util.RegionSplitter; 090import org.apache.hadoop.hbase.zookeeper.EmptyWatcher; 091import org.apache.hadoop.hbase.zookeeper.ZKConfig; 092import org.apache.hadoop.util.Tool; 093import org.apache.hadoop.util.ToolRunner; 094import org.apache.yetus.audience.InterfaceAudience; 095import org.apache.zookeeper.KeeperException; 096import org.apache.zookeeper.ZooKeeper; 097import org.apache.zookeeper.client.ConnectStringParser; 098import org.apache.zookeeper.data.Stat; 099import org.slf4j.Logger; 100import org.slf4j.LoggerFactory; 101 102import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 103import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 104 105/** 106 * HBase Canary Tool for "canary monitoring" of a running HBase cluster. 107 * 108 * There are three modes: 109 * <ol> 110 * <li>region mode (Default): For each region, try to get one row per column family outputting 111 * information on failure (ERROR) or else the latency. 112 * </li> 113 * 114 * <li>regionserver mode: For each regionserver try to get one row from one table selected 115 * randomly outputting information on failure (ERROR) or else the latency. 116 * </li> 117 * 118 * <li>zookeeper mode: for each zookeeper instance, selects a znode outputting information on 119 * failure (ERROR) or else the latency. 120 * </li> 121 * </ol> 122 */ 123@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 124public class CanaryTool implements Tool, Canary { 125 126 @Override 127 public int checkRegions(String[] targets) throws Exception { 128 String configuredReadTableTimeoutsStr = conf.get(HBASE_CANARY_REGION_READ_TABLE_TIMEOUT); 129 try { 130 if (configuredReadTableTimeoutsStr != null) { 131 populateReadTableTimeoutsMap(configuredReadTableTimeoutsStr); 132 } 133 } catch (IllegalArgumentException e) { 134 LOG.error("Constructing read table timeouts map failed ", e); 135 return USAGE_EXIT_CODE; 136 } 137 return runMonitor(targets); 138 } 139 140 @Override 141 public int checkRegionServers(String[] targets) throws Exception { 142 regionServerMode = true; 143 return runMonitor(targets); 144 } 145 146 @Override 147 public int checkZooKeeper() throws Exception { 148 zookeeperMode = true; 149 return runMonitor(null); 150 } 151 152 /** 153 * Sink interface used by the canary to output information 154 */ 155 public interface Sink { 156 long getReadFailureCount(); 157 long incReadFailureCount(); 158 Map<String,String> getReadFailures(); 159 void updateReadFailures(String regionName, String serverName); 160 long getWriteFailureCount(); 161 long incWriteFailureCount(); 162 Map<String,String> getWriteFailures(); 163 void updateWriteFailures(String regionName, String serverName); 164 long getReadSuccessCount(); 165 long incReadSuccessCount(); 166 long getWriteSuccessCount(); 167 long incWriteSuccessCount(); 168 } 169 170 /** 171 * Simple implementation of canary sink that allows plotting to a file or standard output. 172 */ 173 public static class StdOutSink implements Sink { 174 private AtomicLong readFailureCount = new AtomicLong(0), 175 writeFailureCount = new AtomicLong(0), 176 readSuccessCount = new AtomicLong(0), 177 writeSuccessCount = new AtomicLong(0); 178 private Map<String, String> readFailures = new ConcurrentHashMap<>(); 179 private Map<String, String> writeFailures = new ConcurrentHashMap<>(); 180 181 @Override 182 public long getReadFailureCount() { 183 return readFailureCount.get(); 184 } 185 186 @Override 187 public long incReadFailureCount() { 188 return readFailureCount.incrementAndGet(); 189 } 190 191 @Override 192 public Map<String, String> getReadFailures() { 193 return readFailures; 194 } 195 196 @Override 197 public void updateReadFailures(String regionName, String serverName) { 198 readFailures.put(regionName, serverName); 199 } 200 201 @Override 202 public long getWriteFailureCount() { 203 return writeFailureCount.get(); 204 } 205 206 @Override 207 public long incWriteFailureCount() { 208 return writeFailureCount.incrementAndGet(); 209 } 210 211 @Override 212 public Map<String, String> getWriteFailures() { 213 return writeFailures; 214 } 215 216 @Override 217 public void updateWriteFailures(String regionName, String serverName) { 218 writeFailures.put(regionName, serverName); 219 } 220 221 @Override 222 public long getReadSuccessCount() { 223 return readSuccessCount.get(); 224 } 225 226 @Override 227 public long incReadSuccessCount() { 228 return readSuccessCount.incrementAndGet(); 229 } 230 231 @Override 232 public long getWriteSuccessCount() { 233 return writeSuccessCount.get(); 234 } 235 236 @Override 237 public long incWriteSuccessCount() { 238 return writeSuccessCount.incrementAndGet(); 239 } 240 } 241 242 /** 243 * By RegionServer, for 'regionserver' mode. 244 */ 245 public static class RegionServerStdOutSink extends StdOutSink { 246 public void publishReadFailure(String table, String server) { 247 incReadFailureCount(); 248 LOG.error("Read from {} on {}", table, server); 249 } 250 251 public void publishReadTiming(String table, String server, long msTime) { 252 LOG.info("Read from {} on {} in {}ms", table, server, msTime); 253 } 254 } 255 256 /** 257 * Output for 'zookeeper' mode. 258 */ 259 public static class ZookeeperStdOutSink extends StdOutSink { 260 public void publishReadFailure(String znode, String server) { 261 incReadFailureCount(); 262 LOG.error("Read from {} on {}", znode, server); 263 } 264 265 public void publishReadTiming(String znode, String server, long msTime) { 266 LOG.info("Read from {} on {} in {}ms", znode, server, msTime); 267 } 268 } 269 270 /** 271 * By Region, for 'region' mode. 272 */ 273 public static class RegionStdOutSink extends StdOutSink { 274 private Map<String, LongAdder> perTableReadLatency = new HashMap<>(); 275 private LongAdder writeLatency = new LongAdder(); 276 private final Map<String, List<RegionTaskResult>> regionMap = new ConcurrentHashMap<>(); 277 278 public void publishReadFailure(ServerName serverName, RegionInfo region, Exception e) { 279 incReadFailureCount(); 280 LOG.error("Read from {} on {} failed", region.getRegionNameAsString(), serverName, e); 281 } 282 283 public void publishReadFailure(ServerName serverName, RegionInfo region, 284 ColumnFamilyDescriptor column, Exception e) { 285 incReadFailureCount(); 286 LOG.error("Read from {} on {} {} failed", region.getRegionNameAsString(), serverName, 287 column.getNameAsString(), e); 288 } 289 290 public void publishReadTiming(ServerName serverName, RegionInfo region, 291 ColumnFamilyDescriptor column, long msTime) { 292 RegionTaskResult rtr = new RegionTaskResult(region, region.getTable(), serverName, column); 293 rtr.setReadSuccess(); 294 rtr.setReadLatency(msTime); 295 List<RegionTaskResult> rtrs = regionMap.get(region.getRegionNameAsString()); 296 rtrs.add(rtr); 297 // Note that read success count will be equal to total column family read successes. 298 incReadSuccessCount(); 299 LOG.info("Read from {} on {} {} in {}ms", region.getRegionNameAsString(), serverName, 300 column.getNameAsString(), msTime); 301 } 302 303 public void publishWriteFailure(ServerName serverName, RegionInfo region, Exception e) { 304 incWriteFailureCount(); 305 LOG.error("Write to {} on {} failed", region.getRegionNameAsString(), serverName, e); 306 } 307 308 public void publishWriteFailure(ServerName serverName, RegionInfo region, 309 ColumnFamilyDescriptor column, Exception e) { 310 incWriteFailureCount(); 311 LOG.error("Write to {} on {} {} failed", region.getRegionNameAsString(), serverName, 312 column.getNameAsString(), e); 313 } 314 315 public void publishWriteTiming(ServerName serverName, RegionInfo region, 316 ColumnFamilyDescriptor column, long msTime) { 317 RegionTaskResult rtr = new RegionTaskResult(region, region.getTable(), serverName, column); 318 rtr.setWriteSuccess(); 319 rtr.setWriteLatency(msTime); 320 List<RegionTaskResult> rtrs = regionMap.get(region.getRegionNameAsString()); 321 rtrs.add(rtr); 322 // Note that write success count will be equal to total column family write successes. 323 incWriteSuccessCount(); 324 LOG.info("Write to {} on {} {} in {}ms", 325 region.getRegionNameAsString(), serverName, column.getNameAsString(), msTime); 326 } 327 328 public Map<String, LongAdder> getReadLatencyMap() { 329 return this.perTableReadLatency; 330 } 331 332 public LongAdder initializeAndGetReadLatencyForTable(String tableName) { 333 LongAdder initLatency = new LongAdder(); 334 this.perTableReadLatency.put(tableName, initLatency); 335 return initLatency; 336 } 337 338 public void initializeWriteLatency() { 339 this.writeLatency.reset(); 340 } 341 342 public LongAdder getWriteLatency() { 343 return this.writeLatency; 344 } 345 346 public Map<String, List<RegionTaskResult>> getRegionMap() { 347 return this.regionMap; 348 } 349 350 public int getTotalExpectedRegions() { 351 return this.regionMap.size(); 352 } 353 } 354 355 /** 356 * Run a single zookeeper Task and then exit. 357 */ 358 static class ZookeeperTask implements Callable<Void> { 359 private final Connection connection; 360 private final String host; 361 private String znode; 362 private final int timeout; 363 private ZookeeperStdOutSink sink; 364 365 public ZookeeperTask(Connection connection, String host, String znode, int timeout, 366 ZookeeperStdOutSink sink) { 367 this.connection = connection; 368 this.host = host; 369 this.znode = znode; 370 this.timeout = timeout; 371 this.sink = sink; 372 } 373 374 @Override public Void call() throws Exception { 375 ZooKeeper zooKeeper = null; 376 try { 377 zooKeeper = new ZooKeeper(host, timeout, EmptyWatcher.instance); 378 Stat exists = zooKeeper.exists(znode, false); 379 StopWatch stopwatch = new StopWatch(); 380 stopwatch.start(); 381 zooKeeper.getData(znode, false, exists); 382 stopwatch.stop(); 383 sink.publishReadTiming(znode, host, stopwatch.getTime()); 384 } catch (KeeperException | InterruptedException e) { 385 sink.publishReadFailure(znode, host); 386 } finally { 387 if (zooKeeper != null) { 388 zooKeeper.close(); 389 } 390 } 391 return null; 392 } 393 } 394 395 /** 396 * Run a single Region Task and then exit. For each column family of the Region, get one row and 397 * output latency or failure. 398 */ 399 static class RegionTask implements Callable<Void> { 400 public enum TaskType{ 401 READ, WRITE 402 } 403 private Connection connection; 404 private RegionInfo region; 405 private RegionStdOutSink sink; 406 private TaskType taskType; 407 private boolean rawScanEnabled; 408 private ServerName serverName; 409 private LongAdder readWriteLatency; 410 411 RegionTask(Connection connection, RegionInfo region, ServerName serverName, 412 RegionStdOutSink sink, TaskType taskType, boolean rawScanEnabled, LongAdder rwLatency) { 413 this.connection = connection; 414 this.region = region; 415 this.serverName = serverName; 416 this.sink = sink; 417 this.taskType = taskType; 418 this.rawScanEnabled = rawScanEnabled; 419 this.readWriteLatency = rwLatency; 420 } 421 422 @Override 423 public Void call() { 424 switch (taskType) { 425 case READ: 426 return read(); 427 case WRITE: 428 return write(); 429 default: 430 return read(); 431 } 432 } 433 434 public Void read() { 435 Table table = null; 436 TableDescriptor tableDesc = null; 437 try { 438 LOG.debug("Reading table descriptor for table {}", region.getTable()); 439 table = connection.getTable(region.getTable()); 440 tableDesc = table.getDescriptor(); 441 } catch (IOException e) { 442 LOG.debug("sniffRegion {} of {} failed", region.getEncodedName(), e); 443 sink.publishReadFailure(serverName, region, e); 444 if (table != null) { 445 try { 446 table.close(); 447 } catch (IOException ioe) { 448 LOG.error("Close table failed", e); 449 } 450 } 451 return null; 452 } 453 454 byte[] startKey = null; 455 Get get = null; 456 Scan scan = null; 457 ResultScanner rs = null; 458 StopWatch stopWatch = new StopWatch(); 459 for (ColumnFamilyDescriptor column : tableDesc.getColumnFamilies()) { 460 stopWatch.reset(); 461 startKey = region.getStartKey(); 462 // Can't do a get on empty start row so do a Scan of first element if any instead. 463 if (startKey.length > 0) { 464 get = new Get(startKey); 465 get.setCacheBlocks(false); 466 get.setFilter(new FirstKeyOnlyFilter()); 467 get.addFamily(column.getName()); 468 } else { 469 scan = new Scan(); 470 LOG.debug("rawScan {} for {}", rawScanEnabled, tableDesc.getTableName()); 471 scan.setRaw(rawScanEnabled); 472 scan.setCaching(1); 473 scan.setCacheBlocks(false); 474 scan.setFilter(new FirstKeyOnlyFilter()); 475 scan.addFamily(column.getName()); 476 scan.setMaxResultSize(1L); 477 scan.setOneRowLimit(); 478 } 479 LOG.debug("Reading from {} {} {} {}", tableDesc.getTableName(), 480 region.getRegionNameAsString(), column.getNameAsString(), 481 Bytes.toStringBinary(startKey)); 482 try { 483 stopWatch.start(); 484 if (startKey.length > 0) { 485 table.get(get); 486 } else { 487 rs = table.getScanner(scan); 488 rs.next(); 489 } 490 stopWatch.stop(); 491 this.readWriteLatency.add(stopWatch.getTime()); 492 sink.publishReadTiming(serverName, region, column, stopWatch.getTime()); 493 } catch (Exception e) { 494 sink.publishReadFailure(serverName, region, column, e); 495 sink.updateReadFailures(region.getRegionNameAsString(), 496 serverName == null? "NULL": serverName.getHostname()); 497 } finally { 498 if (rs != null) { 499 rs.close(); 500 } 501 scan = null; 502 get = null; 503 } 504 } 505 try { 506 table.close(); 507 } catch (IOException e) { 508 LOG.error("Close table failed", e); 509 } 510 return null; 511 } 512 513 /** 514 * Check writes for the canary table 515 */ 516 private Void write() { 517 Table table = null; 518 TableDescriptor tableDesc = null; 519 try { 520 table = connection.getTable(region.getTable()); 521 tableDesc = table.getDescriptor(); 522 byte[] rowToCheck = region.getStartKey(); 523 if (rowToCheck.length == 0) { 524 rowToCheck = new byte[]{0x0}; 525 } 526 int writeValueSize = 527 connection.getConfiguration().getInt(HConstants.HBASE_CANARY_WRITE_VALUE_SIZE_KEY, 10); 528 for (ColumnFamilyDescriptor column : tableDesc.getColumnFamilies()) { 529 Put put = new Put(rowToCheck); 530 byte[] value = new byte[writeValueSize]; 531 Bytes.random(value); 532 put.addColumn(column.getName(), HConstants.EMPTY_BYTE_ARRAY, value); 533 534 LOG.debug("Writing to {} {} {} {}", 535 tableDesc.getTableName(), region.getRegionNameAsString(), column.getNameAsString(), 536 Bytes.toStringBinary(rowToCheck)); 537 try { 538 long startTime = System.currentTimeMillis(); 539 table.put(put); 540 long time = System.currentTimeMillis() - startTime; 541 this.readWriteLatency.add(time); 542 sink.publishWriteTiming(serverName, region, column, time); 543 } catch (Exception e) { 544 sink.publishWriteFailure(serverName, region, column, e); 545 } 546 } 547 table.close(); 548 } catch (IOException e) { 549 sink.publishWriteFailure(serverName, region, e); 550 sink.updateWriteFailures(region.getRegionNameAsString(), serverName.getHostname() ); 551 } 552 return null; 553 } 554 } 555 556 /** 557 * Run a single RegionServer Task and then exit. 558 * Get one row from a region on the regionserver and output latency or the failure. 559 */ 560 static class RegionServerTask implements Callable<Void> { 561 private Connection connection; 562 private String serverName; 563 private RegionInfo region; 564 private RegionServerStdOutSink sink; 565 private AtomicLong successes; 566 567 RegionServerTask(Connection connection, String serverName, RegionInfo region, 568 RegionServerStdOutSink sink, AtomicLong successes) { 569 this.connection = connection; 570 this.serverName = serverName; 571 this.region = region; 572 this.sink = sink; 573 this.successes = successes; 574 } 575 576 @Override 577 public Void call() { 578 TableName tableName = null; 579 Table table = null; 580 Get get = null; 581 byte[] startKey = null; 582 Scan scan = null; 583 StopWatch stopWatch = new StopWatch(); 584 // monitor one region on every region server 585 stopWatch.reset(); 586 try { 587 tableName = region.getTable(); 588 table = connection.getTable(tableName); 589 startKey = region.getStartKey(); 590 // Can't do a get on empty start row so do a Scan of first element if any instead. 591 LOG.debug("Reading from {} {} {} {}", 592 serverName, region.getTable(), region.getRegionNameAsString(), 593 Bytes.toStringBinary(startKey)); 594 if (startKey.length > 0) { 595 get = new Get(startKey); 596 get.setCacheBlocks(false); 597 get.setFilter(new FirstKeyOnlyFilter()); 598 stopWatch.start(); 599 table.get(get); 600 stopWatch.stop(); 601 } else { 602 scan = new Scan(); 603 scan.setCacheBlocks(false); 604 scan.setFilter(new FirstKeyOnlyFilter()); 605 scan.setCaching(1); 606 scan.setMaxResultSize(1L); 607 scan.setOneRowLimit(); 608 stopWatch.start(); 609 ResultScanner s = table.getScanner(scan); 610 s.next(); 611 s.close(); 612 stopWatch.stop(); 613 } 614 successes.incrementAndGet(); 615 sink.publishReadTiming(tableName.getNameAsString(), serverName, stopWatch.getTime()); 616 } catch (TableNotFoundException tnfe) { 617 LOG.error("Table may be deleted", tnfe); 618 // This is ignored because it doesn't imply that the regionserver is dead 619 } catch (TableNotEnabledException tnee) { 620 // This is considered a success since we got a response. 621 successes.incrementAndGet(); 622 LOG.debug("The targeted table was disabled. Assuming success."); 623 } catch (DoNotRetryIOException dnrioe) { 624 sink.publishReadFailure(tableName.getNameAsString(), serverName); 625 LOG.error(dnrioe.toString(), dnrioe); 626 } catch (IOException e) { 627 sink.publishReadFailure(tableName.getNameAsString(), serverName); 628 LOG.error(e.toString(), e); 629 } finally { 630 if (table != null) { 631 try { 632 table.close(); 633 } catch (IOException e) {/* DO NOTHING */ 634 LOG.error("Close table failed", e); 635 } 636 } 637 scan = null; 638 get = null; 639 startKey = null; 640 } 641 return null; 642 } 643 } 644 645 private static final int USAGE_EXIT_CODE = 1; 646 private static final int INIT_ERROR_EXIT_CODE = 2; 647 private static final int TIMEOUT_ERROR_EXIT_CODE = 3; 648 private static final int ERROR_EXIT_CODE = 4; 649 private static final int FAILURE_EXIT_CODE = 5; 650 651 private static final long DEFAULT_INTERVAL = 60000; 652 653 private static final long DEFAULT_TIMEOUT = 600000; // 10 mins 654 private static final int MAX_THREADS_NUM = 16; // #threads to contact regions 655 656 private static final Logger LOG = LoggerFactory.getLogger(Canary.class); 657 658 public static final TableName DEFAULT_WRITE_TABLE_NAME = TableName.valueOf( 659 NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "canary"); 660 661 private static final String CANARY_TABLE_FAMILY_NAME = "Test"; 662 663 private Configuration conf = null; 664 private long interval = 0; 665 private Sink sink = null; 666 667 /** 668 * True if we are to run in 'regionServer' mode. 669 */ 670 private boolean regionServerMode = false; 671 672 /** 673 * True if we are to run in zookeeper 'mode'. 674 */ 675 private boolean zookeeperMode = false; 676 677 /** 678 * This is a Map of table to timeout. The timeout is for reading all regions in the table; i.e. 679 * we aggregate time to fetch each region and it needs to be less than this value else we 680 * log an ERROR. 681 */ 682 private HashMap<String, Long> configuredReadTableTimeouts = new HashMap<>(); 683 684 public static final String HBASE_CANARY_REGIONSERVER_ALL_REGIONS 685 = "hbase.canary.regionserver_all_regions"; 686 687 public static final String HBASE_CANARY_REGION_WRITE_SNIFFING 688 = "hbase.canary.region.write.sniffing"; 689 public static final String HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT 690 = "hbase.canary.region.write.table.timeout"; 691 public static final String HBASE_CANARY_REGION_WRITE_TABLE_NAME 692 = "hbase.canary.region.write.table.name"; 693 public static final String HBASE_CANARY_REGION_READ_TABLE_TIMEOUT 694 = "hbase.canary.region.read.table.timeout"; 695 696 public static final String HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES 697 = "hbase.canary.zookeeper.permitted.failures"; 698 699 public static final String HBASE_CANARY_USE_REGEX = "hbase.canary.use.regex"; 700 public static final String HBASE_CANARY_TIMEOUT = "hbase.canary.timeout"; 701 public static final String HBASE_CANARY_FAIL_ON_ERROR = "hbase.canary.fail.on.error"; 702 703 704 private ExecutorService executor; // threads to retrieve data from regionservers 705 706 public CanaryTool() { 707 this(new ScheduledThreadPoolExecutor(1)); 708 } 709 710 public CanaryTool(ExecutorService executor) { 711 this(executor, null); 712 } 713 714 @VisibleForTesting 715 CanaryTool(ExecutorService executor, Sink sink) { 716 this.executor = executor; 717 this.sink = sink; 718 } 719 720 CanaryTool(Configuration conf, ExecutorService executor) { 721 this(conf, executor, null); 722 } 723 724 CanaryTool(Configuration conf, ExecutorService executor, Sink sink) { 725 this(executor, sink); 726 setConf(conf); 727 } 728 729 @Override 730 public Configuration getConf() { 731 return conf; 732 } 733 734 @Override 735 public void setConf(Configuration conf) { 736 if (conf == null) { 737 conf = HBaseConfiguration.create(); 738 } 739 this.conf = conf; 740 } 741 742 private int parseArgs(String[] args) { 743 int index = -1; 744 long permittedFailures = 0; 745 boolean regionServerAllRegions = false, writeSniffing = false; 746 String readTableTimeoutsStr = null; 747 748 // Process command line args 749 for (int i = 0; i < args.length; i++) { 750 String cmd = args[i]; 751 752 if (cmd.startsWith("-")) { 753 if (index >= 0) { 754 // command line args must be in the form: [opts] [table 1 [table 2 ...]] 755 System.err.println("Invalid command line options"); 756 printUsageAndExit(); 757 } 758 759 if (cmd.equals("-help") || cmd.equals("-h")) { 760 // user asked for help, print the help and quit. 761 printUsageAndExit(); 762 } else if (cmd.equals("-daemon") && interval == 0) { 763 // user asked for daemon mode, set a default interval between checks 764 interval = DEFAULT_INTERVAL; 765 } else if (cmd.equals("-interval")) { 766 // user has specified an interval for canary breaths (-interval N) 767 i++; 768 769 if (i == args.length) { 770 System.err.println("-interval takes a numeric seconds value argument."); 771 printUsageAndExit(); 772 } 773 774 try { 775 interval = Long.parseLong(args[i]) * 1000; 776 } catch (NumberFormatException e) { 777 System.err.println("-interval needs a numeric value argument."); 778 printUsageAndExit(); 779 } 780 } else if (cmd.equals("-zookeeper")) { 781 this.zookeeperMode = true; 782 } else if(cmd.equals("-regionserver")) { 783 this.regionServerMode = true; 784 } else if(cmd.equals("-allRegions")) { 785 conf.setBoolean(HBASE_CANARY_REGIONSERVER_ALL_REGIONS, true); 786 regionServerAllRegions = true; 787 } else if(cmd.equals("-writeSniffing")) { 788 writeSniffing = true; 789 conf.setBoolean(HBASE_CANARY_REGION_WRITE_SNIFFING, true); 790 } else if(cmd.equals("-treatFailureAsError") || cmd.equals("-failureAsError")) { 791 conf.setBoolean(HBASE_CANARY_FAIL_ON_ERROR, true); 792 } else if (cmd.equals("-e")) { 793 conf.setBoolean(HBASE_CANARY_USE_REGEX, true); 794 } else if (cmd.equals("-t")) { 795 i++; 796 797 if (i == args.length) { 798 System.err.println("-t takes a numeric milliseconds value argument."); 799 printUsageAndExit(); 800 } 801 long timeout = 0; 802 try { 803 timeout = Long.parseLong(args[i]); 804 } catch (NumberFormatException e) { 805 System.err.println("-t takes a numeric milliseconds value argument."); 806 printUsageAndExit(); 807 } 808 conf.setLong(HBASE_CANARY_TIMEOUT, timeout); 809 } else if(cmd.equals("-writeTableTimeout")) { 810 i++; 811 812 if (i == args.length) { 813 System.err.println("-writeTableTimeout takes a numeric milliseconds value argument."); 814 printUsageAndExit(); 815 } 816 long configuredWriteTableTimeout = 0; 817 try { 818 configuredWriteTableTimeout = Long.parseLong(args[i]); 819 } catch (NumberFormatException e) { 820 System.err.println("-writeTableTimeout takes a numeric milliseconds value argument."); 821 printUsageAndExit(); 822 } 823 conf.setLong(HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT, configuredWriteTableTimeout); 824 } else if (cmd.equals("-writeTable")) { 825 i++; 826 827 if (i == args.length) { 828 System.err.println("-writeTable takes a string tablename value argument."); 829 printUsageAndExit(); 830 } 831 conf.set(HBASE_CANARY_REGION_WRITE_TABLE_NAME, args[i]); 832 } else if (cmd.equals("-f")) { 833 i++; 834 835 if (i == args.length) { 836 System.err 837 .println("-f needs a boolean value argument (true|false)."); 838 printUsageAndExit(); 839 } 840 841 conf.setBoolean(HBASE_CANARY_FAIL_ON_ERROR, Boolean.parseBoolean(args[i])); 842 } else if (cmd.equals("-readTableTimeouts")) { 843 i++; 844 845 if (i == args.length) { 846 System.err.println("-readTableTimeouts needs a comma-separated list of read " + 847 "millisecond timeouts per table (without spaces)."); 848 printUsageAndExit(); 849 } 850 readTableTimeoutsStr = args[i]; 851 conf.set(HBASE_CANARY_REGION_READ_TABLE_TIMEOUT, readTableTimeoutsStr); 852 } else if (cmd.equals("-permittedZookeeperFailures")) { 853 i++; 854 855 if (i == args.length) { 856 System.err.println("-permittedZookeeperFailures needs a numeric value argument."); 857 printUsageAndExit(); 858 } 859 try { 860 permittedFailures = Long.parseLong(args[i]); 861 } catch (NumberFormatException e) { 862 System.err.println("-permittedZookeeperFailures needs a numeric value argument."); 863 printUsageAndExit(); 864 } 865 conf.setLong(HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES, permittedFailures); 866 } else { 867 // no options match 868 System.err.println(cmd + " options is invalid."); 869 printUsageAndExit(); 870 } 871 } else if (index < 0) { 872 // keep track of first table name specified by the user 873 index = i; 874 } 875 } 876 if (regionServerAllRegions && !this.regionServerMode) { 877 System.err.println("-allRegions can only be specified in regionserver mode."); 878 printUsageAndExit(); 879 } 880 if (this.zookeeperMode) { 881 if (this.regionServerMode || regionServerAllRegions || writeSniffing) { 882 System.err.println("-zookeeper is exclusive and cannot be combined with " 883 + "other modes."); 884 printUsageAndExit(); 885 } 886 } 887 if (permittedFailures != 0 && !this.zookeeperMode) { 888 System.err.println("-permittedZookeeperFailures requires -zookeeper mode."); 889 printUsageAndExit(); 890 } 891 if (readTableTimeoutsStr != null && (this.regionServerMode || this.zookeeperMode)) { 892 System.err.println("-readTableTimeouts can only be configured in region mode."); 893 printUsageAndExit(); 894 } 895 return index; 896 } 897 898 @Override 899 public int run(String[] args) throws Exception { 900 int index = parseArgs(args); 901 String[] monitorTargets = null; 902 903 if (index >= 0) { 904 int length = args.length - index; 905 monitorTargets = new String[length]; 906 System.arraycopy(args, index, monitorTargets, 0, length); 907 } 908 909 if (zookeeperMode) { 910 return checkZooKeeper(); 911 } else if (regionServerMode) { 912 return checkRegionServers(monitorTargets); 913 } else { 914 return checkRegions(monitorTargets); 915 } 916 } 917 918 private int runMonitor(String[] monitorTargets) throws Exception { 919 ChoreService choreService = null; 920 921 // Launches chore for refreshing kerberos credentials if security is enabled. 922 // Please see http://hbase.apache.org/book.html#_running_canary_in_a_kerberos_enabled_cluster 923 // for more details. 924 final ScheduledChore authChore = AuthUtil.getAuthChore(conf); 925 if (authChore != null) { 926 choreService = new ChoreService("CANARY_TOOL"); 927 choreService.scheduleChore(authChore); 928 } 929 930 // Start to prepare the stuffs 931 Monitor monitor = null; 932 Thread monitorThread; 933 long startTime = 0; 934 long currentTimeLength = 0; 935 boolean failOnError = conf.getBoolean(HBASE_CANARY_FAIL_ON_ERROR, true); 936 long timeout = conf.getLong(HBASE_CANARY_TIMEOUT, DEFAULT_TIMEOUT); 937 // Get a connection to use in below. 938 try (Connection connection = ConnectionFactory.createConnection(this.conf)) { 939 do { 940 // Do monitor !! 941 try { 942 monitor = this.newMonitor(connection, monitorTargets); 943 monitorThread = new Thread(monitor, "CanaryMonitor-" + System.currentTimeMillis()); 944 startTime = System.currentTimeMillis(); 945 monitorThread.start(); 946 while (!monitor.isDone()) { 947 // wait for 1 sec 948 Thread.sleep(1000); 949 // exit if any error occurs 950 if (failOnError && monitor.hasError()) { 951 monitorThread.interrupt(); 952 if (monitor.initialized) { 953 return monitor.errorCode; 954 } else { 955 return INIT_ERROR_EXIT_CODE; 956 } 957 } 958 currentTimeLength = System.currentTimeMillis() - startTime; 959 if (currentTimeLength > timeout) { 960 LOG.error("The monitor is running too long (" + currentTimeLength 961 + ") after timeout limit:" + timeout 962 + " will be killed itself !!"); 963 if (monitor.initialized) { 964 return TIMEOUT_ERROR_EXIT_CODE; 965 } else { 966 return INIT_ERROR_EXIT_CODE; 967 } 968 } 969 } 970 971 if (failOnError && monitor.finalCheckForErrors()) { 972 monitorThread.interrupt(); 973 return monitor.errorCode; 974 } 975 } finally { 976 if (monitor != null) monitor.close(); 977 } 978 979 Thread.sleep(interval); 980 } while (interval > 0); 981 } // try-with-resources close 982 983 if (choreService != null) { 984 choreService.shutdown(); 985 } 986 return monitor.errorCode; 987 } 988 989 @Override 990 public Map<String, String> getReadFailures() { 991 return sink.getReadFailures(); 992 } 993 994 @Override 995 public Map<String, String> getWriteFailures() { 996 return sink.getWriteFailures(); 997 } 998 999 private void printUsageAndExit() { 1000 System.err.println( 1001 "Usage: canary [OPTIONS] [<TABLE1> [<TABLE2]...] | [<REGIONSERVER1> [<REGIONSERVER2]..]"); 1002 System.err.println("Where [OPTIONS] are:"); 1003 System.err.println(" -h,-help show this help and exit."); 1004 System.err.println(" -regionserver set 'regionserver mode'; gets row from random region on " + 1005 "server"); 1006 System.err.println(" -allRegions get from ALL regions when 'regionserver mode', not just " + 1007 "random one."); 1008 System.err.println(" -zookeeper set 'zookeeper mode'; grab zookeeper.znode.parent on " + 1009 "each ensemble member"); 1010 System.err.println(" -daemon continuous check at defined intervals."); 1011 System.err.println(" -interval <N> interval between checks in seconds"); 1012 System.err.println(" -e consider table/regionserver argument as regular " + 1013 "expression"); 1014 System.err.println(" -f <B> exit on first error; default=true"); 1015 System.err.println(" -failureAsError treat read/write failure as error"); 1016 System.err.println(" -t <N> timeout for canary-test run; default=600000ms"); 1017 System.err.println(" -writeSniffing enable write sniffing"); 1018 System.err.println(" -writeTable the table used for write sniffing; default=hbase:canary"); 1019 System.err.println(" -writeTableTimeout <N> timeout for writeTable; default=600000ms"); 1020 System.err.println(" -readTableTimeouts <tableName>=<read timeout>," + 1021 "<tableName>=<read timeout>,..."); 1022 System.err.println(" comma-separated list of table read timeouts " + 1023 "(no spaces);"); 1024 System.err.println(" logs 'ERROR' if takes longer. default=600000ms"); 1025 System.err.println(" -permittedZookeeperFailures <N> Ignore first N failures attempting to "); 1026 System.err.println(" connect to individual zookeeper nodes in ensemble"); 1027 System.err.println(""); 1028 System.err.println(" -D<configProperty>=<value> to assign or override configuration params"); 1029 System.err.println(" -Dhbase.canary.read.raw.enabled=<true/false> Set to enable/disable " + 1030 "raw scan; default=false"); 1031 System.err.println(""); 1032 System.err.println("Canary runs in one of three modes: region (default), regionserver, or " + 1033 "zookeeper."); 1034 System.err.println("To sniff/probe all regions, pass no arguments."); 1035 System.err.println("To sniff/probe all regions of a table, pass tablename."); 1036 System.err.println("To sniff/probe regionservers, pass -regionserver, etc."); 1037 System.err.println("See http://hbase.apache.org/book.html#_canary for Canary documentation."); 1038 System.exit(USAGE_EXIT_CODE); 1039 } 1040 1041 Sink getSink(Configuration configuration, Class clazz) { 1042 // In test context, this.sink might be set. Use it if non-null. For testing. 1043 return this.sink != null? this.sink: 1044 (Sink)ReflectionUtils.newInstance(configuration.getClass("hbase.canary.sink.class", 1045 clazz, Sink.class)); 1046 } 1047 1048 /** 1049 * Canary region mode-specific data structure which stores information about each region 1050 * to be scanned 1051 */ 1052 public static class RegionTaskResult { 1053 private RegionInfo region; 1054 private TableName tableName; 1055 private ServerName serverName; 1056 private ColumnFamilyDescriptor column; 1057 private AtomicLong readLatency = null; 1058 private AtomicLong writeLatency = null; 1059 private boolean readSuccess = false; 1060 private boolean writeSuccess = false; 1061 1062 public RegionTaskResult(RegionInfo region, TableName tableName, ServerName serverName, 1063 ColumnFamilyDescriptor column) { 1064 this.region = region; 1065 this.tableName = tableName; 1066 this.serverName = serverName; 1067 this.column = column; 1068 } 1069 1070 public RegionInfo getRegionInfo() { 1071 return this.region; 1072 } 1073 1074 public String getRegionNameAsString() { 1075 return this.region.getRegionNameAsString(); 1076 } 1077 1078 public TableName getTableName() { 1079 return this.tableName; 1080 } 1081 1082 public String getTableNameAsString() { 1083 return this.tableName.getNameAsString(); 1084 } 1085 1086 public ServerName getServerName() { 1087 return this.serverName; 1088 } 1089 1090 public String getServerNameAsString() { 1091 return this.serverName.getServerName(); 1092 } 1093 1094 public ColumnFamilyDescriptor getColumnFamily() { 1095 return this.column; 1096 } 1097 1098 public String getColumnFamilyNameAsString() { 1099 return this.column.getNameAsString(); 1100 } 1101 1102 public long getReadLatency() { 1103 if (this.readLatency == null) { 1104 return -1; 1105 } 1106 return this.readLatency.get(); 1107 } 1108 1109 public void setReadLatency(long readLatency) { 1110 if (this.readLatency != null) { 1111 this.readLatency.set(readLatency); 1112 } else { 1113 this.readLatency = new AtomicLong(readLatency); 1114 } 1115 } 1116 1117 public long getWriteLatency() { 1118 if (this.writeLatency == null) { 1119 return -1; 1120 } 1121 return this.writeLatency.get(); 1122 } 1123 1124 public void setWriteLatency(long writeLatency) { 1125 if (this.writeLatency != null) { 1126 this.writeLatency.set(writeLatency); 1127 } else { 1128 this.writeLatency = new AtomicLong(writeLatency); 1129 } 1130 } 1131 1132 public boolean isReadSuccess() { 1133 return this.readSuccess; 1134 } 1135 1136 public void setReadSuccess() { 1137 this.readSuccess = true; 1138 } 1139 1140 public boolean isWriteSuccess() { 1141 return this.writeSuccess; 1142 } 1143 1144 public void setWriteSuccess() { 1145 this.writeSuccess = true; 1146 } 1147 } 1148 1149 /** 1150 * A Factory method for {@link Monitor}. 1151 * Makes a RegionServerMonitor, or a ZooKeeperMonitor, or a RegionMonitor. 1152 * @return a Monitor instance 1153 */ 1154 private Monitor newMonitor(final Connection connection, String[] monitorTargets) { 1155 Monitor monitor; 1156 boolean useRegExp = conf.getBoolean(HBASE_CANARY_USE_REGEX, false); 1157 boolean regionServerAllRegions 1158 = conf.getBoolean(HBASE_CANARY_REGIONSERVER_ALL_REGIONS, false); 1159 boolean failOnError 1160 = conf.getBoolean(HBASE_CANARY_FAIL_ON_ERROR, true); 1161 int permittedFailures 1162 = conf.getInt(HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES, 0); 1163 boolean writeSniffing 1164 = conf.getBoolean(HBASE_CANARY_REGION_WRITE_SNIFFING, false); 1165 String writeTableName = conf.get(HBASE_CANARY_REGION_WRITE_TABLE_NAME, 1166 DEFAULT_WRITE_TABLE_NAME.getNameAsString()); 1167 long configuredWriteTableTimeout 1168 = conf.getLong(HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT, DEFAULT_TIMEOUT); 1169 1170 if (this.regionServerMode) { 1171 monitor = 1172 new RegionServerMonitor(connection, monitorTargets, useRegExp, 1173 getSink(connection.getConfiguration(), RegionServerStdOutSink.class), 1174 this.executor, regionServerAllRegions, 1175 failOnError, permittedFailures); 1176 1177 } else if (this.zookeeperMode) { 1178 monitor = 1179 new ZookeeperMonitor(connection, monitorTargets, useRegExp, 1180 getSink(connection.getConfiguration(), ZookeeperStdOutSink.class), 1181 this.executor, failOnError, permittedFailures); 1182 } else { 1183 monitor = 1184 new RegionMonitor(connection, monitorTargets, useRegExp, 1185 getSink(connection.getConfiguration(), RegionStdOutSink.class), 1186 this.executor, writeSniffing, 1187 TableName.valueOf(writeTableName), failOnError, configuredReadTableTimeouts, 1188 configuredWriteTableTimeout, permittedFailures); 1189 } 1190 return monitor; 1191 } 1192 1193 private void populateReadTableTimeoutsMap(String configuredReadTableTimeoutsStr) { 1194 String[] tableTimeouts = configuredReadTableTimeoutsStr.split(","); 1195 for (String tT : tableTimeouts) { 1196 String[] nameTimeout = tT.split("="); 1197 if (nameTimeout.length < 2) { 1198 throw new IllegalArgumentException("Each -readTableTimeouts argument must be of the form " + 1199 "<tableName>=<read timeout> (without spaces)."); 1200 } 1201 long timeoutVal; 1202 try { 1203 timeoutVal = Long.parseLong(nameTimeout[1]); 1204 } catch (NumberFormatException e) { 1205 throw new IllegalArgumentException("-readTableTimeouts read timeout for each table" + 1206 " must be a numeric value argument."); 1207 } 1208 configuredReadTableTimeouts.put(nameTimeout[0], timeoutVal); 1209 } 1210 } 1211 /** 1212 * A Monitor super-class can be extended by users 1213 */ 1214 public static abstract class Monitor implements Runnable, Closeable { 1215 protected Connection connection; 1216 protected Admin admin; 1217 /** 1218 * 'Target' dependent on 'mode'. Could be Tables or RegionServers or ZNodes. 1219 * Passed on the command-line as arguments. 1220 */ 1221 protected String[] targets; 1222 protected boolean useRegExp; 1223 protected boolean treatFailureAsError; 1224 protected boolean initialized = false; 1225 1226 protected boolean done = false; 1227 protected int errorCode = 0; 1228 protected long allowedFailures = 0; 1229 protected Sink sink; 1230 protected ExecutorService executor; 1231 1232 public boolean isDone() { 1233 return done; 1234 } 1235 1236 public boolean hasError() { 1237 return errorCode != 0; 1238 } 1239 1240 public boolean finalCheckForErrors() { 1241 if (errorCode != 0) { 1242 return true; 1243 } 1244 if (treatFailureAsError && 1245 (sink.getReadFailureCount() > allowedFailures || sink.getWriteFailureCount() > allowedFailures)) { 1246 LOG.error("Too many failures detected, treating failure as error, failing the Canary."); 1247 errorCode = FAILURE_EXIT_CODE; 1248 return true; 1249 } 1250 return false; 1251 } 1252 1253 @Override 1254 public void close() throws IOException { 1255 if (this.admin != null) this.admin.close(); 1256 } 1257 1258 protected Monitor(Connection connection, String[] monitorTargets, boolean useRegExp, Sink sink, 1259 ExecutorService executor, boolean treatFailureAsError, long allowedFailures) { 1260 if (null == connection) throw new IllegalArgumentException("connection shall not be null"); 1261 1262 this.connection = connection; 1263 this.targets = monitorTargets; 1264 this.useRegExp = useRegExp; 1265 this.treatFailureAsError = treatFailureAsError; 1266 this.sink = sink; 1267 this.executor = executor; 1268 this.allowedFailures = allowedFailures; 1269 } 1270 1271 @Override 1272 public abstract void run(); 1273 1274 protected boolean initAdmin() { 1275 if (null == this.admin) { 1276 try { 1277 this.admin = this.connection.getAdmin(); 1278 } catch (Exception e) { 1279 LOG.error("Initial HBaseAdmin failed...", e); 1280 this.errorCode = INIT_ERROR_EXIT_CODE; 1281 } 1282 } else if (admin.isAborted()) { 1283 LOG.error("HBaseAdmin aborted"); 1284 this.errorCode = INIT_ERROR_EXIT_CODE; 1285 } 1286 return !this.hasError(); 1287 } 1288 } 1289 1290 /** 1291 * A monitor for region mode. 1292 */ 1293 private static class RegionMonitor extends Monitor { 1294 // 10 minutes 1295 private static final int DEFAULT_WRITE_TABLE_CHECK_PERIOD = 10 * 60 * 1000; 1296 // 1 days 1297 private static final int DEFAULT_WRITE_DATA_TTL = 24 * 60 * 60; 1298 1299 private long lastCheckTime = -1; 1300 private boolean writeSniffing; 1301 private TableName writeTableName; 1302 private int writeDataTTL; 1303 private float regionsLowerLimit; 1304 private float regionsUpperLimit; 1305 private int checkPeriod; 1306 private boolean rawScanEnabled; 1307 1308 /** 1309 * This is a timeout per table. If read of each region in the table aggregated takes longer 1310 * than what is configured here, we log an ERROR rather than just an INFO. 1311 */ 1312 private HashMap<String, Long> configuredReadTableTimeouts; 1313 1314 private long configuredWriteTableTimeout; 1315 1316 public RegionMonitor(Connection connection, String[] monitorTargets, boolean useRegExp, 1317 Sink sink, ExecutorService executor, boolean writeSniffing, TableName writeTableName, 1318 boolean treatFailureAsError, HashMap<String, Long> configuredReadTableTimeouts, 1319 long configuredWriteTableTimeout, 1320 long allowedFailures) { 1321 super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError, 1322 allowedFailures); 1323 Configuration conf = connection.getConfiguration(); 1324 this.writeSniffing = writeSniffing; 1325 this.writeTableName = writeTableName; 1326 this.writeDataTTL = 1327 conf.getInt(HConstants.HBASE_CANARY_WRITE_DATA_TTL_KEY, DEFAULT_WRITE_DATA_TTL); 1328 this.regionsLowerLimit = 1329 conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY, 1.0f); 1330 this.regionsUpperLimit = 1331 conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_UPPERLIMIT_KEY, 1.5f); 1332 this.checkPeriod = 1333 conf.getInt(HConstants.HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY, 1334 DEFAULT_WRITE_TABLE_CHECK_PERIOD); 1335 this.rawScanEnabled = conf.getBoolean(HConstants.HBASE_CANARY_READ_RAW_SCAN_KEY, false); 1336 this.configuredReadTableTimeouts = new HashMap<>(configuredReadTableTimeouts); 1337 this.configuredWriteTableTimeout = configuredWriteTableTimeout; 1338 } 1339 1340 private RegionStdOutSink getSink() { 1341 if (!(sink instanceof RegionStdOutSink)) { 1342 throw new RuntimeException("Can only write to Region sink"); 1343 } 1344 return ((RegionStdOutSink) sink); 1345 } 1346 1347 @Override 1348 public void run() { 1349 if (this.initAdmin()) { 1350 try { 1351 List<Future<Void>> taskFutures = new LinkedList<>(); 1352 RegionStdOutSink regionSink = this.getSink(); 1353 if (this.targets != null && this.targets.length > 0) { 1354 String[] tables = generateMonitorTables(this.targets); 1355 // Check to see that each table name passed in the -readTableTimeouts argument is also 1356 // passed as a monitor target. 1357 if (!new HashSet<>(Arrays.asList(tables)). 1358 containsAll(this.configuredReadTableTimeouts.keySet())) { 1359 LOG.error("-readTableTimeouts can only specify read timeouts for monitor targets " + 1360 "passed via command line."); 1361 this.errorCode = USAGE_EXIT_CODE; 1362 return; 1363 } 1364 this.initialized = true; 1365 for (String table : tables) { 1366 LongAdder readLatency = regionSink.initializeAndGetReadLatencyForTable(table); 1367 taskFutures.addAll(CanaryTool.sniff(admin, regionSink, table, executor, TaskType.READ, 1368 this.rawScanEnabled, readLatency)); 1369 } 1370 } else { 1371 taskFutures.addAll(sniff(TaskType.READ, regionSink)); 1372 } 1373 1374 if (writeSniffing) { 1375 if (EnvironmentEdgeManager.currentTime() - lastCheckTime > checkPeriod) { 1376 try { 1377 checkWriteTableDistribution(); 1378 } catch (IOException e) { 1379 LOG.error("Check canary table distribution failed!", e); 1380 } 1381 lastCheckTime = EnvironmentEdgeManager.currentTime(); 1382 } 1383 // sniff canary table with write operation 1384 regionSink.initializeWriteLatency(); 1385 LongAdder writeTableLatency = regionSink.getWriteLatency(); 1386 taskFutures.addAll(CanaryTool.sniff(admin, regionSink, admin.getDescriptor(writeTableName), 1387 executor, TaskType.WRITE, this.rawScanEnabled, writeTableLatency)); 1388 } 1389 1390 for (Future<Void> future : taskFutures) { 1391 try { 1392 future.get(); 1393 } catch (ExecutionException e) { 1394 LOG.error("Sniff region failed!", e); 1395 } 1396 } 1397 Map<String, LongAdder> actualReadTableLatency = regionSink.getReadLatencyMap(); 1398 for (Map.Entry<String, Long> entry : configuredReadTableTimeouts.entrySet()) { 1399 String tableName = entry.getKey(); 1400 if (actualReadTableLatency.containsKey(tableName)) { 1401 Long actual = actualReadTableLatency.get(tableName).longValue(); 1402 Long configured = entry.getValue(); 1403 if (actual > configured) { 1404 LOG.error("Read operation for {} took {}ms exceeded the configured read timeout." + 1405 "(Configured read timeout {}ms.", tableName, actual, configured); 1406 } else { 1407 LOG.info("Read operation for {} took {}ms (Configured read timeout {}ms.", 1408 tableName, actual, configured); 1409 } 1410 } else { 1411 LOG.error("Read operation for {} failed!", tableName); 1412 } 1413 } 1414 if (this.writeSniffing) { 1415 String writeTableStringName = this.writeTableName.getNameAsString(); 1416 long actualWriteLatency = regionSink.getWriteLatency().longValue(); 1417 LOG.info("Write operation for {} took {}ms. Configured write timeout {}ms.", 1418 writeTableStringName, actualWriteLatency, this.configuredWriteTableTimeout); 1419 // Check that the writeTable write operation latency does not exceed the configured timeout. 1420 if (actualWriteLatency > this.configuredWriteTableTimeout) { 1421 LOG.error("Write operation for {} exceeded the configured write timeout.", 1422 writeTableStringName); 1423 } 1424 } 1425 } catch (Exception e) { 1426 LOG.error("Run regionMonitor failed", e); 1427 this.errorCode = ERROR_EXIT_CODE; 1428 } finally { 1429 this.done = true; 1430 } 1431 } 1432 this.done = true; 1433 } 1434 1435 /** 1436 * @return List of tables to use in test. 1437 */ 1438 private String[] generateMonitorTables(String[] monitorTargets) throws IOException { 1439 String[] returnTables = null; 1440 1441 if (this.useRegExp) { 1442 Pattern pattern = null; 1443 List<TableDescriptor> tds = null; 1444 Set<String> tmpTables = new TreeSet<>(); 1445 try { 1446 LOG.debug(String.format("reading list of tables")); 1447 tds = this.admin.listTableDescriptors(pattern); 1448 if (tds == null) { 1449 tds = Collections.emptyList(); 1450 } 1451 for (String monitorTarget : monitorTargets) { 1452 pattern = Pattern.compile(monitorTarget); 1453 for (TableDescriptor td : tds) { 1454 if (pattern.matcher(td.getTableName().getNameAsString()).matches()) { 1455 tmpTables.add(td.getTableName().getNameAsString()); 1456 } 1457 } 1458 } 1459 } catch (IOException e) { 1460 LOG.error("Communicate with admin failed", e); 1461 throw e; 1462 } 1463 1464 if (tmpTables.size() > 0) { 1465 returnTables = tmpTables.toArray(new String[tmpTables.size()]); 1466 } else { 1467 String msg = "No HTable found, tablePattern:" + Arrays.toString(monitorTargets); 1468 LOG.error(msg); 1469 this.errorCode = INIT_ERROR_EXIT_CODE; 1470 throw new TableNotFoundException(msg); 1471 } 1472 } else { 1473 returnTables = monitorTargets; 1474 } 1475 1476 return returnTables; 1477 } 1478 1479 /* 1480 * Canary entry point to monitor all the tables. 1481 */ 1482 private List<Future<Void>> sniff(TaskType taskType, RegionStdOutSink regionSink) 1483 throws Exception { 1484 LOG.debug("Reading list of tables"); 1485 List<Future<Void>> taskFutures = new LinkedList<>(); 1486 for (TableDescriptor td: admin.listTableDescriptors()) { 1487 if (admin.tableExists(td.getTableName()) && admin.isTableEnabled(td.getTableName()) && 1488 (!td.getTableName().equals(writeTableName))) { 1489 LongAdder readLatency = 1490 regionSink.initializeAndGetReadLatencyForTable(td.getTableName().getNameAsString()); 1491 taskFutures.addAll(CanaryTool.sniff(admin, sink, td, executor, taskType, this.rawScanEnabled, 1492 readLatency)); 1493 } 1494 } 1495 return taskFutures; 1496 } 1497 1498 private void checkWriteTableDistribution() throws IOException { 1499 if (!admin.tableExists(writeTableName)) { 1500 int numberOfServers = 1501 admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics().size(); 1502 if (numberOfServers == 0) { 1503 throw new IllegalStateException("No live regionservers"); 1504 } 1505 createWriteTable(numberOfServers); 1506 } 1507 1508 if (!admin.isTableEnabled(writeTableName)) { 1509 admin.enableTable(writeTableName); 1510 } 1511 1512 ClusterMetrics status = 1513 admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS, Option.MASTER)); 1514 int numberOfServers = status.getLiveServerMetrics().size(); 1515 if (status.getLiveServerMetrics().containsKey(status.getMasterName())) { 1516 numberOfServers -= 1; 1517 } 1518 1519 List<Pair<RegionInfo, ServerName>> pairs = 1520 MetaTableAccessor.getTableRegionsAndLocations(connection, writeTableName); 1521 int numberOfRegions = pairs.size(); 1522 if (numberOfRegions < numberOfServers * regionsLowerLimit 1523 || numberOfRegions > numberOfServers * regionsUpperLimit) { 1524 admin.disableTable(writeTableName); 1525 admin.deleteTable(writeTableName); 1526 createWriteTable(numberOfServers); 1527 } 1528 HashSet<ServerName> serverSet = new HashSet<>(); 1529 for (Pair<RegionInfo, ServerName> pair : pairs) { 1530 serverSet.add(pair.getSecond()); 1531 } 1532 int numberOfCoveredServers = serverSet.size(); 1533 if (numberOfCoveredServers < numberOfServers) { 1534 admin.balance(); 1535 } 1536 } 1537 1538 private void createWriteTable(int numberOfServers) throws IOException { 1539 int numberOfRegions = (int)(numberOfServers * regionsLowerLimit); 1540 LOG.info("Number of live regionservers {}, pre-splitting the canary table into {} regions " + 1541 "(current lower limit of regions per server is {} and you can change it with config {}).", 1542 numberOfServers, numberOfRegions, regionsLowerLimit, 1543 HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY); 1544 HTableDescriptor desc = new HTableDescriptor(writeTableName); 1545 HColumnDescriptor family = new HColumnDescriptor(CANARY_TABLE_FAMILY_NAME); 1546 family.setMaxVersions(1); 1547 family.setTimeToLive(writeDataTTL); 1548 1549 desc.addFamily(family); 1550 byte[][] splits = new RegionSplitter.HexStringSplit().split(numberOfRegions); 1551 admin.createTable(desc, splits); 1552 } 1553 } 1554 1555 /** 1556 * Canary entry point for specified table. 1557 * @throws Exception 1558 */ 1559 private static List<Future<Void>> sniff(final Admin admin, final Sink sink, String tableName, 1560 ExecutorService executor, TaskType taskType, boolean rawScanEnabled, LongAdder readLatency) 1561 throws Exception { 1562 LOG.debug("Checking table is enabled and getting table descriptor for table {}", tableName); 1563 if (admin.isTableEnabled(TableName.valueOf(tableName))) { 1564 return CanaryTool.sniff(admin, sink, admin.getDescriptor(TableName.valueOf(tableName)), 1565 executor, taskType, rawScanEnabled, readLatency); 1566 } else { 1567 LOG.warn("Table {} is not enabled", tableName); 1568 } 1569 return new LinkedList<>(); 1570 } 1571 1572 /* 1573 * Loops over regions of this table, and outputs information about the state. 1574 */ 1575 private static List<Future<Void>> sniff(final Admin admin, final Sink sink, 1576 TableDescriptor tableDesc, ExecutorService executor, TaskType taskType, 1577 boolean rawScanEnabled, LongAdder rwLatency) throws Exception { 1578 LOG.debug("Reading list of regions for table {}", tableDesc.getTableName()); 1579 try (Table table = admin.getConnection().getTable(tableDesc.getTableName())) { 1580 List<RegionTask> tasks = new ArrayList<>(); 1581 try (RegionLocator regionLocator = 1582 admin.getConnection().getRegionLocator(tableDesc.getTableName())) { 1583 for (HRegionLocation location: regionLocator.getAllRegionLocations()) { 1584 if (location == null) { 1585 LOG.warn("Null location"); 1586 continue; 1587 } 1588 ServerName rs = location.getServerName(); 1589 RegionInfo region = location.getRegion(); 1590 tasks.add(new RegionTask(admin.getConnection(), region, rs, (RegionStdOutSink)sink, 1591 taskType, rawScanEnabled, rwLatency)); 1592 Map<String, List<RegionTaskResult>> regionMap = ((RegionStdOutSink) sink).getRegionMap(); 1593 regionMap.put(region.getRegionNameAsString(), new ArrayList<RegionTaskResult>()); 1594 } 1595 return executor.invokeAll(tasks); 1596 } 1597 } catch (TableNotFoundException e) { 1598 return Collections.EMPTY_LIST; 1599 } 1600 } 1601 1602 // monitor for zookeeper mode 1603 private static class ZookeeperMonitor extends Monitor { 1604 private List<String> hosts; 1605 private final String znode; 1606 private final int timeout; 1607 1608 protected ZookeeperMonitor(Connection connection, String[] monitorTargets, boolean useRegExp, 1609 Sink sink, ExecutorService executor, boolean treatFailureAsError, long allowedFailures) { 1610 super(connection, monitorTargets, useRegExp, 1611 sink, executor, treatFailureAsError, allowedFailures); 1612 Configuration configuration = connection.getConfiguration(); 1613 znode = 1614 configuration.get(ZOOKEEPER_ZNODE_PARENT, 1615 DEFAULT_ZOOKEEPER_ZNODE_PARENT); 1616 timeout = configuration 1617 .getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT); 1618 ConnectStringParser parser = 1619 new ConnectStringParser(ZKConfig.getZKQuorumServersString(configuration)); 1620 hosts = Lists.newArrayList(); 1621 for (InetSocketAddress server : parser.getServerAddresses()) { 1622 hosts.add(server.toString()); 1623 } 1624 if (allowedFailures > (hosts.size() - 1) / 2) { 1625 LOG.warn("Confirm allowable number of failed ZooKeeper nodes, as quorum will " + 1626 "already be lost. Setting of {} failures is unexpected for {} ensemble size.", 1627 allowedFailures, hosts.size()); 1628 } 1629 } 1630 1631 @Override public void run() { 1632 List<ZookeeperTask> tasks = Lists.newArrayList(); 1633 ZookeeperStdOutSink zkSink = null; 1634 try { 1635 zkSink = this.getSink(); 1636 } catch (RuntimeException e) { 1637 LOG.error("Run ZooKeeperMonitor failed!", e); 1638 this.errorCode = ERROR_EXIT_CODE; 1639 } 1640 this.initialized = true; 1641 for (final String host : hosts) { 1642 tasks.add(new ZookeeperTask(connection, host, znode, timeout, zkSink)); 1643 } 1644 try { 1645 for (Future<Void> future : this.executor.invokeAll(tasks)) { 1646 try { 1647 future.get(); 1648 } catch (ExecutionException e) { 1649 LOG.error("Sniff zookeeper failed!", e); 1650 this.errorCode = ERROR_EXIT_CODE; 1651 } 1652 } 1653 } catch (InterruptedException e) { 1654 this.errorCode = ERROR_EXIT_CODE; 1655 Thread.currentThread().interrupt(); 1656 LOG.error("Sniff zookeeper interrupted!", e); 1657 } 1658 this.done = true; 1659 } 1660 1661 private ZookeeperStdOutSink getSink() { 1662 if (!(sink instanceof ZookeeperStdOutSink)) { 1663 throw new RuntimeException("Can only write to zookeeper sink"); 1664 } 1665 return ((ZookeeperStdOutSink) sink); 1666 } 1667 } 1668 1669 1670 /** 1671 * A monitor for regionserver mode 1672 */ 1673 private static class RegionServerMonitor extends Monitor { 1674 private boolean allRegions; 1675 1676 public RegionServerMonitor(Connection connection, String[] monitorTargets, boolean useRegExp, 1677 Sink sink, ExecutorService executor, boolean allRegions, 1678 boolean treatFailureAsError, long allowedFailures) { 1679 super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError, 1680 allowedFailures); 1681 this.allRegions = allRegions; 1682 } 1683 1684 private RegionServerStdOutSink getSink() { 1685 if (!(sink instanceof RegionServerStdOutSink)) { 1686 throw new RuntimeException("Can only write to regionserver sink"); 1687 } 1688 return ((RegionServerStdOutSink) sink); 1689 } 1690 1691 @Override 1692 public void run() { 1693 if (this.initAdmin() && this.checkNoTableNames()) { 1694 RegionServerStdOutSink regionServerSink = null; 1695 try { 1696 regionServerSink = this.getSink(); 1697 } catch (RuntimeException e) { 1698 LOG.error("Run RegionServerMonitor failed!", e); 1699 this.errorCode = ERROR_EXIT_CODE; 1700 } 1701 Map<String, List<RegionInfo>> rsAndRMap = this.filterRegionServerByName(); 1702 this.initialized = true; 1703 this.monitorRegionServers(rsAndRMap, regionServerSink); 1704 } 1705 this.done = true; 1706 } 1707 1708 private boolean checkNoTableNames() { 1709 List<String> foundTableNames = new ArrayList<>(); 1710 TableName[] tableNames = null; 1711 LOG.debug("Reading list of tables"); 1712 try { 1713 tableNames = this.admin.listTableNames(); 1714 } catch (IOException e) { 1715 LOG.error("Get listTableNames failed", e); 1716 this.errorCode = INIT_ERROR_EXIT_CODE; 1717 return false; 1718 } 1719 1720 if (this.targets == null || this.targets.length == 0) return true; 1721 1722 for (String target : this.targets) { 1723 for (TableName tableName : tableNames) { 1724 if (target.equals(tableName.getNameAsString())) { 1725 foundTableNames.add(target); 1726 } 1727 } 1728 } 1729 1730 if (foundTableNames.size() > 0) { 1731 System.err.println("Cannot pass a tablename when using the -regionserver " + 1732 "option, tablenames:" + foundTableNames.toString()); 1733 this.errorCode = USAGE_EXIT_CODE; 1734 } 1735 return foundTableNames.isEmpty(); 1736 } 1737 1738 private void monitorRegionServers(Map<String, List<RegionInfo>> rsAndRMap, RegionServerStdOutSink regionServerSink) { 1739 List<RegionServerTask> tasks = new ArrayList<>(); 1740 Map<String, AtomicLong> successMap = new HashMap<>(); 1741 Random rand = new Random(); 1742 for (Map.Entry<String, List<RegionInfo>> entry : rsAndRMap.entrySet()) { 1743 String serverName = entry.getKey(); 1744 AtomicLong successes = new AtomicLong(0); 1745 successMap.put(serverName, successes); 1746 if (entry.getValue().isEmpty()) { 1747 LOG.error("Regionserver not serving any regions - {}", serverName); 1748 } else if (this.allRegions) { 1749 for (RegionInfo region : entry.getValue()) { 1750 tasks.add(new RegionServerTask(this.connection, 1751 serverName, 1752 region, 1753 regionServerSink, 1754 successes)); 1755 } 1756 } else { 1757 // random select a region if flag not set 1758 RegionInfo region = entry.getValue().get(rand.nextInt(entry.getValue().size())); 1759 tasks.add(new RegionServerTask(this.connection, 1760 serverName, 1761 region, 1762 regionServerSink, 1763 successes)); 1764 } 1765 } 1766 try { 1767 for (Future<Void> future : this.executor.invokeAll(tasks)) { 1768 try { 1769 future.get(); 1770 } catch (ExecutionException e) { 1771 LOG.error("Sniff regionserver failed!", e); 1772 this.errorCode = ERROR_EXIT_CODE; 1773 } 1774 } 1775 if (this.allRegions) { 1776 for (Map.Entry<String, List<RegionInfo>> entry : rsAndRMap.entrySet()) { 1777 String serverName = entry.getKey(); 1778 LOG.info("Successfully read {} regions out of {} on regionserver {}", 1779 successMap.get(serverName), entry.getValue().size(), serverName); 1780 } 1781 } 1782 } catch (InterruptedException e) { 1783 this.errorCode = ERROR_EXIT_CODE; 1784 LOG.error("Sniff regionserver interrupted!", e); 1785 } 1786 } 1787 1788 private Map<String, List<RegionInfo>> filterRegionServerByName() { 1789 Map<String, List<RegionInfo>> regionServerAndRegionsMap = this.getAllRegionServerByName(); 1790 regionServerAndRegionsMap = this.doFilterRegionServerByName(regionServerAndRegionsMap); 1791 return regionServerAndRegionsMap; 1792 } 1793 1794 private Map<String, List<RegionInfo>> getAllRegionServerByName() { 1795 Map<String, List<RegionInfo>> rsAndRMap = new HashMap<>(); 1796 try { 1797 LOG.debug("Reading list of tables and locations"); 1798 List<TableDescriptor> tableDescs = this.admin.listTableDescriptors(); 1799 List<RegionInfo> regions = null; 1800 for (TableDescriptor tableDesc: tableDescs) { 1801 try (RegionLocator regionLocator = 1802 this.admin.getConnection().getRegionLocator(tableDesc.getTableName())) { 1803 for (HRegionLocation location : regionLocator.getAllRegionLocations()) { 1804 if (location == null) { 1805 LOG.warn("Null location"); 1806 continue; 1807 } 1808 ServerName rs = location.getServerName(); 1809 String rsName = rs.getHostname(); 1810 RegionInfo r = location.getRegion(); 1811 if (rsAndRMap.containsKey(rsName)) { 1812 regions = rsAndRMap.get(rsName); 1813 } else { 1814 regions = new ArrayList<>(); 1815 rsAndRMap.put(rsName, regions); 1816 } 1817 regions.add(r); 1818 } 1819 } 1820 } 1821 1822 // get any live regionservers not serving any regions 1823 for (ServerName rs: this.admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)) 1824 .getLiveServerMetrics().keySet()) { 1825 String rsName = rs.getHostname(); 1826 if (!rsAndRMap.containsKey(rsName)) { 1827 rsAndRMap.put(rsName, Collections.<RegionInfo> emptyList()); 1828 } 1829 } 1830 } catch (IOException e) { 1831 LOG.error("Get HTables info failed", e); 1832 this.errorCode = INIT_ERROR_EXIT_CODE; 1833 } 1834 return rsAndRMap; 1835 } 1836 1837 private Map<String, List<RegionInfo>> doFilterRegionServerByName( 1838 Map<String, List<RegionInfo>> fullRsAndRMap) { 1839 1840 Map<String, List<RegionInfo>> filteredRsAndRMap = null; 1841 1842 if (this.targets != null && this.targets.length > 0) { 1843 filteredRsAndRMap = new HashMap<>(); 1844 Pattern pattern = null; 1845 Matcher matcher = null; 1846 boolean regExpFound = false; 1847 for (String rsName : this.targets) { 1848 if (this.useRegExp) { 1849 regExpFound = false; 1850 pattern = Pattern.compile(rsName); 1851 for (Map.Entry<String, List<RegionInfo>> entry : fullRsAndRMap.entrySet()) { 1852 matcher = pattern.matcher(entry.getKey()); 1853 if (matcher.matches()) { 1854 filteredRsAndRMap.put(entry.getKey(), entry.getValue()); 1855 regExpFound = true; 1856 } 1857 } 1858 if (!regExpFound) { 1859 LOG.info("No RegionServerInfo found, regionServerPattern {}", rsName); 1860 } 1861 } else { 1862 if (fullRsAndRMap.containsKey(rsName)) { 1863 filteredRsAndRMap.put(rsName, fullRsAndRMap.get(rsName)); 1864 } else { 1865 LOG.info("No RegionServerInfo found, regionServerName {}", rsName); 1866 } 1867 } 1868 } 1869 } else { 1870 filteredRsAndRMap = fullRsAndRMap; 1871 } 1872 return filteredRsAndRMap; 1873 } 1874 } 1875 1876 public static void main(String[] args) throws Exception { 1877 final Configuration conf = HBaseConfiguration.create(); 1878 1879 int numThreads = conf.getInt("hbase.canary.threads.num", MAX_THREADS_NUM); 1880 LOG.info("Execution thread count={}", numThreads); 1881 1882 int exitCode; 1883 ExecutorService executor = new ScheduledThreadPoolExecutor(numThreads); 1884 try { 1885 exitCode = ToolRunner.run(conf, new CanaryTool(executor), args); 1886 } finally { 1887 executor.shutdown(); 1888 } 1889 System.exit(exitCode); 1890 } 1891}