001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Comparator; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Objects; 026import java.util.Set; 027import java.util.TreeSet; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.ClusterManager.ServiceType; 030import org.apache.hadoop.hbase.client.Admin; 031import org.apache.hadoop.hbase.client.ClusterConnection; 032import org.apache.hadoop.hbase.client.Connection; 033import org.apache.hadoop.hbase.client.ConnectionFactory; 034import org.apache.hadoop.hbase.client.RegionInfo; 035import org.apache.hadoop.hbase.client.RegionLocator; 036import org.apache.hadoop.hbase.util.Bytes; 037import org.apache.hadoop.hbase.util.Threads; 038import org.apache.yetus.audience.InterfaceAudience; 039 040import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 041import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 042import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; 043 044/** 045 * Manages the interactions with an already deployed distributed cluster (as opposed to 046 * a pseudo-distributed, or mini/local cluster). This is used by integration and system tests. 047 */ 048@InterfaceAudience.Private 049public class DistributedHBaseCluster extends HBaseCluster { 050 private Admin admin; 051 private final Connection connection; 052 053 private ClusterManager clusterManager; 054 /** 055 * List of RegionServers killed so far. ServerName also comprises startCode of a server, 056 * so any restarted instances of the same server will have different ServerName and will not 057 * coincide with past dead ones. So there's no need to cleanup this list. 058 */ 059 private Set<ServerName> killedRegionServers = new HashSet<>(); 060 061 public DistributedHBaseCluster(Configuration conf, ClusterManager clusterManager) 062 throws IOException { 063 super(conf); 064 this.clusterManager = clusterManager; 065 this.connection = ConnectionFactory.createConnection(conf); 066 this.admin = this.connection.getAdmin(); 067 this.initialClusterStatus = getClusterMetrics(); 068 } 069 070 public void setClusterManager(ClusterManager clusterManager) { 071 this.clusterManager = clusterManager; 072 } 073 074 public ClusterManager getClusterManager() { 075 return clusterManager; 076 } 077 078 /** 079 * Returns a ClusterStatus for this HBase cluster 080 * @throws IOException 081 */ 082 @Override 083 public ClusterMetrics getClusterMetrics() throws IOException { 084 return admin.getClusterMetrics(); 085 } 086 087 @Override 088 public ClusterMetrics getInitialClusterMetrics() throws IOException { 089 return initialClusterStatus; 090 } 091 092 @Override 093 public void close() throws IOException { 094 if (this.admin != null) { 095 admin.close(); 096 } 097 if (this.connection != null && !this.connection.isClosed()) { 098 this.connection.close(); 099 } 100 } 101 102 @Override 103 public AdminProtos.AdminService.BlockingInterface getAdminProtocol(ServerName serverName) 104 throws IOException { 105 return ((ClusterConnection)this.connection).getAdmin(serverName); 106 } 107 108 @Override 109 public ClientProtos.ClientService.BlockingInterface getClientProtocol(ServerName serverName) 110 throws IOException { 111 return ((ClusterConnection)this.connection).getClient(serverName); 112 } 113 114 @Override 115 public void startRegionServer(String hostname, int port) throws IOException { 116 LOG.info("Starting RS on: " + hostname); 117 clusterManager.start(ServiceType.HBASE_REGIONSERVER, hostname, port); 118 } 119 120 @Override 121 public void killRegionServer(ServerName serverName) throws IOException { 122 LOG.info("Aborting RS: " + serverName.getServerName()); 123 killedRegionServers.add(serverName); 124 clusterManager.kill(ServiceType.HBASE_REGIONSERVER, 125 serverName.getHostname(), serverName.getPort()); 126 } 127 128 @Override 129 public boolean isKilledRS(ServerName serverName) { 130 return killedRegionServers.contains(serverName); 131 } 132 133 @Override 134 public void stopRegionServer(ServerName serverName) throws IOException { 135 LOG.info("Stopping RS: " + serverName.getServerName()); 136 clusterManager.stop(ServiceType.HBASE_REGIONSERVER, 137 serverName.getHostname(), serverName.getPort()); 138 } 139 140 @Override 141 public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException { 142 waitForServiceToStop(ServiceType.HBASE_REGIONSERVER, serverName, timeout); 143 } 144 145 @Override 146 public void startZkNode(String hostname, int port) throws IOException { 147 LOG.info("Starting ZooKeeper node on: " + hostname); 148 clusterManager.start(ServiceType.ZOOKEEPER_SERVER, hostname, port); 149 } 150 151 @Override 152 public void killZkNode(ServerName serverName) throws IOException { 153 LOG.info("Aborting ZooKeeper node on: " + serverName.getServerName()); 154 clusterManager.kill(ServiceType.ZOOKEEPER_SERVER, 155 serverName.getHostname(), serverName.getPort()); 156 } 157 158 @Override 159 public void stopZkNode(ServerName serverName) throws IOException { 160 LOG.info("Stopping ZooKeeper node: " + serverName.getServerName()); 161 clusterManager.stop(ServiceType.ZOOKEEPER_SERVER, 162 serverName.getHostname(), serverName.getPort()); 163 } 164 165 @Override 166 public void waitForZkNodeToStart(ServerName serverName, long timeout) throws IOException { 167 waitForServiceToStart(ServiceType.ZOOKEEPER_SERVER, serverName, timeout); 168 } 169 170 @Override 171 public void waitForZkNodeToStop(ServerName serverName, long timeout) throws IOException { 172 waitForServiceToStop(ServiceType.ZOOKEEPER_SERVER, serverName, timeout); 173 } 174 175 @Override 176 public void startDataNode(ServerName serverName) throws IOException { 177 LOG.info("Starting data node on: " + serverName.getServerName()); 178 clusterManager.start(ServiceType.HADOOP_DATANODE, 179 serverName.getHostname(), serverName.getPort()); 180 } 181 182 @Override 183 public void killDataNode(ServerName serverName) throws IOException { 184 LOG.info("Aborting data node on: " + serverName.getServerName()); 185 clusterManager.kill(ServiceType.HADOOP_DATANODE, 186 serverName.getHostname(), serverName.getPort()); 187 } 188 189 @Override 190 public void stopDataNode(ServerName serverName) throws IOException { 191 LOG.info("Stopping data node on: " + serverName.getServerName()); 192 clusterManager.stop(ServiceType.HADOOP_DATANODE, 193 serverName.getHostname(), serverName.getPort()); 194 } 195 196 @Override 197 public void waitForDataNodeToStart(ServerName serverName, long timeout) throws IOException { 198 waitForServiceToStart(ServiceType.HADOOP_DATANODE, serverName, timeout); 199 } 200 201 @Override 202 public void waitForDataNodeToStop(ServerName serverName, long timeout) throws IOException { 203 waitForServiceToStop(ServiceType.HADOOP_DATANODE, serverName, timeout); 204 } 205 206 private void waitForServiceToStop(ServiceType service, ServerName serverName, long timeout) 207 throws IOException { 208 LOG.info("Waiting for service: " + service + " to stop: " + serverName.getServerName()); 209 long start = System.currentTimeMillis(); 210 211 while ((System.currentTimeMillis() - start) < timeout) { 212 if (!clusterManager.isRunning(service, serverName.getHostname(), serverName.getPort())) { 213 return; 214 } 215 Threads.sleep(100); 216 } 217 throw new IOException("did timeout waiting for service to stop:" + serverName); 218 } 219 220 private void waitForServiceToStart(ServiceType service, ServerName serverName, long timeout) 221 throws IOException { 222 LOG.info("Waiting for service: " + service + " to start: " + serverName.getServerName()); 223 long start = System.currentTimeMillis(); 224 225 while ((System.currentTimeMillis() - start) < timeout) { 226 if (clusterManager.isRunning(service, serverName.getHostname(), serverName.getPort())) { 227 return; 228 } 229 Threads.sleep(100); 230 } 231 throw new IOException("did timeout waiting for service to start:" + serverName); 232 } 233 234 235 @Override 236 public MasterService.BlockingInterface getMasterAdminService() 237 throws IOException { 238 return ((ClusterConnection)this.connection).getMaster(); 239 } 240 241 @Override 242 public void startMaster(String hostname, int port) throws IOException { 243 LOG.info("Starting Master on: " + hostname + ":" + port); 244 clusterManager.start(ServiceType.HBASE_MASTER, hostname, port); 245 } 246 247 @Override 248 public void killMaster(ServerName serverName) throws IOException { 249 LOG.info("Aborting Master: " + serverName.getServerName()); 250 clusterManager.kill(ServiceType.HBASE_MASTER, serverName.getHostname(), serverName.getPort()); 251 } 252 253 @Override 254 public void stopMaster(ServerName serverName) throws IOException { 255 LOG.info("Stopping Master: " + serverName.getServerName()); 256 clusterManager.stop(ServiceType.HBASE_MASTER, serverName.getHostname(), serverName.getPort()); 257 } 258 259 @Override 260 public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException { 261 waitForServiceToStop(ServiceType.HBASE_MASTER, serverName, timeout); 262 } 263 264 @Override 265 public boolean waitForActiveAndReadyMaster(long timeout) throws IOException { 266 long start = System.currentTimeMillis(); 267 while (System.currentTimeMillis() - start < timeout) { 268 try { 269 getMasterAdminService(); 270 return true; 271 } catch (MasterNotRunningException m) { 272 LOG.warn("Master not started yet " + m); 273 } catch (ZooKeeperConnectionException e) { 274 LOG.warn("Failed to connect to ZK " + e); 275 } 276 Threads.sleep(1000); 277 } 278 return false; 279 } 280 281 @Override 282 public ServerName getServerHoldingRegion(TableName tn, byte[] regionName) throws IOException { 283 byte[] startKey = RegionInfo.getStartKey(regionName); 284 HRegionLocation regionLoc = null; 285 try (RegionLocator locator = connection.getRegionLocator(tn)) { 286 regionLoc = locator.getRegionLocation(startKey, true); 287 } 288 if (regionLoc == null) { 289 LOG.warn("Cannot find region server holding region " + Bytes.toStringBinary(regionName)); 290 return null; 291 } 292 return regionLoc.getServerName(); 293 } 294 295 @Override 296 public void waitUntilShutDown() { 297 // Simply wait for a few seconds for now (after issuing serverManager.kill 298 throw new RuntimeException(HConstants.NOT_IMPLEMENTED); 299 } 300 301 @Override 302 public void shutdown() throws IOException { 303 // not sure we want this 304 throw new RuntimeException(HConstants.NOT_IMPLEMENTED); 305 } 306 307 @Override 308 public boolean isDistributedCluster() { 309 return true; 310 } 311 312 @Override 313 public boolean restoreClusterMetrics(ClusterMetrics initial) throws IOException { 314 ClusterMetrics current = getClusterMetrics(); 315 316 LOG.info("Restoring cluster - started"); 317 318 // do a best effort restore 319 boolean success = true; 320 success = restoreMasters(initial, current) & success; 321 success = restoreRegionServers(initial, current) & success; 322 success = restoreAdmin() & success; 323 324 LOG.info("Restoring cluster - done"); 325 return success; 326 } 327 328 protected boolean restoreMasters(ClusterMetrics initial, ClusterMetrics current) { 329 List<IOException> deferred = new ArrayList<>(); 330 //check whether current master has changed 331 final ServerName initMaster = initial.getMasterName(); 332 if (!ServerName.isSameAddress(initMaster, current.getMasterName())) { 333 LOG.info("Restoring cluster - Initial active master : " + initMaster.getAddress() + 334 " has changed to : " + current.getMasterName().getAddress()); 335 // If initial master is stopped, start it, before restoring the state. 336 // It will come up as a backup master, if there is already an active master. 337 try { 338 if (!clusterManager.isRunning(ServiceType.HBASE_MASTER, 339 initMaster.getHostname(), initMaster.getPort())) { 340 LOG.info("Restoring cluster - starting initial active master at:" 341 + initMaster.getAddress()); 342 startMaster(initMaster.getHostname(), initMaster.getPort()); 343 } 344 345 // master has changed, we would like to undo this. 346 // 1. Kill the current backups 347 // 2. Stop current master 348 // 3. Start backup masters 349 for (ServerName currentBackup : current.getBackupMasterNames()) { 350 if (!ServerName.isSameAddress(currentBackup, initMaster)) { 351 LOG.info("Restoring cluster - stopping backup master: " + currentBackup); 352 stopMaster(currentBackup); 353 } 354 } 355 LOG.info("Restoring cluster - stopping active master: " + current.getMasterName()); 356 stopMaster(current.getMasterName()); 357 waitForActiveAndReadyMaster(); // wait so that active master takes over 358 } catch (IOException ex) { 359 // if we fail to start the initial active master, we do not want to continue stopping 360 // backup masters. Just keep what we have now 361 deferred.add(ex); 362 } 363 364 //start backup masters 365 for (ServerName backup : initial.getBackupMasterNames()) { 366 try { 367 //these are not started in backup mode, but we should already have an active master 368 if (!clusterManager.isRunning(ServiceType.HBASE_MASTER, 369 backup.getHostname(), 370 backup.getPort())) { 371 LOG.info("Restoring cluster - starting initial backup master: " 372 + backup.getAddress()); 373 startMaster(backup.getHostname(), backup.getPort()); 374 } 375 } catch (IOException ex) { 376 deferred.add(ex); 377 } 378 } 379 } else { 380 //current master has not changed, match up backup masters 381 Set<ServerName> toStart = new TreeSet<>(new ServerNameIgnoreStartCodeComparator()); 382 Set<ServerName> toKill = new TreeSet<>(new ServerNameIgnoreStartCodeComparator()); 383 toStart.addAll(initial.getBackupMasterNames()); 384 toKill.addAll(current.getBackupMasterNames()); 385 386 for (ServerName server : current.getBackupMasterNames()) { 387 toStart.remove(server); 388 } 389 for (ServerName server: initial.getBackupMasterNames()) { 390 toKill.remove(server); 391 } 392 393 for (ServerName sn:toStart) { 394 try { 395 if(!clusterManager.isRunning(ServiceType.HBASE_MASTER, sn.getHostname(), sn.getPort())) { 396 LOG.info("Restoring cluster - starting initial backup master: " + sn.getAddress()); 397 startMaster(sn.getHostname(), sn.getPort()); 398 } 399 } catch (IOException ex) { 400 deferred.add(ex); 401 } 402 } 403 404 for (ServerName sn:toKill) { 405 try { 406 if(clusterManager.isRunning(ServiceType.HBASE_MASTER, sn.getHostname(), sn.getPort())) { 407 LOG.info("Restoring cluster - stopping backup master: " + sn.getAddress()); 408 stopMaster(sn); 409 } 410 } catch (IOException ex) { 411 deferred.add(ex); 412 } 413 } 414 } 415 if (!deferred.isEmpty()) { 416 LOG.warn("Restoring cluster - restoring region servers reported " 417 + deferred.size() + " errors:"); 418 for (int i=0; i<deferred.size() && i < 3; i++) { 419 LOG.warn(Objects.toString(deferred.get(i))); 420 } 421 } 422 423 return deferred.isEmpty(); 424 } 425 426 427 private static class ServerNameIgnoreStartCodeComparator implements Comparator<ServerName> { 428 @Override 429 public int compare(ServerName o1, ServerName o2) { 430 int compare = o1.getHostname().compareToIgnoreCase(o2.getHostname()); 431 if (compare != 0) return compare; 432 compare = o1.getPort() - o2.getPort(); 433 if (compare != 0) return compare; 434 return 0; 435 } 436 } 437 438 protected boolean restoreRegionServers(ClusterMetrics initial, ClusterMetrics current) { 439 Set<ServerName> toStart = new TreeSet<>(new ServerNameIgnoreStartCodeComparator()); 440 Set<ServerName> toKill = new TreeSet<>(new ServerNameIgnoreStartCodeComparator()); 441 toStart.addAll(initial.getLiveServerMetrics().keySet()); 442 toKill.addAll(current.getLiveServerMetrics().keySet()); 443 444 ServerName master = initial.getMasterName(); 445 446 for (ServerName server : current.getLiveServerMetrics().keySet()) { 447 toStart.remove(server); 448 } 449 for (ServerName server: initial.getLiveServerMetrics().keySet()) { 450 toKill.remove(server); 451 } 452 453 List<IOException> deferred = new ArrayList<>(); 454 455 for(ServerName sn:toStart) { 456 try { 457 if (!clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER, sn.getHostname(), 458 sn.getPort()) && master.getPort() != sn.getPort()) { 459 LOG.info("Restoring cluster - starting initial region server: " + sn.getAddress()); 460 startRegionServer(sn.getHostname(), sn.getPort()); 461 } 462 } catch (IOException ex) { 463 deferred.add(ex); 464 } 465 } 466 467 for(ServerName sn:toKill) { 468 try { 469 if (clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER, sn.getHostname(), 470 sn.getPort()) && master.getPort() != sn.getPort()) { 471 LOG.info("Restoring cluster - stopping initial region server: " + sn.getAddress()); 472 stopRegionServer(sn); 473 } 474 } catch (IOException ex) { 475 deferred.add(ex); 476 } 477 } 478 if (!deferred.isEmpty()) { 479 LOG.warn("Restoring cluster - restoring region servers reported " 480 + deferred.size() + " errors:"); 481 for (int i=0; i<deferred.size() && i < 3; i++) { 482 LOG.warn(Objects.toString(deferred.get(i))); 483 } 484 } 485 486 return deferred.isEmpty(); 487 } 488 489 protected boolean restoreAdmin() throws IOException { 490 // While restoring above, if the HBase Master which was initially the Active one, was down 491 // and the restore put the cluster back to Initial configuration, HAdmin instance will need 492 // to refresh its connections (otherwise it will return incorrect information) or we can 493 // point it to new instance. 494 try { 495 admin.close(); 496 } catch (IOException ioe) { 497 LOG.warn("While closing the old connection", ioe); 498 } 499 this.admin = this.connection.getAdmin(); 500 LOG.info("Added new HBaseAdmin"); 501 return true; 502 } 503}