001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.tool; 019 020import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT; 021import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_ZNODE_PARENT; 022import static org.apache.hadoop.hbase.util.Addressing.inetSocketAddress2String; 023 024import java.io.Closeable; 025import java.io.IOException; 026import java.net.BindException; 027import java.net.InetSocketAddress; 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.Collections; 031import java.util.EnumSet; 032import java.util.HashMap; 033import java.util.HashSet; 034import java.util.LinkedList; 035import java.util.List; 036import java.util.Map; 037import java.util.Set; 038import java.util.TreeSet; 039import java.util.concurrent.Callable; 040import java.util.concurrent.ConcurrentHashMap; 041import java.util.concurrent.ConcurrentMap; 042import java.util.concurrent.ExecutionException; 043import java.util.concurrent.ExecutorService; 044import java.util.concurrent.Future; 045import java.util.concurrent.ScheduledThreadPoolExecutor; 046import java.util.concurrent.ThreadLocalRandom; 047import java.util.concurrent.atomic.AtomicLong; 048import java.util.concurrent.atomic.LongAdder; 049import java.util.regex.Matcher; 050import java.util.regex.Pattern; 051import org.apache.commons.lang3.time.StopWatch; 052import org.apache.hadoop.conf.Configuration; 053import org.apache.hadoop.hbase.AuthUtil; 054import org.apache.hadoop.hbase.ChoreService; 055import org.apache.hadoop.hbase.ClusterMetrics; 056import org.apache.hadoop.hbase.ClusterMetrics.Option; 057import org.apache.hadoop.hbase.DoNotRetryIOException; 058import org.apache.hadoop.hbase.HBaseConfiguration; 059import org.apache.hadoop.hbase.HBaseInterfaceAudience; 060import org.apache.hadoop.hbase.HConstants; 061import org.apache.hadoop.hbase.HRegionLocation; 062import org.apache.hadoop.hbase.MetaTableAccessor; 063import org.apache.hadoop.hbase.NamespaceDescriptor; 064import org.apache.hadoop.hbase.ScheduledChore; 065import org.apache.hadoop.hbase.ServerName; 066import org.apache.hadoop.hbase.TableName; 067import org.apache.hadoop.hbase.TableNotEnabledException; 068import org.apache.hadoop.hbase.TableNotFoundException; 069import org.apache.hadoop.hbase.client.Admin; 070import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 071import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 072import org.apache.hadoop.hbase.client.Connection; 073import org.apache.hadoop.hbase.client.ConnectionFactory; 074import org.apache.hadoop.hbase.client.Get; 075import org.apache.hadoop.hbase.client.Put; 076import org.apache.hadoop.hbase.client.RegionInfo; 077import org.apache.hadoop.hbase.client.RegionLocator; 078import org.apache.hadoop.hbase.client.ResultScanner; 079import org.apache.hadoop.hbase.client.Scan; 080import org.apache.hadoop.hbase.client.Table; 081import org.apache.hadoop.hbase.client.TableDescriptor; 082import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 083import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; 084import org.apache.hadoop.hbase.http.InfoServer; 085import org.apache.hadoop.hbase.tool.CanaryTool.RegionTask.TaskType; 086import org.apache.hadoop.hbase.util.Bytes; 087import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 088import org.apache.hadoop.hbase.util.Pair; 089import org.apache.hadoop.hbase.util.ReflectionUtils; 090import org.apache.hadoop.hbase.util.RegionSplitter; 091import org.apache.hadoop.hbase.zookeeper.EmptyWatcher; 092import org.apache.hadoop.hbase.zookeeper.ZKConfig; 093import org.apache.hadoop.util.Tool; 094import org.apache.hadoop.util.ToolRunner; 095import org.apache.yetus.audience.InterfaceAudience; 096import org.apache.zookeeper.KeeperException; 097import org.apache.zookeeper.ZooKeeper; 098import org.apache.zookeeper.client.ConnectStringParser; 099import org.apache.zookeeper.data.Stat; 100import org.slf4j.Logger; 101import org.slf4j.LoggerFactory; 102 103import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 104 105/** 106 * HBase Canary Tool for "canary monitoring" of a running HBase cluster. There are three modes: 107 * <ol> 108 * <li>region mode (Default): For each region, try to get one row per column family outputting 109 * information on failure (ERROR) or else the latency.</li> 110 * <li>regionserver mode: For each regionserver try to get one row from one table selected randomly 111 * outputting information on failure (ERROR) or else the latency.</li> 112 * <li>zookeeper mode: for each zookeeper instance, selects a znode outputting information on 113 * failure (ERROR) or else the latency.</li> 114 * </ol> 115 */ 116@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 117public class CanaryTool implements Tool, Canary { 118 public static final String HBASE_CANARY_INFO_PORT = "hbase.canary.info.port"; 119 public static final String HBASE_CANARY_INFO_BINDADDRESS = "hbase.canary.info.bindAddress"; 120 121 private void putUpWebUI() throws IOException { 122 int port = conf.getInt(HBASE_CANARY_INFO_PORT, -1); 123 // -1 is for disabling info server 124 if (port < 0) { 125 return; 126 } 127 if (zookeeperMode) { 128 LOG.info("WebUI is not supported in Zookeeper mode"); 129 } else if (regionServerMode) { 130 LOG.info("WebUI is not supported in RegionServer mode"); 131 } else { 132 String addr = conf.get(HBASE_CANARY_INFO_BINDADDRESS, "0.0.0.0"); 133 try { 134 InfoServer infoServer = new InfoServer("canary", addr, port, false, conf); 135 infoServer.addUnprivilegedServlet("canary", "/canary-status", CanaryStatusServlet.class); 136 infoServer.setAttribute("sink", getSink(conf, RegionStdOutSink.class)); 137 infoServer.start(); 138 LOG.info("Bind Canary http info server to {}:{} ", addr, port); 139 } catch (BindException e) { 140 LOG.warn("Failed binding Canary http info server to {}:{}", addr, port, e); 141 } 142 } 143 } 144 145 @Override 146 public int checkRegions(String[] targets) throws Exception { 147 String configuredReadTableTimeoutsStr = conf.get(HBASE_CANARY_REGION_READ_TABLE_TIMEOUT); 148 try { 149 LOG.info("Canary tool is running in Region mode"); 150 if (configuredReadTableTimeoutsStr != null) { 151 populateReadTableTimeoutsMap(configuredReadTableTimeoutsStr); 152 } 153 } catch (IllegalArgumentException e) { 154 LOG.error("Constructing read table timeouts map failed ", e); 155 return USAGE_EXIT_CODE; 156 } 157 return runMonitor(targets); 158 } 159 160 @Override 161 public int checkRegionServers(String[] targets) throws Exception { 162 regionServerMode = true; 163 LOG.info("Canary tool is running in RegionServer mode"); 164 return runMonitor(targets); 165 } 166 167 @Override 168 public int checkZooKeeper() throws Exception { 169 zookeeperMode = true; 170 LOG.info("Canary tool is running in ZooKeeper mode"); 171 return runMonitor(null); 172 } 173 174 /** 175 * Sink interface used by the canary to output information 176 */ 177 public interface Sink { 178 long getReadFailureCount(); 179 180 long incReadFailureCount(); 181 182 Map<String, String> getReadFailures(); 183 184 void updateReadFailures(String regionName, String serverName); 185 186 long getWriteFailureCount(); 187 188 long incWriteFailureCount(); 189 190 Map<String, String> getWriteFailures(); 191 192 void updateWriteFailures(String regionName, String serverName); 193 194 long getReadSuccessCount(); 195 196 long incReadSuccessCount(); 197 198 long getWriteSuccessCount(); 199 200 long incWriteSuccessCount(); 201 202 void stop(); 203 204 boolean isStopped(); 205 } 206 207 /** 208 * Simple implementation of canary sink that allows plotting to a file or standard output. 209 */ 210 public static class StdOutSink implements Sink { 211 private AtomicLong readFailureCount = new AtomicLong(0), writeFailureCount = new AtomicLong(0), 212 readSuccessCount = new AtomicLong(0), writeSuccessCount = new AtomicLong(0); 213 private Map<String, String> readFailures = new ConcurrentHashMap<>(); 214 private Map<String, String> writeFailures = new ConcurrentHashMap<>(); 215 private volatile boolean stopped = false; 216 217 @Override 218 public long getReadFailureCount() { 219 return readFailureCount.get(); 220 } 221 222 @Override 223 public long incReadFailureCount() { 224 return readFailureCount.incrementAndGet(); 225 } 226 227 @Override 228 public Map<String, String> getReadFailures() { 229 return readFailures; 230 } 231 232 @Override 233 public void updateReadFailures(String regionName, String serverName) { 234 readFailures.put(regionName, serverName); 235 } 236 237 @Override 238 public long getWriteFailureCount() { 239 return writeFailureCount.get(); 240 } 241 242 @Override 243 public long incWriteFailureCount() { 244 return writeFailureCount.incrementAndGet(); 245 } 246 247 @Override 248 public Map<String, String> getWriteFailures() { 249 return writeFailures; 250 } 251 252 @Override 253 public void updateWriteFailures(String regionName, String serverName) { 254 writeFailures.put(regionName, serverName); 255 } 256 257 @Override 258 public long getReadSuccessCount() { 259 return readSuccessCount.get(); 260 } 261 262 @Override 263 public long incReadSuccessCount() { 264 return readSuccessCount.incrementAndGet(); 265 } 266 267 @Override 268 public long getWriteSuccessCount() { 269 return writeSuccessCount.get(); 270 } 271 272 @Override 273 public long incWriteSuccessCount() { 274 return writeSuccessCount.incrementAndGet(); 275 } 276 277 public void stop() { 278 stopped = true; 279 } 280 281 @Override 282 public boolean isStopped() { 283 return stopped; 284 } 285 } 286 287 /** 288 * By RegionServer, for 'regionserver' mode. 289 */ 290 public static class RegionServerStdOutSink extends StdOutSink { 291 public void publishReadFailure(String table, String server) { 292 incReadFailureCount(); 293 LOG.error("Read from {} on {}", table, server); 294 } 295 296 public void publishReadTiming(String table, String server, long msTime) { 297 LOG.info("Read from {} on {} in {}ms", table, server, msTime); 298 } 299 } 300 301 /** 302 * Output for 'zookeeper' mode. 303 */ 304 public static class ZookeeperStdOutSink extends StdOutSink { 305 public void publishReadFailure(String znode, String server) { 306 incReadFailureCount(); 307 LOG.error("Read from {} on {}", znode, server); 308 } 309 310 public void publishReadTiming(String znode, String server, long msTime) { 311 LOG.info("Read from {} on {} in {}ms", znode, server, msTime); 312 } 313 } 314 315 /** 316 * By Region, for 'region' mode. 317 */ 318 public static class RegionStdOutSink extends StdOutSink { 319 private Map<String, LongAdder> perTableReadLatency = new HashMap<>(); 320 private LongAdder writeLatency = new LongAdder(); 321 private final ConcurrentMap<String, List<RegionTaskResult>> regionMap = 322 new ConcurrentHashMap<>(); 323 private ConcurrentMap<ServerName, LongAdder> perServerFailuresCount = new ConcurrentHashMap<>(); 324 private ConcurrentMap<String, LongAdder> perTableFailuresCount = new ConcurrentHashMap<>(); 325 326 public ConcurrentMap<ServerName, LongAdder> getPerServerFailuresCount() { 327 return perServerFailuresCount; 328 } 329 330 public ConcurrentMap<String, LongAdder> getPerTableFailuresCount() { 331 return perTableFailuresCount; 332 } 333 334 public void resetFailuresCountDetails() { 335 perServerFailuresCount.clear(); 336 perTableFailuresCount.clear(); 337 } 338 339 private void incFailuresCountDetails(ServerName serverName, RegionInfo region) { 340 if (serverName != null) { 341 perServerFailuresCount.compute(serverName, (server, count) -> { 342 if (count == null) { 343 count = new LongAdder(); 344 } 345 count.increment(); 346 return count; 347 }); 348 } 349 perTableFailuresCount.compute(region.getTable().getNameAsString(), (tableName, count) -> { 350 if (count == null) { 351 count = new LongAdder(); 352 } 353 count.increment(); 354 return count; 355 }); 356 } 357 358 public void publishReadFailure(ServerName serverName, RegionInfo region, Exception e) { 359 LOG.error("Read from {} on serverName={} failed", region.getRegionNameAsString(), serverName, 360 e); 361 incReadFailureCount(); 362 incFailuresCountDetails(serverName, region); 363 } 364 365 public void publishReadFailure(ServerName serverName, RegionInfo region, 366 ColumnFamilyDescriptor column, Exception e) { 367 LOG.error("Read from {} on serverName={}, columnFamily={} failed", 368 region.getRegionNameAsString(), serverName, column.getNameAsString(), e); 369 incReadFailureCount(); 370 incFailuresCountDetails(serverName, region); 371 } 372 373 public void publishReadTiming(ServerName serverName, RegionInfo region, 374 ColumnFamilyDescriptor column, long msTime) { 375 RegionTaskResult rtr = new RegionTaskResult(region, region.getTable(), serverName, column); 376 rtr.setReadSuccess(); 377 rtr.setReadLatency(msTime); 378 List<RegionTaskResult> rtrs = regionMap.get(region.getRegionNameAsString()); 379 rtrs.add(rtr); 380 // Note that read success count will be equal to total column family read successes. 381 incReadSuccessCount(); 382 LOG.info("Read from {} on {} {} in {}ms", region.getRegionNameAsString(), serverName, 383 column.getNameAsString(), msTime); 384 } 385 386 public void publishWriteFailure(ServerName serverName, RegionInfo region, Exception e) { 387 LOG.error("Write to {} on {} failed", region.getRegionNameAsString(), serverName, e); 388 incWriteFailureCount(); 389 incFailuresCountDetails(serverName, region); 390 } 391 392 public void publishWriteFailure(ServerName serverName, RegionInfo region, 393 ColumnFamilyDescriptor column, Exception e) { 394 LOG.error("Write to {} on {} {} failed", region.getRegionNameAsString(), serverName, 395 column.getNameAsString(), e); 396 incWriteFailureCount(); 397 incFailuresCountDetails(serverName, region); 398 } 399 400 public void publishWriteTiming(ServerName serverName, RegionInfo region, 401 ColumnFamilyDescriptor column, long msTime) { 402 RegionTaskResult rtr = new RegionTaskResult(region, region.getTable(), serverName, column); 403 rtr.setWriteSuccess(); 404 rtr.setWriteLatency(msTime); 405 List<RegionTaskResult> rtrs = regionMap.get(region.getRegionNameAsString()); 406 rtrs.add(rtr); 407 // Note that write success count will be equal to total column family write successes. 408 incWriteSuccessCount(); 409 LOG.info("Write to {} on {} {} in {}ms", region.getRegionNameAsString(), serverName, 410 column.getNameAsString(), msTime); 411 } 412 413 public Map<String, LongAdder> getReadLatencyMap() { 414 return this.perTableReadLatency; 415 } 416 417 public LongAdder initializeAndGetReadLatencyForTable(String tableName) { 418 LongAdder initLatency = new LongAdder(); 419 this.perTableReadLatency.put(tableName, initLatency); 420 return initLatency; 421 } 422 423 public void initializeWriteLatency() { 424 this.writeLatency.reset(); 425 } 426 427 public LongAdder getWriteLatency() { 428 return this.writeLatency; 429 } 430 431 public ConcurrentMap<String, List<RegionTaskResult>> getRegionMap() { 432 return this.regionMap; 433 } 434 435 public int getTotalExpectedRegions() { 436 return this.regionMap.size(); 437 } 438 } 439 440 /** 441 * Run a single zookeeper Task and then exit. 442 */ 443 static class ZookeeperTask implements Callable<Void> { 444 private final Connection connection; 445 private final String host; 446 private String znode; 447 private final int timeout; 448 private ZookeeperStdOutSink sink; 449 450 public ZookeeperTask(Connection connection, String host, String znode, int timeout, 451 ZookeeperStdOutSink sink) { 452 this.connection = connection; 453 this.host = host; 454 this.znode = znode; 455 this.timeout = timeout; 456 this.sink = sink; 457 } 458 459 @Override 460 public Void call() throws Exception { 461 if (this.sink.isStopped()) { 462 return null; 463 } 464 ZooKeeper zooKeeper = null; 465 try { 466 zooKeeper = new ZooKeeper(host, timeout, EmptyWatcher.instance); 467 Stat exists = zooKeeper.exists(znode, false); 468 StopWatch stopwatch = new StopWatch(); 469 stopwatch.start(); 470 zooKeeper.getData(znode, false, exists); 471 stopwatch.stop(); 472 sink.publishReadTiming(znode, host, stopwatch.getTime()); 473 } catch (KeeperException | InterruptedException e) { 474 sink.publishReadFailure(znode, host); 475 } finally { 476 if (zooKeeper != null) { 477 zooKeeper.close(); 478 } 479 } 480 return null; 481 } 482 } 483 484 /** 485 * Run a single Region Task and then exit. For each column family of the Region, get one row and 486 * output latency or failure. 487 */ 488 static class RegionTask implements Callable<Void> { 489 public enum TaskType { 490 READ, 491 WRITE 492 } 493 494 private Connection connection; 495 private RegionInfo region; 496 private RegionStdOutSink sink; 497 private TaskType taskType; 498 private boolean rawScanEnabled; 499 private ServerName serverName; 500 private LongAdder readWriteLatency; 501 private boolean readAllCF; 502 503 RegionTask(Connection connection, RegionInfo region, ServerName serverName, 504 RegionStdOutSink sink, TaskType taskType, boolean rawScanEnabled, LongAdder rwLatency, 505 boolean readAllCF) { 506 this.connection = connection; 507 this.region = region; 508 this.serverName = serverName; 509 this.sink = sink; 510 this.taskType = taskType; 511 this.rawScanEnabled = rawScanEnabled; 512 this.readWriteLatency = rwLatency; 513 this.readAllCF = readAllCF; 514 } 515 516 @Override 517 public Void call() { 518 if (this.sink.isStopped()) { 519 return null; 520 } 521 switch (taskType) { 522 case READ: 523 return read(); 524 case WRITE: 525 return write(); 526 default: 527 return read(); 528 } 529 } 530 531 private Void readColumnFamily(Table table, ColumnFamilyDescriptor column) { 532 byte[] startKey = null; 533 Scan scan = null; 534 ResultScanner rs = null; 535 StopWatch stopWatch = new StopWatch(); 536 startKey = region.getStartKey(); 537 // Can't do a get on empty start row so do a Scan of first element if any instead. 538 if (startKey.length > 0) { 539 Get get = new Get(startKey); 540 get.setCacheBlocks(false); 541 get.setFilter(new FirstKeyOnlyFilter()); 542 get.addFamily(column.getName()); 543 // Converting get object to scan to enable RAW SCAN. 544 // This will work for all the regions of the HBase tables except first region of the table. 545 scan = new Scan(get); 546 scan.setRaw(rawScanEnabled); 547 } else { 548 scan = new Scan(); 549 // In case of first region of the HBase Table, we do not have start-key for the region. 550 // For Region Canary, we only need to scan a single row/cell in the region to make sure that 551 // region is accessible. 552 // 553 // When HBase table has more than 1 empty regions at start of the row-key space, Canary will 554 // create multiple scan object to find first available row in the table by scanning all the 555 // regions in sequence until it can find first available row. 556 // 557 // This could result in multiple millions of scans based on the size of table and number of 558 // empty regions in sequence. In test environment, A table with no data and 1100 empty 559 // regions, Single canary run was creating close to half million to 1 million scans to 560 // successfully do canary run for the table. 561 // 562 // Since First region of the table doesn't have any start key, We should set End Key as 563 // stop row and set inclusive=false to limit scan to single region only. 564 // 565 // TODO : In future, we can streamline Canary behaviour for all the regions by doing scan 566 // with startRow inclusive and stopRow exclusive instead of different behaviour for First 567 // Region of the table and rest of the region of the table. This way implementation is 568 // simplified. As of now this change has been kept minimal to avoid any unnecessary 569 // perf impact. 570 scan.withStopRow(region.getEndKey(), false); 571 LOG.debug("rawScan {} for {}", rawScanEnabled, region.getTable()); 572 scan.setRaw(rawScanEnabled); 573 scan.setCaching(1); 574 scan.setCacheBlocks(false); 575 scan.setFilter(new FirstKeyOnlyFilter()); 576 scan.addFamily(column.getName()); 577 scan.setMaxResultSize(1L); 578 scan.setOneRowLimit(); 579 } 580 LOG.debug("Reading from {} {} {} {}", region.getTable(), region.getRegionNameAsString(), 581 column.getNameAsString(), Bytes.toStringBinary(startKey)); 582 try { 583 stopWatch.start(); 584 rs = table.getScanner(scan); 585 rs.next(); 586 stopWatch.stop(); 587 this.readWriteLatency.add(stopWatch.getTime()); 588 sink.publishReadTiming(serverName, region, column, stopWatch.getTime()); 589 } catch (Exception e) { 590 sink.publishReadFailure(serverName, region, column, e); 591 sink.updateReadFailures(region.getRegionNameAsString(), 592 serverName == null ? "NULL" : serverName.getHostname()); 593 } finally { 594 if (rs != null) { 595 rs.close(); 596 } 597 } 598 return null; 599 } 600 601 private ColumnFamilyDescriptor randomPickOneColumnFamily(ColumnFamilyDescriptor[] cfs) { 602 int size = cfs.length; 603 return cfs[ThreadLocalRandom.current().nextInt(size)]; 604 605 } 606 607 public Void read() { 608 Table table = null; 609 TableDescriptor tableDesc = null; 610 try { 611 LOG.debug("Reading table descriptor for table {}", region.getTable()); 612 table = connection.getTable(region.getTable()); 613 tableDesc = table.getDescriptor(); 614 } catch (IOException e) { 615 LOG.debug("sniffRegion {} of {} failed", region.getEncodedName(), e); 616 sink.publishReadFailure(serverName, region, e); 617 if (table != null) { 618 try { 619 table.close(); 620 } catch (IOException ioe) { 621 LOG.error("Close table failed", e); 622 } 623 } 624 return null; 625 } 626 627 if (readAllCF) { 628 for (ColumnFamilyDescriptor column : tableDesc.getColumnFamilies()) { 629 readColumnFamily(table, column); 630 } 631 } else { 632 readColumnFamily(table, randomPickOneColumnFamily(tableDesc.getColumnFamilies())); 633 } 634 try { 635 table.close(); 636 } catch (IOException e) { 637 LOG.error("Close table failed", e); 638 } 639 return null; 640 } 641 642 /** 643 * Check writes for the canary table 644 */ 645 private Void write() { 646 Table table = null; 647 TableDescriptor tableDesc = null; 648 try { 649 table = connection.getTable(region.getTable()); 650 tableDesc = table.getDescriptor(); 651 byte[] rowToCheck = region.getStartKey(); 652 if (rowToCheck.length == 0) { 653 rowToCheck = new byte[] { 0x0 }; 654 } 655 int writeValueSize = 656 connection.getConfiguration().getInt(HConstants.HBASE_CANARY_WRITE_VALUE_SIZE_KEY, 10); 657 for (ColumnFamilyDescriptor column : tableDesc.getColumnFamilies()) { 658 Put put = new Put(rowToCheck); 659 byte[] value = new byte[writeValueSize]; 660 Bytes.random(value); 661 put.addColumn(column.getName(), HConstants.EMPTY_BYTE_ARRAY, value); 662 LOG.debug("Writing to {} {} {} {}", tableDesc.getTableName(), 663 region.getRegionNameAsString(), column.getNameAsString(), 664 Bytes.toStringBinary(rowToCheck)); 665 try { 666 long startTime = EnvironmentEdgeManager.currentTime(); 667 table.put(put); 668 long time = EnvironmentEdgeManager.currentTime() - startTime; 669 this.readWriteLatency.add(time); 670 sink.publishWriteTiming(serverName, region, column, time); 671 } catch (Exception e) { 672 sink.publishWriteFailure(serverName, region, column, e); 673 } 674 } 675 table.close(); 676 } catch (IOException e) { 677 sink.publishWriteFailure(serverName, region, e); 678 sink.updateWriteFailures(region.getRegionNameAsString(), serverName.getHostname()); 679 } 680 return null; 681 } 682 } 683 684 /** 685 * Run a single RegionServer Task and then exit. Get one row from a region on the regionserver and 686 * output latency or the failure. 687 */ 688 static class RegionServerTask implements Callable<Void> { 689 private Connection connection; 690 private String serverName; 691 private RegionInfo region; 692 private RegionServerStdOutSink sink; 693 private Boolean rawScanEnabled; 694 private AtomicLong successes; 695 696 RegionServerTask(Connection connection, String serverName, RegionInfo region, 697 RegionServerStdOutSink sink, Boolean rawScanEnabled, AtomicLong successes) { 698 this.connection = connection; 699 this.serverName = serverName; 700 this.region = region; 701 this.sink = sink; 702 this.rawScanEnabled = rawScanEnabled; 703 this.successes = successes; 704 } 705 706 @Override 707 public Void call() { 708 if (this.sink.isStopped()) { 709 return null; 710 } 711 TableName tableName = null; 712 Table table = null; 713 Get get = null; 714 byte[] startKey = null; 715 Scan scan = null; 716 StopWatch stopWatch = new StopWatch(); 717 // monitor one region on every region server 718 stopWatch.reset(); 719 try { 720 tableName = region.getTable(); 721 table = connection.getTable(tableName); 722 startKey = region.getStartKey(); 723 // Can't do a get on empty start row so do a Scan of first element if any instead. 724 LOG.debug("Reading from {} {} {} {}", serverName, region.getTable(), 725 region.getRegionNameAsString(), Bytes.toStringBinary(startKey)); 726 if (startKey.length > 0) { 727 get = new Get(startKey); 728 get.setCacheBlocks(false); 729 get.setFilter(new FirstKeyOnlyFilter()); 730 // Converting get object to scan to enable RAW SCAN. 731 // This will work for all the regions of the HBase tables except first region. 732 scan = new Scan(get); 733 734 } else { 735 scan = new Scan(); 736 // In case of first region of the HBase Table, we do not have start-key for the region. 737 // For Region Canary, we only need scan a single row/cell in the region to make sure that 738 // region is accessible. 739 // 740 // When HBase table has more than 1 empty regions at start of the row-key space, Canary 741 // will create multiple scan object to find first available row in the table by scanning 742 // all the regions in sequence until it can find first available row. 743 // 744 // Since First region of the table doesn't have any start key, We should set End Key as 745 // stop row and set inclusive=false to limit scan to first region only. 746 scan.withStopRow(region.getEndKey(), false); 747 scan.setCacheBlocks(false); 748 scan.setFilter(new FirstKeyOnlyFilter()); 749 scan.setCaching(1); 750 scan.setMaxResultSize(1L); 751 scan.setOneRowLimit(); 752 } 753 scan.setRaw(rawScanEnabled); 754 stopWatch.start(); 755 ResultScanner s = table.getScanner(scan); 756 s.next(); 757 s.close(); 758 stopWatch.stop(); 759 successes.incrementAndGet(); 760 sink.publishReadTiming(tableName.getNameAsString(), serverName, stopWatch.getTime()); 761 } catch (TableNotFoundException tnfe) { 762 LOG.error("Table may be deleted", tnfe); 763 // This is ignored because it doesn't imply that the regionserver is dead 764 } catch (TableNotEnabledException tnee) { 765 // This is considered a success since we got a response. 766 successes.incrementAndGet(); 767 LOG.debug("The targeted table was disabled. Assuming success."); 768 } catch (DoNotRetryIOException dnrioe) { 769 sink.publishReadFailure(tableName.getNameAsString(), serverName); 770 LOG.error(dnrioe.toString(), dnrioe); 771 } catch (IOException e) { 772 sink.publishReadFailure(tableName.getNameAsString(), serverName); 773 LOG.error(e.toString(), e); 774 } finally { 775 if (table != null) { 776 try { 777 table.close(); 778 } catch (IOException e) {/* DO NOTHING */ 779 LOG.error("Close table failed", e); 780 } 781 } 782 scan = null; 783 get = null; 784 startKey = null; 785 } 786 return null; 787 } 788 } 789 790 private static final int USAGE_EXIT_CODE = 1; 791 private static final int INIT_ERROR_EXIT_CODE = 2; 792 private static final int TIMEOUT_ERROR_EXIT_CODE = 3; 793 private static final int ERROR_EXIT_CODE = 4; 794 private static final int FAILURE_EXIT_CODE = 5; 795 796 private static final long DEFAULT_INTERVAL = 60000; 797 798 private static final long DEFAULT_TIMEOUT = 600000; // 10 mins 799 private static final int MAX_THREADS_NUM = 16; // #threads to contact regions 800 801 private static final Logger LOG = LoggerFactory.getLogger(Canary.class); 802 803 public static final TableName DEFAULT_WRITE_TABLE_NAME = 804 TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "canary"); 805 806 private static final String CANARY_TABLE_FAMILY_NAME = "Test"; 807 808 private Configuration conf = null; 809 private long interval = 0; 810 private Sink sink = null; 811 812 /** 813 * True if we are to run in 'regionServer' mode. 814 */ 815 private boolean regionServerMode = false; 816 817 /** 818 * True if we are to run in zookeeper 'mode'. 819 */ 820 private boolean zookeeperMode = false; 821 822 /** 823 * This is a Map of table to timeout. The timeout is for reading all regions in the table; i.e. we 824 * aggregate time to fetch each region and it needs to be less than this value else we log an 825 * ERROR. 826 */ 827 private HashMap<String, Long> configuredReadTableTimeouts = new HashMap<>(); 828 829 public static final String HBASE_CANARY_REGIONSERVER_ALL_REGIONS = 830 "hbase.canary.regionserver_all_regions"; 831 832 public static final String HBASE_CANARY_REGION_WRITE_SNIFFING = 833 "hbase.canary.region.write.sniffing"; 834 public static final String HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT = 835 "hbase.canary.region.write.table.timeout"; 836 public static final String HBASE_CANARY_REGION_WRITE_TABLE_NAME = 837 "hbase.canary.region.write.table.name"; 838 public static final String HBASE_CANARY_REGION_READ_TABLE_TIMEOUT = 839 "hbase.canary.region.read.table.timeout"; 840 841 public static final String HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES = 842 "hbase.canary.zookeeper.permitted.failures"; 843 844 public static final String HBASE_CANARY_USE_REGEX = "hbase.canary.use.regex"; 845 public static final String HBASE_CANARY_TIMEOUT = "hbase.canary.timeout"; 846 public static final String HBASE_CANARY_FAIL_ON_ERROR = "hbase.canary.fail.on.error"; 847 848 private ExecutorService executor; // threads to retrieve data from regionservers 849 850 public CanaryTool() { 851 this(new ScheduledThreadPoolExecutor(1)); 852 } 853 854 public CanaryTool(ExecutorService executor) { 855 this(executor, null); 856 } 857 858 @InterfaceAudience.Private 859 CanaryTool(ExecutorService executor, Sink sink) { 860 this.executor = executor; 861 this.sink = sink; 862 } 863 864 CanaryTool(Configuration conf, ExecutorService executor) { 865 this(conf, executor, null); 866 } 867 868 CanaryTool(Configuration conf, ExecutorService executor, Sink sink) { 869 this(executor, sink); 870 setConf(conf); 871 } 872 873 @Override 874 public Configuration getConf() { 875 return conf; 876 } 877 878 @Override 879 public void setConf(Configuration conf) { 880 if (conf == null) { 881 conf = HBaseConfiguration.create(); 882 } 883 this.conf = conf; 884 } 885 886 private int parseArgs(String[] args) { 887 int index = -1; 888 long permittedFailures = 0; 889 boolean regionServerAllRegions = false, writeSniffing = false; 890 String readTableTimeoutsStr = null; 891 // Process command line args 892 for (int i = 0; i < args.length; i++) { 893 String cmd = args[i]; 894 if (cmd.startsWith("-")) { 895 if (index >= 0) { 896 // command line args must be in the form: [opts] [table 1 [table 2 ...]] 897 System.err.println("Invalid command line options"); 898 printUsageAndExit(); 899 } 900 if (cmd.equals("-help") || cmd.equals("-h")) { 901 // user asked for help, print the help and quit. 902 printUsageAndExit(); 903 } else if (cmd.equals("-daemon") && interval == 0) { 904 // user asked for daemon mode, set a default interval between checks 905 interval = DEFAULT_INTERVAL; 906 } else if (cmd.equals("-interval")) { 907 // user has specified an interval for canary breaths (-interval N) 908 i++; 909 910 if (i == args.length) { 911 System.err.println("-interval takes a numeric seconds value argument."); 912 printUsageAndExit(); 913 } 914 try { 915 interval = Long.parseLong(args[i]) * 1000; 916 } catch (NumberFormatException e) { 917 System.err.println("-interval needs a numeric value argument."); 918 printUsageAndExit(); 919 } 920 } else if (cmd.equals("-zookeeper")) { 921 this.zookeeperMode = true; 922 } else if (cmd.equals("-regionserver")) { 923 this.regionServerMode = true; 924 } else if (cmd.equals("-allRegions")) { 925 conf.setBoolean(HBASE_CANARY_REGIONSERVER_ALL_REGIONS, true); 926 regionServerAllRegions = true; 927 } else if (cmd.equals("-writeSniffing")) { 928 writeSniffing = true; 929 conf.setBoolean(HBASE_CANARY_REGION_WRITE_SNIFFING, true); 930 } else if (cmd.equals("-treatFailureAsError") || cmd.equals("-failureAsError")) { 931 conf.setBoolean(HBASE_CANARY_FAIL_ON_ERROR, true); 932 } else if (cmd.equals("-e")) { 933 conf.setBoolean(HBASE_CANARY_USE_REGEX, true); 934 } else if (cmd.equals("-t")) { 935 i++; 936 937 if (i == args.length) { 938 System.err.println("-t takes a numeric milliseconds value argument."); 939 printUsageAndExit(); 940 } 941 long timeout = 0; 942 try { 943 timeout = Long.parseLong(args[i]); 944 } catch (NumberFormatException e) { 945 System.err.println("-t takes a numeric milliseconds value argument."); 946 printUsageAndExit(); 947 } 948 conf.setLong(HBASE_CANARY_TIMEOUT, timeout); 949 } else if (cmd.equals("-writeTableTimeout")) { 950 i++; 951 952 if (i == args.length) { 953 System.err.println("-writeTableTimeout takes a numeric milliseconds value argument."); 954 printUsageAndExit(); 955 } 956 long configuredWriteTableTimeout = 0; 957 try { 958 configuredWriteTableTimeout = Long.parseLong(args[i]); 959 } catch (NumberFormatException e) { 960 System.err.println("-writeTableTimeout takes a numeric milliseconds value argument."); 961 printUsageAndExit(); 962 } 963 conf.setLong(HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT, configuredWriteTableTimeout); 964 } else if (cmd.equals("-writeTable")) { 965 i++; 966 967 if (i == args.length) { 968 System.err.println("-writeTable takes a string tablename value argument."); 969 printUsageAndExit(); 970 } 971 conf.set(HBASE_CANARY_REGION_WRITE_TABLE_NAME, args[i]); 972 } else if (cmd.equals("-f")) { 973 i++; 974 if (i == args.length) { 975 System.err.println("-f needs a boolean value argument (true|false)."); 976 printUsageAndExit(); 977 } 978 979 conf.setBoolean(HBASE_CANARY_FAIL_ON_ERROR, Boolean.parseBoolean(args[i])); 980 } else if (cmd.equals("-readTableTimeouts")) { 981 i++; 982 if (i == args.length) { 983 System.err.println("-readTableTimeouts needs a comma-separated list of read " 984 + "millisecond timeouts per table (without spaces)."); 985 printUsageAndExit(); 986 } 987 readTableTimeoutsStr = args[i]; 988 conf.set(HBASE_CANARY_REGION_READ_TABLE_TIMEOUT, readTableTimeoutsStr); 989 } else if (cmd.equals("-permittedZookeeperFailures")) { 990 i++; 991 992 if (i == args.length) { 993 System.err.println("-permittedZookeeperFailures needs a numeric value argument."); 994 printUsageAndExit(); 995 } 996 try { 997 permittedFailures = Long.parseLong(args[i]); 998 } catch (NumberFormatException e) { 999 System.err.println("-permittedZookeeperFailures needs a numeric value argument."); 1000 printUsageAndExit(); 1001 } 1002 conf.setLong(HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES, permittedFailures); 1003 } else { 1004 // no options match 1005 System.err.println(cmd + " options is invalid."); 1006 printUsageAndExit(); 1007 } 1008 } else if (index < 0) { 1009 // keep track of first table name specified by the user 1010 index = i; 1011 } 1012 } 1013 if (regionServerAllRegions && !this.regionServerMode) { 1014 System.err.println("-allRegions can only be specified in regionserver mode."); 1015 printUsageAndExit(); 1016 } 1017 if (this.zookeeperMode) { 1018 if (this.regionServerMode || regionServerAllRegions || writeSniffing) { 1019 System.err.println("-zookeeper is exclusive and cannot be combined with " + "other modes."); 1020 printUsageAndExit(); 1021 } 1022 } 1023 if (permittedFailures != 0 && !this.zookeeperMode) { 1024 System.err.println("-permittedZookeeperFailures requires -zookeeper mode."); 1025 printUsageAndExit(); 1026 } 1027 if (readTableTimeoutsStr != null && (this.regionServerMode || this.zookeeperMode)) { 1028 System.err.println("-readTableTimeouts can only be configured in region mode."); 1029 printUsageAndExit(); 1030 } 1031 return index; 1032 } 1033 1034 @Override 1035 public int run(String[] args) throws Exception { 1036 int index = parseArgs(args); 1037 String[] monitorTargets = null; 1038 1039 if (index >= 0) { 1040 int length = args.length - index; 1041 monitorTargets = new String[length]; 1042 System.arraycopy(args, index, monitorTargets, 0, length); 1043 } 1044 if (interval > 0) { 1045 // Only show the web page in daemon mode 1046 putUpWebUI(); 1047 } 1048 if (zookeeperMode) { 1049 return checkZooKeeper(); 1050 } else if (regionServerMode) { 1051 return checkRegionServers(monitorTargets); 1052 } else { 1053 return checkRegions(monitorTargets); 1054 } 1055 } 1056 1057 private int runMonitor(String[] monitorTargets) throws Exception { 1058 ChoreService choreService = null; 1059 1060 // Launches chore for refreshing kerberos credentials if security is enabled. 1061 // Please see http://hbase.apache.org/book.html#_running_canary_in_a_kerberos_enabled_cluster 1062 // for more details. 1063 final ScheduledChore authChore = AuthUtil.getAuthChore(conf); 1064 if (authChore != null) { 1065 choreService = new ChoreService("CANARY_TOOL"); 1066 choreService.scheduleChore(authChore); 1067 } 1068 1069 // Start to prepare the stuffs 1070 Monitor monitor = null; 1071 Thread monitorThread; 1072 long startTime = 0; 1073 long currentTimeLength = 0; 1074 boolean failOnError = conf.getBoolean(HBASE_CANARY_FAIL_ON_ERROR, true); 1075 long timeout = conf.getLong(HBASE_CANARY_TIMEOUT, DEFAULT_TIMEOUT); 1076 // Get a connection to use in below. 1077 try (Connection connection = ConnectionFactory.createConnection(this.conf)) { 1078 do { 1079 // Do monitor !! 1080 try { 1081 monitor = this.newMonitor(connection, monitorTargets); 1082 startTime = EnvironmentEdgeManager.currentTime(); 1083 monitorThread = new Thread(monitor, "CanaryMonitor-" + startTime); 1084 monitorThread.start(); 1085 while (!monitor.isDone()) { 1086 // wait for 1 sec 1087 Thread.sleep(1000); 1088 // exit if any error occurs 1089 if (failOnError && monitor.hasError()) { 1090 monitorThread.interrupt(); 1091 if (monitor.initialized) { 1092 return monitor.errorCode; 1093 } else { 1094 return INIT_ERROR_EXIT_CODE; 1095 } 1096 } 1097 currentTimeLength = EnvironmentEdgeManager.currentTime() - startTime; 1098 if (currentTimeLength > timeout) { 1099 LOG.error("The monitor is running too long (" + currentTimeLength 1100 + ") after timeout limit:" + timeout + " will be killed itself !!"); 1101 monitorThread.interrupt(); 1102 if (monitor.initialized) { 1103 return TIMEOUT_ERROR_EXIT_CODE; 1104 } else { 1105 return INIT_ERROR_EXIT_CODE; 1106 } 1107 } 1108 } 1109 1110 if (failOnError && monitor.finalCheckForErrors()) { 1111 monitorThread.interrupt(); 1112 return monitor.errorCode; 1113 } 1114 } finally { 1115 if (monitor != null) { 1116 monitor.close(); 1117 } 1118 } 1119 1120 Thread.sleep(interval); 1121 } while (interval > 0); 1122 } // try-with-resources close 1123 1124 if (choreService != null) { 1125 choreService.shutdown(); 1126 } 1127 return monitor.errorCode; 1128 } 1129 1130 @Override 1131 public Map<String, String> getReadFailures() { 1132 return sink.getReadFailures(); 1133 } 1134 1135 @Override 1136 public Map<String, String> getWriteFailures() { 1137 return sink.getWriteFailures(); 1138 } 1139 1140 /** 1141 * Return a CanaryTool.Sink object containing the detailed results of the canary run. The Sink may 1142 * not have been created if a Monitor thread is not yet running. 1143 * @return the active Sink if one exists, null otherwise. 1144 */ 1145 public Sink getActiveSink() { 1146 return sink; 1147 } 1148 1149 private void printUsageAndExit() { 1150 System.err.println( 1151 "Usage: canary [OPTIONS] [<TABLE1> [<TABLE2]...] | [<REGIONSERVER1> [<REGIONSERVER2]..]"); 1152 System.err.println("Where [OPTIONS] are:"); 1153 System.err.println(" -h,-help show this help and exit."); 1154 System.err.println( 1155 " -regionserver set 'regionserver mode'; gets row from random region on " + "server"); 1156 System.err.println( 1157 " -allRegions get from ALL regions when 'regionserver mode', not just " + "random one."); 1158 System.err.println(" -zookeeper set 'zookeeper mode'; grab zookeeper.znode.parent on " 1159 + "each ensemble member"); 1160 System.err.println(" -daemon continuous check at defined intervals."); 1161 System.err.println(" -interval <N> interval between checks in seconds"); 1162 System.err 1163 .println(" -e consider table/regionserver argument as regular " + "expression"); 1164 System.err.println(" -f <B> exit on first error; default=true"); 1165 System.err.println(" -failureAsError treat read/write failure as error"); 1166 System.err.println(" -t <N> timeout for canary-test run; default=600000ms"); 1167 System.err.println(" -writeSniffing enable write sniffing"); 1168 System.err.println(" -writeTable the table used for write sniffing; default=hbase:canary"); 1169 System.err.println(" -writeTableTimeout <N> timeout for writeTable; default=600000ms"); 1170 System.err.println( 1171 " -readTableTimeouts <tableName>=<read timeout>," + "<tableName>=<read timeout>,..."); 1172 System.err 1173 .println(" comma-separated list of table read timeouts " + "(no spaces);"); 1174 System.err.println(" logs 'ERROR' if takes longer. default=600000ms"); 1175 System.err.println(" -permittedZookeeperFailures <N> Ignore first N failures attempting to "); 1176 System.err.println(" connect to individual zookeeper nodes in ensemble"); 1177 System.err.println(""); 1178 System.err.println(" -D<configProperty>=<value> to assign or override configuration params"); 1179 System.err.println(" -Dhbase.canary.read.raw.enabled=<true/false> Set to enable/disable " 1180 + "raw scan; default=false"); 1181 System.err.println( 1182 " -Dhbase.canary.info.port=PORT_NUMBER Set for a Canary UI; " + "default=-1 (None)"); 1183 System.err.println(""); 1184 System.err.println( 1185 "Canary runs in one of three modes: region (default), regionserver, or " + "zookeeper."); 1186 System.err.println("To sniff/probe all regions, pass no arguments."); 1187 System.err.println("To sniff/probe all regions of a table, pass tablename."); 1188 System.err.println("To sniff/probe regionservers, pass -regionserver, etc."); 1189 System.err.println("See http://hbase.apache.org/book.html#_canary for Canary documentation."); 1190 System.exit(USAGE_EXIT_CODE); 1191 } 1192 1193 Sink getSink(Configuration configuration, Class clazz) { 1194 // In test context, this.sink might be set. Use it if non-null. For testing. 1195 if (this.sink == null) { 1196 this.sink = (Sink) ReflectionUtils 1197 .newInstance(configuration.getClass("hbase.canary.sink.class", clazz, Sink.class)); 1198 } 1199 return this.sink; 1200 } 1201 1202 /** 1203 * Canary region mode-specific data structure which stores information about each region to be 1204 * scanned 1205 */ 1206 public static class RegionTaskResult { 1207 private RegionInfo region; 1208 private TableName tableName; 1209 private ServerName serverName; 1210 private ColumnFamilyDescriptor column; 1211 private AtomicLong readLatency = null; 1212 private AtomicLong writeLatency = null; 1213 private boolean readSuccess = false; 1214 private boolean writeSuccess = false; 1215 1216 public RegionTaskResult(RegionInfo region, TableName tableName, ServerName serverName, 1217 ColumnFamilyDescriptor column) { 1218 this.region = region; 1219 this.tableName = tableName; 1220 this.serverName = serverName; 1221 this.column = column; 1222 } 1223 1224 public RegionInfo getRegionInfo() { 1225 return this.region; 1226 } 1227 1228 public String getRegionNameAsString() { 1229 return this.region.getRegionNameAsString(); 1230 } 1231 1232 public TableName getTableName() { 1233 return this.tableName; 1234 } 1235 1236 public String getTableNameAsString() { 1237 return this.tableName.getNameAsString(); 1238 } 1239 1240 public ServerName getServerName() { 1241 return this.serverName; 1242 } 1243 1244 public String getServerNameAsString() { 1245 return this.serverName.getServerName(); 1246 } 1247 1248 public ColumnFamilyDescriptor getColumnFamily() { 1249 return this.column; 1250 } 1251 1252 public String getColumnFamilyNameAsString() { 1253 return this.column.getNameAsString(); 1254 } 1255 1256 public long getReadLatency() { 1257 if (this.readLatency == null) { 1258 return -1; 1259 } 1260 return this.readLatency.get(); 1261 } 1262 1263 public void setReadLatency(long readLatency) { 1264 if (this.readLatency != null) { 1265 this.readLatency.set(readLatency); 1266 } else { 1267 this.readLatency = new AtomicLong(readLatency); 1268 } 1269 } 1270 1271 public long getWriteLatency() { 1272 if (this.writeLatency == null) { 1273 return -1; 1274 } 1275 return this.writeLatency.get(); 1276 } 1277 1278 public void setWriteLatency(long writeLatency) { 1279 if (this.writeLatency != null) { 1280 this.writeLatency.set(writeLatency); 1281 } else { 1282 this.writeLatency = new AtomicLong(writeLatency); 1283 } 1284 } 1285 1286 public boolean isReadSuccess() { 1287 return this.readSuccess; 1288 } 1289 1290 public void setReadSuccess() { 1291 this.readSuccess = true; 1292 } 1293 1294 public boolean isWriteSuccess() { 1295 return this.writeSuccess; 1296 } 1297 1298 public void setWriteSuccess() { 1299 this.writeSuccess = true; 1300 } 1301 } 1302 1303 /** 1304 * A Factory method for {@link Monitor}. Makes a RegionServerMonitor, or a ZooKeeperMonitor, or a 1305 * RegionMonitor. 1306 * @return a Monitor instance 1307 */ 1308 private Monitor newMonitor(final Connection connection, String[] monitorTargets) { 1309 Monitor monitor; 1310 boolean useRegExp = conf.getBoolean(HBASE_CANARY_USE_REGEX, false); 1311 boolean regionServerAllRegions = conf.getBoolean(HBASE_CANARY_REGIONSERVER_ALL_REGIONS, false); 1312 boolean failOnError = conf.getBoolean(HBASE_CANARY_FAIL_ON_ERROR, true); 1313 int permittedFailures = conf.getInt(HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES, 0); 1314 boolean writeSniffing = conf.getBoolean(HBASE_CANARY_REGION_WRITE_SNIFFING, false); 1315 String writeTableName = 1316 conf.get(HBASE_CANARY_REGION_WRITE_TABLE_NAME, DEFAULT_WRITE_TABLE_NAME.getNameAsString()); 1317 long configuredWriteTableTimeout = 1318 conf.getLong(HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT, DEFAULT_TIMEOUT); 1319 1320 if (this.regionServerMode) { 1321 monitor = new RegionServerMonitor(connection, monitorTargets, useRegExp, 1322 getSink(connection.getConfiguration(), RegionServerStdOutSink.class), this.executor, 1323 regionServerAllRegions, failOnError, permittedFailures); 1324 1325 } else if (this.zookeeperMode) { 1326 monitor = new ZookeeperMonitor(connection, monitorTargets, useRegExp, 1327 getSink(connection.getConfiguration(), ZookeeperStdOutSink.class), this.executor, 1328 failOnError, permittedFailures); 1329 } else { 1330 monitor = new RegionMonitor(connection, monitorTargets, useRegExp, 1331 getSink(connection.getConfiguration(), RegionStdOutSink.class), this.executor, 1332 writeSniffing, TableName.valueOf(writeTableName), failOnError, configuredReadTableTimeouts, 1333 configuredWriteTableTimeout, permittedFailures); 1334 } 1335 return monitor; 1336 } 1337 1338 private void populateReadTableTimeoutsMap(String configuredReadTableTimeoutsStr) { 1339 String[] tableTimeouts = configuredReadTableTimeoutsStr.split(","); 1340 for (String tT : tableTimeouts) { 1341 String[] nameTimeout = tT.split("="); 1342 if (nameTimeout.length < 2) { 1343 throw new IllegalArgumentException("Each -readTableTimeouts argument must be of the form " 1344 + "<tableName>=<read timeout> (without spaces)."); 1345 } 1346 long timeoutVal; 1347 try { 1348 timeoutVal = Long.parseLong(nameTimeout[1]); 1349 } catch (NumberFormatException e) { 1350 throw new IllegalArgumentException( 1351 "-readTableTimeouts read timeout for each table" + " must be a numeric value argument."); 1352 } 1353 configuredReadTableTimeouts.put(nameTimeout[0], timeoutVal); 1354 } 1355 } 1356 1357 /** 1358 * A Monitor super-class can be extended by users 1359 */ 1360 public static abstract class Monitor implements Runnable, Closeable { 1361 protected Connection connection; 1362 protected Admin admin; 1363 /** 1364 * 'Target' dependent on 'mode'. Could be Tables or RegionServers or ZNodes. Passed on the 1365 * command-line as arguments. 1366 */ 1367 protected String[] targets; 1368 protected boolean useRegExp; 1369 protected boolean treatFailureAsError; 1370 protected boolean initialized = false; 1371 1372 protected boolean done = false; 1373 protected int errorCode = 0; 1374 protected long allowedFailures = 0; 1375 protected Sink sink; 1376 protected ExecutorService executor; 1377 1378 public boolean isDone() { 1379 return done; 1380 } 1381 1382 public boolean hasError() { 1383 return errorCode != 0; 1384 } 1385 1386 public boolean finalCheckForErrors() { 1387 if (errorCode != 0) { 1388 return true; 1389 } 1390 if ( 1391 treatFailureAsError && (sink.getReadFailureCount() > allowedFailures 1392 || sink.getWriteFailureCount() > allowedFailures) 1393 ) { 1394 LOG.error("Too many failures detected, treating failure as error, failing the Canary."); 1395 errorCode = FAILURE_EXIT_CODE; 1396 return true; 1397 } 1398 return false; 1399 } 1400 1401 @Override 1402 public void close() throws IOException { 1403 this.sink.stop(); 1404 if (this.admin != null) { 1405 this.admin.close(); 1406 } 1407 } 1408 1409 protected Monitor(Connection connection, String[] monitorTargets, boolean useRegExp, Sink sink, 1410 ExecutorService executor, boolean treatFailureAsError, long allowedFailures) { 1411 if (null == connection) { 1412 throw new IllegalArgumentException("connection shall not be null"); 1413 } 1414 1415 this.connection = connection; 1416 this.targets = monitorTargets; 1417 this.useRegExp = useRegExp; 1418 this.treatFailureAsError = treatFailureAsError; 1419 this.sink = sink; 1420 this.executor = executor; 1421 this.allowedFailures = allowedFailures; 1422 } 1423 1424 @Override 1425 public abstract void run(); 1426 1427 protected boolean initAdmin() { 1428 if (null == this.admin) { 1429 try { 1430 this.admin = this.connection.getAdmin(); 1431 } catch (Exception e) { 1432 LOG.error("Initial HBaseAdmin failed...", e); 1433 this.errorCode = INIT_ERROR_EXIT_CODE; 1434 } 1435 } else if (admin.isAborted()) { 1436 LOG.error("HBaseAdmin aborted"); 1437 this.errorCode = INIT_ERROR_EXIT_CODE; 1438 } 1439 return !this.hasError(); 1440 } 1441 } 1442 1443 /** 1444 * A monitor for region mode. 1445 */ 1446 private static class RegionMonitor extends Monitor { 1447 // 10 minutes 1448 private static final int DEFAULT_WRITE_TABLE_CHECK_PERIOD = 10 * 60 * 1000; 1449 // 1 days 1450 private static final int DEFAULT_WRITE_DATA_TTL = 24 * 60 * 60; 1451 1452 private long lastCheckTime = -1; 1453 private boolean writeSniffing; 1454 private TableName writeTableName; 1455 private int writeDataTTL; 1456 private float regionsLowerLimit; 1457 private float regionsUpperLimit; 1458 private int checkPeriod; 1459 private boolean rawScanEnabled; 1460 private boolean readAllCF; 1461 1462 /** 1463 * This is a timeout per table. If read of each region in the table aggregated takes longer than 1464 * what is configured here, we log an ERROR rather than just an INFO. 1465 */ 1466 private HashMap<String, Long> configuredReadTableTimeouts; 1467 1468 private long configuredWriteTableTimeout; 1469 1470 public RegionMonitor(Connection connection, String[] monitorTargets, boolean useRegExp, 1471 Sink sink, ExecutorService executor, boolean writeSniffing, TableName writeTableName, 1472 boolean treatFailureAsError, HashMap<String, Long> configuredReadTableTimeouts, 1473 long configuredWriteTableTimeout, long allowedFailures) { 1474 super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError, 1475 allowedFailures); 1476 Configuration conf = connection.getConfiguration(); 1477 this.writeSniffing = writeSniffing; 1478 this.writeTableName = writeTableName; 1479 this.writeDataTTL = 1480 conf.getInt(HConstants.HBASE_CANARY_WRITE_DATA_TTL_KEY, DEFAULT_WRITE_DATA_TTL); 1481 this.regionsLowerLimit = 1482 conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY, 1.0f); 1483 this.regionsUpperLimit = 1484 conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_UPPERLIMIT_KEY, 1.5f); 1485 this.checkPeriod = conf.getInt(HConstants.HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY, 1486 DEFAULT_WRITE_TABLE_CHECK_PERIOD); 1487 this.rawScanEnabled = conf.getBoolean(HConstants.HBASE_CANARY_READ_RAW_SCAN_KEY, false); 1488 this.configuredReadTableTimeouts = new HashMap<>(configuredReadTableTimeouts); 1489 this.configuredWriteTableTimeout = configuredWriteTableTimeout; 1490 this.readAllCF = conf.getBoolean(HConstants.HBASE_CANARY_READ_ALL_CF, true); 1491 } 1492 1493 private RegionStdOutSink getSink() { 1494 if (!(sink instanceof RegionStdOutSink)) { 1495 throw new RuntimeException("Can only write to Region sink"); 1496 } 1497 return ((RegionStdOutSink) sink); 1498 } 1499 1500 @Override 1501 public void run() { 1502 if (this.initAdmin()) { 1503 try { 1504 List<Future<Void>> taskFutures = new LinkedList<>(); 1505 RegionStdOutSink regionSink = this.getSink(); 1506 regionSink.resetFailuresCountDetails(); 1507 if (this.targets != null && this.targets.length > 0) { 1508 String[] tables = generateMonitorTables(this.targets); 1509 // Check to see that each table name passed in the -readTableTimeouts argument is also 1510 // passed as a monitor target. 1511 if ( 1512 !new HashSet<>(Arrays.asList(tables)) 1513 .containsAll(this.configuredReadTableTimeouts.keySet()) 1514 ) { 1515 LOG.error("-readTableTimeouts can only specify read timeouts for monitor targets " 1516 + "passed via command line."); 1517 this.errorCode = USAGE_EXIT_CODE; 1518 return; 1519 } 1520 this.initialized = true; 1521 for (String table : tables) { 1522 LongAdder readLatency = regionSink.initializeAndGetReadLatencyForTable(table); 1523 taskFutures.addAll(CanaryTool.sniff(admin, regionSink, table, executor, TaskType.READ, 1524 this.rawScanEnabled, readLatency, readAllCF)); 1525 } 1526 } else { 1527 taskFutures.addAll(sniff(TaskType.READ, regionSink)); 1528 } 1529 1530 if (writeSniffing) { 1531 if (EnvironmentEdgeManager.currentTime() - lastCheckTime > checkPeriod) { 1532 try { 1533 checkWriteTableDistribution(); 1534 } catch (IOException e) { 1535 LOG.error("Check canary table distribution failed!", e); 1536 } 1537 lastCheckTime = EnvironmentEdgeManager.currentTime(); 1538 } 1539 // sniff canary table with write operation 1540 regionSink.initializeWriteLatency(); 1541 LongAdder writeTableLatency = regionSink.getWriteLatency(); 1542 taskFutures 1543 .addAll(CanaryTool.sniff(admin, regionSink, admin.getDescriptor(writeTableName), 1544 executor, TaskType.WRITE, this.rawScanEnabled, writeTableLatency, readAllCF)); 1545 } 1546 1547 for (Future<Void> future : taskFutures) { 1548 try { 1549 future.get(); 1550 } catch (ExecutionException e) { 1551 LOG.error("Sniff region failed!", e); 1552 } 1553 } 1554 Map<String, LongAdder> actualReadTableLatency = regionSink.getReadLatencyMap(); 1555 for (Map.Entry<String, Long> entry : configuredReadTableTimeouts.entrySet()) { 1556 String tableName = entry.getKey(); 1557 if (actualReadTableLatency.containsKey(tableName)) { 1558 Long actual = actualReadTableLatency.get(tableName).longValue(); 1559 Long configured = entry.getValue(); 1560 if (actual > configured) { 1561 LOG.error("Read operation for {} took {}ms exceeded the configured read timeout." 1562 + "(Configured read timeout {}ms.", tableName, actual, configured); 1563 } else { 1564 LOG.info("Read operation for {} took {}ms (Configured read timeout {}ms.", 1565 tableName, actual, configured); 1566 } 1567 } else { 1568 LOG.error("Read operation for {} failed!", tableName); 1569 } 1570 } 1571 if (this.writeSniffing) { 1572 String writeTableStringName = this.writeTableName.getNameAsString(); 1573 long actualWriteLatency = regionSink.getWriteLatency().longValue(); 1574 LOG.info("Write operation for {} took {}ms. Configured write timeout {}ms.", 1575 writeTableStringName, actualWriteLatency, this.configuredWriteTableTimeout); 1576 // Check that the writeTable write operation latency does not exceed the configured 1577 // timeout. 1578 if (actualWriteLatency > this.configuredWriteTableTimeout) { 1579 LOG.error("Write operation for {} exceeded the configured write timeout.", 1580 writeTableStringName); 1581 } 1582 } 1583 } catch (Exception e) { 1584 LOG.error("Run regionMonitor failed", e); 1585 this.errorCode = ERROR_EXIT_CODE; 1586 } finally { 1587 this.done = true; 1588 } 1589 } 1590 this.done = true; 1591 } 1592 1593 /** Returns List of tables to use in test. */ 1594 private String[] generateMonitorTables(String[] monitorTargets) throws IOException { 1595 String[] returnTables = null; 1596 1597 if (this.useRegExp) { 1598 Pattern pattern = null; 1599 List<TableDescriptor> tds = null; 1600 Set<String> tmpTables = new TreeSet<>(); 1601 try { 1602 LOG.debug(String.format("reading list of tables")); 1603 tds = this.admin.listTableDescriptors(pattern); 1604 if (tds == null) { 1605 tds = Collections.emptyList(); 1606 } 1607 for (String monitorTarget : monitorTargets) { 1608 pattern = Pattern.compile(monitorTarget); 1609 for (TableDescriptor td : tds) { 1610 if (pattern.matcher(td.getTableName().getNameAsString()).matches()) { 1611 tmpTables.add(td.getTableName().getNameAsString()); 1612 } 1613 } 1614 } 1615 } catch (IOException e) { 1616 LOG.error("Communicate with admin failed", e); 1617 throw e; 1618 } 1619 1620 if (tmpTables.size() > 0) { 1621 returnTables = tmpTables.toArray(new String[tmpTables.size()]); 1622 } else { 1623 String msg = "No HTable found, tablePattern:" + Arrays.toString(monitorTargets); 1624 LOG.error(msg); 1625 this.errorCode = INIT_ERROR_EXIT_CODE; 1626 throw new TableNotFoundException(msg); 1627 } 1628 } else { 1629 returnTables = monitorTargets; 1630 } 1631 1632 return returnTables; 1633 } 1634 1635 /* 1636 * Canary entry point to monitor all the tables. 1637 */ 1638 private List<Future<Void>> sniff(TaskType taskType, RegionStdOutSink regionSink) 1639 throws Exception { 1640 LOG.debug("Reading list of tables"); 1641 List<Future<Void>> taskFutures = new LinkedList<>(); 1642 for (TableDescriptor td : admin.listTableDescriptors()) { 1643 if ( 1644 admin.tableExists(td.getTableName()) && admin.isTableEnabled(td.getTableName()) 1645 && (!td.getTableName().equals(writeTableName)) 1646 ) { 1647 LongAdder readLatency = 1648 regionSink.initializeAndGetReadLatencyForTable(td.getTableName().getNameAsString()); 1649 taskFutures.addAll(CanaryTool.sniff(admin, sink, td, executor, taskType, 1650 this.rawScanEnabled, readLatency, readAllCF)); 1651 } 1652 } 1653 return taskFutures; 1654 } 1655 1656 private void checkWriteTableDistribution() throws IOException { 1657 if (!admin.tableExists(writeTableName)) { 1658 int numberOfServers = admin.getRegionServers().size(); 1659 if (numberOfServers == 0) { 1660 throw new IllegalStateException("No live regionservers"); 1661 } 1662 createWriteTable(numberOfServers); 1663 } 1664 1665 if (!admin.isTableEnabled(writeTableName)) { 1666 admin.enableTable(writeTableName); 1667 } 1668 1669 ClusterMetrics status = 1670 admin.getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER)); 1671 int numberOfServers = status.getServersName().size(); 1672 if (status.getServersName().contains(status.getMasterName())) { 1673 numberOfServers -= 1; 1674 } 1675 1676 List<Pair<RegionInfo, ServerName>> pairs = 1677 MetaTableAccessor.getTableRegionsAndLocations(connection, writeTableName); 1678 int numberOfRegions = pairs.size(); 1679 if ( 1680 numberOfRegions < numberOfServers * regionsLowerLimit 1681 || numberOfRegions > numberOfServers * regionsUpperLimit 1682 ) { 1683 admin.disableTable(writeTableName); 1684 admin.deleteTable(writeTableName); 1685 createWriteTable(numberOfServers); 1686 } 1687 HashSet<ServerName> serverSet = new HashSet<>(); 1688 for (Pair<RegionInfo, ServerName> pair : pairs) { 1689 serverSet.add(pair.getSecond()); 1690 } 1691 int numberOfCoveredServers = serverSet.size(); 1692 if (numberOfCoveredServers < numberOfServers) { 1693 admin.balance(); 1694 } 1695 } 1696 1697 private void createWriteTable(int numberOfServers) throws IOException { 1698 int numberOfRegions = (int) (numberOfServers * regionsLowerLimit); 1699 LOG.info("Number of live regionservers {}, pre-splitting the canary table into {} regions " 1700 + "(current lower limit of regions per server is {} and you can change it with config {}).", 1701 numberOfServers, numberOfRegions, regionsLowerLimit, 1702 HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY); 1703 ColumnFamilyDescriptor family = 1704 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(CANARY_TABLE_FAMILY_NAME)) 1705 .setMaxVersions(1).setTimeToLive(writeDataTTL).build(); 1706 TableDescriptor desc = 1707 TableDescriptorBuilder.newBuilder(writeTableName).setColumnFamily(family).build(); 1708 byte[][] splits = new RegionSplitter.HexStringSplit().split(numberOfRegions); 1709 admin.createTable(desc, splits); 1710 } 1711 } 1712 1713 /** 1714 * Canary entry point for specified table. 1715 * @throws Exception exception 1716 */ 1717 private static List<Future<Void>> sniff(final Admin admin, final Sink sink, String tableName, 1718 ExecutorService executor, TaskType taskType, boolean rawScanEnabled, LongAdder readLatency, 1719 boolean readAllCF) throws Exception { 1720 LOG.debug("Checking table is enabled and getting table descriptor for table {}", tableName); 1721 if (admin.isTableEnabled(TableName.valueOf(tableName))) { 1722 return CanaryTool.sniff(admin, sink, admin.getDescriptor(TableName.valueOf(tableName)), 1723 executor, taskType, rawScanEnabled, readLatency, readAllCF); 1724 } else { 1725 LOG.warn("Table {} is not enabled", tableName); 1726 } 1727 return new LinkedList<>(); 1728 } 1729 1730 /* 1731 * Loops over regions of this table, and outputs information about the state. 1732 */ 1733 private static List<Future<Void>> sniff(final Admin admin, final Sink sink, 1734 TableDescriptor tableDesc, ExecutorService executor, TaskType taskType, boolean rawScanEnabled, 1735 LongAdder rwLatency, boolean readAllCF) throws Exception { 1736 LOG.debug("Reading list of regions for table {}", tableDesc.getTableName()); 1737 List<RegionTask> tasks = new ArrayList<>(); 1738 try (RegionLocator regionLocator = 1739 admin.getConnection().getRegionLocator(tableDesc.getTableName())) { 1740 for (HRegionLocation location : regionLocator.getAllRegionLocations()) { 1741 if (location == null) { 1742 LOG.warn("Null location for table {}", tableDesc.getTableName()); 1743 continue; 1744 } 1745 ServerName rs = location.getServerName(); 1746 RegionInfo region = location.getRegion(); 1747 tasks.add(new RegionTask(admin.getConnection(), region, rs, (RegionStdOutSink) sink, 1748 taskType, rawScanEnabled, rwLatency, readAllCF)); 1749 Map<String, List<RegionTaskResult>> regionMap = ((RegionStdOutSink) sink).getRegionMap(); 1750 regionMap.put(region.getRegionNameAsString(), new ArrayList<RegionTaskResult>()); 1751 } 1752 return executor.invokeAll(tasks); 1753 } 1754 } 1755 1756 // monitor for zookeeper mode 1757 private static class ZookeeperMonitor extends Monitor { 1758 private List<String> hosts; 1759 private final String znode; 1760 private final int timeout; 1761 1762 protected ZookeeperMonitor(Connection connection, String[] monitorTargets, boolean useRegExp, 1763 Sink sink, ExecutorService executor, boolean treatFailureAsError, long allowedFailures) { 1764 super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError, 1765 allowedFailures); 1766 Configuration configuration = connection.getConfiguration(); 1767 znode = configuration.get(ZOOKEEPER_ZNODE_PARENT, DEFAULT_ZOOKEEPER_ZNODE_PARENT); 1768 timeout = 1769 configuration.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT); 1770 ConnectStringParser parser = 1771 new ConnectStringParser(ZKConfig.getZKQuorumServersString(configuration)); 1772 hosts = Lists.newArrayList(); 1773 for (InetSocketAddress server : parser.getServerAddresses()) { 1774 hosts.add(inetSocketAddress2String(server)); 1775 } 1776 if (allowedFailures > (hosts.size() - 1) / 2) { 1777 LOG.warn( 1778 "Confirm allowable number of failed ZooKeeper nodes, as quorum will " 1779 + "already be lost. Setting of {} failures is unexpected for {} ensemble size.", 1780 allowedFailures, hosts.size()); 1781 } 1782 } 1783 1784 @Override 1785 public void run() { 1786 List<ZookeeperTask> tasks = Lists.newArrayList(); 1787 ZookeeperStdOutSink zkSink = null; 1788 try { 1789 zkSink = this.getSink(); 1790 } catch (RuntimeException e) { 1791 LOG.error("Run ZooKeeperMonitor failed!", e); 1792 this.errorCode = ERROR_EXIT_CODE; 1793 } 1794 this.initialized = true; 1795 for (final String host : hosts) { 1796 tasks.add(new ZookeeperTask(connection, host, znode, timeout, zkSink)); 1797 } 1798 try { 1799 for (Future<Void> future : this.executor.invokeAll(tasks)) { 1800 try { 1801 future.get(); 1802 } catch (ExecutionException e) { 1803 LOG.error("Sniff zookeeper failed!", e); 1804 this.errorCode = ERROR_EXIT_CODE; 1805 } 1806 } 1807 } catch (InterruptedException e) { 1808 this.errorCode = ERROR_EXIT_CODE; 1809 Thread.currentThread().interrupt(); 1810 LOG.error("Sniff zookeeper interrupted!", e); 1811 } 1812 this.done = true; 1813 } 1814 1815 private ZookeeperStdOutSink getSink() { 1816 if (!(sink instanceof ZookeeperStdOutSink)) { 1817 throw new RuntimeException("Can only write to zookeeper sink"); 1818 } 1819 return ((ZookeeperStdOutSink) sink); 1820 } 1821 } 1822 1823 /** 1824 * A monitor for regionserver mode 1825 */ 1826 private static class RegionServerMonitor extends Monitor { 1827 private boolean rawScanEnabled; 1828 private boolean allRegions; 1829 1830 public RegionServerMonitor(Connection connection, String[] monitorTargets, boolean useRegExp, 1831 Sink sink, ExecutorService executor, boolean allRegions, boolean treatFailureAsError, 1832 long allowedFailures) { 1833 super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError, 1834 allowedFailures); 1835 Configuration conf = connection.getConfiguration(); 1836 this.rawScanEnabled = conf.getBoolean(HConstants.HBASE_CANARY_READ_RAW_SCAN_KEY, false); 1837 this.allRegions = allRegions; 1838 } 1839 1840 private RegionServerStdOutSink getSink() { 1841 if (!(sink instanceof RegionServerStdOutSink)) { 1842 throw new RuntimeException("Can only write to regionserver sink"); 1843 } 1844 return ((RegionServerStdOutSink) sink); 1845 } 1846 1847 @Override 1848 public void run() { 1849 if (this.initAdmin() && this.checkNoTableNames()) { 1850 RegionServerStdOutSink regionServerSink = null; 1851 try { 1852 regionServerSink = this.getSink(); 1853 } catch (RuntimeException e) { 1854 LOG.error("Run RegionServerMonitor failed!", e); 1855 this.errorCode = ERROR_EXIT_CODE; 1856 } 1857 Map<String, List<RegionInfo>> rsAndRMap = this.filterRegionServerByName(); 1858 this.initialized = true; 1859 this.monitorRegionServers(rsAndRMap, regionServerSink); 1860 } 1861 this.done = true; 1862 } 1863 1864 private boolean checkNoTableNames() { 1865 List<String> foundTableNames = new ArrayList<>(); 1866 TableName[] tableNames = null; 1867 LOG.debug("Reading list of tables"); 1868 try { 1869 tableNames = this.admin.listTableNames(); 1870 } catch (IOException e) { 1871 LOG.error("Get listTableNames failed", e); 1872 this.errorCode = INIT_ERROR_EXIT_CODE; 1873 return false; 1874 } 1875 1876 if (this.targets == null || this.targets.length == 0) { 1877 return true; 1878 } 1879 1880 for (String target : this.targets) { 1881 for (TableName tableName : tableNames) { 1882 if (target.equals(tableName.getNameAsString())) { 1883 foundTableNames.add(target); 1884 } 1885 } 1886 } 1887 1888 if (foundTableNames.size() > 0) { 1889 System.err.println("Cannot pass a tablename when using the -regionserver " 1890 + "option, tablenames:" + foundTableNames.toString()); 1891 this.errorCode = USAGE_EXIT_CODE; 1892 } 1893 return foundTableNames.isEmpty(); 1894 } 1895 1896 private void monitorRegionServers(Map<String, List<RegionInfo>> rsAndRMap, 1897 RegionServerStdOutSink regionServerSink) { 1898 List<RegionServerTask> tasks = new ArrayList<>(); 1899 Map<String, AtomicLong> successMap = new HashMap<>(); 1900 for (Map.Entry<String, List<RegionInfo>> entry : rsAndRMap.entrySet()) { 1901 String serverName = entry.getKey(); 1902 AtomicLong successes = new AtomicLong(0); 1903 successMap.put(serverName, successes); 1904 if (entry.getValue().isEmpty()) { 1905 LOG.error("Regionserver not serving any regions - {}", serverName); 1906 } else if (this.allRegions) { 1907 for (RegionInfo region : entry.getValue()) { 1908 tasks.add(new RegionServerTask(this.connection, serverName, region, regionServerSink, 1909 this.rawScanEnabled, successes)); 1910 } 1911 } else { 1912 // random select a region if flag not set 1913 RegionInfo region = 1914 entry.getValue().get(ThreadLocalRandom.current().nextInt(entry.getValue().size())); 1915 tasks.add(new RegionServerTask(this.connection, serverName, region, regionServerSink, 1916 this.rawScanEnabled, successes)); 1917 } 1918 } 1919 try { 1920 for (Future<Void> future : this.executor.invokeAll(tasks)) { 1921 try { 1922 future.get(); 1923 } catch (ExecutionException e) { 1924 LOG.error("Sniff regionserver failed!", e); 1925 this.errorCode = ERROR_EXIT_CODE; 1926 } 1927 } 1928 if (this.allRegions) { 1929 for (Map.Entry<String, List<RegionInfo>> entry : rsAndRMap.entrySet()) { 1930 String serverName = entry.getKey(); 1931 LOG.info("Successfully read {} regions out of {} on regionserver {}", 1932 successMap.get(serverName), entry.getValue().size(), serverName); 1933 } 1934 } 1935 } catch (InterruptedException e) { 1936 this.errorCode = ERROR_EXIT_CODE; 1937 LOG.error("Sniff regionserver interrupted!", e); 1938 } 1939 } 1940 1941 private Map<String, List<RegionInfo>> filterRegionServerByName() { 1942 Map<String, List<RegionInfo>> regionServerAndRegionsMap = this.getAllRegionServerByName(); 1943 regionServerAndRegionsMap = this.doFilterRegionServerByName(regionServerAndRegionsMap); 1944 return regionServerAndRegionsMap; 1945 } 1946 1947 private Map<String, List<RegionInfo>> getAllRegionServerByName() { 1948 Map<String, List<RegionInfo>> rsAndRMap = new HashMap<>(); 1949 try { 1950 LOG.debug("Reading list of tables and locations"); 1951 List<TableDescriptor> tableDescs = this.admin.listTableDescriptors(); 1952 List<RegionInfo> regions = null; 1953 for (TableDescriptor tableDesc : tableDescs) { 1954 try (RegionLocator regionLocator = 1955 this.admin.getConnection().getRegionLocator(tableDesc.getTableName())) { 1956 for (HRegionLocation location : regionLocator.getAllRegionLocations()) { 1957 if (location == null) { 1958 LOG.warn("Null location for table {}", tableDesc.getTableName()); 1959 continue; 1960 } 1961 ServerName rs = location.getServerName(); 1962 String rsName = rs.getHostname(); 1963 RegionInfo r = location.getRegion(); 1964 if (rsAndRMap.containsKey(rsName)) { 1965 regions = rsAndRMap.get(rsName); 1966 } else { 1967 regions = new ArrayList<>(); 1968 rsAndRMap.put(rsName, regions); 1969 } 1970 regions.add(r); 1971 } 1972 } 1973 } 1974 1975 // get any live regionservers not serving any regions 1976 for (ServerName rs : this.admin.getRegionServers()) { 1977 String rsName = rs.getHostname(); 1978 if (!rsAndRMap.containsKey(rsName)) { 1979 rsAndRMap.put(rsName, Collections.<RegionInfo> emptyList()); 1980 } 1981 } 1982 } catch (IOException e) { 1983 LOG.error("Get HTables info failed", e); 1984 this.errorCode = INIT_ERROR_EXIT_CODE; 1985 } 1986 return rsAndRMap; 1987 } 1988 1989 private Map<String, List<RegionInfo>> 1990 doFilterRegionServerByName(Map<String, List<RegionInfo>> fullRsAndRMap) { 1991 1992 Map<String, List<RegionInfo>> filteredRsAndRMap = null; 1993 1994 if (this.targets != null && this.targets.length > 0) { 1995 filteredRsAndRMap = new HashMap<>(); 1996 Pattern pattern = null; 1997 Matcher matcher = null; 1998 boolean regExpFound = false; 1999 for (String rsName : this.targets) { 2000 if (this.useRegExp) { 2001 regExpFound = false; 2002 pattern = Pattern.compile(rsName); 2003 for (Map.Entry<String, List<RegionInfo>> entry : fullRsAndRMap.entrySet()) { 2004 matcher = pattern.matcher(entry.getKey()); 2005 if (matcher.matches()) { 2006 filteredRsAndRMap.put(entry.getKey(), entry.getValue()); 2007 regExpFound = true; 2008 } 2009 } 2010 if (!regExpFound) { 2011 LOG.info("No RegionServerInfo found, regionServerPattern {}", rsName); 2012 } 2013 } else { 2014 if (fullRsAndRMap.containsKey(rsName)) { 2015 filteredRsAndRMap.put(rsName, fullRsAndRMap.get(rsName)); 2016 } else { 2017 LOG.info("No RegionServerInfo found, regionServerName {}", rsName); 2018 } 2019 } 2020 } 2021 } else { 2022 filteredRsAndRMap = fullRsAndRMap; 2023 } 2024 return filteredRsAndRMap; 2025 } 2026 } 2027 2028 public static void main(String[] args) throws Exception { 2029 final Configuration conf = HBaseConfiguration.create(); 2030 2031 int numThreads = conf.getInt("hbase.canary.threads.num", MAX_THREADS_NUM); 2032 LOG.info("Execution thread count={}", numThreads); 2033 2034 int exitCode; 2035 ExecutorService executor = new ScheduledThreadPoolExecutor(numThreads); 2036 try { 2037 exitCode = ToolRunner.run(conf, new CanaryTool(executor), args); 2038 } finally { 2039 executor.shutdown(); 2040 } 2041 System.exit(exitCode); 2042 } 2043}