001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Comparator; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Objects; 026import java.util.Set; 027import java.util.TreeSet; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.ClusterManager.ServiceType; 030import org.apache.hadoop.hbase.client.Admin; 031import org.apache.hadoop.hbase.client.ClusterConnection; 032import org.apache.hadoop.hbase.client.Connection; 033import org.apache.hadoop.hbase.client.ConnectionFactory; 034import org.apache.hadoop.hbase.client.RegionInfo; 035import org.apache.hadoop.hbase.client.RegionLocator; 036import org.apache.hadoop.hbase.util.Bytes; 037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 038import org.apache.hadoop.hbase.util.Threads; 039import org.apache.yetus.audience.InterfaceAudience; 040 041import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 042import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 043import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; 044 045/** 046 * Manages the interactions with an already deployed distributed cluster (as opposed to a 047 * pseudo-distributed, or mini/local cluster). This is used by integration and system tests. 048 */ 049@InterfaceAudience.Private 050public class DistributedHBaseCluster extends HBaseCluster { 051 private Admin admin; 052 private final Connection connection; 053 054 private ClusterManager clusterManager; 055 /** 056 * List of RegionServers killed so far. ServerName also comprises startCode of a server, so any 057 * restarted instances of the same server will have different ServerName and will not coincide 058 * with past dead ones. So there's no need to cleanup this list. 059 */ 060 private Set<ServerName> killedRegionServers = new HashSet<>(); 061 062 public DistributedHBaseCluster(Configuration conf, ClusterManager clusterManager) 063 throws IOException { 064 super(conf); 065 this.clusterManager = clusterManager; 066 this.connection = ConnectionFactory.createConnection(conf); 067 this.admin = this.connection.getAdmin(); 068 this.initialClusterStatus = getClusterMetrics(); 069 } 070 071 public void setClusterManager(ClusterManager clusterManager) { 072 this.clusterManager = clusterManager; 073 } 074 075 public ClusterManager getClusterManager() { 076 return clusterManager; 077 } 078 079 /** 080 * Returns a ClusterStatus for this HBase cluster n 081 */ 082 @Override 083 public ClusterMetrics getClusterMetrics() throws IOException { 084 return admin.getClusterMetrics(); 085 } 086 087 @Override 088 public ClusterMetrics getInitialClusterMetrics() throws IOException { 089 return initialClusterStatus; 090 } 091 092 @Override 093 public void close() throws IOException { 094 if (this.admin != null) { 095 admin.close(); 096 } 097 if (this.connection != null && !this.connection.isClosed()) { 098 this.connection.close(); 099 } 100 } 101 102 @Override 103 public AdminProtos.AdminService.BlockingInterface getAdminProtocol(ServerName serverName) 104 throws IOException { 105 return ((ClusterConnection) this.connection).getAdmin(serverName); 106 } 107 108 @Override 109 public ClientProtos.ClientService.BlockingInterface getClientProtocol(ServerName serverName) 110 throws IOException { 111 return ((ClusterConnection) this.connection).getClient(serverName); 112 } 113 114 @Override 115 public void startRegionServer(String hostname, int port) throws IOException { 116 LOG.info("Starting RS on: {}", hostname); 117 clusterManager.start(ServiceType.HBASE_REGIONSERVER, hostname, port); 118 } 119 120 @Override 121 public void killRegionServer(ServerName serverName) throws IOException { 122 LOG.info("Aborting RS: {}", serverName.getServerName()); 123 killedRegionServers.add(serverName); 124 clusterManager.kill(ServiceType.HBASE_REGIONSERVER, serverName.getHostname(), 125 serverName.getPort()); 126 } 127 128 @Override 129 public boolean isKilledRS(ServerName serverName) { 130 return killedRegionServers.contains(serverName); 131 } 132 133 @Override 134 public void stopRegionServer(ServerName serverName) throws IOException { 135 LOG.info("Stopping RS: {}", serverName.getServerName()); 136 clusterManager.stop(ServiceType.HBASE_REGIONSERVER, serverName.getHostname(), 137 serverName.getPort()); 138 } 139 140 @Override 141 public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException { 142 waitForServiceToStop(ServiceType.HBASE_REGIONSERVER, serverName, timeout); 143 } 144 145 @Override 146 public void suspendRegionServer(ServerName serverName) throws IOException { 147 LOG.info("Suspend RS: {}", serverName.getServerName()); 148 clusterManager.suspend(ServiceType.HBASE_REGIONSERVER, serverName.getHostname(), 149 serverName.getPort()); 150 } 151 152 @Override 153 public void resumeRegionServer(ServerName serverName) throws IOException { 154 LOG.info("Resume RS: {}", serverName.getServerName()); 155 clusterManager.resume(ServiceType.HBASE_REGIONSERVER, serverName.getHostname(), 156 serverName.getPort()); 157 } 158 159 @Override 160 public void startZkNode(String hostname, int port) throws IOException { 161 LOG.info("Starting ZooKeeper node on: {}", hostname); 162 clusterManager.start(ServiceType.ZOOKEEPER_SERVER, hostname, port); 163 } 164 165 @Override 166 public void killZkNode(ServerName serverName) throws IOException { 167 LOG.info("Aborting ZooKeeper node on: {}", serverName.getServerName()); 168 clusterManager.kill(ServiceType.ZOOKEEPER_SERVER, serverName.getHostname(), 169 serverName.getPort()); 170 } 171 172 @Override 173 public void stopZkNode(ServerName serverName) throws IOException { 174 LOG.info("Stopping ZooKeeper node: {}", serverName.getServerName()); 175 clusterManager.stop(ServiceType.ZOOKEEPER_SERVER, serverName.getHostname(), 176 serverName.getPort()); 177 } 178 179 @Override 180 public void waitForZkNodeToStart(ServerName serverName, long timeout) throws IOException { 181 waitForServiceToStart(ServiceType.ZOOKEEPER_SERVER, serverName, timeout); 182 } 183 184 @Override 185 public void waitForZkNodeToStop(ServerName serverName, long timeout) throws IOException { 186 waitForServiceToStop(ServiceType.ZOOKEEPER_SERVER, serverName, timeout); 187 } 188 189 @Override 190 public void startDataNode(ServerName serverName) throws IOException { 191 LOG.info("Starting data node on: {}", serverName.getServerName()); 192 clusterManager.start(ServiceType.HADOOP_DATANODE, serverName.getHostname(), 193 serverName.getPort()); 194 } 195 196 @Override 197 public void killDataNode(ServerName serverName) throws IOException { 198 LOG.info("Aborting data node on: {}", serverName.getServerName()); 199 clusterManager.kill(ServiceType.HADOOP_DATANODE, serverName.getHostname(), 200 serverName.getPort()); 201 } 202 203 @Override 204 public void stopDataNode(ServerName serverName) throws IOException { 205 LOG.info("Stopping data node on: {}", serverName.getServerName()); 206 clusterManager.stop(ServiceType.HADOOP_DATANODE, serverName.getHostname(), 207 serverName.getPort()); 208 } 209 210 @Override 211 public void waitForDataNodeToStart(ServerName serverName, long timeout) throws IOException { 212 waitForServiceToStart(ServiceType.HADOOP_DATANODE, serverName, timeout); 213 } 214 215 @Override 216 public void waitForDataNodeToStop(ServerName serverName, long timeout) throws IOException { 217 waitForServiceToStop(ServiceType.HADOOP_DATANODE, serverName, timeout); 218 } 219 220 @Override 221 public void startNameNode(ServerName serverName) throws IOException { 222 LOG.info("Starting name node on: {}", serverName.getServerName()); 223 clusterManager.start(ServiceType.HADOOP_NAMENODE, serverName.getHostname(), 224 serverName.getPort()); 225 } 226 227 @Override 228 public void killNameNode(ServerName serverName) throws IOException { 229 LOG.info("Aborting name node on: {}", serverName.getServerName()); 230 clusterManager.kill(ServiceType.HADOOP_NAMENODE, serverName.getHostname(), 231 serverName.getPort()); 232 } 233 234 @Override 235 public void stopNameNode(ServerName serverName) throws IOException { 236 LOG.info("Stopping name node on: {}", serverName.getServerName()); 237 clusterManager.stop(ServiceType.HADOOP_NAMENODE, serverName.getHostname(), 238 serverName.getPort()); 239 } 240 241 @Override 242 public void waitForNameNodeToStart(ServerName serverName, long timeout) throws IOException { 243 waitForServiceToStart(ServiceType.HADOOP_NAMENODE, serverName, timeout); 244 } 245 246 @Override 247 public void waitForNameNodeToStop(ServerName serverName, long timeout) throws IOException { 248 waitForServiceToStop(ServiceType.HADOOP_NAMENODE, serverName, timeout); 249 } 250 251 private void waitForServiceToStop(ServiceType service, ServerName serverName, long timeout) 252 throws IOException { 253 LOG.info("Waiting for service: {} to stop: {}", service, serverName.getServerName()); 254 long start = EnvironmentEdgeManager.currentTime(); 255 256 while ((EnvironmentEdgeManager.currentTime() - start) < timeout) { 257 if (!clusterManager.isRunning(service, serverName.getHostname(), serverName.getPort())) { 258 return; 259 } 260 Threads.sleep(100); 261 } 262 throw new IOException("Timed-out waiting for service to stop: " + serverName); 263 } 264 265 private void waitForServiceToStart(ServiceType service, ServerName serverName, long timeout) 266 throws IOException { 267 LOG.info("Waiting for service: {} to start: ", service, serverName.getServerName()); 268 long start = EnvironmentEdgeManager.currentTime(); 269 270 while ((EnvironmentEdgeManager.currentTime() - start) < timeout) { 271 if (clusterManager.isRunning(service, serverName.getHostname(), serverName.getPort())) { 272 return; 273 } 274 Threads.sleep(100); 275 } 276 throw new IOException("Timed-out waiting for service to start: " + serverName); 277 } 278 279 @Override 280 public MasterService.BlockingInterface getMasterAdminService() throws IOException { 281 return ((ClusterConnection) this.connection).getMaster(); 282 } 283 284 @Override 285 public void startMaster(String hostname, int port) throws IOException { 286 LOG.info("Starting Master on: {}:{}", hostname, port); 287 clusterManager.start(ServiceType.HBASE_MASTER, hostname, port); 288 } 289 290 @Override 291 public void killMaster(ServerName serverName) throws IOException { 292 LOG.info("Aborting Master: {}", serverName.getServerName()); 293 clusterManager.kill(ServiceType.HBASE_MASTER, serverName.getHostname(), serverName.getPort()); 294 } 295 296 @Override 297 public void stopMaster(ServerName serverName) throws IOException { 298 LOG.info("Stopping Master: {}", serverName.getServerName()); 299 clusterManager.stop(ServiceType.HBASE_MASTER, serverName.getHostname(), serverName.getPort()); 300 } 301 302 @Override 303 public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException { 304 waitForServiceToStop(ServiceType.HBASE_MASTER, serverName, timeout); 305 } 306 307 @Override 308 public boolean waitForActiveAndReadyMaster(long timeout) throws IOException { 309 long start = EnvironmentEdgeManager.currentTime(); 310 while (EnvironmentEdgeManager.currentTime() - start < timeout) { 311 try { 312 getMasterAdminService(); 313 return true; 314 } catch (MasterNotRunningException m) { 315 LOG.warn("Master not started yet " + m); 316 } catch (ZooKeeperConnectionException e) { 317 LOG.warn("Failed to connect to ZK " + e); 318 } 319 Threads.sleep(1000); 320 } 321 return false; 322 } 323 324 @Override 325 public ServerName getServerHoldingRegion(TableName tn, byte[] regionName) throws IOException { 326 byte[] startKey = RegionInfo.getStartKey(regionName); 327 HRegionLocation regionLoc = null; 328 try (RegionLocator locator = connection.getRegionLocator(tn)) { 329 regionLoc = locator.getRegionLocation(startKey, true); 330 } 331 if (regionLoc == null) { 332 LOG.warn("Cannot find region server holding region {}", Bytes.toStringBinary(regionName)); 333 return null; 334 } 335 return regionLoc.getServerName(); 336 } 337 338 @Override 339 public void waitUntilShutDown() { 340 // Simply wait for a few seconds for now (after issuing serverManager.kill 341 throw new RuntimeException(HConstants.NOT_IMPLEMENTED); 342 } 343 344 @Override 345 public void shutdown() throws IOException { 346 // not sure we want this 347 throw new RuntimeException(HConstants.NOT_IMPLEMENTED); 348 } 349 350 @Override 351 public boolean isDistributedCluster() { 352 return true; 353 } 354 355 @Override 356 public boolean restoreClusterMetrics(ClusterMetrics initial) throws IOException { 357 ClusterMetrics current = getClusterMetrics(); 358 359 LOG.info("Restoring cluster - started"); 360 361 // do a best effort restore 362 boolean success = true; 363 success = restoreMasters(initial, current) && success; 364 success = restoreRegionServers(initial, current) && success; 365 success = restoreAdmin() && success; 366 367 LOG.info("Restoring cluster - done"); 368 return success; 369 } 370 371 protected boolean restoreMasters(ClusterMetrics initial, ClusterMetrics current) { 372 List<IOException> deferred = new ArrayList<>(); 373 // check whether current master has changed 374 final ServerName initMaster = initial.getMasterName(); 375 if (!ServerName.isSameAddress(initMaster, current.getMasterName())) { 376 LOG.info("Restoring cluster - Initial active master : {} has changed to : {}", 377 initMaster.getAddress(), current.getMasterName().getAddress()); 378 // If initial master is stopped, start it, before restoring the state. 379 // It will come up as a backup master, if there is already an active master. 380 try { 381 if ( 382 !clusterManager.isRunning(ServiceType.HBASE_MASTER, initMaster.getHostname(), 383 initMaster.getPort()) 384 ) { 385 LOG.info("Restoring cluster - starting initial active master at:{}", 386 initMaster.getAddress()); 387 startMaster(initMaster.getHostname(), initMaster.getPort()); 388 } 389 390 // master has changed, we would like to undo this. 391 // 1. Kill the current backups 392 // 2. Stop current master 393 // 3. Start backup masters 394 for (ServerName currentBackup : current.getBackupMasterNames()) { 395 if (!ServerName.isSameAddress(currentBackup, initMaster)) { 396 LOG.info("Restoring cluster - stopping backup master: {}", currentBackup); 397 stopMaster(currentBackup); 398 } 399 } 400 LOG.info("Restoring cluster - stopping active master: {}", current.getMasterName()); 401 stopMaster(current.getMasterName()); 402 waitForActiveAndReadyMaster(); // wait so that active master takes over 403 } catch (IOException ex) { 404 // if we fail to start the initial active master, we do not want to continue stopping 405 // backup masters. Just keep what we have now 406 deferred.add(ex); 407 } 408 409 // start backup masters 410 for (ServerName backup : initial.getBackupMasterNames()) { 411 try { 412 // these are not started in backup mode, but we should already have an active master 413 if ( 414 !clusterManager.isRunning(ServiceType.HBASE_MASTER, backup.getHostname(), 415 backup.getPort()) 416 ) { 417 LOG.info("Restoring cluster - starting initial backup master: {}", backup.getAddress()); 418 startMaster(backup.getHostname(), backup.getPort()); 419 } 420 } catch (IOException ex) { 421 deferred.add(ex); 422 } 423 } 424 } else { 425 // current master has not changed, match up backup masters 426 Set<ServerName> toStart = new TreeSet<>(new ServerNameIgnoreStartCodeComparator()); 427 Set<ServerName> toKill = new TreeSet<>(new ServerNameIgnoreStartCodeComparator()); 428 toStart.addAll(initial.getBackupMasterNames()); 429 toKill.addAll(current.getBackupMasterNames()); 430 431 for (ServerName server : current.getBackupMasterNames()) { 432 toStart.remove(server); 433 } 434 for (ServerName server : initial.getBackupMasterNames()) { 435 toKill.remove(server); 436 } 437 438 for (ServerName sn : toStart) { 439 try { 440 if (!clusterManager.isRunning(ServiceType.HBASE_MASTER, sn.getHostname(), sn.getPort())) { 441 LOG.info("Restoring cluster - starting initial backup master: {}", sn.getAddress()); 442 startMaster(sn.getHostname(), sn.getPort()); 443 } 444 } catch (IOException ex) { 445 deferred.add(ex); 446 } 447 } 448 449 for (ServerName sn : toKill) { 450 try { 451 if (clusterManager.isRunning(ServiceType.HBASE_MASTER, sn.getHostname(), sn.getPort())) { 452 LOG.info("Restoring cluster - stopping backup master: {}", sn.getAddress()); 453 stopMaster(sn); 454 } 455 } catch (IOException ex) { 456 deferred.add(ex); 457 } 458 } 459 } 460 if (!deferred.isEmpty()) { 461 LOG.warn("Restoring cluster - restoring region servers reported {} errors:", deferred.size()); 462 for (int i = 0; i < deferred.size() && i < 3; i++) { 463 LOG.warn(Objects.toString(deferred.get(i))); 464 } 465 } 466 467 return deferred.isEmpty(); 468 } 469 470 private static class ServerNameIgnoreStartCodeComparator implements Comparator<ServerName> { 471 @Override 472 public int compare(ServerName o1, ServerName o2) { 473 int compare = o1.getHostname().compareToIgnoreCase(o2.getHostname()); 474 if (compare != 0) return compare; 475 compare = o1.getPort() - o2.getPort(); 476 if (compare != 0) return compare; 477 return 0; 478 } 479 } 480 481 protected boolean restoreRegionServers(ClusterMetrics initial, ClusterMetrics current) { 482 Set<ServerName> toStart = new TreeSet<>(new ServerNameIgnoreStartCodeComparator()); 483 Set<ServerName> toKill = new TreeSet<>(new ServerNameIgnoreStartCodeComparator()); 484 toStart.addAll(initial.getLiveServerMetrics().keySet()); 485 toKill.addAll(current.getLiveServerMetrics().keySet()); 486 487 ServerName master = initial.getMasterName(); 488 489 for (ServerName server : current.getLiveServerMetrics().keySet()) { 490 toStart.remove(server); 491 } 492 for (ServerName server : initial.getLiveServerMetrics().keySet()) { 493 toKill.remove(server); 494 } 495 496 List<IOException> deferred = new ArrayList<>(); 497 498 for (ServerName sn : toStart) { 499 try { 500 if ( 501 !clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER, sn.getHostname(), sn.getPort()) 502 && master.getPort() != sn.getPort() 503 ) { 504 LOG.info("Restoring cluster - starting initial region server: {}", sn.getAddress()); 505 startRegionServer(sn.getHostname(), sn.getPort()); 506 } 507 } catch (IOException ex) { 508 deferred.add(ex); 509 } 510 } 511 512 for (ServerName sn : toKill) { 513 try { 514 if ( 515 clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER, sn.getHostname(), sn.getPort()) 516 && master.getPort() != sn.getPort() 517 ) { 518 LOG.info("Restoring cluster - stopping initial region server: {}", sn.getAddress()); 519 stopRegionServer(sn); 520 } 521 } catch (IOException ex) { 522 deferred.add(ex); 523 } 524 } 525 if (!deferred.isEmpty()) { 526 LOG.warn("Restoring cluster - restoring region servers reported {} errors:", deferred.size()); 527 for (int i = 0; i < deferred.size() && i < 3; i++) { 528 LOG.warn(Objects.toString(deferred.get(i))); 529 } 530 } 531 532 return deferred.isEmpty(); 533 } 534 535 protected boolean restoreAdmin() throws IOException { 536 // While restoring above, if the HBase Master which was initially the Active one, was down 537 // and the restore put the cluster back to Initial configuration, HAdmin instance will need 538 // to refresh its connections (otherwise it will return incorrect information) or we can 539 // point it to new instance. 540 try { 541 admin.close(); 542 } catch (IOException ioe) { 543 LOG.warn("While closing the old connection", ioe); 544 } 545 this.admin = this.connection.getAdmin(); 546 LOG.info("Added new HBaseAdmin"); 547 return true; 548 } 549}