001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.tool; 019 020import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT; 021import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_ZNODE_PARENT; 022import static org.apache.hadoop.hbase.util.Addressing.inetSocketAddress2String; 023 024import java.io.Closeable; 025import java.io.IOException; 026import java.net.BindException; 027import java.net.InetSocketAddress; 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.Collections; 031import java.util.EnumSet; 032import java.util.HashMap; 033import java.util.HashSet; 034import java.util.LinkedList; 035import java.util.List; 036import java.util.Map; 037import java.util.Set; 038import java.util.TreeSet; 039import java.util.concurrent.Callable; 040import java.util.concurrent.ConcurrentHashMap; 041import java.util.concurrent.ConcurrentMap; 042import java.util.concurrent.ExecutionException; 043import java.util.concurrent.ExecutorService; 044import java.util.concurrent.Future; 045import java.util.concurrent.ScheduledThreadPoolExecutor; 046import java.util.concurrent.ThreadLocalRandom; 047import java.util.concurrent.atomic.AtomicLong; 048import java.util.concurrent.atomic.LongAdder; 049import java.util.regex.Matcher; 050import java.util.regex.Pattern; 051import org.apache.commons.lang3.time.StopWatch; 052import org.apache.hadoop.conf.Configuration; 053import org.apache.hadoop.hbase.AuthUtil; 054import org.apache.hadoop.hbase.ChoreService; 055import org.apache.hadoop.hbase.ClusterMetrics; 056import org.apache.hadoop.hbase.ClusterMetrics.Option; 057import org.apache.hadoop.hbase.DoNotRetryIOException; 058import org.apache.hadoop.hbase.HBaseConfiguration; 059import org.apache.hadoop.hbase.HBaseInterfaceAudience; 060import org.apache.hadoop.hbase.HColumnDescriptor; 061import org.apache.hadoop.hbase.HConstants; 062import org.apache.hadoop.hbase.HRegionLocation; 063import org.apache.hadoop.hbase.HTableDescriptor; 064import org.apache.hadoop.hbase.MetaTableAccessor; 065import org.apache.hadoop.hbase.NamespaceDescriptor; 066import org.apache.hadoop.hbase.ScheduledChore; 067import org.apache.hadoop.hbase.ServerName; 068import org.apache.hadoop.hbase.TableName; 069import org.apache.hadoop.hbase.TableNotEnabledException; 070import org.apache.hadoop.hbase.TableNotFoundException; 071import org.apache.hadoop.hbase.client.Admin; 072import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 073import org.apache.hadoop.hbase.client.Connection; 074import org.apache.hadoop.hbase.client.ConnectionFactory; 075import org.apache.hadoop.hbase.client.Get; 076import org.apache.hadoop.hbase.client.Put; 077import org.apache.hadoop.hbase.client.RegionInfo; 078import org.apache.hadoop.hbase.client.RegionLocator; 079import org.apache.hadoop.hbase.client.ResultScanner; 080import org.apache.hadoop.hbase.client.Scan; 081import org.apache.hadoop.hbase.client.Table; 082import org.apache.hadoop.hbase.client.TableDescriptor; 083import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; 084import org.apache.hadoop.hbase.http.InfoServer; 085import org.apache.hadoop.hbase.tool.CanaryTool.RegionTask.TaskType; 086import org.apache.hadoop.hbase.util.Bytes; 087import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 088import org.apache.hadoop.hbase.util.Pair; 089import org.apache.hadoop.hbase.util.ReflectionUtils; 090import org.apache.hadoop.hbase.util.RegionSplitter; 091import org.apache.hadoop.hbase.zookeeper.EmptyWatcher; 092import org.apache.hadoop.hbase.zookeeper.ZKConfig; 093import org.apache.hadoop.util.Tool; 094import org.apache.hadoop.util.ToolRunner; 095import org.apache.yetus.audience.InterfaceAudience; 096import org.apache.zookeeper.KeeperException; 097import org.apache.zookeeper.ZooKeeper; 098import org.apache.zookeeper.client.ConnectStringParser; 099import org.apache.zookeeper.data.Stat; 100import org.slf4j.Logger; 101import org.slf4j.LoggerFactory; 102 103import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 104 105/** 106 * HBase Canary Tool for "canary monitoring" of a running HBase cluster. There are three modes: 107 * <ol> 108 * <li>region mode (Default): For each region, try to get one row per column family outputting 109 * information on failure (ERROR) or else the latency.</li> 110 * <li>regionserver mode: For each regionserver try to get one row from one table selected randomly 111 * outputting information on failure (ERROR) or else the latency.</li> 112 * <li>zookeeper mode: for each zookeeper instance, selects a znode outputting information on 113 * failure (ERROR) or else the latency.</li> 114 * </ol> 115 */ 116@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 117public class CanaryTool implements Tool, Canary { 118 public static final String HBASE_CANARY_INFO_PORT = "hbase.canary.info.port"; 119 public static final String HBASE_CANARY_INFO_BINDADDRESS = "hbase.canary.info.bindAddress"; 120 121 private void putUpWebUI() throws IOException { 122 int port = conf.getInt(HBASE_CANARY_INFO_PORT, -1); 123 // -1 is for disabling info server 124 if (port < 0) { 125 return; 126 } 127 if (zookeeperMode) { 128 LOG.info("WebUI is not supported in Zookeeper mode"); 129 } else if (regionServerMode) { 130 LOG.info("WebUI is not supported in RegionServer mode"); 131 } else { 132 String addr = conf.get(HBASE_CANARY_INFO_BINDADDRESS, "0.0.0.0"); 133 try { 134 InfoServer infoServer = new InfoServer("canary", addr, port, false, conf); 135 infoServer.addUnprivilegedServlet("canary", "/canary-status", CanaryStatusServlet.class); 136 infoServer.setAttribute("sink", getSink(conf, RegionStdOutSink.class)); 137 infoServer.start(); 138 LOG.info("Bind Canary http info server to {}:{} ", addr, port); 139 } catch (BindException e) { 140 LOG.warn("Failed binding Canary http info server to {}:{}", addr, port, e); 141 } 142 } 143 } 144 145 @Override 146 public int checkRegions(String[] targets) throws Exception { 147 String configuredReadTableTimeoutsStr = conf.get(HBASE_CANARY_REGION_READ_TABLE_TIMEOUT); 148 try { 149 LOG.info("Canary tool is running in Region mode"); 150 if (configuredReadTableTimeoutsStr != null) { 151 populateReadTableTimeoutsMap(configuredReadTableTimeoutsStr); 152 } 153 } catch (IllegalArgumentException e) { 154 LOG.error("Constructing read table timeouts map failed ", e); 155 return USAGE_EXIT_CODE; 156 } 157 return runMonitor(targets); 158 } 159 160 @Override 161 public int checkRegionServers(String[] targets) throws Exception { 162 regionServerMode = true; 163 LOG.info("Canary tool is running in RegionServer mode"); 164 return runMonitor(targets); 165 } 166 167 @Override 168 public int checkZooKeeper() throws Exception { 169 zookeeperMode = true; 170 LOG.info("Canary tool is running in ZooKeeper mode"); 171 return runMonitor(null); 172 } 173 174 /** 175 * Sink interface used by the canary to output information 176 */ 177 public interface Sink { 178 long getReadFailureCount(); 179 180 long incReadFailureCount(); 181 182 Map<String, String> getReadFailures(); 183 184 void updateReadFailures(String regionName, String serverName); 185 186 long getWriteFailureCount(); 187 188 long incWriteFailureCount(); 189 190 Map<String, String> getWriteFailures(); 191 192 void updateWriteFailures(String regionName, String serverName); 193 194 long getReadSuccessCount(); 195 196 long incReadSuccessCount(); 197 198 long getWriteSuccessCount(); 199 200 long incWriteSuccessCount(); 201 } 202 203 /** 204 * Simple implementation of canary sink that allows plotting to a file or standard output. 205 */ 206 public static class StdOutSink implements Sink { 207 private AtomicLong readFailureCount = new AtomicLong(0), writeFailureCount = new AtomicLong(0), 208 readSuccessCount = new AtomicLong(0), writeSuccessCount = new AtomicLong(0); 209 private Map<String, String> readFailures = new ConcurrentHashMap<>(); 210 private Map<String, String> writeFailures = new ConcurrentHashMap<>(); 211 212 @Override 213 public long getReadFailureCount() { 214 return readFailureCount.get(); 215 } 216 217 @Override 218 public long incReadFailureCount() { 219 return readFailureCount.incrementAndGet(); 220 } 221 222 @Override 223 public Map<String, String> getReadFailures() { 224 return readFailures; 225 } 226 227 @Override 228 public void updateReadFailures(String regionName, String serverName) { 229 readFailures.put(regionName, serverName); 230 } 231 232 @Override 233 public long getWriteFailureCount() { 234 return writeFailureCount.get(); 235 } 236 237 @Override 238 public long incWriteFailureCount() { 239 return writeFailureCount.incrementAndGet(); 240 } 241 242 @Override 243 public Map<String, String> getWriteFailures() { 244 return writeFailures; 245 } 246 247 @Override 248 public void updateWriteFailures(String regionName, String serverName) { 249 writeFailures.put(regionName, serverName); 250 } 251 252 @Override 253 public long getReadSuccessCount() { 254 return readSuccessCount.get(); 255 } 256 257 @Override 258 public long incReadSuccessCount() { 259 return readSuccessCount.incrementAndGet(); 260 } 261 262 @Override 263 public long getWriteSuccessCount() { 264 return writeSuccessCount.get(); 265 } 266 267 @Override 268 public long incWriteSuccessCount() { 269 return writeSuccessCount.incrementAndGet(); 270 } 271 } 272 273 /** 274 * By RegionServer, for 'regionserver' mode. 275 */ 276 public static class RegionServerStdOutSink extends StdOutSink { 277 public void publishReadFailure(String table, String server) { 278 incReadFailureCount(); 279 LOG.error("Read from {} on {}", table, server); 280 } 281 282 public void publishReadTiming(String table, String server, long msTime) { 283 LOG.info("Read from {} on {} in {}ms", table, server, msTime); 284 } 285 } 286 287 /** 288 * Output for 'zookeeper' mode. 289 */ 290 public static class ZookeeperStdOutSink extends StdOutSink { 291 public void publishReadFailure(String znode, String server) { 292 incReadFailureCount(); 293 LOG.error("Read from {} on {}", znode, server); 294 } 295 296 public void publishReadTiming(String znode, String server, long msTime) { 297 LOG.info("Read from {} on {} in {}ms", znode, server, msTime); 298 } 299 } 300 301 /** 302 * By Region, for 'region' mode. 303 */ 304 public static class RegionStdOutSink extends StdOutSink { 305 private Map<String, LongAdder> perTableReadLatency = new HashMap<>(); 306 private LongAdder writeLatency = new LongAdder(); 307 private final ConcurrentMap<String, List<RegionTaskResult>> regionMap = 308 new ConcurrentHashMap<>(); 309 private ConcurrentMap<ServerName, LongAdder> perServerFailuresCount = new ConcurrentHashMap<>(); 310 private ConcurrentMap<String, LongAdder> perTableFailuresCount = new ConcurrentHashMap<>(); 311 312 public ConcurrentMap<ServerName, LongAdder> getPerServerFailuresCount() { 313 return perServerFailuresCount; 314 } 315 316 public ConcurrentMap<String, LongAdder> getPerTableFailuresCount() { 317 return perTableFailuresCount; 318 } 319 320 public void resetFailuresCountDetails() { 321 perServerFailuresCount.clear(); 322 perTableFailuresCount.clear(); 323 } 324 325 private void incFailuresCountDetails(ServerName serverName, RegionInfo region) { 326 perServerFailuresCount.compute(serverName, (server, count) -> { 327 if (count == null) { 328 count = new LongAdder(); 329 } 330 count.increment(); 331 return count; 332 }); 333 perTableFailuresCount.compute(region.getTable().getNameAsString(), (tableName, count) -> { 334 if (count == null) { 335 count = new LongAdder(); 336 } 337 count.increment(); 338 return count; 339 }); 340 } 341 342 public void publishReadFailure(ServerName serverName, RegionInfo region, Exception e) { 343 incReadFailureCount(); 344 incFailuresCountDetails(serverName, region); 345 LOG.error("Read from {} on serverName={} failed", region.getRegionNameAsString(), serverName, 346 e); 347 } 348 349 public void publishReadFailure(ServerName serverName, RegionInfo region, 350 ColumnFamilyDescriptor column, Exception e) { 351 incReadFailureCount(); 352 incFailuresCountDetails(serverName, region); 353 LOG.error("Read from {} on serverName={}, columnFamily={} failed", 354 region.getRegionNameAsString(), serverName, column.getNameAsString(), e); 355 } 356 357 public void publishReadTiming(ServerName serverName, RegionInfo region, 358 ColumnFamilyDescriptor column, long msTime) { 359 RegionTaskResult rtr = new RegionTaskResult(region, region.getTable(), serverName, column); 360 rtr.setReadSuccess(); 361 rtr.setReadLatency(msTime); 362 List<RegionTaskResult> rtrs = regionMap.get(region.getRegionNameAsString()); 363 rtrs.add(rtr); 364 // Note that read success count will be equal to total column family read successes. 365 incReadSuccessCount(); 366 LOG.info("Read from {} on {} {} in {}ms", region.getRegionNameAsString(), serverName, 367 column.getNameAsString(), msTime); 368 } 369 370 public void publishWriteFailure(ServerName serverName, RegionInfo region, Exception e) { 371 incWriteFailureCount(); 372 incFailuresCountDetails(serverName, region); 373 LOG.error("Write to {} on {} failed", region.getRegionNameAsString(), serverName, e); 374 } 375 376 public void publishWriteFailure(ServerName serverName, RegionInfo region, 377 ColumnFamilyDescriptor column, Exception e) { 378 incWriteFailureCount(); 379 incFailuresCountDetails(serverName, region); 380 LOG.error("Write to {} on {} {} failed", region.getRegionNameAsString(), serverName, 381 column.getNameAsString(), e); 382 } 383 384 public void publishWriteTiming(ServerName serverName, RegionInfo region, 385 ColumnFamilyDescriptor column, long msTime) { 386 RegionTaskResult rtr = new RegionTaskResult(region, region.getTable(), serverName, column); 387 rtr.setWriteSuccess(); 388 rtr.setWriteLatency(msTime); 389 List<RegionTaskResult> rtrs = regionMap.get(region.getRegionNameAsString()); 390 rtrs.add(rtr); 391 // Note that write success count will be equal to total column family write successes. 392 incWriteSuccessCount(); 393 LOG.info("Write to {} on {} {} in {}ms", region.getRegionNameAsString(), serverName, 394 column.getNameAsString(), msTime); 395 } 396 397 public Map<String, LongAdder> getReadLatencyMap() { 398 return this.perTableReadLatency; 399 } 400 401 public LongAdder initializeAndGetReadLatencyForTable(String tableName) { 402 LongAdder initLatency = new LongAdder(); 403 this.perTableReadLatency.put(tableName, initLatency); 404 return initLatency; 405 } 406 407 public void initializeWriteLatency() { 408 this.writeLatency.reset(); 409 } 410 411 public LongAdder getWriteLatency() { 412 return this.writeLatency; 413 } 414 415 public ConcurrentMap<String, List<RegionTaskResult>> getRegionMap() { 416 return this.regionMap; 417 } 418 419 public int getTotalExpectedRegions() { 420 return this.regionMap.size(); 421 } 422 } 423 424 /** 425 * Run a single zookeeper Task and then exit. 426 */ 427 static class ZookeeperTask implements Callable<Void> { 428 private final Connection connection; 429 private final String host; 430 private String znode; 431 private final int timeout; 432 private ZookeeperStdOutSink sink; 433 434 public ZookeeperTask(Connection connection, String host, String znode, int timeout, 435 ZookeeperStdOutSink sink) { 436 this.connection = connection; 437 this.host = host; 438 this.znode = znode; 439 this.timeout = timeout; 440 this.sink = sink; 441 } 442 443 @Override 444 public Void call() throws Exception { 445 ZooKeeper zooKeeper = null; 446 try { 447 zooKeeper = new ZooKeeper(host, timeout, EmptyWatcher.instance); 448 Stat exists = zooKeeper.exists(znode, false); 449 StopWatch stopwatch = new StopWatch(); 450 stopwatch.start(); 451 zooKeeper.getData(znode, false, exists); 452 stopwatch.stop(); 453 sink.publishReadTiming(znode, host, stopwatch.getTime()); 454 } catch (KeeperException | InterruptedException e) { 455 sink.publishReadFailure(znode, host); 456 } finally { 457 if (zooKeeper != null) { 458 zooKeeper.close(); 459 } 460 } 461 return null; 462 } 463 } 464 465 /** 466 * Run a single Region Task and then exit. For each column family of the Region, get one row and 467 * output latency or failure. 468 */ 469 static class RegionTask implements Callable<Void> { 470 public enum TaskType { 471 READ, 472 WRITE 473 } 474 475 private Connection connection; 476 private RegionInfo region; 477 private RegionStdOutSink sink; 478 private TaskType taskType; 479 private boolean rawScanEnabled; 480 private ServerName serverName; 481 private LongAdder readWriteLatency; 482 private boolean readAllCF; 483 484 RegionTask(Connection connection, RegionInfo region, ServerName serverName, 485 RegionStdOutSink sink, TaskType taskType, boolean rawScanEnabled, LongAdder rwLatency, 486 boolean readAllCF) { 487 this.connection = connection; 488 this.region = region; 489 this.serverName = serverName; 490 this.sink = sink; 491 this.taskType = taskType; 492 this.rawScanEnabled = rawScanEnabled; 493 this.readWriteLatency = rwLatency; 494 this.readAllCF = readAllCF; 495 } 496 497 @Override 498 public Void call() { 499 switch (taskType) { 500 case READ: 501 return read(); 502 case WRITE: 503 return write(); 504 default: 505 return read(); 506 } 507 } 508 509 private Void readColumnFamily(Table table, ColumnFamilyDescriptor column) { 510 byte[] startKey = null; 511 Get get = null; 512 Scan scan = null; 513 ResultScanner rs = null; 514 StopWatch stopWatch = new StopWatch(); 515 startKey = region.getStartKey(); 516 // Can't do a get on empty start row so do a Scan of first element if any instead. 517 if (startKey.length > 0) { 518 get = new Get(startKey); 519 get.setCacheBlocks(false); 520 get.setFilter(new FirstKeyOnlyFilter()); 521 get.addFamily(column.getName()); 522 } else { 523 scan = new Scan(); 524 LOG.debug("rawScan {} for {}", rawScanEnabled, region.getTable()); 525 scan.setRaw(rawScanEnabled); 526 scan.setCaching(1); 527 scan.setCacheBlocks(false); 528 scan.setFilter(new FirstKeyOnlyFilter()); 529 scan.addFamily(column.getName()); 530 scan.setMaxResultSize(1L); 531 scan.setOneRowLimit(); 532 } 533 LOG.debug("Reading from {} {} {} {}", region.getTable(), region.getRegionNameAsString(), 534 column.getNameAsString(), Bytes.toStringBinary(startKey)); 535 try { 536 stopWatch.start(); 537 if (startKey.length > 0) { 538 table.get(get); 539 } else { 540 rs = table.getScanner(scan); 541 rs.next(); 542 } 543 stopWatch.stop(); 544 this.readWriteLatency.add(stopWatch.getTime()); 545 sink.publishReadTiming(serverName, region, column, stopWatch.getTime()); 546 } catch (Exception e) { 547 sink.publishReadFailure(serverName, region, column, e); 548 sink.updateReadFailures(region.getRegionNameAsString(), 549 serverName == null ? "NULL" : serverName.getHostname()); 550 } finally { 551 if (rs != null) { 552 rs.close(); 553 } 554 } 555 return null; 556 } 557 558 private ColumnFamilyDescriptor randomPickOneColumnFamily(ColumnFamilyDescriptor[] cfs) { 559 int size = cfs.length; 560 return cfs[ThreadLocalRandom.current().nextInt(size)]; 561 562 } 563 564 public Void read() { 565 Table table = null; 566 TableDescriptor tableDesc = null; 567 try { 568 LOG.debug("Reading table descriptor for table {}", region.getTable()); 569 table = connection.getTable(region.getTable()); 570 tableDesc = table.getDescriptor(); 571 } catch (IOException e) { 572 LOG.debug("sniffRegion {} of {} failed", region.getEncodedName(), e); 573 sink.publishReadFailure(serverName, region, e); 574 if (table != null) { 575 try { 576 table.close(); 577 } catch (IOException ioe) { 578 LOG.error("Close table failed", e); 579 } 580 } 581 return null; 582 } 583 584 if (readAllCF) { 585 for (ColumnFamilyDescriptor column : tableDesc.getColumnFamilies()) { 586 readColumnFamily(table, column); 587 } 588 } else { 589 readColumnFamily(table, randomPickOneColumnFamily(tableDesc.getColumnFamilies())); 590 } 591 try { 592 table.close(); 593 } catch (IOException e) { 594 LOG.error("Close table failed", e); 595 } 596 return null; 597 } 598 599 /** 600 * Check writes for the canary table 601 */ 602 private Void write() { 603 Table table = null; 604 TableDescriptor tableDesc = null; 605 try { 606 table = connection.getTable(region.getTable()); 607 tableDesc = table.getDescriptor(); 608 byte[] rowToCheck = region.getStartKey(); 609 if (rowToCheck.length == 0) { 610 rowToCheck = new byte[] { 0x0 }; 611 } 612 int writeValueSize = 613 connection.getConfiguration().getInt(HConstants.HBASE_CANARY_WRITE_VALUE_SIZE_KEY, 10); 614 for (ColumnFamilyDescriptor column : tableDesc.getColumnFamilies()) { 615 Put put = new Put(rowToCheck); 616 byte[] value = new byte[writeValueSize]; 617 Bytes.random(value); 618 put.addColumn(column.getName(), HConstants.EMPTY_BYTE_ARRAY, value); 619 LOG.debug("Writing to {} {} {} {}", tableDesc.getTableName(), 620 region.getRegionNameAsString(), column.getNameAsString(), 621 Bytes.toStringBinary(rowToCheck)); 622 try { 623 long startTime = EnvironmentEdgeManager.currentTime(); 624 table.put(put); 625 long time = EnvironmentEdgeManager.currentTime() - startTime; 626 this.readWriteLatency.add(time); 627 sink.publishWriteTiming(serverName, region, column, time); 628 } catch (Exception e) { 629 sink.publishWriteFailure(serverName, region, column, e); 630 } 631 } 632 table.close(); 633 } catch (IOException e) { 634 sink.publishWriteFailure(serverName, region, e); 635 sink.updateWriteFailures(region.getRegionNameAsString(), serverName.getHostname()); 636 } 637 return null; 638 } 639 } 640 641 /** 642 * Run a single RegionServer Task and then exit. Get one row from a region on the regionserver and 643 * output latency or the failure. 644 */ 645 static class RegionServerTask implements Callable<Void> { 646 private Connection connection; 647 private String serverName; 648 private RegionInfo region; 649 private RegionServerStdOutSink sink; 650 private AtomicLong successes; 651 652 RegionServerTask(Connection connection, String serverName, RegionInfo region, 653 RegionServerStdOutSink sink, AtomicLong successes) { 654 this.connection = connection; 655 this.serverName = serverName; 656 this.region = region; 657 this.sink = sink; 658 this.successes = successes; 659 } 660 661 @Override 662 public Void call() { 663 TableName tableName = null; 664 Table table = null; 665 Get get = null; 666 byte[] startKey = null; 667 Scan scan = null; 668 StopWatch stopWatch = new StopWatch(); 669 // monitor one region on every region server 670 stopWatch.reset(); 671 try { 672 tableName = region.getTable(); 673 table = connection.getTable(tableName); 674 startKey = region.getStartKey(); 675 // Can't do a get on empty start row so do a Scan of first element if any instead. 676 LOG.debug("Reading from {} {} {} {}", serverName, region.getTable(), 677 region.getRegionNameAsString(), Bytes.toStringBinary(startKey)); 678 if (startKey.length > 0) { 679 get = new Get(startKey); 680 get.setCacheBlocks(false); 681 get.setFilter(new FirstKeyOnlyFilter()); 682 stopWatch.start(); 683 table.get(get); 684 stopWatch.stop(); 685 } else { 686 scan = new Scan(); 687 scan.setCacheBlocks(false); 688 scan.setFilter(new FirstKeyOnlyFilter()); 689 scan.setCaching(1); 690 scan.setMaxResultSize(1L); 691 scan.setOneRowLimit(); 692 stopWatch.start(); 693 ResultScanner s = table.getScanner(scan); 694 s.next(); 695 s.close(); 696 stopWatch.stop(); 697 } 698 successes.incrementAndGet(); 699 sink.publishReadTiming(tableName.getNameAsString(), serverName, stopWatch.getTime()); 700 } catch (TableNotFoundException tnfe) { 701 LOG.error("Table may be deleted", tnfe); 702 // This is ignored because it doesn't imply that the regionserver is dead 703 } catch (TableNotEnabledException tnee) { 704 // This is considered a success since we got a response. 705 successes.incrementAndGet(); 706 LOG.debug("The targeted table was disabled. Assuming success."); 707 } catch (DoNotRetryIOException dnrioe) { 708 sink.publishReadFailure(tableName.getNameAsString(), serverName); 709 LOG.error(dnrioe.toString(), dnrioe); 710 } catch (IOException e) { 711 sink.publishReadFailure(tableName.getNameAsString(), serverName); 712 LOG.error(e.toString(), e); 713 } finally { 714 if (table != null) { 715 try { 716 table.close(); 717 } catch (IOException e) {/* DO NOTHING */ 718 LOG.error("Close table failed", e); 719 } 720 } 721 scan = null; 722 get = null; 723 startKey = null; 724 } 725 return null; 726 } 727 } 728 729 private static final int USAGE_EXIT_CODE = 1; 730 private static final int INIT_ERROR_EXIT_CODE = 2; 731 private static final int TIMEOUT_ERROR_EXIT_CODE = 3; 732 private static final int ERROR_EXIT_CODE = 4; 733 private static final int FAILURE_EXIT_CODE = 5; 734 735 private static final long DEFAULT_INTERVAL = 60000; 736 737 private static final long DEFAULT_TIMEOUT = 600000; // 10 mins 738 private static final int MAX_THREADS_NUM = 16; // #threads to contact regions 739 740 private static final Logger LOG = LoggerFactory.getLogger(Canary.class); 741 742 public static final TableName DEFAULT_WRITE_TABLE_NAME = 743 TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "canary"); 744 745 private static final String CANARY_TABLE_FAMILY_NAME = "Test"; 746 747 private Configuration conf = null; 748 private long interval = 0; 749 private Sink sink = null; 750 751 /** 752 * True if we are to run in 'regionServer' mode. 753 */ 754 private boolean regionServerMode = false; 755 756 /** 757 * True if we are to run in zookeeper 'mode'. 758 */ 759 private boolean zookeeperMode = false; 760 761 /** 762 * This is a Map of table to timeout. The timeout is for reading all regions in the table; i.e. we 763 * aggregate time to fetch each region and it needs to be less than this value else we log an 764 * ERROR. 765 */ 766 private HashMap<String, Long> configuredReadTableTimeouts = new HashMap<>(); 767 768 public static final String HBASE_CANARY_REGIONSERVER_ALL_REGIONS = 769 "hbase.canary.regionserver_all_regions"; 770 771 public static final String HBASE_CANARY_REGION_WRITE_SNIFFING = 772 "hbase.canary.region.write.sniffing"; 773 public static final String HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT = 774 "hbase.canary.region.write.table.timeout"; 775 public static final String HBASE_CANARY_REGION_WRITE_TABLE_NAME = 776 "hbase.canary.region.write.table.name"; 777 public static final String HBASE_CANARY_REGION_READ_TABLE_TIMEOUT = 778 "hbase.canary.region.read.table.timeout"; 779 780 public static final String HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES = 781 "hbase.canary.zookeeper.permitted.failures"; 782 783 public static final String HBASE_CANARY_USE_REGEX = "hbase.canary.use.regex"; 784 public static final String HBASE_CANARY_TIMEOUT = "hbase.canary.timeout"; 785 public static final String HBASE_CANARY_FAIL_ON_ERROR = "hbase.canary.fail.on.error"; 786 787 private ExecutorService executor; // threads to retrieve data from regionservers 788 789 public CanaryTool() { 790 this(new ScheduledThreadPoolExecutor(1)); 791 } 792 793 public CanaryTool(ExecutorService executor) { 794 this(executor, null); 795 } 796 797 @InterfaceAudience.Private 798 CanaryTool(ExecutorService executor, Sink sink) { 799 this.executor = executor; 800 this.sink = sink; 801 } 802 803 CanaryTool(Configuration conf, ExecutorService executor) { 804 this(conf, executor, null); 805 } 806 807 CanaryTool(Configuration conf, ExecutorService executor, Sink sink) { 808 this(executor, sink); 809 setConf(conf); 810 } 811 812 @Override 813 public Configuration getConf() { 814 return conf; 815 } 816 817 @Override 818 public void setConf(Configuration conf) { 819 if (conf == null) { 820 conf = HBaseConfiguration.create(); 821 } 822 this.conf = conf; 823 } 824 825 private int parseArgs(String[] args) { 826 int index = -1; 827 long permittedFailures = 0; 828 boolean regionServerAllRegions = false, writeSniffing = false; 829 String readTableTimeoutsStr = null; 830 // Process command line args 831 for (int i = 0; i < args.length; i++) { 832 String cmd = args[i]; 833 if (cmd.startsWith("-")) { 834 if (index >= 0) { 835 // command line args must be in the form: [opts] [table 1 [table 2 ...]] 836 System.err.println("Invalid command line options"); 837 printUsageAndExit(); 838 } 839 if (cmd.equals("-help") || cmd.equals("-h")) { 840 // user asked for help, print the help and quit. 841 printUsageAndExit(); 842 } else if (cmd.equals("-daemon") && interval == 0) { 843 // user asked for daemon mode, set a default interval between checks 844 interval = DEFAULT_INTERVAL; 845 } else if (cmd.equals("-interval")) { 846 // user has specified an interval for canary breaths (-interval N) 847 i++; 848 849 if (i == args.length) { 850 System.err.println("-interval takes a numeric seconds value argument."); 851 printUsageAndExit(); 852 } 853 try { 854 interval = Long.parseLong(args[i]) * 1000; 855 } catch (NumberFormatException e) { 856 System.err.println("-interval needs a numeric value argument."); 857 printUsageAndExit(); 858 } 859 } else if (cmd.equals("-zookeeper")) { 860 this.zookeeperMode = true; 861 } else if (cmd.equals("-regionserver")) { 862 this.regionServerMode = true; 863 } else if (cmd.equals("-allRegions")) { 864 conf.setBoolean(HBASE_CANARY_REGIONSERVER_ALL_REGIONS, true); 865 regionServerAllRegions = true; 866 } else if (cmd.equals("-writeSniffing")) { 867 writeSniffing = true; 868 conf.setBoolean(HBASE_CANARY_REGION_WRITE_SNIFFING, true); 869 } else if (cmd.equals("-treatFailureAsError") || cmd.equals("-failureAsError")) { 870 conf.setBoolean(HBASE_CANARY_FAIL_ON_ERROR, true); 871 } else if (cmd.equals("-e")) { 872 conf.setBoolean(HBASE_CANARY_USE_REGEX, true); 873 } else if (cmd.equals("-t")) { 874 i++; 875 876 if (i == args.length) { 877 System.err.println("-t takes a numeric milliseconds value argument."); 878 printUsageAndExit(); 879 } 880 long timeout = 0; 881 try { 882 timeout = Long.parseLong(args[i]); 883 } catch (NumberFormatException e) { 884 System.err.println("-t takes a numeric milliseconds value argument."); 885 printUsageAndExit(); 886 } 887 conf.setLong(HBASE_CANARY_TIMEOUT, timeout); 888 } else if (cmd.equals("-writeTableTimeout")) { 889 i++; 890 891 if (i == args.length) { 892 System.err.println("-writeTableTimeout takes a numeric milliseconds value argument."); 893 printUsageAndExit(); 894 } 895 long configuredWriteTableTimeout = 0; 896 try { 897 configuredWriteTableTimeout = Long.parseLong(args[i]); 898 } catch (NumberFormatException e) { 899 System.err.println("-writeTableTimeout takes a numeric milliseconds value argument."); 900 printUsageAndExit(); 901 } 902 conf.setLong(HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT, configuredWriteTableTimeout); 903 } else if (cmd.equals("-writeTable")) { 904 i++; 905 906 if (i == args.length) { 907 System.err.println("-writeTable takes a string tablename value argument."); 908 printUsageAndExit(); 909 } 910 conf.set(HBASE_CANARY_REGION_WRITE_TABLE_NAME, args[i]); 911 } else if (cmd.equals("-f")) { 912 i++; 913 if (i == args.length) { 914 System.err.println("-f needs a boolean value argument (true|false)."); 915 printUsageAndExit(); 916 } 917 918 conf.setBoolean(HBASE_CANARY_FAIL_ON_ERROR, Boolean.parseBoolean(args[i])); 919 } else if (cmd.equals("-readTableTimeouts")) { 920 i++; 921 if (i == args.length) { 922 System.err.println("-readTableTimeouts needs a comma-separated list of read " 923 + "millisecond timeouts per table (without spaces)."); 924 printUsageAndExit(); 925 } 926 readTableTimeoutsStr = args[i]; 927 conf.set(HBASE_CANARY_REGION_READ_TABLE_TIMEOUT, readTableTimeoutsStr); 928 } else if (cmd.equals("-permittedZookeeperFailures")) { 929 i++; 930 931 if (i == args.length) { 932 System.err.println("-permittedZookeeperFailures needs a numeric value argument."); 933 printUsageAndExit(); 934 } 935 try { 936 permittedFailures = Long.parseLong(args[i]); 937 } catch (NumberFormatException e) { 938 System.err.println("-permittedZookeeperFailures needs a numeric value argument."); 939 printUsageAndExit(); 940 } 941 conf.setLong(HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES, permittedFailures); 942 } else { 943 // no options match 944 System.err.println(cmd + " options is invalid."); 945 printUsageAndExit(); 946 } 947 } else if (index < 0) { 948 // keep track of first table name specified by the user 949 index = i; 950 } 951 } 952 if (regionServerAllRegions && !this.regionServerMode) { 953 System.err.println("-allRegions can only be specified in regionserver mode."); 954 printUsageAndExit(); 955 } 956 if (this.zookeeperMode) { 957 if (this.regionServerMode || regionServerAllRegions || writeSniffing) { 958 System.err.println("-zookeeper is exclusive and cannot be combined with " + "other modes."); 959 printUsageAndExit(); 960 } 961 } 962 if (permittedFailures != 0 && !this.zookeeperMode) { 963 System.err.println("-permittedZookeeperFailures requires -zookeeper mode."); 964 printUsageAndExit(); 965 } 966 if (readTableTimeoutsStr != null && (this.regionServerMode || this.zookeeperMode)) { 967 System.err.println("-readTableTimeouts can only be configured in region mode."); 968 printUsageAndExit(); 969 } 970 return index; 971 } 972 973 @Override 974 public int run(String[] args) throws Exception { 975 int index = parseArgs(args); 976 String[] monitorTargets = null; 977 978 if (index >= 0) { 979 int length = args.length - index; 980 monitorTargets = new String[length]; 981 System.arraycopy(args, index, monitorTargets, 0, length); 982 } 983 if (interval > 0) { 984 // Only show the web page in daemon mode 985 putUpWebUI(); 986 } 987 if (zookeeperMode) { 988 return checkZooKeeper(); 989 } else if (regionServerMode) { 990 return checkRegionServers(monitorTargets); 991 } else { 992 return checkRegions(monitorTargets); 993 } 994 } 995 996 private int runMonitor(String[] monitorTargets) throws Exception { 997 ChoreService choreService = null; 998 999 // Launches chore for refreshing kerberos credentials if security is enabled. 1000 // Please see http://hbase.apache.org/book.html#_running_canary_in_a_kerberos_enabled_cluster 1001 // for more details. 1002 final ScheduledChore authChore = AuthUtil.getAuthChore(conf); 1003 if (authChore != null) { 1004 choreService = new ChoreService("CANARY_TOOL"); 1005 choreService.scheduleChore(authChore); 1006 } 1007 1008 // Start to prepare the stuffs 1009 Monitor monitor = null; 1010 Thread monitorThread; 1011 long startTime = 0; 1012 long currentTimeLength = 0; 1013 boolean failOnError = conf.getBoolean(HBASE_CANARY_FAIL_ON_ERROR, true); 1014 long timeout = conf.getLong(HBASE_CANARY_TIMEOUT, DEFAULT_TIMEOUT); 1015 // Get a connection to use in below. 1016 try (Connection connection = ConnectionFactory.createConnection(this.conf)) { 1017 do { 1018 // Do monitor !! 1019 try { 1020 monitor = this.newMonitor(connection, monitorTargets); 1021 startTime = EnvironmentEdgeManager.currentTime(); 1022 monitorThread = new Thread(monitor, "CanaryMonitor-" + startTime); 1023 monitorThread.start(); 1024 while (!monitor.isDone()) { 1025 // wait for 1 sec 1026 Thread.sleep(1000); 1027 // exit if any error occurs 1028 if (failOnError && monitor.hasError()) { 1029 monitorThread.interrupt(); 1030 if (monitor.initialized) { 1031 return monitor.errorCode; 1032 } else { 1033 return INIT_ERROR_EXIT_CODE; 1034 } 1035 } 1036 currentTimeLength = EnvironmentEdgeManager.currentTime() - startTime; 1037 if (currentTimeLength > timeout) { 1038 LOG.error("The monitor is running too long (" + currentTimeLength 1039 + ") after timeout limit:" + timeout + " will be killed itself !!"); 1040 if (monitor.initialized) { 1041 return TIMEOUT_ERROR_EXIT_CODE; 1042 } else { 1043 return INIT_ERROR_EXIT_CODE; 1044 } 1045 } 1046 } 1047 1048 if (failOnError && monitor.finalCheckForErrors()) { 1049 monitorThread.interrupt(); 1050 return monitor.errorCode; 1051 } 1052 } finally { 1053 if (monitor != null) { 1054 monitor.close(); 1055 } 1056 } 1057 1058 Thread.sleep(interval); 1059 } while (interval > 0); 1060 } // try-with-resources close 1061 1062 if (choreService != null) { 1063 choreService.shutdown(); 1064 } 1065 return monitor.errorCode; 1066 } 1067 1068 @Override 1069 public Map<String, String> getReadFailures() { 1070 return sink.getReadFailures(); 1071 } 1072 1073 @Override 1074 public Map<String, String> getWriteFailures() { 1075 return sink.getWriteFailures(); 1076 } 1077 1078 private void printUsageAndExit() { 1079 System.err.println( 1080 "Usage: canary [OPTIONS] [<TABLE1> [<TABLE2]...] | [<REGIONSERVER1> [<REGIONSERVER2]..]"); 1081 System.err.println("Where [OPTIONS] are:"); 1082 System.err.println(" -h,-help show this help and exit."); 1083 System.err.println( 1084 " -regionserver set 'regionserver mode'; gets row from random region on " + "server"); 1085 System.err.println( 1086 " -allRegions get from ALL regions when 'regionserver mode', not just " + "random one."); 1087 System.err.println(" -zookeeper set 'zookeeper mode'; grab zookeeper.znode.parent on " 1088 + "each ensemble member"); 1089 System.err.println(" -daemon continuous check at defined intervals."); 1090 System.err.println(" -interval <N> interval between checks in seconds"); 1091 System.err 1092 .println(" -e consider table/regionserver argument as regular " + "expression"); 1093 System.err.println(" -f <B> exit on first error; default=true"); 1094 System.err.println(" -failureAsError treat read/write failure as error"); 1095 System.err.println(" -t <N> timeout for canary-test run; default=600000ms"); 1096 System.err.println(" -writeSniffing enable write sniffing"); 1097 System.err.println(" -writeTable the table used for write sniffing; default=hbase:canary"); 1098 System.err.println(" -writeTableTimeout <N> timeout for writeTable; default=600000ms"); 1099 System.err.println( 1100 " -readTableTimeouts <tableName>=<read timeout>," + "<tableName>=<read timeout>,..."); 1101 System.err 1102 .println(" comma-separated list of table read timeouts " + "(no spaces);"); 1103 System.err.println(" logs 'ERROR' if takes longer. default=600000ms"); 1104 System.err.println(" -permittedZookeeperFailures <N> Ignore first N failures attempting to "); 1105 System.err.println(" connect to individual zookeeper nodes in ensemble"); 1106 System.err.println(""); 1107 System.err.println(" -D<configProperty>=<value> to assign or override configuration params"); 1108 System.err.println(" -Dhbase.canary.read.raw.enabled=<true/false> Set to enable/disable " 1109 + "raw scan; default=false"); 1110 System.err.println( 1111 " -Dhbase.canary.info.port=PORT_NUMBER Set for a Canary UI; " + "default=-1 (None)"); 1112 System.err.println(""); 1113 System.err.println( 1114 "Canary runs in one of three modes: region (default), regionserver, or " + "zookeeper."); 1115 System.err.println("To sniff/probe all regions, pass no arguments."); 1116 System.err.println("To sniff/probe all regions of a table, pass tablename."); 1117 System.err.println("To sniff/probe regionservers, pass -regionserver, etc."); 1118 System.err.println("See http://hbase.apache.org/book.html#_canary for Canary documentation."); 1119 System.exit(USAGE_EXIT_CODE); 1120 } 1121 1122 Sink getSink(Configuration configuration, Class clazz) { 1123 // In test context, this.sink might be set. Use it if non-null. For testing. 1124 return this.sink != null 1125 ? this.sink 1126 : (Sink) ReflectionUtils 1127 .newInstance(configuration.getClass("hbase.canary.sink.class", clazz, Sink.class)); 1128 } 1129 1130 /** 1131 * Canary region mode-specific data structure which stores information about each region to be 1132 * scanned 1133 */ 1134 public static class RegionTaskResult { 1135 private RegionInfo region; 1136 private TableName tableName; 1137 private ServerName serverName; 1138 private ColumnFamilyDescriptor column; 1139 private AtomicLong readLatency = null; 1140 private AtomicLong writeLatency = null; 1141 private boolean readSuccess = false; 1142 private boolean writeSuccess = false; 1143 1144 public RegionTaskResult(RegionInfo region, TableName tableName, ServerName serverName, 1145 ColumnFamilyDescriptor column) { 1146 this.region = region; 1147 this.tableName = tableName; 1148 this.serverName = serverName; 1149 this.column = column; 1150 } 1151 1152 public RegionInfo getRegionInfo() { 1153 return this.region; 1154 } 1155 1156 public String getRegionNameAsString() { 1157 return this.region.getRegionNameAsString(); 1158 } 1159 1160 public TableName getTableName() { 1161 return this.tableName; 1162 } 1163 1164 public String getTableNameAsString() { 1165 return this.tableName.getNameAsString(); 1166 } 1167 1168 public ServerName getServerName() { 1169 return this.serverName; 1170 } 1171 1172 public String getServerNameAsString() { 1173 return this.serverName.getServerName(); 1174 } 1175 1176 public ColumnFamilyDescriptor getColumnFamily() { 1177 return this.column; 1178 } 1179 1180 public String getColumnFamilyNameAsString() { 1181 return this.column.getNameAsString(); 1182 } 1183 1184 public long getReadLatency() { 1185 if (this.readLatency == null) { 1186 return -1; 1187 } 1188 return this.readLatency.get(); 1189 } 1190 1191 public void setReadLatency(long readLatency) { 1192 if (this.readLatency != null) { 1193 this.readLatency.set(readLatency); 1194 } else { 1195 this.readLatency = new AtomicLong(readLatency); 1196 } 1197 } 1198 1199 public long getWriteLatency() { 1200 if (this.writeLatency == null) { 1201 return -1; 1202 } 1203 return this.writeLatency.get(); 1204 } 1205 1206 public void setWriteLatency(long writeLatency) { 1207 if (this.writeLatency != null) { 1208 this.writeLatency.set(writeLatency); 1209 } else { 1210 this.writeLatency = new AtomicLong(writeLatency); 1211 } 1212 } 1213 1214 public boolean isReadSuccess() { 1215 return this.readSuccess; 1216 } 1217 1218 public void setReadSuccess() { 1219 this.readSuccess = true; 1220 } 1221 1222 public boolean isWriteSuccess() { 1223 return this.writeSuccess; 1224 } 1225 1226 public void setWriteSuccess() { 1227 this.writeSuccess = true; 1228 } 1229 } 1230 1231 /** 1232 * A Factory method for {@link Monitor}. Makes a RegionServerMonitor, or a ZooKeeperMonitor, or a 1233 * RegionMonitor. 1234 * @return a Monitor instance 1235 */ 1236 private Monitor newMonitor(final Connection connection, String[] monitorTargets) { 1237 Monitor monitor; 1238 boolean useRegExp = conf.getBoolean(HBASE_CANARY_USE_REGEX, false); 1239 boolean regionServerAllRegions = conf.getBoolean(HBASE_CANARY_REGIONSERVER_ALL_REGIONS, false); 1240 boolean failOnError = conf.getBoolean(HBASE_CANARY_FAIL_ON_ERROR, true); 1241 int permittedFailures = conf.getInt(HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES, 0); 1242 boolean writeSniffing = conf.getBoolean(HBASE_CANARY_REGION_WRITE_SNIFFING, false); 1243 String writeTableName = 1244 conf.get(HBASE_CANARY_REGION_WRITE_TABLE_NAME, DEFAULT_WRITE_TABLE_NAME.getNameAsString()); 1245 long configuredWriteTableTimeout = 1246 conf.getLong(HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT, DEFAULT_TIMEOUT); 1247 1248 if (this.regionServerMode) { 1249 monitor = new RegionServerMonitor(connection, monitorTargets, useRegExp, 1250 getSink(connection.getConfiguration(), RegionServerStdOutSink.class), this.executor, 1251 regionServerAllRegions, failOnError, permittedFailures); 1252 1253 } else if (this.zookeeperMode) { 1254 monitor = new ZookeeperMonitor(connection, monitorTargets, useRegExp, 1255 getSink(connection.getConfiguration(), ZookeeperStdOutSink.class), this.executor, 1256 failOnError, permittedFailures); 1257 } else { 1258 monitor = new RegionMonitor(connection, monitorTargets, useRegExp, 1259 getSink(connection.getConfiguration(), RegionStdOutSink.class), this.executor, 1260 writeSniffing, TableName.valueOf(writeTableName), failOnError, configuredReadTableTimeouts, 1261 configuredWriteTableTimeout, permittedFailures); 1262 } 1263 return monitor; 1264 } 1265 1266 private void populateReadTableTimeoutsMap(String configuredReadTableTimeoutsStr) { 1267 String[] tableTimeouts = configuredReadTableTimeoutsStr.split(","); 1268 for (String tT : tableTimeouts) { 1269 String[] nameTimeout = tT.split("="); 1270 if (nameTimeout.length < 2) { 1271 throw new IllegalArgumentException("Each -readTableTimeouts argument must be of the form " 1272 + "<tableName>=<read timeout> (without spaces)."); 1273 } 1274 long timeoutVal; 1275 try { 1276 timeoutVal = Long.parseLong(nameTimeout[1]); 1277 } catch (NumberFormatException e) { 1278 throw new IllegalArgumentException( 1279 "-readTableTimeouts read timeout for each table" + " must be a numeric value argument."); 1280 } 1281 configuredReadTableTimeouts.put(nameTimeout[0], timeoutVal); 1282 } 1283 } 1284 1285 /** 1286 * A Monitor super-class can be extended by users 1287 */ 1288 public static abstract class Monitor implements Runnable, Closeable { 1289 protected Connection connection; 1290 protected Admin admin; 1291 /** 1292 * 'Target' dependent on 'mode'. Could be Tables or RegionServers or ZNodes. Passed on the 1293 * command-line as arguments. 1294 */ 1295 protected String[] targets; 1296 protected boolean useRegExp; 1297 protected boolean treatFailureAsError; 1298 protected boolean initialized = false; 1299 1300 protected boolean done = false; 1301 protected int errorCode = 0; 1302 protected long allowedFailures = 0; 1303 protected Sink sink; 1304 protected ExecutorService executor; 1305 1306 public boolean isDone() { 1307 return done; 1308 } 1309 1310 public boolean hasError() { 1311 return errorCode != 0; 1312 } 1313 1314 public boolean finalCheckForErrors() { 1315 if (errorCode != 0) { 1316 return true; 1317 } 1318 if ( 1319 treatFailureAsError && (sink.getReadFailureCount() > allowedFailures 1320 || sink.getWriteFailureCount() > allowedFailures) 1321 ) { 1322 LOG.error("Too many failures detected, treating failure as error, failing the Canary."); 1323 errorCode = FAILURE_EXIT_CODE; 1324 return true; 1325 } 1326 return false; 1327 } 1328 1329 @Override 1330 public void close() throws IOException { 1331 if (this.admin != null) { 1332 this.admin.close(); 1333 } 1334 } 1335 1336 protected Monitor(Connection connection, String[] monitorTargets, boolean useRegExp, Sink sink, 1337 ExecutorService executor, boolean treatFailureAsError, long allowedFailures) { 1338 if (null == connection) { 1339 throw new IllegalArgumentException("connection shall not be null"); 1340 } 1341 1342 this.connection = connection; 1343 this.targets = monitorTargets; 1344 this.useRegExp = useRegExp; 1345 this.treatFailureAsError = treatFailureAsError; 1346 this.sink = sink; 1347 this.executor = executor; 1348 this.allowedFailures = allowedFailures; 1349 } 1350 1351 @Override 1352 public abstract void run(); 1353 1354 protected boolean initAdmin() { 1355 if (null == this.admin) { 1356 try { 1357 this.admin = this.connection.getAdmin(); 1358 } catch (Exception e) { 1359 LOG.error("Initial HBaseAdmin failed...", e); 1360 this.errorCode = INIT_ERROR_EXIT_CODE; 1361 } 1362 } else if (admin.isAborted()) { 1363 LOG.error("HBaseAdmin aborted"); 1364 this.errorCode = INIT_ERROR_EXIT_CODE; 1365 } 1366 return !this.hasError(); 1367 } 1368 } 1369 1370 /** 1371 * A monitor for region mode. 1372 */ 1373 private static class RegionMonitor extends Monitor { 1374 // 10 minutes 1375 private static final int DEFAULT_WRITE_TABLE_CHECK_PERIOD = 10 * 60 * 1000; 1376 // 1 days 1377 private static final int DEFAULT_WRITE_DATA_TTL = 24 * 60 * 60; 1378 1379 private long lastCheckTime = -1; 1380 private boolean writeSniffing; 1381 private TableName writeTableName; 1382 private int writeDataTTL; 1383 private float regionsLowerLimit; 1384 private float regionsUpperLimit; 1385 private int checkPeriod; 1386 private boolean rawScanEnabled; 1387 private boolean readAllCF; 1388 1389 /** 1390 * This is a timeout per table. If read of each region in the table aggregated takes longer than 1391 * what is configured here, we log an ERROR rather than just an INFO. 1392 */ 1393 private HashMap<String, Long> configuredReadTableTimeouts; 1394 1395 private long configuredWriteTableTimeout; 1396 1397 public RegionMonitor(Connection connection, String[] monitorTargets, boolean useRegExp, 1398 Sink sink, ExecutorService executor, boolean writeSniffing, TableName writeTableName, 1399 boolean treatFailureAsError, HashMap<String, Long> configuredReadTableTimeouts, 1400 long configuredWriteTableTimeout, long allowedFailures) { 1401 super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError, 1402 allowedFailures); 1403 Configuration conf = connection.getConfiguration(); 1404 this.writeSniffing = writeSniffing; 1405 this.writeTableName = writeTableName; 1406 this.writeDataTTL = 1407 conf.getInt(HConstants.HBASE_CANARY_WRITE_DATA_TTL_KEY, DEFAULT_WRITE_DATA_TTL); 1408 this.regionsLowerLimit = 1409 conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY, 1.0f); 1410 this.regionsUpperLimit = 1411 conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_UPPERLIMIT_KEY, 1.5f); 1412 this.checkPeriod = conf.getInt(HConstants.HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY, 1413 DEFAULT_WRITE_TABLE_CHECK_PERIOD); 1414 this.rawScanEnabled = conf.getBoolean(HConstants.HBASE_CANARY_READ_RAW_SCAN_KEY, false); 1415 this.configuredReadTableTimeouts = new HashMap<>(configuredReadTableTimeouts); 1416 this.configuredWriteTableTimeout = configuredWriteTableTimeout; 1417 this.readAllCF = conf.getBoolean(HConstants.HBASE_CANARY_READ_ALL_CF, true); 1418 } 1419 1420 private RegionStdOutSink getSink() { 1421 if (!(sink instanceof RegionStdOutSink)) { 1422 throw new RuntimeException("Can only write to Region sink"); 1423 } 1424 return ((RegionStdOutSink) sink); 1425 } 1426 1427 @Override 1428 public void run() { 1429 if (this.initAdmin()) { 1430 try { 1431 List<Future<Void>> taskFutures = new LinkedList<>(); 1432 RegionStdOutSink regionSink = this.getSink(); 1433 regionSink.resetFailuresCountDetails(); 1434 if (this.targets != null && this.targets.length > 0) { 1435 String[] tables = generateMonitorTables(this.targets); 1436 // Check to see that each table name passed in the -readTableTimeouts argument is also 1437 // passed as a monitor target. 1438 if ( 1439 !new HashSet<>(Arrays.asList(tables)) 1440 .containsAll(this.configuredReadTableTimeouts.keySet()) 1441 ) { 1442 LOG.error("-readTableTimeouts can only specify read timeouts for monitor targets " 1443 + "passed via command line."); 1444 this.errorCode = USAGE_EXIT_CODE; 1445 return; 1446 } 1447 this.initialized = true; 1448 for (String table : tables) { 1449 LongAdder readLatency = regionSink.initializeAndGetReadLatencyForTable(table); 1450 taskFutures.addAll(CanaryTool.sniff(admin, regionSink, table, executor, TaskType.READ, 1451 this.rawScanEnabled, readLatency, readAllCF)); 1452 } 1453 } else { 1454 taskFutures.addAll(sniff(TaskType.READ, regionSink)); 1455 } 1456 1457 if (writeSniffing) { 1458 if (EnvironmentEdgeManager.currentTime() - lastCheckTime > checkPeriod) { 1459 try { 1460 checkWriteTableDistribution(); 1461 } catch (IOException e) { 1462 LOG.error("Check canary table distribution failed!", e); 1463 } 1464 lastCheckTime = EnvironmentEdgeManager.currentTime(); 1465 } 1466 // sniff canary table with write operation 1467 regionSink.initializeWriteLatency(); 1468 LongAdder writeTableLatency = regionSink.getWriteLatency(); 1469 taskFutures 1470 .addAll(CanaryTool.sniff(admin, regionSink, admin.getDescriptor(writeTableName), 1471 executor, TaskType.WRITE, this.rawScanEnabled, writeTableLatency, readAllCF)); 1472 } 1473 1474 for (Future<Void> future : taskFutures) { 1475 try { 1476 future.get(); 1477 } catch (ExecutionException e) { 1478 LOG.error("Sniff region failed!", e); 1479 } 1480 } 1481 Map<String, LongAdder> actualReadTableLatency = regionSink.getReadLatencyMap(); 1482 for (Map.Entry<String, Long> entry : configuredReadTableTimeouts.entrySet()) { 1483 String tableName = entry.getKey(); 1484 if (actualReadTableLatency.containsKey(tableName)) { 1485 Long actual = actualReadTableLatency.get(tableName).longValue(); 1486 Long configured = entry.getValue(); 1487 if (actual > configured) { 1488 LOG.error("Read operation for {} took {}ms exceeded the configured read timeout." 1489 + "(Configured read timeout {}ms.", tableName, actual, configured); 1490 } else { 1491 LOG.info("Read operation for {} took {}ms (Configured read timeout {}ms.", 1492 tableName, actual, configured); 1493 } 1494 } else { 1495 LOG.error("Read operation for {} failed!", tableName); 1496 } 1497 } 1498 if (this.writeSniffing) { 1499 String writeTableStringName = this.writeTableName.getNameAsString(); 1500 long actualWriteLatency = regionSink.getWriteLatency().longValue(); 1501 LOG.info("Write operation for {} took {}ms. Configured write timeout {}ms.", 1502 writeTableStringName, actualWriteLatency, this.configuredWriteTableTimeout); 1503 // Check that the writeTable write operation latency does not exceed the configured 1504 // timeout. 1505 if (actualWriteLatency > this.configuredWriteTableTimeout) { 1506 LOG.error("Write operation for {} exceeded the configured write timeout.", 1507 writeTableStringName); 1508 } 1509 } 1510 } catch (Exception e) { 1511 LOG.error("Run regionMonitor failed", e); 1512 this.errorCode = ERROR_EXIT_CODE; 1513 } finally { 1514 this.done = true; 1515 } 1516 } 1517 this.done = true; 1518 } 1519 1520 /** Returns List of tables to use in test. */ 1521 private String[] generateMonitorTables(String[] monitorTargets) throws IOException { 1522 String[] returnTables = null; 1523 1524 if (this.useRegExp) { 1525 Pattern pattern = null; 1526 List<TableDescriptor> tds = null; 1527 Set<String> tmpTables = new TreeSet<>(); 1528 try { 1529 LOG.debug(String.format("reading list of tables")); 1530 tds = this.admin.listTableDescriptors(pattern); 1531 if (tds == null) { 1532 tds = Collections.emptyList(); 1533 } 1534 for (String monitorTarget : monitorTargets) { 1535 pattern = Pattern.compile(monitorTarget); 1536 for (TableDescriptor td : tds) { 1537 if (pattern.matcher(td.getTableName().getNameAsString()).matches()) { 1538 tmpTables.add(td.getTableName().getNameAsString()); 1539 } 1540 } 1541 } 1542 } catch (IOException e) { 1543 LOG.error("Communicate with admin failed", e); 1544 throw e; 1545 } 1546 1547 if (tmpTables.size() > 0) { 1548 returnTables = tmpTables.toArray(new String[tmpTables.size()]); 1549 } else { 1550 String msg = "No HTable found, tablePattern:" + Arrays.toString(monitorTargets); 1551 LOG.error(msg); 1552 this.errorCode = INIT_ERROR_EXIT_CODE; 1553 throw new TableNotFoundException(msg); 1554 } 1555 } else { 1556 returnTables = monitorTargets; 1557 } 1558 1559 return returnTables; 1560 } 1561 1562 /* 1563 * Canary entry point to monitor all the tables. 1564 */ 1565 private List<Future<Void>> sniff(TaskType taskType, RegionStdOutSink regionSink) 1566 throws Exception { 1567 LOG.debug("Reading list of tables"); 1568 List<Future<Void>> taskFutures = new LinkedList<>(); 1569 for (TableDescriptor td : admin.listTableDescriptors()) { 1570 if ( 1571 admin.tableExists(td.getTableName()) && admin.isTableEnabled(td.getTableName()) 1572 && (!td.getTableName().equals(writeTableName)) 1573 ) { 1574 LongAdder readLatency = 1575 regionSink.initializeAndGetReadLatencyForTable(td.getTableName().getNameAsString()); 1576 taskFutures.addAll(CanaryTool.sniff(admin, sink, td, executor, taskType, 1577 this.rawScanEnabled, readLatency, readAllCF)); 1578 } 1579 } 1580 return taskFutures; 1581 } 1582 1583 private void checkWriteTableDistribution() throws IOException { 1584 if (!admin.tableExists(writeTableName)) { 1585 int numberOfServers = admin.getRegionServers().size(); 1586 if (numberOfServers == 0) { 1587 throw new IllegalStateException("No live regionservers"); 1588 } 1589 createWriteTable(numberOfServers); 1590 } 1591 1592 if (!admin.isTableEnabled(writeTableName)) { 1593 admin.enableTable(writeTableName); 1594 } 1595 1596 ClusterMetrics status = 1597 admin.getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER)); 1598 int numberOfServers = status.getServersName().size(); 1599 if (status.getServersName().contains(status.getMasterName())) { 1600 numberOfServers -= 1; 1601 } 1602 1603 List<Pair<RegionInfo, ServerName>> pairs = 1604 MetaTableAccessor.getTableRegionsAndLocations(connection, writeTableName); 1605 int numberOfRegions = pairs.size(); 1606 if ( 1607 numberOfRegions < numberOfServers * regionsLowerLimit 1608 || numberOfRegions > numberOfServers * regionsUpperLimit 1609 ) { 1610 admin.disableTable(writeTableName); 1611 admin.deleteTable(writeTableName); 1612 createWriteTable(numberOfServers); 1613 } 1614 HashSet<ServerName> serverSet = new HashSet<>(); 1615 for (Pair<RegionInfo, ServerName> pair : pairs) { 1616 serverSet.add(pair.getSecond()); 1617 } 1618 int numberOfCoveredServers = serverSet.size(); 1619 if (numberOfCoveredServers < numberOfServers) { 1620 admin.balance(); 1621 } 1622 } 1623 1624 private void createWriteTable(int numberOfServers) throws IOException { 1625 int numberOfRegions = (int) (numberOfServers * regionsLowerLimit); 1626 LOG.info("Number of live regionservers {}, pre-splitting the canary table into {} regions " 1627 + "(current lower limit of regions per server is {} and you can change it with config {}).", 1628 numberOfServers, numberOfRegions, regionsLowerLimit, 1629 HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY); 1630 HTableDescriptor desc = new HTableDescriptor(writeTableName); 1631 HColumnDescriptor family = new HColumnDescriptor(CANARY_TABLE_FAMILY_NAME); 1632 family.setMaxVersions(1); 1633 family.setTimeToLive(writeDataTTL); 1634 1635 desc.addFamily(family); 1636 byte[][] splits = new RegionSplitter.HexStringSplit().split(numberOfRegions); 1637 admin.createTable(desc, splits); 1638 } 1639 } 1640 1641 /** 1642 * Canary entry point for specified table. 1643 * @throws Exception exception 1644 */ 1645 private static List<Future<Void>> sniff(final Admin admin, final Sink sink, String tableName, 1646 ExecutorService executor, TaskType taskType, boolean rawScanEnabled, LongAdder readLatency, 1647 boolean readAllCF) throws Exception { 1648 LOG.debug("Checking table is enabled and getting table descriptor for table {}", tableName); 1649 if (admin.isTableEnabled(TableName.valueOf(tableName))) { 1650 return CanaryTool.sniff(admin, sink, admin.getDescriptor(TableName.valueOf(tableName)), 1651 executor, taskType, rawScanEnabled, readLatency, readAllCF); 1652 } else { 1653 LOG.warn("Table {} is not enabled", tableName); 1654 } 1655 return new LinkedList<>(); 1656 } 1657 1658 /* 1659 * Loops over regions of this table, and outputs information about the state. 1660 */ 1661 private static List<Future<Void>> sniff(final Admin admin, final Sink sink, 1662 TableDescriptor tableDesc, ExecutorService executor, TaskType taskType, boolean rawScanEnabled, 1663 LongAdder rwLatency, boolean readAllCF) throws Exception { 1664 LOG.debug("Reading list of regions for table {}", tableDesc.getTableName()); 1665 try (Table table = admin.getConnection().getTable(tableDesc.getTableName())) { 1666 List<RegionTask> tasks = new ArrayList<>(); 1667 try (RegionLocator regionLocator = 1668 admin.getConnection().getRegionLocator(tableDesc.getTableName())) { 1669 for (HRegionLocation location : regionLocator.getAllRegionLocations()) { 1670 if (location == null) { 1671 LOG.warn("Null location"); 1672 continue; 1673 } 1674 ServerName rs = location.getServerName(); 1675 RegionInfo region = location.getRegion(); 1676 tasks.add(new RegionTask(admin.getConnection(), region, rs, (RegionStdOutSink) sink, 1677 taskType, rawScanEnabled, rwLatency, readAllCF)); 1678 Map<String, List<RegionTaskResult>> regionMap = ((RegionStdOutSink) sink).getRegionMap(); 1679 regionMap.put(region.getRegionNameAsString(), new ArrayList<RegionTaskResult>()); 1680 } 1681 return executor.invokeAll(tasks); 1682 } 1683 } catch (TableNotFoundException e) { 1684 return Collections.EMPTY_LIST; 1685 } 1686 } 1687 1688 // monitor for zookeeper mode 1689 private static class ZookeeperMonitor extends Monitor { 1690 private List<String> hosts; 1691 private final String znode; 1692 private final int timeout; 1693 1694 protected ZookeeperMonitor(Connection connection, String[] monitorTargets, boolean useRegExp, 1695 Sink sink, ExecutorService executor, boolean treatFailureAsError, long allowedFailures) { 1696 super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError, 1697 allowedFailures); 1698 Configuration configuration = connection.getConfiguration(); 1699 znode = configuration.get(ZOOKEEPER_ZNODE_PARENT, DEFAULT_ZOOKEEPER_ZNODE_PARENT); 1700 timeout = 1701 configuration.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT); 1702 ConnectStringParser parser = 1703 new ConnectStringParser(ZKConfig.getZKQuorumServersString(configuration)); 1704 hosts = Lists.newArrayList(); 1705 for (InetSocketAddress server : parser.getServerAddresses()) { 1706 hosts.add(inetSocketAddress2String(server)); 1707 } 1708 if (allowedFailures > (hosts.size() - 1) / 2) { 1709 LOG.warn( 1710 "Confirm allowable number of failed ZooKeeper nodes, as quorum will " 1711 + "already be lost. Setting of {} failures is unexpected for {} ensemble size.", 1712 allowedFailures, hosts.size()); 1713 } 1714 } 1715 1716 @Override 1717 public void run() { 1718 List<ZookeeperTask> tasks = Lists.newArrayList(); 1719 ZookeeperStdOutSink zkSink = null; 1720 try { 1721 zkSink = this.getSink(); 1722 } catch (RuntimeException e) { 1723 LOG.error("Run ZooKeeperMonitor failed!", e); 1724 this.errorCode = ERROR_EXIT_CODE; 1725 } 1726 this.initialized = true; 1727 for (final String host : hosts) { 1728 tasks.add(new ZookeeperTask(connection, host, znode, timeout, zkSink)); 1729 } 1730 try { 1731 for (Future<Void> future : this.executor.invokeAll(tasks)) { 1732 try { 1733 future.get(); 1734 } catch (ExecutionException e) { 1735 LOG.error("Sniff zookeeper failed!", e); 1736 this.errorCode = ERROR_EXIT_CODE; 1737 } 1738 } 1739 } catch (InterruptedException e) { 1740 this.errorCode = ERROR_EXIT_CODE; 1741 Thread.currentThread().interrupt(); 1742 LOG.error("Sniff zookeeper interrupted!", e); 1743 } 1744 this.done = true; 1745 } 1746 1747 private ZookeeperStdOutSink getSink() { 1748 if (!(sink instanceof ZookeeperStdOutSink)) { 1749 throw new RuntimeException("Can only write to zookeeper sink"); 1750 } 1751 return ((ZookeeperStdOutSink) sink); 1752 } 1753 } 1754 1755 /** 1756 * A monitor for regionserver mode 1757 */ 1758 private static class RegionServerMonitor extends Monitor { 1759 private boolean allRegions; 1760 1761 public RegionServerMonitor(Connection connection, String[] monitorTargets, boolean useRegExp, 1762 Sink sink, ExecutorService executor, boolean allRegions, boolean treatFailureAsError, 1763 long allowedFailures) { 1764 super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError, 1765 allowedFailures); 1766 this.allRegions = allRegions; 1767 } 1768 1769 private RegionServerStdOutSink getSink() { 1770 if (!(sink instanceof RegionServerStdOutSink)) { 1771 throw new RuntimeException("Can only write to regionserver sink"); 1772 } 1773 return ((RegionServerStdOutSink) sink); 1774 } 1775 1776 @Override 1777 public void run() { 1778 if (this.initAdmin() && this.checkNoTableNames()) { 1779 RegionServerStdOutSink regionServerSink = null; 1780 try { 1781 regionServerSink = this.getSink(); 1782 } catch (RuntimeException e) { 1783 LOG.error("Run RegionServerMonitor failed!", e); 1784 this.errorCode = ERROR_EXIT_CODE; 1785 } 1786 Map<String, List<RegionInfo>> rsAndRMap = this.filterRegionServerByName(); 1787 this.initialized = true; 1788 this.monitorRegionServers(rsAndRMap, regionServerSink); 1789 } 1790 this.done = true; 1791 } 1792 1793 private boolean checkNoTableNames() { 1794 List<String> foundTableNames = new ArrayList<>(); 1795 TableName[] tableNames = null; 1796 LOG.debug("Reading list of tables"); 1797 try { 1798 tableNames = this.admin.listTableNames(); 1799 } catch (IOException e) { 1800 LOG.error("Get listTableNames failed", e); 1801 this.errorCode = INIT_ERROR_EXIT_CODE; 1802 return false; 1803 } 1804 1805 if (this.targets == null || this.targets.length == 0) { 1806 return true; 1807 } 1808 1809 for (String target : this.targets) { 1810 for (TableName tableName : tableNames) { 1811 if (target.equals(tableName.getNameAsString())) { 1812 foundTableNames.add(target); 1813 } 1814 } 1815 } 1816 1817 if (foundTableNames.size() > 0) { 1818 System.err.println("Cannot pass a tablename when using the -regionserver " 1819 + "option, tablenames:" + foundTableNames.toString()); 1820 this.errorCode = USAGE_EXIT_CODE; 1821 } 1822 return foundTableNames.isEmpty(); 1823 } 1824 1825 private void monitorRegionServers(Map<String, List<RegionInfo>> rsAndRMap, 1826 RegionServerStdOutSink regionServerSink) { 1827 List<RegionServerTask> tasks = new ArrayList<>(); 1828 Map<String, AtomicLong> successMap = new HashMap<>(); 1829 for (Map.Entry<String, List<RegionInfo>> entry : rsAndRMap.entrySet()) { 1830 String serverName = entry.getKey(); 1831 AtomicLong successes = new AtomicLong(0); 1832 successMap.put(serverName, successes); 1833 if (entry.getValue().isEmpty()) { 1834 LOG.error("Regionserver not serving any regions - {}", serverName); 1835 } else if (this.allRegions) { 1836 for (RegionInfo region : entry.getValue()) { 1837 tasks.add(new RegionServerTask(this.connection, serverName, region, regionServerSink, 1838 successes)); 1839 } 1840 } else { 1841 // random select a region if flag not set 1842 RegionInfo region = 1843 entry.getValue().get(ThreadLocalRandom.current().nextInt(entry.getValue().size())); 1844 tasks.add( 1845 new RegionServerTask(this.connection, serverName, region, regionServerSink, successes)); 1846 } 1847 } 1848 try { 1849 for (Future<Void> future : this.executor.invokeAll(tasks)) { 1850 try { 1851 future.get(); 1852 } catch (ExecutionException e) { 1853 LOG.error("Sniff regionserver failed!", e); 1854 this.errorCode = ERROR_EXIT_CODE; 1855 } 1856 } 1857 if (this.allRegions) { 1858 for (Map.Entry<String, List<RegionInfo>> entry : rsAndRMap.entrySet()) { 1859 String serverName = entry.getKey(); 1860 LOG.info("Successfully read {} regions out of {} on regionserver {}", 1861 successMap.get(serverName), entry.getValue().size(), serverName); 1862 } 1863 } 1864 } catch (InterruptedException e) { 1865 this.errorCode = ERROR_EXIT_CODE; 1866 LOG.error("Sniff regionserver interrupted!", e); 1867 } 1868 } 1869 1870 private Map<String, List<RegionInfo>> filterRegionServerByName() { 1871 Map<String, List<RegionInfo>> regionServerAndRegionsMap = this.getAllRegionServerByName(); 1872 regionServerAndRegionsMap = this.doFilterRegionServerByName(regionServerAndRegionsMap); 1873 return regionServerAndRegionsMap; 1874 } 1875 1876 private Map<String, List<RegionInfo>> getAllRegionServerByName() { 1877 Map<String, List<RegionInfo>> rsAndRMap = new HashMap<>(); 1878 try { 1879 LOG.debug("Reading list of tables and locations"); 1880 List<TableDescriptor> tableDescs = this.admin.listTableDescriptors(); 1881 List<RegionInfo> regions = null; 1882 for (TableDescriptor tableDesc : tableDescs) { 1883 try (RegionLocator regionLocator = 1884 this.admin.getConnection().getRegionLocator(tableDesc.getTableName())) { 1885 for (HRegionLocation location : regionLocator.getAllRegionLocations()) { 1886 if (location == null) { 1887 LOG.warn("Null location"); 1888 continue; 1889 } 1890 ServerName rs = location.getServerName(); 1891 String rsName = rs.getHostname(); 1892 RegionInfo r = location.getRegion(); 1893 if (rsAndRMap.containsKey(rsName)) { 1894 regions = rsAndRMap.get(rsName); 1895 } else { 1896 regions = new ArrayList<>(); 1897 rsAndRMap.put(rsName, regions); 1898 } 1899 regions.add(r); 1900 } 1901 } 1902 } 1903 1904 // get any live regionservers not serving any regions 1905 for (ServerName rs : this.admin.getRegionServers()) { 1906 String rsName = rs.getHostname(); 1907 if (!rsAndRMap.containsKey(rsName)) { 1908 rsAndRMap.put(rsName, Collections.<RegionInfo> emptyList()); 1909 } 1910 } 1911 } catch (IOException e) { 1912 LOG.error("Get HTables info failed", e); 1913 this.errorCode = INIT_ERROR_EXIT_CODE; 1914 } 1915 return rsAndRMap; 1916 } 1917 1918 private Map<String, List<RegionInfo>> 1919 doFilterRegionServerByName(Map<String, List<RegionInfo>> fullRsAndRMap) { 1920 1921 Map<String, List<RegionInfo>> filteredRsAndRMap = null; 1922 1923 if (this.targets != null && this.targets.length > 0) { 1924 filteredRsAndRMap = new HashMap<>(); 1925 Pattern pattern = null; 1926 Matcher matcher = null; 1927 boolean regExpFound = false; 1928 for (String rsName : this.targets) { 1929 if (this.useRegExp) { 1930 regExpFound = false; 1931 pattern = Pattern.compile(rsName); 1932 for (Map.Entry<String, List<RegionInfo>> entry : fullRsAndRMap.entrySet()) { 1933 matcher = pattern.matcher(entry.getKey()); 1934 if (matcher.matches()) { 1935 filteredRsAndRMap.put(entry.getKey(), entry.getValue()); 1936 regExpFound = true; 1937 } 1938 } 1939 if (!regExpFound) { 1940 LOG.info("No RegionServerInfo found, regionServerPattern {}", rsName); 1941 } 1942 } else { 1943 if (fullRsAndRMap.containsKey(rsName)) { 1944 filteredRsAndRMap.put(rsName, fullRsAndRMap.get(rsName)); 1945 } else { 1946 LOG.info("No RegionServerInfo found, regionServerName {}", rsName); 1947 } 1948 } 1949 } 1950 } else { 1951 filteredRsAndRMap = fullRsAndRMap; 1952 } 1953 return filteredRsAndRMap; 1954 } 1955 } 1956 1957 public static void main(String[] args) throws Exception { 1958 final Configuration conf = HBaseConfiguration.create(); 1959 1960 int numThreads = conf.getInt("hbase.canary.threads.num", MAX_THREADS_NUM); 1961 LOG.info("Execution thread count={}", numThreads); 1962 1963 int exitCode; 1964 ExecutorService executor = new ScheduledThreadPoolExecutor(numThreads); 1965 try { 1966 exitCode = ToolRunner.run(conf, new CanaryTool(executor), args); 1967 } finally { 1968 executor.shutdown(); 1969 } 1970 System.exit(exitCode); 1971 } 1972}