001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.tool; 019 020import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT; 021import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_ZNODE_PARENT; 022import static org.apache.hadoop.hbase.util.Addressing.inetSocketAddress2String; 023 024import java.io.Closeable; 025import java.io.IOException; 026import java.net.BindException; 027import java.net.InetSocketAddress; 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.Collections; 031import java.util.EnumSet; 032import java.util.HashMap; 033import java.util.HashSet; 034import java.util.LinkedList; 035import java.util.List; 036import java.util.Map; 037import java.util.Set; 038import java.util.TreeSet; 039import java.util.concurrent.Callable; 040import java.util.concurrent.ConcurrentHashMap; 041import java.util.concurrent.ConcurrentMap; 042import java.util.concurrent.ExecutionException; 043import java.util.concurrent.ExecutorService; 044import java.util.concurrent.Future; 045import java.util.concurrent.ScheduledThreadPoolExecutor; 046import java.util.concurrent.ThreadLocalRandom; 047import java.util.concurrent.atomic.AtomicLong; 048import java.util.concurrent.atomic.LongAdder; 049import java.util.regex.Matcher; 050import java.util.regex.Pattern; 051import org.apache.commons.lang3.time.StopWatch; 052import org.apache.hadoop.conf.Configuration; 053import org.apache.hadoop.hbase.AuthUtil; 054import org.apache.hadoop.hbase.ChoreService; 055import org.apache.hadoop.hbase.ClusterMetrics; 056import org.apache.hadoop.hbase.ClusterMetrics.Option; 057import org.apache.hadoop.hbase.DoNotRetryIOException; 058import org.apache.hadoop.hbase.HBaseConfiguration; 059import org.apache.hadoop.hbase.HBaseInterfaceAudience; 060import org.apache.hadoop.hbase.HConstants; 061import org.apache.hadoop.hbase.HRegionLocation; 062import org.apache.hadoop.hbase.MetaTableAccessor; 063import org.apache.hadoop.hbase.NamespaceDescriptor; 064import org.apache.hadoop.hbase.ScheduledChore; 065import org.apache.hadoop.hbase.ServerName; 066import org.apache.hadoop.hbase.TableName; 067import org.apache.hadoop.hbase.TableNotEnabledException; 068import org.apache.hadoop.hbase.TableNotFoundException; 069import org.apache.hadoop.hbase.client.Admin; 070import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 071import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 072import org.apache.hadoop.hbase.client.Connection; 073import org.apache.hadoop.hbase.client.ConnectionFactory; 074import org.apache.hadoop.hbase.client.Get; 075import org.apache.hadoop.hbase.client.Put; 076import org.apache.hadoop.hbase.client.RegionInfo; 077import org.apache.hadoop.hbase.client.RegionLocator; 078import org.apache.hadoop.hbase.client.ResultScanner; 079import org.apache.hadoop.hbase.client.Scan; 080import org.apache.hadoop.hbase.client.Table; 081import org.apache.hadoop.hbase.client.TableDescriptor; 082import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 083import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; 084import org.apache.hadoop.hbase.http.InfoServer; 085import org.apache.hadoop.hbase.tool.CanaryTool.RegionTask.TaskType; 086import org.apache.hadoop.hbase.util.Bytes; 087import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 088import org.apache.hadoop.hbase.util.Pair; 089import org.apache.hadoop.hbase.util.ReflectionUtils; 090import org.apache.hadoop.hbase.util.RegionSplitter; 091import org.apache.hadoop.hbase.zookeeper.EmptyWatcher; 092import org.apache.hadoop.hbase.zookeeper.ZKConfig; 093import org.apache.hadoop.util.Tool; 094import org.apache.hadoop.util.ToolRunner; 095import org.apache.yetus.audience.InterfaceAudience; 096import org.apache.zookeeper.KeeperException; 097import org.apache.zookeeper.ZooKeeper; 098import org.apache.zookeeper.client.ConnectStringParser; 099import org.apache.zookeeper.data.Stat; 100import org.slf4j.Logger; 101import org.slf4j.LoggerFactory; 102 103import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 104 105/** 106 * HBase Canary Tool for "canary monitoring" of a running HBase cluster. There are three modes: 107 * <ol> 108 * <li>region mode (Default): For each region, try to get one row per column family outputting 109 * information on failure (ERROR) or else the latency.</li> 110 * <li>regionserver mode: For each regionserver try to get one row from one table selected randomly 111 * outputting information on failure (ERROR) or else the latency.</li> 112 * <li>zookeeper mode: for each zookeeper instance, selects a znode outputting information on 113 * failure (ERROR) or else the latency.</li> 114 * </ol> 115 */ 116@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 117public class CanaryTool implements Tool, Canary { 118 public static final String HBASE_CANARY_INFO_PORT = "hbase.canary.info.port"; 119 public static final String HBASE_CANARY_INFO_BINDADDRESS = "hbase.canary.info.bindAddress"; 120 121 private void putUpWebUI() throws IOException { 122 int port = conf.getInt(HBASE_CANARY_INFO_PORT, -1); 123 // -1 is for disabling info server 124 if (port < 0) { 125 return; 126 } 127 if (zookeeperMode) { 128 LOG.info("WebUI is not supported in Zookeeper mode"); 129 } else if (regionServerMode) { 130 LOG.info("WebUI is not supported in RegionServer mode"); 131 } else { 132 String addr = conf.get(HBASE_CANARY_INFO_BINDADDRESS, "0.0.0.0"); 133 try { 134 InfoServer infoServer = new InfoServer("canary", addr, port, false, conf); 135 infoServer.addUnprivilegedServlet("canary", "/canary-status", CanaryStatusServlet.class); 136 infoServer.setAttribute("sink", getSink(conf, RegionStdOutSink.class)); 137 infoServer.start(); 138 LOG.info("Bind Canary http info server to {}:{} ", addr, port); 139 } catch (BindException e) { 140 LOG.warn("Failed binding Canary http info server to {}:{}", addr, port, e); 141 } 142 } 143 } 144 145 @Override 146 public int checkRegions(String[] targets) throws Exception { 147 String configuredReadTableTimeoutsStr = conf.get(HBASE_CANARY_REGION_READ_TABLE_TIMEOUT); 148 try { 149 if (configuredReadTableTimeoutsStr != null) { 150 populateReadTableTimeoutsMap(configuredReadTableTimeoutsStr); 151 } 152 } catch (IllegalArgumentException e) { 153 LOG.error("Constructing read table timeouts map failed ", e); 154 return USAGE_EXIT_CODE; 155 } 156 return runMonitor(targets); 157 } 158 159 @Override 160 public int checkRegionServers(String[] targets) throws Exception { 161 regionServerMode = true; 162 return runMonitor(targets); 163 } 164 165 @Override 166 public int checkZooKeeper() throws Exception { 167 zookeeperMode = true; 168 return runMonitor(null); 169 } 170 171 /** 172 * Sink interface used by the canary to output information 173 */ 174 public interface Sink { 175 long getReadFailureCount(); 176 177 long incReadFailureCount(); 178 179 Map<String, String> getReadFailures(); 180 181 void updateReadFailures(String regionName, String serverName); 182 183 long getWriteFailureCount(); 184 185 long incWriteFailureCount(); 186 187 Map<String, String> getWriteFailures(); 188 189 void updateWriteFailures(String regionName, String serverName); 190 191 long getReadSuccessCount(); 192 193 long incReadSuccessCount(); 194 195 long getWriteSuccessCount(); 196 197 long incWriteSuccessCount(); 198 } 199 200 /** 201 * Simple implementation of canary sink that allows plotting to a file or standard output. 202 */ 203 public static class StdOutSink implements Sink { 204 private AtomicLong readFailureCount = new AtomicLong(0), writeFailureCount = new AtomicLong(0), 205 readSuccessCount = new AtomicLong(0), writeSuccessCount = new AtomicLong(0); 206 private Map<String, String> readFailures = new ConcurrentHashMap<>(); 207 private Map<String, String> writeFailures = new ConcurrentHashMap<>(); 208 209 @Override 210 public long getReadFailureCount() { 211 return readFailureCount.get(); 212 } 213 214 @Override 215 public long incReadFailureCount() { 216 return readFailureCount.incrementAndGet(); 217 } 218 219 @Override 220 public Map<String, String> getReadFailures() { 221 return readFailures; 222 } 223 224 @Override 225 public void updateReadFailures(String regionName, String serverName) { 226 readFailures.put(regionName, serverName); 227 } 228 229 @Override 230 public long getWriteFailureCount() { 231 return writeFailureCount.get(); 232 } 233 234 @Override 235 public long incWriteFailureCount() { 236 return writeFailureCount.incrementAndGet(); 237 } 238 239 @Override 240 public Map<String, String> getWriteFailures() { 241 return writeFailures; 242 } 243 244 @Override 245 public void updateWriteFailures(String regionName, String serverName) { 246 writeFailures.put(regionName, serverName); 247 } 248 249 @Override 250 public long getReadSuccessCount() { 251 return readSuccessCount.get(); 252 } 253 254 @Override 255 public long incReadSuccessCount() { 256 return readSuccessCount.incrementAndGet(); 257 } 258 259 @Override 260 public long getWriteSuccessCount() { 261 return writeSuccessCount.get(); 262 } 263 264 @Override 265 public long incWriteSuccessCount() { 266 return writeSuccessCount.incrementAndGet(); 267 } 268 } 269 270 /** 271 * By RegionServer, for 'regionserver' mode. 272 */ 273 public static class RegionServerStdOutSink extends StdOutSink { 274 public void publishReadFailure(String table, String server) { 275 incReadFailureCount(); 276 LOG.error("Read from {} on {}", table, server); 277 } 278 279 public void publishReadTiming(String table, String server, long msTime) { 280 LOG.info("Read from {} on {} in {}ms", table, server, msTime); 281 } 282 } 283 284 /** 285 * Output for 'zookeeper' mode. 286 */ 287 public static class ZookeeperStdOutSink extends StdOutSink { 288 public void publishReadFailure(String znode, String server) { 289 incReadFailureCount(); 290 LOG.error("Read from {} on {}", znode, server); 291 } 292 293 public void publishReadTiming(String znode, String server, long msTime) { 294 LOG.info("Read from {} on {} in {}ms", znode, server, msTime); 295 } 296 } 297 298 /** 299 * By Region, for 'region' mode. 300 */ 301 public static class RegionStdOutSink extends StdOutSink { 302 private Map<String, LongAdder> perTableReadLatency = new HashMap<>(); 303 private LongAdder writeLatency = new LongAdder(); 304 private final ConcurrentMap<String, List<RegionTaskResult>> regionMap = 305 new ConcurrentHashMap<>(); 306 private ConcurrentMap<ServerName, LongAdder> perServerFailuresCount = new ConcurrentHashMap<>(); 307 private ConcurrentMap<String, LongAdder> perTableFailuresCount = new ConcurrentHashMap<>(); 308 309 public ConcurrentMap<ServerName, LongAdder> getPerServerFailuresCount() { 310 return perServerFailuresCount; 311 } 312 313 public ConcurrentMap<String, LongAdder> getPerTableFailuresCount() { 314 return perTableFailuresCount; 315 } 316 317 public void resetFailuresCountDetails() { 318 perServerFailuresCount.clear(); 319 perTableFailuresCount.clear(); 320 } 321 322 private void incFailuresCountDetails(ServerName serverName, RegionInfo region) { 323 perServerFailuresCount.compute(serverName, (server, count) -> { 324 if (count == null) { 325 count = new LongAdder(); 326 } 327 count.increment(); 328 return count; 329 }); 330 perTableFailuresCount.compute(region.getTable().getNameAsString(), (tableName, count) -> { 331 if (count == null) { 332 count = new LongAdder(); 333 } 334 count.increment(); 335 return count; 336 }); 337 } 338 339 public void publishReadFailure(ServerName serverName, RegionInfo region, Exception e) { 340 incReadFailureCount(); 341 incFailuresCountDetails(serverName, region); 342 LOG.error("Read from {} on serverName={} failed", region.getRegionNameAsString(), serverName, 343 e); 344 } 345 346 public void publishReadFailure(ServerName serverName, RegionInfo region, 347 ColumnFamilyDescriptor column, Exception e) { 348 incReadFailureCount(); 349 incFailuresCountDetails(serverName, region); 350 LOG.error("Read from {} on serverName={}, columnFamily={} failed", 351 region.getRegionNameAsString(), serverName, column.getNameAsString(), e); 352 } 353 354 public void publishReadTiming(ServerName serverName, RegionInfo region, 355 ColumnFamilyDescriptor column, long msTime) { 356 RegionTaskResult rtr = new RegionTaskResult(region, region.getTable(), serverName, column); 357 rtr.setReadSuccess(); 358 rtr.setReadLatency(msTime); 359 List<RegionTaskResult> rtrs = regionMap.get(region.getRegionNameAsString()); 360 rtrs.add(rtr); 361 // Note that read success count will be equal to total column family read successes. 362 incReadSuccessCount(); 363 LOG.info("Read from {} on {} {} in {}ms", region.getRegionNameAsString(), serverName, 364 column.getNameAsString(), msTime); 365 } 366 367 public void publishWriteFailure(ServerName serverName, RegionInfo region, Exception e) { 368 incWriteFailureCount(); 369 incFailuresCountDetails(serverName, region); 370 LOG.error("Write to {} on {} failed", region.getRegionNameAsString(), serverName, e); 371 } 372 373 public void publishWriteFailure(ServerName serverName, RegionInfo region, 374 ColumnFamilyDescriptor column, Exception e) { 375 incWriteFailureCount(); 376 incFailuresCountDetails(serverName, region); 377 LOG.error("Write to {} on {} {} failed", region.getRegionNameAsString(), serverName, 378 column.getNameAsString(), e); 379 } 380 381 public void publishWriteTiming(ServerName serverName, RegionInfo region, 382 ColumnFamilyDescriptor column, long msTime) { 383 RegionTaskResult rtr = new RegionTaskResult(region, region.getTable(), serverName, column); 384 rtr.setWriteSuccess(); 385 rtr.setWriteLatency(msTime); 386 List<RegionTaskResult> rtrs = regionMap.get(region.getRegionNameAsString()); 387 rtrs.add(rtr); 388 // Note that write success count will be equal to total column family write successes. 389 incWriteSuccessCount(); 390 LOG.info("Write to {} on {} {} in {}ms", region.getRegionNameAsString(), serverName, 391 column.getNameAsString(), msTime); 392 } 393 394 public Map<String, LongAdder> getReadLatencyMap() { 395 return this.perTableReadLatency; 396 } 397 398 public LongAdder initializeAndGetReadLatencyForTable(String tableName) { 399 LongAdder initLatency = new LongAdder(); 400 this.perTableReadLatency.put(tableName, initLatency); 401 return initLatency; 402 } 403 404 public void initializeWriteLatency() { 405 this.writeLatency.reset(); 406 } 407 408 public LongAdder getWriteLatency() { 409 return this.writeLatency; 410 } 411 412 public ConcurrentMap<String, List<RegionTaskResult>> getRegionMap() { 413 return this.regionMap; 414 } 415 416 public int getTotalExpectedRegions() { 417 return this.regionMap.size(); 418 } 419 } 420 421 /** 422 * Run a single zookeeper Task and then exit. 423 */ 424 static class ZookeeperTask implements Callable<Void> { 425 private final Connection connection; 426 private final String host; 427 private String znode; 428 private final int timeout; 429 private ZookeeperStdOutSink sink; 430 431 public ZookeeperTask(Connection connection, String host, String znode, int timeout, 432 ZookeeperStdOutSink sink) { 433 this.connection = connection; 434 this.host = host; 435 this.znode = znode; 436 this.timeout = timeout; 437 this.sink = sink; 438 } 439 440 @Override 441 public Void call() throws Exception { 442 ZooKeeper zooKeeper = null; 443 try { 444 zooKeeper = new ZooKeeper(host, timeout, EmptyWatcher.instance); 445 Stat exists = zooKeeper.exists(znode, false); 446 StopWatch stopwatch = new StopWatch(); 447 stopwatch.start(); 448 zooKeeper.getData(znode, false, exists); 449 stopwatch.stop(); 450 sink.publishReadTiming(znode, host, stopwatch.getTime()); 451 } catch (KeeperException | InterruptedException e) { 452 sink.publishReadFailure(znode, host); 453 } finally { 454 if (zooKeeper != null) { 455 zooKeeper.close(); 456 } 457 } 458 return null; 459 } 460 } 461 462 /** 463 * Run a single Region Task and then exit. For each column family of the Region, get one row and 464 * output latency or failure. 465 */ 466 static class RegionTask implements Callable<Void> { 467 public enum TaskType { 468 READ, 469 WRITE 470 } 471 472 private Connection connection; 473 private RegionInfo region; 474 private RegionStdOutSink sink; 475 private TaskType taskType; 476 private boolean rawScanEnabled; 477 private ServerName serverName; 478 private LongAdder readWriteLatency; 479 private boolean readAllCF; 480 481 RegionTask(Connection connection, RegionInfo region, ServerName serverName, 482 RegionStdOutSink sink, TaskType taskType, boolean rawScanEnabled, LongAdder rwLatency, 483 boolean readAllCF) { 484 this.connection = connection; 485 this.region = region; 486 this.serverName = serverName; 487 this.sink = sink; 488 this.taskType = taskType; 489 this.rawScanEnabled = rawScanEnabled; 490 this.readWriteLatency = rwLatency; 491 this.readAllCF = readAllCF; 492 } 493 494 @Override 495 public Void call() { 496 switch (taskType) { 497 case READ: 498 return read(); 499 case WRITE: 500 return write(); 501 default: 502 return read(); 503 } 504 } 505 506 private Void readColumnFamily(Table table, ColumnFamilyDescriptor column) { 507 byte[] startKey = null; 508 Get get = null; 509 Scan scan = null; 510 ResultScanner rs = null; 511 StopWatch stopWatch = new StopWatch(); 512 startKey = region.getStartKey(); 513 // Can't do a get on empty start row so do a Scan of first element if any instead. 514 if (startKey.length > 0) { 515 get = new Get(startKey); 516 get.setCacheBlocks(false); 517 get.setFilter(new FirstKeyOnlyFilter()); 518 get.addFamily(column.getName()); 519 } else { 520 scan = new Scan(); 521 LOG.debug("rawScan {} for {}", rawScanEnabled, region.getTable()); 522 scan.setRaw(rawScanEnabled); 523 scan.setCaching(1); 524 scan.setCacheBlocks(false); 525 scan.setFilter(new FirstKeyOnlyFilter()); 526 scan.addFamily(column.getName()); 527 scan.setMaxResultSize(1L); 528 scan.setOneRowLimit(); 529 } 530 LOG.debug("Reading from {} {} {} {}", region.getTable(), region.getRegionNameAsString(), 531 column.getNameAsString(), Bytes.toStringBinary(startKey)); 532 try { 533 stopWatch.start(); 534 if (startKey.length > 0) { 535 table.get(get); 536 } else { 537 rs = table.getScanner(scan); 538 rs.next(); 539 } 540 stopWatch.stop(); 541 this.readWriteLatency.add(stopWatch.getTime()); 542 sink.publishReadTiming(serverName, region, column, stopWatch.getTime()); 543 } catch (Exception e) { 544 sink.publishReadFailure(serverName, region, column, e); 545 sink.updateReadFailures(region.getRegionNameAsString(), 546 serverName == null ? "NULL" : serverName.getHostname()); 547 } finally { 548 if (rs != null) { 549 rs.close(); 550 } 551 } 552 return null; 553 } 554 555 private ColumnFamilyDescriptor randomPickOneColumnFamily(ColumnFamilyDescriptor[] cfs) { 556 int size = cfs.length; 557 return cfs[ThreadLocalRandom.current().nextInt(size)]; 558 559 } 560 561 public Void read() { 562 Table table = null; 563 TableDescriptor tableDesc = null; 564 try { 565 LOG.debug("Reading table descriptor for table {}", region.getTable()); 566 table = connection.getTable(region.getTable()); 567 tableDesc = table.getDescriptor(); 568 } catch (IOException e) { 569 LOG.debug("sniffRegion {} of {} failed", region.getEncodedName(), e); 570 sink.publishReadFailure(serverName, region, e); 571 if (table != null) { 572 try { 573 table.close(); 574 } catch (IOException ioe) { 575 LOG.error("Close table failed", e); 576 } 577 } 578 return null; 579 } 580 581 if (readAllCF) { 582 for (ColumnFamilyDescriptor column : tableDesc.getColumnFamilies()) { 583 readColumnFamily(table, column); 584 } 585 } else { 586 readColumnFamily(table, randomPickOneColumnFamily(tableDesc.getColumnFamilies())); 587 } 588 try { 589 table.close(); 590 } catch (IOException e) { 591 LOG.error("Close table failed", e); 592 } 593 return null; 594 } 595 596 /** 597 * Check writes for the canary table 598 */ 599 private Void write() { 600 Table table = null; 601 TableDescriptor tableDesc = null; 602 try { 603 table = connection.getTable(region.getTable()); 604 tableDesc = table.getDescriptor(); 605 byte[] rowToCheck = region.getStartKey(); 606 if (rowToCheck.length == 0) { 607 rowToCheck = new byte[] { 0x0 }; 608 } 609 int writeValueSize = 610 connection.getConfiguration().getInt(HConstants.HBASE_CANARY_WRITE_VALUE_SIZE_KEY, 10); 611 for (ColumnFamilyDescriptor column : tableDesc.getColumnFamilies()) { 612 Put put = new Put(rowToCheck); 613 byte[] value = new byte[writeValueSize]; 614 Bytes.random(value); 615 put.addColumn(column.getName(), HConstants.EMPTY_BYTE_ARRAY, value); 616 LOG.debug("Writing to {} {} {} {}", tableDesc.getTableName(), 617 region.getRegionNameAsString(), column.getNameAsString(), 618 Bytes.toStringBinary(rowToCheck)); 619 try { 620 long startTime = EnvironmentEdgeManager.currentTime(); 621 table.put(put); 622 long time = EnvironmentEdgeManager.currentTime() - startTime; 623 this.readWriteLatency.add(time); 624 sink.publishWriteTiming(serverName, region, column, time); 625 } catch (Exception e) { 626 sink.publishWriteFailure(serverName, region, column, e); 627 } 628 } 629 table.close(); 630 } catch (IOException e) { 631 sink.publishWriteFailure(serverName, region, e); 632 sink.updateWriteFailures(region.getRegionNameAsString(), serverName.getHostname()); 633 } 634 return null; 635 } 636 } 637 638 /** 639 * Run a single RegionServer Task and then exit. Get one row from a region on the regionserver and 640 * output latency or the failure. 641 */ 642 static class RegionServerTask implements Callable<Void> { 643 private Connection connection; 644 private String serverName; 645 private RegionInfo region; 646 private RegionServerStdOutSink sink; 647 private AtomicLong successes; 648 649 RegionServerTask(Connection connection, String serverName, RegionInfo region, 650 RegionServerStdOutSink sink, AtomicLong successes) { 651 this.connection = connection; 652 this.serverName = serverName; 653 this.region = region; 654 this.sink = sink; 655 this.successes = successes; 656 } 657 658 @Override 659 public Void call() { 660 TableName tableName = null; 661 Table table = null; 662 Get get = null; 663 byte[] startKey = null; 664 Scan scan = null; 665 StopWatch stopWatch = new StopWatch(); 666 // monitor one region on every region server 667 stopWatch.reset(); 668 try { 669 tableName = region.getTable(); 670 table = connection.getTable(tableName); 671 startKey = region.getStartKey(); 672 // Can't do a get on empty start row so do a Scan of first element if any instead. 673 LOG.debug("Reading from {} {} {} {}", serverName, region.getTable(), 674 region.getRegionNameAsString(), Bytes.toStringBinary(startKey)); 675 if (startKey.length > 0) { 676 get = new Get(startKey); 677 get.setCacheBlocks(false); 678 get.setFilter(new FirstKeyOnlyFilter()); 679 stopWatch.start(); 680 table.get(get); 681 stopWatch.stop(); 682 } else { 683 scan = new Scan(); 684 scan.setCacheBlocks(false); 685 scan.setFilter(new FirstKeyOnlyFilter()); 686 scan.setCaching(1); 687 scan.setMaxResultSize(1L); 688 scan.setOneRowLimit(); 689 stopWatch.start(); 690 ResultScanner s = table.getScanner(scan); 691 s.next(); 692 s.close(); 693 stopWatch.stop(); 694 } 695 successes.incrementAndGet(); 696 sink.publishReadTiming(tableName.getNameAsString(), serverName, stopWatch.getTime()); 697 } catch (TableNotFoundException tnfe) { 698 LOG.error("Table may be deleted", tnfe); 699 // This is ignored because it doesn't imply that the regionserver is dead 700 } catch (TableNotEnabledException tnee) { 701 // This is considered a success since we got a response. 702 successes.incrementAndGet(); 703 LOG.debug("The targeted table was disabled. Assuming success."); 704 } catch (DoNotRetryIOException dnrioe) { 705 sink.publishReadFailure(tableName.getNameAsString(), serverName); 706 LOG.error(dnrioe.toString(), dnrioe); 707 } catch (IOException e) { 708 sink.publishReadFailure(tableName.getNameAsString(), serverName); 709 LOG.error(e.toString(), e); 710 } finally { 711 if (table != null) { 712 try { 713 table.close(); 714 } catch (IOException e) {/* DO NOTHING */ 715 LOG.error("Close table failed", e); 716 } 717 } 718 scan = null; 719 get = null; 720 startKey = null; 721 } 722 return null; 723 } 724 } 725 726 private static final int USAGE_EXIT_CODE = 1; 727 private static final int INIT_ERROR_EXIT_CODE = 2; 728 private static final int TIMEOUT_ERROR_EXIT_CODE = 3; 729 private static final int ERROR_EXIT_CODE = 4; 730 private static final int FAILURE_EXIT_CODE = 5; 731 732 private static final long DEFAULT_INTERVAL = 60000; 733 734 private static final long DEFAULT_TIMEOUT = 600000; // 10 mins 735 private static final int MAX_THREADS_NUM = 16; // #threads to contact regions 736 737 private static final Logger LOG = LoggerFactory.getLogger(Canary.class); 738 739 public static final TableName DEFAULT_WRITE_TABLE_NAME = 740 TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "canary"); 741 742 private static final String CANARY_TABLE_FAMILY_NAME = "Test"; 743 744 private Configuration conf = null; 745 private long interval = 0; 746 private Sink sink = null; 747 748 /** 749 * True if we are to run in 'regionServer' mode. 750 */ 751 private boolean regionServerMode = false; 752 753 /** 754 * True if we are to run in zookeeper 'mode'. 755 */ 756 private boolean zookeeperMode = false; 757 758 /** 759 * This is a Map of table to timeout. The timeout is for reading all regions in the table; i.e. we 760 * aggregate time to fetch each region and it needs to be less than this value else we log an 761 * ERROR. 762 */ 763 private HashMap<String, Long> configuredReadTableTimeouts = new HashMap<>(); 764 765 public static final String HBASE_CANARY_REGIONSERVER_ALL_REGIONS = 766 "hbase.canary.regionserver_all_regions"; 767 768 public static final String HBASE_CANARY_REGION_WRITE_SNIFFING = 769 "hbase.canary.region.write.sniffing"; 770 public static final String HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT = 771 "hbase.canary.region.write.table.timeout"; 772 public static final String HBASE_CANARY_REGION_WRITE_TABLE_NAME = 773 "hbase.canary.region.write.table.name"; 774 public static final String HBASE_CANARY_REGION_READ_TABLE_TIMEOUT = 775 "hbase.canary.region.read.table.timeout"; 776 777 public static final String HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES = 778 "hbase.canary.zookeeper.permitted.failures"; 779 780 public static final String HBASE_CANARY_USE_REGEX = "hbase.canary.use.regex"; 781 public static final String HBASE_CANARY_TIMEOUT = "hbase.canary.timeout"; 782 public static final String HBASE_CANARY_FAIL_ON_ERROR = "hbase.canary.fail.on.error"; 783 784 private ExecutorService executor; // threads to retrieve data from regionservers 785 786 public CanaryTool() { 787 this(new ScheduledThreadPoolExecutor(1)); 788 } 789 790 public CanaryTool(ExecutorService executor) { 791 this(executor, null); 792 } 793 794 @InterfaceAudience.Private 795 CanaryTool(ExecutorService executor, Sink sink) { 796 this.executor = executor; 797 this.sink = sink; 798 } 799 800 CanaryTool(Configuration conf, ExecutorService executor) { 801 this(conf, executor, null); 802 } 803 804 CanaryTool(Configuration conf, ExecutorService executor, Sink sink) { 805 this(executor, sink); 806 setConf(conf); 807 } 808 809 @Override 810 public Configuration getConf() { 811 return conf; 812 } 813 814 @Override 815 public void setConf(Configuration conf) { 816 if (conf == null) { 817 conf = HBaseConfiguration.create(); 818 } 819 this.conf = conf; 820 } 821 822 private int parseArgs(String[] args) { 823 int index = -1; 824 long permittedFailures = 0; 825 boolean regionServerAllRegions = false, writeSniffing = false; 826 String readTableTimeoutsStr = null; 827 // Process command line args 828 for (int i = 0; i < args.length; i++) { 829 String cmd = args[i]; 830 if (cmd.startsWith("-")) { 831 if (index >= 0) { 832 // command line args must be in the form: [opts] [table 1 [table 2 ...]] 833 System.err.println("Invalid command line options"); 834 printUsageAndExit(); 835 } 836 if (cmd.equals("-help") || cmd.equals("-h")) { 837 // user asked for help, print the help and quit. 838 printUsageAndExit(); 839 } else if (cmd.equals("-daemon") && interval == 0) { 840 // user asked for daemon mode, set a default interval between checks 841 interval = DEFAULT_INTERVAL; 842 } else if (cmd.equals("-interval")) { 843 // user has specified an interval for canary breaths (-interval N) 844 i++; 845 846 if (i == args.length) { 847 System.err.println("-interval takes a numeric seconds value argument."); 848 printUsageAndExit(); 849 } 850 try { 851 interval = Long.parseLong(args[i]) * 1000; 852 } catch (NumberFormatException e) { 853 System.err.println("-interval needs a numeric value argument."); 854 printUsageAndExit(); 855 } 856 } else if (cmd.equals("-zookeeper")) { 857 this.zookeeperMode = true; 858 } else if (cmd.equals("-regionserver")) { 859 this.regionServerMode = true; 860 } else if (cmd.equals("-allRegions")) { 861 conf.setBoolean(HBASE_CANARY_REGIONSERVER_ALL_REGIONS, true); 862 regionServerAllRegions = true; 863 } else if (cmd.equals("-writeSniffing")) { 864 writeSniffing = true; 865 conf.setBoolean(HBASE_CANARY_REGION_WRITE_SNIFFING, true); 866 } else if (cmd.equals("-treatFailureAsError") || cmd.equals("-failureAsError")) { 867 conf.setBoolean(HBASE_CANARY_FAIL_ON_ERROR, true); 868 } else if (cmd.equals("-e")) { 869 conf.setBoolean(HBASE_CANARY_USE_REGEX, true); 870 } else if (cmd.equals("-t")) { 871 i++; 872 873 if (i == args.length) { 874 System.err.println("-t takes a numeric milliseconds value argument."); 875 printUsageAndExit(); 876 } 877 long timeout = 0; 878 try { 879 timeout = Long.parseLong(args[i]); 880 } catch (NumberFormatException e) { 881 System.err.println("-t takes a numeric milliseconds value argument."); 882 printUsageAndExit(); 883 } 884 conf.setLong(HBASE_CANARY_TIMEOUT, timeout); 885 } else if (cmd.equals("-writeTableTimeout")) { 886 i++; 887 888 if (i == args.length) { 889 System.err.println("-writeTableTimeout takes a numeric milliseconds value argument."); 890 printUsageAndExit(); 891 } 892 long configuredWriteTableTimeout = 0; 893 try { 894 configuredWriteTableTimeout = Long.parseLong(args[i]); 895 } catch (NumberFormatException e) { 896 System.err.println("-writeTableTimeout takes a numeric milliseconds value argument."); 897 printUsageAndExit(); 898 } 899 conf.setLong(HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT, configuredWriteTableTimeout); 900 } else if (cmd.equals("-writeTable")) { 901 i++; 902 903 if (i == args.length) { 904 System.err.println("-writeTable takes a string tablename value argument."); 905 printUsageAndExit(); 906 } 907 conf.set(HBASE_CANARY_REGION_WRITE_TABLE_NAME, args[i]); 908 } else if (cmd.equals("-f")) { 909 i++; 910 if (i == args.length) { 911 System.err.println("-f needs a boolean value argument (true|false)."); 912 printUsageAndExit(); 913 } 914 915 conf.setBoolean(HBASE_CANARY_FAIL_ON_ERROR, Boolean.parseBoolean(args[i])); 916 } else if (cmd.equals("-readTableTimeouts")) { 917 i++; 918 if (i == args.length) { 919 System.err.println("-readTableTimeouts needs a comma-separated list of read " 920 + "millisecond timeouts per table (without spaces)."); 921 printUsageAndExit(); 922 } 923 readTableTimeoutsStr = args[i]; 924 conf.set(HBASE_CANARY_REGION_READ_TABLE_TIMEOUT, readTableTimeoutsStr); 925 } else if (cmd.equals("-permittedZookeeperFailures")) { 926 i++; 927 928 if (i == args.length) { 929 System.err.println("-permittedZookeeperFailures needs a numeric value argument."); 930 printUsageAndExit(); 931 } 932 try { 933 permittedFailures = Long.parseLong(args[i]); 934 } catch (NumberFormatException e) { 935 System.err.println("-permittedZookeeperFailures needs a numeric value argument."); 936 printUsageAndExit(); 937 } 938 conf.setLong(HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES, permittedFailures); 939 } else { 940 // no options match 941 System.err.println(cmd + " options is invalid."); 942 printUsageAndExit(); 943 } 944 } else if (index < 0) { 945 // keep track of first table name specified by the user 946 index = i; 947 } 948 } 949 if (regionServerAllRegions && !this.regionServerMode) { 950 System.err.println("-allRegions can only be specified in regionserver mode."); 951 printUsageAndExit(); 952 } 953 if (this.zookeeperMode) { 954 if (this.regionServerMode || regionServerAllRegions || writeSniffing) { 955 System.err.println("-zookeeper is exclusive and cannot be combined with " + "other modes."); 956 printUsageAndExit(); 957 } 958 } 959 if (permittedFailures != 0 && !this.zookeeperMode) { 960 System.err.println("-permittedZookeeperFailures requires -zookeeper mode."); 961 printUsageAndExit(); 962 } 963 if (readTableTimeoutsStr != null && (this.regionServerMode || this.zookeeperMode)) { 964 System.err.println("-readTableTimeouts can only be configured in region mode."); 965 printUsageAndExit(); 966 } 967 return index; 968 } 969 970 @Override 971 public int run(String[] args) throws Exception { 972 int index = parseArgs(args); 973 String[] monitorTargets = null; 974 975 if (index >= 0) { 976 int length = args.length - index; 977 monitorTargets = new String[length]; 978 System.arraycopy(args, index, monitorTargets, 0, length); 979 } 980 if (interval > 0) { 981 // Only show the web page in daemon mode 982 putUpWebUI(); 983 } 984 if (zookeeperMode) { 985 return checkZooKeeper(); 986 } else if (regionServerMode) { 987 return checkRegionServers(monitorTargets); 988 } else { 989 return checkRegions(monitorTargets); 990 } 991 } 992 993 private int runMonitor(String[] monitorTargets) throws Exception { 994 ChoreService choreService = null; 995 996 // Launches chore for refreshing kerberos credentials if security is enabled. 997 // Please see http://hbase.apache.org/book.html#_running_canary_in_a_kerberos_enabled_cluster 998 // for more details. 999 final ScheduledChore authChore = AuthUtil.getAuthChore(conf); 1000 if (authChore != null) { 1001 choreService = new ChoreService("CANARY_TOOL"); 1002 choreService.scheduleChore(authChore); 1003 } 1004 1005 // Start to prepare the stuffs 1006 Monitor monitor = null; 1007 Thread monitorThread; 1008 long startTime = 0; 1009 long currentTimeLength = 0; 1010 boolean failOnError = conf.getBoolean(HBASE_CANARY_FAIL_ON_ERROR, true); 1011 long timeout = conf.getLong(HBASE_CANARY_TIMEOUT, DEFAULT_TIMEOUT); 1012 // Get a connection to use in below. 1013 try (Connection connection = ConnectionFactory.createConnection(this.conf)) { 1014 do { 1015 // Do monitor !! 1016 try { 1017 monitor = this.newMonitor(connection, monitorTargets); 1018 startTime = EnvironmentEdgeManager.currentTime(); 1019 monitorThread = new Thread(monitor, "CanaryMonitor-" + startTime); 1020 monitorThread.start(); 1021 while (!monitor.isDone()) { 1022 // wait for 1 sec 1023 Thread.sleep(1000); 1024 // exit if any error occurs 1025 if (failOnError && monitor.hasError()) { 1026 monitorThread.interrupt(); 1027 if (monitor.initialized) { 1028 return monitor.errorCode; 1029 } else { 1030 return INIT_ERROR_EXIT_CODE; 1031 } 1032 } 1033 currentTimeLength = EnvironmentEdgeManager.currentTime() - startTime; 1034 if (currentTimeLength > timeout) { 1035 LOG.error("The monitor is running too long (" + currentTimeLength 1036 + ") after timeout limit:" + timeout + " will be killed itself !!"); 1037 if (monitor.initialized) { 1038 return TIMEOUT_ERROR_EXIT_CODE; 1039 } else { 1040 return INIT_ERROR_EXIT_CODE; 1041 } 1042 } 1043 } 1044 1045 if (failOnError && monitor.finalCheckForErrors()) { 1046 monitorThread.interrupt(); 1047 return monitor.errorCode; 1048 } 1049 } finally { 1050 if (monitor != null) { 1051 monitor.close(); 1052 } 1053 } 1054 1055 Thread.sleep(interval); 1056 } while (interval > 0); 1057 } // try-with-resources close 1058 1059 if (choreService != null) { 1060 choreService.shutdown(); 1061 } 1062 return monitor.errorCode; 1063 } 1064 1065 @Override 1066 public Map<String, String> getReadFailures() { 1067 return sink.getReadFailures(); 1068 } 1069 1070 @Override 1071 public Map<String, String> getWriteFailures() { 1072 return sink.getWriteFailures(); 1073 } 1074 1075 private void printUsageAndExit() { 1076 System.err.println( 1077 "Usage: canary [OPTIONS] [<TABLE1> [<TABLE2]...] | [<REGIONSERVER1> [<REGIONSERVER2]..]"); 1078 System.err.println("Where [OPTIONS] are:"); 1079 System.err.println(" -h,-help show this help and exit."); 1080 System.err.println( 1081 " -regionserver set 'regionserver mode'; gets row from random region on " + "server"); 1082 System.err.println( 1083 " -allRegions get from ALL regions when 'regionserver mode', not just " + "random one."); 1084 System.err.println(" -zookeeper set 'zookeeper mode'; grab zookeeper.znode.parent on " 1085 + "each ensemble member"); 1086 System.err.println(" -daemon continuous check at defined intervals."); 1087 System.err.println(" -interval <N> interval between checks in seconds"); 1088 System.err 1089 .println(" -e consider table/regionserver argument as regular " + "expression"); 1090 System.err.println(" -f <B> exit on first error; default=true"); 1091 System.err.println(" -failureAsError treat read/write failure as error"); 1092 System.err.println(" -t <N> timeout for canary-test run; default=600000ms"); 1093 System.err.println(" -writeSniffing enable write sniffing"); 1094 System.err.println(" -writeTable the table used for write sniffing; default=hbase:canary"); 1095 System.err.println(" -writeTableTimeout <N> timeout for writeTable; default=600000ms"); 1096 System.err.println( 1097 " -readTableTimeouts <tableName>=<read timeout>," + "<tableName>=<read timeout>,..."); 1098 System.err 1099 .println(" comma-separated list of table read timeouts " + "(no spaces);"); 1100 System.err.println(" logs 'ERROR' if takes longer. default=600000ms"); 1101 System.err.println(" -permittedZookeeperFailures <N> Ignore first N failures attempting to "); 1102 System.err.println(" connect to individual zookeeper nodes in ensemble"); 1103 System.err.println(""); 1104 System.err.println(" -D<configProperty>=<value> to assign or override configuration params"); 1105 System.err.println(" -Dhbase.canary.read.raw.enabled=<true/false> Set to enable/disable " 1106 + "raw scan; default=false"); 1107 System.err.println( 1108 " -Dhbase.canary.info.port=PORT_NUMBER Set for a Canary UI; " + "default=-1 (None)"); 1109 System.err.println(""); 1110 System.err.println( 1111 "Canary runs in one of three modes: region (default), regionserver, or " + "zookeeper."); 1112 System.err.println("To sniff/probe all regions, pass no arguments."); 1113 System.err.println("To sniff/probe all regions of a table, pass tablename."); 1114 System.err.println("To sniff/probe regionservers, pass -regionserver, etc."); 1115 System.err.println("See http://hbase.apache.org/book.html#_canary for Canary documentation."); 1116 System.exit(USAGE_EXIT_CODE); 1117 } 1118 1119 Sink getSink(Configuration configuration, Class clazz) { 1120 // In test context, this.sink might be set. Use it if non-null. For testing. 1121 return this.sink != null 1122 ? this.sink 1123 : (Sink) ReflectionUtils 1124 .newInstance(configuration.getClass("hbase.canary.sink.class", clazz, Sink.class)); 1125 } 1126 1127 /** 1128 * Canary region mode-specific data structure which stores information about each region to be 1129 * scanned 1130 */ 1131 public static class RegionTaskResult { 1132 private RegionInfo region; 1133 private TableName tableName; 1134 private ServerName serverName; 1135 private ColumnFamilyDescriptor column; 1136 private AtomicLong readLatency = null; 1137 private AtomicLong writeLatency = null; 1138 private boolean readSuccess = false; 1139 private boolean writeSuccess = false; 1140 1141 public RegionTaskResult(RegionInfo region, TableName tableName, ServerName serverName, 1142 ColumnFamilyDescriptor column) { 1143 this.region = region; 1144 this.tableName = tableName; 1145 this.serverName = serverName; 1146 this.column = column; 1147 } 1148 1149 public RegionInfo getRegionInfo() { 1150 return this.region; 1151 } 1152 1153 public String getRegionNameAsString() { 1154 return this.region.getRegionNameAsString(); 1155 } 1156 1157 public TableName getTableName() { 1158 return this.tableName; 1159 } 1160 1161 public String getTableNameAsString() { 1162 return this.tableName.getNameAsString(); 1163 } 1164 1165 public ServerName getServerName() { 1166 return this.serverName; 1167 } 1168 1169 public String getServerNameAsString() { 1170 return this.serverName.getServerName(); 1171 } 1172 1173 public ColumnFamilyDescriptor getColumnFamily() { 1174 return this.column; 1175 } 1176 1177 public String getColumnFamilyNameAsString() { 1178 return this.column.getNameAsString(); 1179 } 1180 1181 public long getReadLatency() { 1182 if (this.readLatency == null) { 1183 return -1; 1184 } 1185 return this.readLatency.get(); 1186 } 1187 1188 public void setReadLatency(long readLatency) { 1189 if (this.readLatency != null) { 1190 this.readLatency.set(readLatency); 1191 } else { 1192 this.readLatency = new AtomicLong(readLatency); 1193 } 1194 } 1195 1196 public long getWriteLatency() { 1197 if (this.writeLatency == null) { 1198 return -1; 1199 } 1200 return this.writeLatency.get(); 1201 } 1202 1203 public void setWriteLatency(long writeLatency) { 1204 if (this.writeLatency != null) { 1205 this.writeLatency.set(writeLatency); 1206 } else { 1207 this.writeLatency = new AtomicLong(writeLatency); 1208 } 1209 } 1210 1211 public boolean isReadSuccess() { 1212 return this.readSuccess; 1213 } 1214 1215 public void setReadSuccess() { 1216 this.readSuccess = true; 1217 } 1218 1219 public boolean isWriteSuccess() { 1220 return this.writeSuccess; 1221 } 1222 1223 public void setWriteSuccess() { 1224 this.writeSuccess = true; 1225 } 1226 } 1227 1228 /** 1229 * A Factory method for {@link Monitor}. Makes a RegionServerMonitor, or a ZooKeeperMonitor, or a 1230 * RegionMonitor. 1231 * @return a Monitor instance 1232 */ 1233 private Monitor newMonitor(final Connection connection, String[] monitorTargets) { 1234 Monitor monitor; 1235 boolean useRegExp = conf.getBoolean(HBASE_CANARY_USE_REGEX, false); 1236 boolean regionServerAllRegions = conf.getBoolean(HBASE_CANARY_REGIONSERVER_ALL_REGIONS, false); 1237 boolean failOnError = conf.getBoolean(HBASE_CANARY_FAIL_ON_ERROR, true); 1238 int permittedFailures = conf.getInt(HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES, 0); 1239 boolean writeSniffing = conf.getBoolean(HBASE_CANARY_REGION_WRITE_SNIFFING, false); 1240 String writeTableName = 1241 conf.get(HBASE_CANARY_REGION_WRITE_TABLE_NAME, DEFAULT_WRITE_TABLE_NAME.getNameAsString()); 1242 long configuredWriteTableTimeout = 1243 conf.getLong(HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT, DEFAULT_TIMEOUT); 1244 1245 if (this.regionServerMode) { 1246 monitor = new RegionServerMonitor(connection, monitorTargets, useRegExp, 1247 getSink(connection.getConfiguration(), RegionServerStdOutSink.class), this.executor, 1248 regionServerAllRegions, failOnError, permittedFailures); 1249 1250 } else if (this.zookeeperMode) { 1251 monitor = new ZookeeperMonitor(connection, monitorTargets, useRegExp, 1252 getSink(connection.getConfiguration(), ZookeeperStdOutSink.class), this.executor, 1253 failOnError, permittedFailures); 1254 } else { 1255 monitor = new RegionMonitor(connection, monitorTargets, useRegExp, 1256 getSink(connection.getConfiguration(), RegionStdOutSink.class), this.executor, 1257 writeSniffing, TableName.valueOf(writeTableName), failOnError, configuredReadTableTimeouts, 1258 configuredWriteTableTimeout, permittedFailures); 1259 } 1260 return monitor; 1261 } 1262 1263 private void populateReadTableTimeoutsMap(String configuredReadTableTimeoutsStr) { 1264 String[] tableTimeouts = configuredReadTableTimeoutsStr.split(","); 1265 for (String tT : tableTimeouts) { 1266 String[] nameTimeout = tT.split("="); 1267 if (nameTimeout.length < 2) { 1268 throw new IllegalArgumentException("Each -readTableTimeouts argument must be of the form " 1269 + "<tableName>=<read timeout> (without spaces)."); 1270 } 1271 long timeoutVal; 1272 try { 1273 timeoutVal = Long.parseLong(nameTimeout[1]); 1274 } catch (NumberFormatException e) { 1275 throw new IllegalArgumentException( 1276 "-readTableTimeouts read timeout for each table" + " must be a numeric value argument."); 1277 } 1278 configuredReadTableTimeouts.put(nameTimeout[0], timeoutVal); 1279 } 1280 } 1281 1282 /** 1283 * A Monitor super-class can be extended by users 1284 */ 1285 public static abstract class Monitor implements Runnable, Closeable { 1286 protected Connection connection; 1287 protected Admin admin; 1288 /** 1289 * 'Target' dependent on 'mode'. Could be Tables or RegionServers or ZNodes. Passed on the 1290 * command-line as arguments. 1291 */ 1292 protected String[] targets; 1293 protected boolean useRegExp; 1294 protected boolean treatFailureAsError; 1295 protected boolean initialized = false; 1296 1297 protected boolean done = false; 1298 protected int errorCode = 0; 1299 protected long allowedFailures = 0; 1300 protected Sink sink; 1301 protected ExecutorService executor; 1302 1303 public boolean isDone() { 1304 return done; 1305 } 1306 1307 public boolean hasError() { 1308 return errorCode != 0; 1309 } 1310 1311 public boolean finalCheckForErrors() { 1312 if (errorCode != 0) { 1313 return true; 1314 } 1315 if ( 1316 treatFailureAsError && (sink.getReadFailureCount() > allowedFailures 1317 || sink.getWriteFailureCount() > allowedFailures) 1318 ) { 1319 LOG.error("Too many failures detected, treating failure as error, failing the Canary."); 1320 errorCode = FAILURE_EXIT_CODE; 1321 return true; 1322 } 1323 return false; 1324 } 1325 1326 @Override 1327 public void close() throws IOException { 1328 if (this.admin != null) { 1329 this.admin.close(); 1330 } 1331 } 1332 1333 protected Monitor(Connection connection, String[] monitorTargets, boolean useRegExp, Sink sink, 1334 ExecutorService executor, boolean treatFailureAsError, long allowedFailures) { 1335 if (null == connection) { 1336 throw new IllegalArgumentException("connection shall not be null"); 1337 } 1338 1339 this.connection = connection; 1340 this.targets = monitorTargets; 1341 this.useRegExp = useRegExp; 1342 this.treatFailureAsError = treatFailureAsError; 1343 this.sink = sink; 1344 this.executor = executor; 1345 this.allowedFailures = allowedFailures; 1346 } 1347 1348 @Override 1349 public abstract void run(); 1350 1351 protected boolean initAdmin() { 1352 if (null == this.admin) { 1353 try { 1354 this.admin = this.connection.getAdmin(); 1355 } catch (Exception e) { 1356 LOG.error("Initial HBaseAdmin failed...", e); 1357 this.errorCode = INIT_ERROR_EXIT_CODE; 1358 } 1359 } else if (admin.isAborted()) { 1360 LOG.error("HBaseAdmin aborted"); 1361 this.errorCode = INIT_ERROR_EXIT_CODE; 1362 } 1363 return !this.hasError(); 1364 } 1365 } 1366 1367 /** 1368 * A monitor for region mode. 1369 */ 1370 private static class RegionMonitor extends Monitor { 1371 // 10 minutes 1372 private static final int DEFAULT_WRITE_TABLE_CHECK_PERIOD = 10 * 60 * 1000; 1373 // 1 days 1374 private static final int DEFAULT_WRITE_DATA_TTL = 24 * 60 * 60; 1375 1376 private long lastCheckTime = -1; 1377 private boolean writeSniffing; 1378 private TableName writeTableName; 1379 private int writeDataTTL; 1380 private float regionsLowerLimit; 1381 private float regionsUpperLimit; 1382 private int checkPeriod; 1383 private boolean rawScanEnabled; 1384 private boolean readAllCF; 1385 1386 /** 1387 * This is a timeout per table. If read of each region in the table aggregated takes longer than 1388 * what is configured here, we log an ERROR rather than just an INFO. 1389 */ 1390 private HashMap<String, Long> configuredReadTableTimeouts; 1391 1392 private long configuredWriteTableTimeout; 1393 1394 public RegionMonitor(Connection connection, String[] monitorTargets, boolean useRegExp, 1395 Sink sink, ExecutorService executor, boolean writeSniffing, TableName writeTableName, 1396 boolean treatFailureAsError, HashMap<String, Long> configuredReadTableTimeouts, 1397 long configuredWriteTableTimeout, long allowedFailures) { 1398 super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError, 1399 allowedFailures); 1400 Configuration conf = connection.getConfiguration(); 1401 this.writeSniffing = writeSniffing; 1402 this.writeTableName = writeTableName; 1403 this.writeDataTTL = 1404 conf.getInt(HConstants.HBASE_CANARY_WRITE_DATA_TTL_KEY, DEFAULT_WRITE_DATA_TTL); 1405 this.regionsLowerLimit = 1406 conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY, 1.0f); 1407 this.regionsUpperLimit = 1408 conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_UPPERLIMIT_KEY, 1.5f); 1409 this.checkPeriod = conf.getInt(HConstants.HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY, 1410 DEFAULT_WRITE_TABLE_CHECK_PERIOD); 1411 this.rawScanEnabled = conf.getBoolean(HConstants.HBASE_CANARY_READ_RAW_SCAN_KEY, false); 1412 this.configuredReadTableTimeouts = new HashMap<>(configuredReadTableTimeouts); 1413 this.configuredWriteTableTimeout = configuredWriteTableTimeout; 1414 this.readAllCF = conf.getBoolean(HConstants.HBASE_CANARY_READ_ALL_CF, true); 1415 } 1416 1417 private RegionStdOutSink getSink() { 1418 if (!(sink instanceof RegionStdOutSink)) { 1419 throw new RuntimeException("Can only write to Region sink"); 1420 } 1421 return ((RegionStdOutSink) sink); 1422 } 1423 1424 @Override 1425 public void run() { 1426 if (this.initAdmin()) { 1427 try { 1428 List<Future<Void>> taskFutures = new LinkedList<>(); 1429 RegionStdOutSink regionSink = this.getSink(); 1430 regionSink.resetFailuresCountDetails(); 1431 if (this.targets != null && this.targets.length > 0) { 1432 String[] tables = generateMonitorTables(this.targets); 1433 // Check to see that each table name passed in the -readTableTimeouts argument is also 1434 // passed as a monitor target. 1435 if ( 1436 !new HashSet<>(Arrays.asList(tables)) 1437 .containsAll(this.configuredReadTableTimeouts.keySet()) 1438 ) { 1439 LOG.error("-readTableTimeouts can only specify read timeouts for monitor targets " 1440 + "passed via command line."); 1441 this.errorCode = USAGE_EXIT_CODE; 1442 return; 1443 } 1444 this.initialized = true; 1445 for (String table : tables) { 1446 LongAdder readLatency = regionSink.initializeAndGetReadLatencyForTable(table); 1447 taskFutures.addAll(CanaryTool.sniff(admin, regionSink, table, executor, TaskType.READ, 1448 this.rawScanEnabled, readLatency, readAllCF)); 1449 } 1450 } else { 1451 taskFutures.addAll(sniff(TaskType.READ, regionSink)); 1452 } 1453 1454 if (writeSniffing) { 1455 if (EnvironmentEdgeManager.currentTime() - lastCheckTime > checkPeriod) { 1456 try { 1457 checkWriteTableDistribution(); 1458 } catch (IOException e) { 1459 LOG.error("Check canary table distribution failed!", e); 1460 } 1461 lastCheckTime = EnvironmentEdgeManager.currentTime(); 1462 } 1463 // sniff canary table with write operation 1464 regionSink.initializeWriteLatency(); 1465 LongAdder writeTableLatency = regionSink.getWriteLatency(); 1466 taskFutures 1467 .addAll(CanaryTool.sniff(admin, regionSink, admin.getDescriptor(writeTableName), 1468 executor, TaskType.WRITE, this.rawScanEnabled, writeTableLatency, readAllCF)); 1469 } 1470 1471 for (Future<Void> future : taskFutures) { 1472 try { 1473 future.get(); 1474 } catch (ExecutionException e) { 1475 LOG.error("Sniff region failed!", e); 1476 } 1477 } 1478 Map<String, LongAdder> actualReadTableLatency = regionSink.getReadLatencyMap(); 1479 for (Map.Entry<String, Long> entry : configuredReadTableTimeouts.entrySet()) { 1480 String tableName = entry.getKey(); 1481 if (actualReadTableLatency.containsKey(tableName)) { 1482 Long actual = actualReadTableLatency.get(tableName).longValue(); 1483 Long configured = entry.getValue(); 1484 if (actual > configured) { 1485 LOG.error("Read operation for {} took {}ms exceeded the configured read timeout." 1486 + "(Configured read timeout {}ms.", tableName, actual, configured); 1487 } else { 1488 LOG.info("Read operation for {} took {}ms (Configured read timeout {}ms.", 1489 tableName, actual, configured); 1490 } 1491 } else { 1492 LOG.error("Read operation for {} failed!", tableName); 1493 } 1494 } 1495 if (this.writeSniffing) { 1496 String writeTableStringName = this.writeTableName.getNameAsString(); 1497 long actualWriteLatency = regionSink.getWriteLatency().longValue(); 1498 LOG.info("Write operation for {} took {}ms. Configured write timeout {}ms.", 1499 writeTableStringName, actualWriteLatency, this.configuredWriteTableTimeout); 1500 // Check that the writeTable write operation latency does not exceed the configured 1501 // timeout. 1502 if (actualWriteLatency > this.configuredWriteTableTimeout) { 1503 LOG.error("Write operation for {} exceeded the configured write timeout.", 1504 writeTableStringName); 1505 } 1506 } 1507 } catch (Exception e) { 1508 LOG.error("Run regionMonitor failed", e); 1509 this.errorCode = ERROR_EXIT_CODE; 1510 } finally { 1511 this.done = true; 1512 } 1513 } 1514 this.done = true; 1515 } 1516 1517 /** 1518 * @return List of tables to use in test. 1519 */ 1520 private String[] generateMonitorTables(String[] monitorTargets) throws IOException { 1521 String[] returnTables = null; 1522 1523 if (this.useRegExp) { 1524 Pattern pattern = null; 1525 List<TableDescriptor> tds = null; 1526 Set<String> tmpTables = new TreeSet<>(); 1527 try { 1528 LOG.debug(String.format("reading list of tables")); 1529 tds = this.admin.listTableDescriptors(pattern); 1530 if (tds == null) { 1531 tds = Collections.emptyList(); 1532 } 1533 for (String monitorTarget : monitorTargets) { 1534 pattern = Pattern.compile(monitorTarget); 1535 for (TableDescriptor td : tds) { 1536 if (pattern.matcher(td.getTableName().getNameAsString()).matches()) { 1537 tmpTables.add(td.getTableName().getNameAsString()); 1538 } 1539 } 1540 } 1541 } catch (IOException e) { 1542 LOG.error("Communicate with admin failed", e); 1543 throw e; 1544 } 1545 1546 if (tmpTables.size() > 0) { 1547 returnTables = tmpTables.toArray(new String[tmpTables.size()]); 1548 } else { 1549 String msg = "No HTable found, tablePattern:" + Arrays.toString(monitorTargets); 1550 LOG.error(msg); 1551 this.errorCode = INIT_ERROR_EXIT_CODE; 1552 throw new TableNotFoundException(msg); 1553 } 1554 } else { 1555 returnTables = monitorTargets; 1556 } 1557 1558 return returnTables; 1559 } 1560 1561 /* 1562 * Canary entry point to monitor all the tables. 1563 */ 1564 private List<Future<Void>> sniff(TaskType taskType, RegionStdOutSink regionSink) 1565 throws Exception { 1566 LOG.debug("Reading list of tables"); 1567 List<Future<Void>> taskFutures = new LinkedList<>(); 1568 for (TableDescriptor td : admin.listTableDescriptors()) { 1569 if ( 1570 admin.tableExists(td.getTableName()) && admin.isTableEnabled(td.getTableName()) 1571 && (!td.getTableName().equals(writeTableName)) 1572 ) { 1573 LongAdder readLatency = 1574 regionSink.initializeAndGetReadLatencyForTable(td.getTableName().getNameAsString()); 1575 taskFutures.addAll(CanaryTool.sniff(admin, sink, td, executor, taskType, 1576 this.rawScanEnabled, readLatency, readAllCF)); 1577 } 1578 } 1579 return taskFutures; 1580 } 1581 1582 private void checkWriteTableDistribution() throws IOException { 1583 if (!admin.tableExists(writeTableName)) { 1584 int numberOfServers = admin.getRegionServers().size(); 1585 if (numberOfServers == 0) { 1586 throw new IllegalStateException("No live regionservers"); 1587 } 1588 createWriteTable(numberOfServers); 1589 } 1590 1591 if (!admin.isTableEnabled(writeTableName)) { 1592 admin.enableTable(writeTableName); 1593 } 1594 1595 ClusterMetrics status = 1596 admin.getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER)); 1597 int numberOfServers = status.getServersName().size(); 1598 if (status.getServersName().contains(status.getMasterName())) { 1599 numberOfServers -= 1; 1600 } 1601 1602 List<Pair<RegionInfo, ServerName>> pairs = 1603 MetaTableAccessor.getTableRegionsAndLocations(connection, writeTableName); 1604 int numberOfRegions = pairs.size(); 1605 if ( 1606 numberOfRegions < numberOfServers * regionsLowerLimit 1607 || numberOfRegions > numberOfServers * regionsUpperLimit 1608 ) { 1609 admin.disableTable(writeTableName); 1610 admin.deleteTable(writeTableName); 1611 createWriteTable(numberOfServers); 1612 } 1613 HashSet<ServerName> serverSet = new HashSet<>(); 1614 for (Pair<RegionInfo, ServerName> pair : pairs) { 1615 serverSet.add(pair.getSecond()); 1616 } 1617 int numberOfCoveredServers = serverSet.size(); 1618 if (numberOfCoveredServers < numberOfServers) { 1619 admin.balance(); 1620 } 1621 } 1622 1623 private void createWriteTable(int numberOfServers) throws IOException { 1624 int numberOfRegions = (int) (numberOfServers * regionsLowerLimit); 1625 LOG.info("Number of live regionservers {}, pre-splitting the canary table into {} regions " 1626 + "(current lower limit of regions per server is {} and you can change it with config {}).", 1627 numberOfServers, numberOfRegions, regionsLowerLimit, 1628 HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY); 1629 ColumnFamilyDescriptor family = 1630 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(CANARY_TABLE_FAMILY_NAME)) 1631 .setMaxVersions(1).setTimeToLive(writeDataTTL).build(); 1632 TableDescriptor desc = 1633 TableDescriptorBuilder.newBuilder(writeTableName).setColumnFamily(family).build(); 1634 byte[][] splits = new RegionSplitter.HexStringSplit().split(numberOfRegions); 1635 admin.createTable(desc, splits); 1636 } 1637 } 1638 1639 /** 1640 * Canary entry point for specified table. 1641 * @throws Exception exception 1642 */ 1643 private static List<Future<Void>> sniff(final Admin admin, final Sink sink, String tableName, 1644 ExecutorService executor, TaskType taskType, boolean rawScanEnabled, LongAdder readLatency, 1645 boolean readAllCF) throws Exception { 1646 LOG.debug("Checking table is enabled and getting table descriptor for table {}", tableName); 1647 if (admin.isTableEnabled(TableName.valueOf(tableName))) { 1648 return CanaryTool.sniff(admin, sink, admin.getDescriptor(TableName.valueOf(tableName)), 1649 executor, taskType, rawScanEnabled, readLatency, readAllCF); 1650 } else { 1651 LOG.warn("Table {} is not enabled", tableName); 1652 } 1653 return new LinkedList<>(); 1654 } 1655 1656 /* 1657 * Loops over regions of this table, and outputs information about the state. 1658 */ 1659 private static List<Future<Void>> sniff(final Admin admin, final Sink sink, 1660 TableDescriptor tableDesc, ExecutorService executor, TaskType taskType, boolean rawScanEnabled, 1661 LongAdder rwLatency, boolean readAllCF) throws Exception { 1662 LOG.debug("Reading list of regions for table {}", tableDesc.getTableName()); 1663 try (Table table = admin.getConnection().getTable(tableDesc.getTableName())) { 1664 List<RegionTask> tasks = new ArrayList<>(); 1665 try (RegionLocator regionLocator = 1666 admin.getConnection().getRegionLocator(tableDesc.getTableName())) { 1667 for (HRegionLocation location : regionLocator.getAllRegionLocations()) { 1668 if (location == null) { 1669 LOG.warn("Null location"); 1670 continue; 1671 } 1672 ServerName rs = location.getServerName(); 1673 RegionInfo region = location.getRegion(); 1674 tasks.add(new RegionTask(admin.getConnection(), region, rs, (RegionStdOutSink) sink, 1675 taskType, rawScanEnabled, rwLatency, readAllCF)); 1676 Map<String, List<RegionTaskResult>> regionMap = ((RegionStdOutSink) sink).getRegionMap(); 1677 regionMap.put(region.getRegionNameAsString(), new ArrayList<RegionTaskResult>()); 1678 } 1679 return executor.invokeAll(tasks); 1680 } 1681 } catch (TableNotFoundException e) { 1682 return Collections.EMPTY_LIST; 1683 } 1684 } 1685 1686 // monitor for zookeeper mode 1687 private static class ZookeeperMonitor extends Monitor { 1688 private List<String> hosts; 1689 private final String znode; 1690 private final int timeout; 1691 1692 protected ZookeeperMonitor(Connection connection, String[] monitorTargets, boolean useRegExp, 1693 Sink sink, ExecutorService executor, boolean treatFailureAsError, long allowedFailures) { 1694 super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError, 1695 allowedFailures); 1696 Configuration configuration = connection.getConfiguration(); 1697 znode = configuration.get(ZOOKEEPER_ZNODE_PARENT, DEFAULT_ZOOKEEPER_ZNODE_PARENT); 1698 timeout = 1699 configuration.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT); 1700 ConnectStringParser parser = 1701 new ConnectStringParser(ZKConfig.getZKQuorumServersString(configuration)); 1702 hosts = Lists.newArrayList(); 1703 for (InetSocketAddress server : parser.getServerAddresses()) { 1704 hosts.add(inetSocketAddress2String(server)); 1705 } 1706 if (allowedFailures > (hosts.size() - 1) / 2) { 1707 LOG.warn( 1708 "Confirm allowable number of failed ZooKeeper nodes, as quorum will " 1709 + "already be lost. Setting of {} failures is unexpected for {} ensemble size.", 1710 allowedFailures, hosts.size()); 1711 } 1712 } 1713 1714 @Override 1715 public void run() { 1716 List<ZookeeperTask> tasks = Lists.newArrayList(); 1717 ZookeeperStdOutSink zkSink = null; 1718 try { 1719 zkSink = this.getSink(); 1720 } catch (RuntimeException e) { 1721 LOG.error("Run ZooKeeperMonitor failed!", e); 1722 this.errorCode = ERROR_EXIT_CODE; 1723 } 1724 this.initialized = true; 1725 for (final String host : hosts) { 1726 tasks.add(new ZookeeperTask(connection, host, znode, timeout, zkSink)); 1727 } 1728 try { 1729 for (Future<Void> future : this.executor.invokeAll(tasks)) { 1730 try { 1731 future.get(); 1732 } catch (ExecutionException e) { 1733 LOG.error("Sniff zookeeper failed!", e); 1734 this.errorCode = ERROR_EXIT_CODE; 1735 } 1736 } 1737 } catch (InterruptedException e) { 1738 this.errorCode = ERROR_EXIT_CODE; 1739 Thread.currentThread().interrupt(); 1740 LOG.error("Sniff zookeeper interrupted!", e); 1741 } 1742 this.done = true; 1743 } 1744 1745 private ZookeeperStdOutSink getSink() { 1746 if (!(sink instanceof ZookeeperStdOutSink)) { 1747 throw new RuntimeException("Can only write to zookeeper sink"); 1748 } 1749 return ((ZookeeperStdOutSink) sink); 1750 } 1751 } 1752 1753 /** 1754 * A monitor for regionserver mode 1755 */ 1756 private static class RegionServerMonitor extends Monitor { 1757 private boolean allRegions; 1758 1759 public RegionServerMonitor(Connection connection, String[] monitorTargets, boolean useRegExp, 1760 Sink sink, ExecutorService executor, boolean allRegions, boolean treatFailureAsError, 1761 long allowedFailures) { 1762 super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError, 1763 allowedFailures); 1764 this.allRegions = allRegions; 1765 } 1766 1767 private RegionServerStdOutSink getSink() { 1768 if (!(sink instanceof RegionServerStdOutSink)) { 1769 throw new RuntimeException("Can only write to regionserver sink"); 1770 } 1771 return ((RegionServerStdOutSink) sink); 1772 } 1773 1774 @Override 1775 public void run() { 1776 if (this.initAdmin() && this.checkNoTableNames()) { 1777 RegionServerStdOutSink regionServerSink = null; 1778 try { 1779 regionServerSink = this.getSink(); 1780 } catch (RuntimeException e) { 1781 LOG.error("Run RegionServerMonitor failed!", e); 1782 this.errorCode = ERROR_EXIT_CODE; 1783 } 1784 Map<String, List<RegionInfo>> rsAndRMap = this.filterRegionServerByName(); 1785 this.initialized = true; 1786 this.monitorRegionServers(rsAndRMap, regionServerSink); 1787 } 1788 this.done = true; 1789 } 1790 1791 private boolean checkNoTableNames() { 1792 List<String> foundTableNames = new ArrayList<>(); 1793 TableName[] tableNames = null; 1794 LOG.debug("Reading list of tables"); 1795 try { 1796 tableNames = this.admin.listTableNames(); 1797 } catch (IOException e) { 1798 LOG.error("Get listTableNames failed", e); 1799 this.errorCode = INIT_ERROR_EXIT_CODE; 1800 return false; 1801 } 1802 1803 if (this.targets == null || this.targets.length == 0) { 1804 return true; 1805 } 1806 1807 for (String target : this.targets) { 1808 for (TableName tableName : tableNames) { 1809 if (target.equals(tableName.getNameAsString())) { 1810 foundTableNames.add(target); 1811 } 1812 } 1813 } 1814 1815 if (foundTableNames.size() > 0) { 1816 System.err.println("Cannot pass a tablename when using the -regionserver " 1817 + "option, tablenames:" + foundTableNames.toString()); 1818 this.errorCode = USAGE_EXIT_CODE; 1819 } 1820 return foundTableNames.isEmpty(); 1821 } 1822 1823 private void monitorRegionServers(Map<String, List<RegionInfo>> rsAndRMap, 1824 RegionServerStdOutSink regionServerSink) { 1825 List<RegionServerTask> tasks = new ArrayList<>(); 1826 Map<String, AtomicLong> successMap = new HashMap<>(); 1827 for (Map.Entry<String, List<RegionInfo>> entry : rsAndRMap.entrySet()) { 1828 String serverName = entry.getKey(); 1829 AtomicLong successes = new AtomicLong(0); 1830 successMap.put(serverName, successes); 1831 if (entry.getValue().isEmpty()) { 1832 LOG.error("Regionserver not serving any regions - {}", serverName); 1833 } else if (this.allRegions) { 1834 for (RegionInfo region : entry.getValue()) { 1835 tasks.add(new RegionServerTask(this.connection, serverName, region, regionServerSink, 1836 successes)); 1837 } 1838 } else { 1839 // random select a region if flag not set 1840 RegionInfo region = 1841 entry.getValue().get(ThreadLocalRandom.current().nextInt(entry.getValue().size())); 1842 tasks.add( 1843 new RegionServerTask(this.connection, serverName, region, regionServerSink, successes)); 1844 } 1845 } 1846 try { 1847 for (Future<Void> future : this.executor.invokeAll(tasks)) { 1848 try { 1849 future.get(); 1850 } catch (ExecutionException e) { 1851 LOG.error("Sniff regionserver failed!", e); 1852 this.errorCode = ERROR_EXIT_CODE; 1853 } 1854 } 1855 if (this.allRegions) { 1856 for (Map.Entry<String, List<RegionInfo>> entry : rsAndRMap.entrySet()) { 1857 String serverName = entry.getKey(); 1858 LOG.info("Successfully read {} regions out of {} on regionserver {}", 1859 successMap.get(serverName), entry.getValue().size(), serverName); 1860 } 1861 } 1862 } catch (InterruptedException e) { 1863 this.errorCode = ERROR_EXIT_CODE; 1864 LOG.error("Sniff regionserver interrupted!", e); 1865 } 1866 } 1867 1868 private Map<String, List<RegionInfo>> filterRegionServerByName() { 1869 Map<String, List<RegionInfo>> regionServerAndRegionsMap = this.getAllRegionServerByName(); 1870 regionServerAndRegionsMap = this.doFilterRegionServerByName(regionServerAndRegionsMap); 1871 return regionServerAndRegionsMap; 1872 } 1873 1874 private Map<String, List<RegionInfo>> getAllRegionServerByName() { 1875 Map<String, List<RegionInfo>> rsAndRMap = new HashMap<>(); 1876 try { 1877 LOG.debug("Reading list of tables and locations"); 1878 List<TableDescriptor> tableDescs = this.admin.listTableDescriptors(); 1879 List<RegionInfo> regions = null; 1880 for (TableDescriptor tableDesc : tableDescs) { 1881 try (RegionLocator regionLocator = 1882 this.admin.getConnection().getRegionLocator(tableDesc.getTableName())) { 1883 for (HRegionLocation location : regionLocator.getAllRegionLocations()) { 1884 if (location == null) { 1885 LOG.warn("Null location"); 1886 continue; 1887 } 1888 ServerName rs = location.getServerName(); 1889 String rsName = rs.getHostname(); 1890 RegionInfo r = location.getRegion(); 1891 if (rsAndRMap.containsKey(rsName)) { 1892 regions = rsAndRMap.get(rsName); 1893 } else { 1894 regions = new ArrayList<>(); 1895 rsAndRMap.put(rsName, regions); 1896 } 1897 regions.add(r); 1898 } 1899 } 1900 } 1901 1902 // get any live regionservers not serving any regions 1903 for (ServerName rs : this.admin.getRegionServers()) { 1904 String rsName = rs.getHostname(); 1905 if (!rsAndRMap.containsKey(rsName)) { 1906 rsAndRMap.put(rsName, Collections.<RegionInfo> emptyList()); 1907 } 1908 } 1909 } catch (IOException e) { 1910 LOG.error("Get HTables info failed", e); 1911 this.errorCode = INIT_ERROR_EXIT_CODE; 1912 } 1913 return rsAndRMap; 1914 } 1915 1916 private Map<String, List<RegionInfo>> 1917 doFilterRegionServerByName(Map<String, List<RegionInfo>> fullRsAndRMap) { 1918 1919 Map<String, List<RegionInfo>> filteredRsAndRMap = null; 1920 1921 if (this.targets != null && this.targets.length > 0) { 1922 filteredRsAndRMap = new HashMap<>(); 1923 Pattern pattern = null; 1924 Matcher matcher = null; 1925 boolean regExpFound = false; 1926 for (String rsName : this.targets) { 1927 if (this.useRegExp) { 1928 regExpFound = false; 1929 pattern = Pattern.compile(rsName); 1930 for (Map.Entry<String, List<RegionInfo>> entry : fullRsAndRMap.entrySet()) { 1931 matcher = pattern.matcher(entry.getKey()); 1932 if (matcher.matches()) { 1933 filteredRsAndRMap.put(entry.getKey(), entry.getValue()); 1934 regExpFound = true; 1935 } 1936 } 1937 if (!regExpFound) { 1938 LOG.info("No RegionServerInfo found, regionServerPattern {}", rsName); 1939 } 1940 } else { 1941 if (fullRsAndRMap.containsKey(rsName)) { 1942 filteredRsAndRMap.put(rsName, fullRsAndRMap.get(rsName)); 1943 } else { 1944 LOG.info("No RegionServerInfo found, regionServerName {}", rsName); 1945 } 1946 } 1947 } 1948 } else { 1949 filteredRsAndRMap = fullRsAndRMap; 1950 } 1951 return filteredRsAndRMap; 1952 } 1953 } 1954 1955 public static void main(String[] args) throws Exception { 1956 final Configuration conf = HBaseConfiguration.create(); 1957 1958 int numThreads = conf.getInt("hbase.canary.threads.num", MAX_THREADS_NUM); 1959 LOG.info("Execution thread count={}", numThreads); 1960 1961 int exitCode; 1962 ExecutorService executor = new ScheduledThreadPoolExecutor(numThreads); 1963 try { 1964 exitCode = ToolRunner.run(conf, new CanaryTool(executor), args); 1965 } finally { 1966 executor.shutdown(); 1967 } 1968 System.exit(exitCode); 1969 } 1970}