001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Comparator; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Objects; 026import java.util.Set; 027import java.util.TreeSet; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.ClusterManager.ServiceType; 030import org.apache.hadoop.hbase.client.Admin; 031import org.apache.hadoop.hbase.client.ClusterConnection; 032import org.apache.hadoop.hbase.client.Connection; 033import org.apache.hadoop.hbase.client.ConnectionFactory; 034import org.apache.hadoop.hbase.client.RegionInfo; 035import org.apache.hadoop.hbase.client.RegionLocator; 036import org.apache.hadoop.hbase.util.Bytes; 037import org.apache.hadoop.hbase.util.Threads; 038import org.apache.yetus.audience.InterfaceAudience; 039 040import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 041import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 042import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; 043 044/** 045 * Manages the interactions with an already deployed distributed cluster (as opposed to 046 * a pseudo-distributed, or mini/local cluster). This is used by integration and system tests. 047 */ 048@InterfaceAudience.Private 049public class DistributedHBaseCluster extends HBaseCluster { 050 private Admin admin; 051 private final Connection connection; 052 053 private ClusterManager clusterManager; 054 /** 055 * List of RegionServers killed so far. ServerName also comprises startCode of a server, 056 * so any restarted instances of the same server will have different ServerName and will not 057 * coincide with past dead ones. So there's no need to cleanup this list. 058 */ 059 private Set<ServerName> killedRegionServers = new HashSet<>(); 060 061 public DistributedHBaseCluster(Configuration conf, ClusterManager clusterManager) 062 throws IOException { 063 super(conf); 064 this.clusterManager = clusterManager; 065 this.connection = ConnectionFactory.createConnection(conf); 066 this.admin = this.connection.getAdmin(); 067 this.initialClusterStatus = getClusterMetrics(); 068 } 069 070 public void setClusterManager(ClusterManager clusterManager) { 071 this.clusterManager = clusterManager; 072 } 073 074 public ClusterManager getClusterManager() { 075 return clusterManager; 076 } 077 078 /** 079 * Returns a ClusterStatus for this HBase cluster 080 * @throws IOException 081 */ 082 @Override 083 public ClusterMetrics getClusterMetrics() throws IOException { 084 return admin.getClusterMetrics(); 085 } 086 087 @Override 088 public ClusterMetrics getInitialClusterMetrics() throws IOException { 089 return initialClusterStatus; 090 } 091 092 @Override 093 public void close() throws IOException { 094 if (this.admin != null) { 095 admin.close(); 096 } 097 if (this.connection != null && !this.connection.isClosed()) { 098 this.connection.close(); 099 } 100 } 101 102 @Override 103 public AdminProtos.AdminService.BlockingInterface getAdminProtocol(ServerName serverName) 104 throws IOException { 105 return ((ClusterConnection)this.connection).getAdmin(serverName); 106 } 107 108 @Override 109 public ClientProtos.ClientService.BlockingInterface getClientProtocol(ServerName serverName) 110 throws IOException { 111 return ((ClusterConnection)this.connection).getClient(serverName); 112 } 113 114 @Override 115 public void startRegionServer(String hostname, int port) throws IOException { 116 LOG.info("Starting RS on: {}", hostname); 117 clusterManager.start(ServiceType.HBASE_REGIONSERVER, hostname, port); 118 } 119 120 @Override 121 public void killRegionServer(ServerName serverName) throws IOException { 122 LOG.info("Aborting RS: {}", serverName.getServerName()); 123 killedRegionServers.add(serverName); 124 clusterManager.kill(ServiceType.HBASE_REGIONSERVER, 125 serverName.getHostname(), serverName.getPort()); 126 } 127 128 @Override 129 public boolean isKilledRS(ServerName serverName) { 130 return killedRegionServers.contains(serverName); 131 } 132 133 @Override 134 public void stopRegionServer(ServerName serverName) throws IOException { 135 LOG.info("Stopping RS: {}", serverName.getServerName()); 136 clusterManager.stop(ServiceType.HBASE_REGIONSERVER, 137 serverName.getHostname(), serverName.getPort()); 138 } 139 140 @Override 141 public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException { 142 waitForServiceToStop(ServiceType.HBASE_REGIONSERVER, serverName, timeout); 143 } 144 145 @Override 146 public void suspendRegionServer(ServerName serverName) throws IOException { 147 LOG.info("Suspend RS: {}", serverName.getServerName()); 148 clusterManager.suspend(ServiceType.HBASE_REGIONSERVER, 149 serverName.getHostname(), serverName.getPort()); 150 } 151 152 @Override 153 public void resumeRegionServer(ServerName serverName) throws IOException { 154 LOG.info("Resume RS: {}", serverName.getServerName()); 155 clusterManager.resume(ServiceType.HBASE_REGIONSERVER, 156 serverName.getHostname(), serverName.getPort()); 157 } 158 159 @Override 160 public void startZkNode(String hostname, int port) throws IOException { 161 LOG.info("Starting ZooKeeper node on: {}", hostname); 162 clusterManager.start(ServiceType.ZOOKEEPER_SERVER, hostname, port); 163 } 164 165 @Override 166 public void killZkNode(ServerName serverName) throws IOException { 167 LOG.info("Aborting ZooKeeper node on: {}", serverName.getServerName()); 168 clusterManager.kill(ServiceType.ZOOKEEPER_SERVER, 169 serverName.getHostname(), serverName.getPort()); 170 } 171 172 @Override 173 public void stopZkNode(ServerName serverName) throws IOException { 174 LOG.info("Stopping ZooKeeper node: {}", serverName.getServerName()); 175 clusterManager.stop(ServiceType.ZOOKEEPER_SERVER, 176 serverName.getHostname(), serverName.getPort()); 177 } 178 179 @Override 180 public void waitForZkNodeToStart(ServerName serverName, long timeout) throws IOException { 181 waitForServiceToStart(ServiceType.ZOOKEEPER_SERVER, serverName, timeout); 182 } 183 184 @Override 185 public void waitForZkNodeToStop(ServerName serverName, long timeout) throws IOException { 186 waitForServiceToStop(ServiceType.ZOOKEEPER_SERVER, serverName, timeout); 187 } 188 189 @Override 190 public void startDataNode(ServerName serverName) throws IOException { 191 LOG.info("Starting data node on: {}", serverName.getServerName()); 192 clusterManager.start(ServiceType.HADOOP_DATANODE, 193 serverName.getHostname(), serverName.getPort()); 194 } 195 196 @Override 197 public void killDataNode(ServerName serverName) throws IOException { 198 LOG.info("Aborting data node on: {}", serverName.getServerName()); 199 clusterManager.kill(ServiceType.HADOOP_DATANODE, 200 serverName.getHostname(), serverName.getPort()); 201 } 202 203 @Override 204 public void stopDataNode(ServerName serverName) throws IOException { 205 LOG.info("Stopping data node on: {}", serverName.getServerName()); 206 clusterManager.stop(ServiceType.HADOOP_DATANODE, 207 serverName.getHostname(), serverName.getPort()); 208 } 209 210 @Override 211 public void waitForDataNodeToStart(ServerName serverName, long timeout) throws IOException { 212 waitForServiceToStart(ServiceType.HADOOP_DATANODE, serverName, timeout); 213 } 214 215 @Override 216 public void waitForDataNodeToStop(ServerName serverName, long timeout) throws IOException { 217 waitForServiceToStop(ServiceType.HADOOP_DATANODE, serverName, timeout); 218 } 219 220 @Override 221 public void startNameNode(ServerName serverName) throws IOException { 222 LOG.info("Starting name node on: {}", serverName.getServerName()); 223 clusterManager.start(ServiceType.HADOOP_NAMENODE, serverName.getHostname(), 224 serverName.getPort()); 225 } 226 227 @Override 228 public void killNameNode(ServerName serverName) throws IOException { 229 LOG.info("Aborting name node on: {}", serverName.getServerName()); 230 clusterManager.kill(ServiceType.HADOOP_NAMENODE, serverName.getHostname(), 231 serverName.getPort()); 232 } 233 234 @Override 235 public void stopNameNode(ServerName serverName) throws IOException { 236 LOG.info("Stopping name node on: {}", serverName.getServerName()); 237 clusterManager.stop(ServiceType.HADOOP_NAMENODE, serverName.getHostname(), 238 serverName.getPort()); 239 } 240 241 @Override 242 public void waitForNameNodeToStart(ServerName serverName, long timeout) throws IOException { 243 waitForServiceToStart(ServiceType.HADOOP_NAMENODE, serverName, timeout); 244 } 245 246 @Override 247 public void waitForNameNodeToStop(ServerName serverName, long timeout) throws IOException { 248 waitForServiceToStop(ServiceType.HADOOP_NAMENODE, serverName, timeout); 249 } 250 251 private void waitForServiceToStop(ServiceType service, ServerName serverName, long timeout) 252 throws IOException { 253 LOG.info("Waiting for service: {} to stop: {}", service, serverName.getServerName()); 254 long start = System.currentTimeMillis(); 255 256 while ((System.currentTimeMillis() - start) < timeout) { 257 if (!clusterManager.isRunning(service, serverName.getHostname(), serverName.getPort())) { 258 return; 259 } 260 Threads.sleep(100); 261 } 262 throw new IOException("did timeout waiting for service to stop:" + serverName); 263 } 264 265 private void waitForServiceToStart(ServiceType service, ServerName serverName, long timeout) 266 throws IOException { 267 LOG.info("Waiting for service: {} to start: ", service, serverName.getServerName()); 268 long start = System.currentTimeMillis(); 269 270 while ((System.currentTimeMillis() - start) < timeout) { 271 if (clusterManager.isRunning(service, serverName.getHostname(), serverName.getPort())) { 272 return; 273 } 274 Threads.sleep(100); 275 } 276 throw new IOException("did timeout waiting for service to start:" + serverName); 277 } 278 279 280 @Override 281 public MasterService.BlockingInterface getMasterAdminService() 282 throws IOException { 283 return ((ClusterConnection)this.connection).getMaster(); 284 } 285 286 @Override 287 public void startMaster(String hostname, int port) throws IOException { 288 LOG.info("Starting Master on: {}:{}", hostname, port); 289 clusterManager.start(ServiceType.HBASE_MASTER, hostname, port); 290 } 291 292 @Override 293 public void killMaster(ServerName serverName) throws IOException { 294 LOG.info("Aborting Master: {}", serverName.getServerName()); 295 clusterManager.kill(ServiceType.HBASE_MASTER, serverName.getHostname(), serverName.getPort()); 296 } 297 298 @Override 299 public void stopMaster(ServerName serverName) throws IOException { 300 LOG.info("Stopping Master: {}", serverName.getServerName()); 301 clusterManager.stop(ServiceType.HBASE_MASTER, serverName.getHostname(), serverName.getPort()); 302 } 303 304 @Override 305 public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException { 306 waitForServiceToStop(ServiceType.HBASE_MASTER, serverName, timeout); 307 } 308 309 @Override 310 public boolean waitForActiveAndReadyMaster(long timeout) throws IOException { 311 long start = System.currentTimeMillis(); 312 while (System.currentTimeMillis() - start < timeout) { 313 try { 314 getMasterAdminService(); 315 return true; 316 } catch (MasterNotRunningException m) { 317 LOG.warn("Master not started yet " + m); 318 } catch (ZooKeeperConnectionException e) { 319 LOG.warn("Failed to connect to ZK " + e); 320 } 321 Threads.sleep(1000); 322 } 323 return false; 324 } 325 326 @Override 327 public ServerName getServerHoldingRegion(TableName tn, byte[] regionName) throws IOException { 328 byte[] startKey = RegionInfo.getStartKey(regionName); 329 HRegionLocation regionLoc = null; 330 try (RegionLocator locator = connection.getRegionLocator(tn)) { 331 regionLoc = locator.getRegionLocation(startKey, true); 332 } 333 if (regionLoc == null) { 334 LOG.warn("Cannot find region server holding region {}", Bytes.toStringBinary(regionName)); 335 return null; 336 } 337 return regionLoc.getServerName(); 338 } 339 340 @Override 341 public void waitUntilShutDown() { 342 // Simply wait for a few seconds for now (after issuing serverManager.kill 343 throw new RuntimeException(HConstants.NOT_IMPLEMENTED); 344 } 345 346 @Override 347 public void shutdown() throws IOException { 348 // not sure we want this 349 throw new RuntimeException(HConstants.NOT_IMPLEMENTED); 350 } 351 352 @Override 353 public boolean isDistributedCluster() { 354 return true; 355 } 356 357 @Override 358 public boolean restoreClusterMetrics(ClusterMetrics initial) throws IOException { 359 ClusterMetrics current = getClusterMetrics(); 360 361 LOG.info("Restoring cluster - started"); 362 363 // do a best effort restore 364 boolean success = true; 365 success = restoreMasters(initial, current) & success; 366 success = restoreRegionServers(initial, current) & success; 367 success = restoreAdmin() & success; 368 369 LOG.info("Restoring cluster - done"); 370 return success; 371 } 372 373 protected boolean restoreMasters(ClusterMetrics initial, ClusterMetrics current) { 374 List<IOException> deferred = new ArrayList<>(); 375 //check whether current master has changed 376 final ServerName initMaster = initial.getMasterName(); 377 if (!ServerName.isSameAddress(initMaster, current.getMasterName())) { 378 LOG.info("Restoring cluster - Initial active master : {} has changed to : {}", 379 initMaster.getAddress(), current.getMasterName().getAddress()); 380 // If initial master is stopped, start it, before restoring the state. 381 // It will come up as a backup master, if there is already an active master. 382 try { 383 if (!clusterManager.isRunning(ServiceType.HBASE_MASTER, 384 initMaster.getHostname(), initMaster.getPort())) { 385 LOG.info("Restoring cluster - starting initial active master at:{}", 386 initMaster.getAddress()); 387 startMaster(initMaster.getHostname(), initMaster.getPort()); 388 } 389 390 // master has changed, we would like to undo this. 391 // 1. Kill the current backups 392 // 2. Stop current master 393 // 3. Start backup masters 394 for (ServerName currentBackup : current.getBackupMasterNames()) { 395 if (!ServerName.isSameAddress(currentBackup, initMaster)) { 396 LOG.info("Restoring cluster - stopping backup master: {}", currentBackup); 397 stopMaster(currentBackup); 398 } 399 } 400 LOG.info("Restoring cluster - stopping active master: {}", current.getMasterName()); 401 stopMaster(current.getMasterName()); 402 waitForActiveAndReadyMaster(); // wait so that active master takes over 403 } catch (IOException ex) { 404 // if we fail to start the initial active master, we do not want to continue stopping 405 // backup masters. Just keep what we have now 406 deferred.add(ex); 407 } 408 409 //start backup masters 410 for (ServerName backup : initial.getBackupMasterNames()) { 411 try { 412 //these are not started in backup mode, but we should already have an active master 413 if (!clusterManager.isRunning(ServiceType.HBASE_MASTER, 414 backup.getHostname(), 415 backup.getPort())) { 416 LOG.info("Restoring cluster - starting initial backup master: {}", 417 backup.getAddress()); 418 startMaster(backup.getHostname(), backup.getPort()); 419 } 420 } catch (IOException ex) { 421 deferred.add(ex); 422 } 423 } 424 } else { 425 //current master has not changed, match up backup masters 426 Set<ServerName> toStart = new TreeSet<>(new ServerNameIgnoreStartCodeComparator()); 427 Set<ServerName> toKill = new TreeSet<>(new ServerNameIgnoreStartCodeComparator()); 428 toStart.addAll(initial.getBackupMasterNames()); 429 toKill.addAll(current.getBackupMasterNames()); 430 431 for (ServerName server : current.getBackupMasterNames()) { 432 toStart.remove(server); 433 } 434 for (ServerName server: initial.getBackupMasterNames()) { 435 toKill.remove(server); 436 } 437 438 for (ServerName sn:toStart) { 439 try { 440 if(!clusterManager.isRunning(ServiceType.HBASE_MASTER, sn.getHostname(), sn.getPort())) { 441 LOG.info("Restoring cluster - starting initial backup master: {}", sn.getAddress()); 442 startMaster(sn.getHostname(), sn.getPort()); 443 } 444 } catch (IOException ex) { 445 deferred.add(ex); 446 } 447 } 448 449 for (ServerName sn:toKill) { 450 try { 451 if(clusterManager.isRunning(ServiceType.HBASE_MASTER, sn.getHostname(), sn.getPort())) { 452 LOG.info("Restoring cluster - stopping backup master: {}", sn.getAddress()); 453 stopMaster(sn); 454 } 455 } catch (IOException ex) { 456 deferred.add(ex); 457 } 458 } 459 } 460 if (!deferred.isEmpty()) { 461 LOG.warn("Restoring cluster - restoring region servers reported {} errors:", 462 deferred.size()); 463 for (int i=0; i<deferred.size() && i < 3; i++) { 464 LOG.warn(Objects.toString(deferred.get(i))); 465 } 466 } 467 468 return deferred.isEmpty(); 469 } 470 471 472 private static class ServerNameIgnoreStartCodeComparator implements Comparator<ServerName> { 473 @Override 474 public int compare(ServerName o1, ServerName o2) { 475 int compare = o1.getHostname().compareToIgnoreCase(o2.getHostname()); 476 if (compare != 0) return compare; 477 compare = o1.getPort() - o2.getPort(); 478 if (compare != 0) return compare; 479 return 0; 480 } 481 } 482 483 protected boolean restoreRegionServers(ClusterMetrics initial, ClusterMetrics current) { 484 Set<ServerName> toStart = new TreeSet<>(new ServerNameIgnoreStartCodeComparator()); 485 Set<ServerName> toKill = new TreeSet<>(new ServerNameIgnoreStartCodeComparator()); 486 toStart.addAll(initial.getLiveServerMetrics().keySet()); 487 toKill.addAll(current.getLiveServerMetrics().keySet()); 488 489 ServerName master = initial.getMasterName(); 490 491 for (ServerName server : current.getLiveServerMetrics().keySet()) { 492 toStart.remove(server); 493 } 494 for (ServerName server: initial.getLiveServerMetrics().keySet()) { 495 toKill.remove(server); 496 } 497 498 List<IOException> deferred = new ArrayList<>(); 499 500 for(ServerName sn:toStart) { 501 try { 502 if (!clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER, sn.getHostname(), 503 sn.getPort()) && master.getPort() != sn.getPort()) { 504 LOG.info("Restoring cluster - starting initial region server: {}", sn.getAddress()); 505 startRegionServer(sn.getHostname(), sn.getPort()); 506 } 507 } catch (IOException ex) { 508 deferred.add(ex); 509 } 510 } 511 512 for(ServerName sn:toKill) { 513 try { 514 if (clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER, sn.getHostname(), 515 sn.getPort()) && master.getPort() != sn.getPort()) { 516 LOG.info("Restoring cluster - stopping initial region server: {}", sn.getAddress()); 517 stopRegionServer(sn); 518 } 519 } catch (IOException ex) { 520 deferred.add(ex); 521 } 522 } 523 if (!deferred.isEmpty()) { 524 LOG.warn("Restoring cluster - restoring region servers reported {} errors:", 525 deferred.size()); 526 for (int i=0; i<deferred.size() && i < 3; i++) { 527 LOG.warn(Objects.toString(deferred.get(i))); 528 } 529 } 530 531 return deferred.isEmpty(); 532 } 533 534 protected boolean restoreAdmin() throws IOException { 535 // While restoring above, if the HBase Master which was initially the Active one, was down 536 // and the restore put the cluster back to Initial configuration, HAdmin instance will need 537 // to refresh its connections (otherwise it will return incorrect information) or we can 538 // point it to new instance. 539 try { 540 admin.close(); 541 } catch (IOException ioe) { 542 LOG.warn("While closing the old connection", ioe); 543 } 544 this.admin = this.connection.getAdmin(); 545 LOG.info("Added new HBaseAdmin"); 546 return true; 547 } 548}