001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.zookeeper; 019 020import static org.apache.zookeeper.client.FourLetterWordMain.send4LetterWord; 021 022import java.io.File; 023import java.io.IOException; 024import java.io.InterruptedIOException; 025import java.io.PrintWriter; 026import java.io.StringWriter; 027import java.net.BindException; 028import java.net.ConnectException; 029import java.net.InetAddress; 030import java.net.InetSocketAddress; 031import java.util.ArrayList; 032import java.util.List; 033import java.util.concurrent.ThreadLocalRandom; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.net.Address; 037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 038import org.apache.hadoop.hbase.util.Threads; 039import org.apache.yetus.audience.InterfaceAudience; 040import org.apache.zookeeper.common.X509Exception; 041import org.apache.zookeeper.server.NIOServerCnxnFactory; 042import org.apache.zookeeper.server.ZooKeeperServer; 043import org.apache.zookeeper.server.persistence.FileTxnLog; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047/** 048 * TODO: Most of the code in this class is ripped from ZooKeeper tests. Instead of redoing it, we 049 * should contribute updates to their code which let us more easily access testing helper objects. 050 */ 051@InterfaceAudience.Public 052public class MiniZooKeeperCluster { 053 private static final Logger LOG = LoggerFactory.getLogger(MiniZooKeeperCluster.class); 054 private static final int TICK_TIME = 2000; 055 private static final int TIMEOUT = 1000; 056 private static final int DEFAULT_CONNECTION_TIMEOUT = 30000; 057 private int connectionTimeout; 058 public static final String LOOPBACK_HOST = InetAddress.getLoopbackAddress().getHostName(); 059 public static final String HOST = LOOPBACK_HOST; 060 061 private boolean started; 062 063 /** 064 * The default port. If zero, we use a random port. 065 */ 066 private int defaultClientPort = 0; 067 068 private final List<NIOServerCnxnFactory> standaloneServerFactoryList; 069 private final List<ZooKeeperServer> zooKeeperServers; 070 private final List<Integer> clientPortList; 071 072 private int activeZKServerIndex; 073 private int tickTime = 0; 074 075 private final Configuration configuration; 076 077 public MiniZooKeeperCluster() { 078 this(new Configuration()); 079 } 080 081 public MiniZooKeeperCluster(Configuration configuration) { 082 this.started = false; 083 this.configuration = configuration; 084 activeZKServerIndex = -1; 085 zooKeeperServers = new ArrayList<>(); 086 clientPortList = new ArrayList<>(); 087 standaloneServerFactoryList = new ArrayList<>(); 088 connectionTimeout = configuration.getInt(HConstants.ZK_SESSION_TIMEOUT + ".localHBaseCluster", 089 DEFAULT_CONNECTION_TIMEOUT); 090 } 091 092 /** 093 * Add a client port to the list. 094 * @param clientPort the specified port 095 */ 096 public void addClientPort(int clientPort) { 097 clientPortList.add(clientPort); 098 } 099 100 /** 101 * Get the list of client ports. 102 * @return clientPortList the client port list 103 */ 104 @InterfaceAudience.Private 105 public List<Integer> getClientPortList() { 106 return clientPortList; 107 } 108 109 /** 110 * Check whether the client port in a specific position of the client port list is valid. 111 * @param index the specified position 112 */ 113 private boolean hasValidClientPortInList(int index) { 114 return (clientPortList.size() > index && clientPortList.get(index) > 0); 115 } 116 117 public void setDefaultClientPort(int clientPort) { 118 if (clientPort <= 0) { 119 throw new IllegalArgumentException("Invalid default ZK client port: " + clientPort); 120 } 121 this.defaultClientPort = clientPort; 122 } 123 124 /** 125 * Selects a ZK client port. 126 * @param seedPort the seed port to start with; -1 means first time. 127 * @return a valid and unused client port 128 */ 129 private int selectClientPort(int seedPort) { 130 int i; 131 int returnClientPort = seedPort + 1; 132 if (returnClientPort == 0) { 133 // If the new port is invalid, find one - starting with the default client port. 134 // If the default client port is not specified, starting with a random port. 135 // The random port is selected from the range between 49152 to 65535. These ports cannot be 136 // registered with IANA and are intended for dynamic allocation (see http://bit.ly/dynports). 137 if (defaultClientPort > 0) { 138 returnClientPort = defaultClientPort; 139 } else { 140 returnClientPort = 0xc000 + ThreadLocalRandom.current().nextInt(0x3f00); 141 } 142 } 143 // Make sure that the port is unused. 144 // break when an unused port is found 145 do { 146 for (i = 0; i < clientPortList.size(); i++) { 147 if (returnClientPort == clientPortList.get(i)) { 148 // Already used. Update the port and retry. 149 returnClientPort++; 150 break; 151 } 152 } 153 } while (i != clientPortList.size()); 154 return returnClientPort; 155 } 156 157 public void setTickTime(int tickTime) { 158 this.tickTime = tickTime; 159 } 160 161 public int getBackupZooKeeperServerNum() { 162 return zooKeeperServers.size() - 1; 163 } 164 165 public int getZooKeeperServerNum() { 166 return zooKeeperServers.size(); 167 } 168 169 // / XXX: From o.a.zk.t.ClientBase 170 private static void setupTestEnv() { 171 // during the tests we run with 100K prealloc in the logs. 172 // on windows systems prealloc of 64M was seen to take ~15seconds 173 // resulting in test failure (client timeout on first session). 174 // set env and directly in order to handle static init/gc issues 175 System.setProperty("zookeeper.preAllocSize", "100"); 176 FileTxnLog.setPreallocSize(100 * 1024); 177 // allow all 4 letter words 178 System.setProperty("zookeeper.4lw.commands.whitelist", "*"); 179 } 180 181 public int startup(File baseDir) throws IOException, InterruptedException { 182 int numZooKeeperServers = clientPortList.size(); 183 if (numZooKeeperServers == 0) { 184 numZooKeeperServers = 1; // need at least 1 ZK server for testing 185 } 186 return startup(baseDir, numZooKeeperServers); 187 } 188 189 /** 190 * @param baseDir the base directory to use 191 * @param numZooKeeperServers the number of ZooKeeper servers 192 * @return ClientPort server bound to, -1 if there was a binding problem and we couldn't pick 193 * another port. 194 * @throws IOException if an operation fails during the startup 195 * @throws InterruptedException if the startup fails 196 */ 197 public int startup(File baseDir, int numZooKeeperServers) 198 throws IOException, InterruptedException { 199 if (numZooKeeperServers <= 0) { 200 return -1; 201 } 202 203 setupTestEnv(); 204 shutdown(); 205 206 int tentativePort = -1; // the seed port 207 int currentClientPort; 208 209 // running all the ZK servers 210 for (int i = 0; i < numZooKeeperServers; i++) { 211 File dir = new File(baseDir, "zookeeper_" + i).getAbsoluteFile(); 212 createDir(dir); 213 int tickTimeToUse; 214 if (this.tickTime > 0) { 215 tickTimeToUse = this.tickTime; 216 } else { 217 tickTimeToUse = TICK_TIME; 218 } 219 220 // Set up client port - if we have already had a list of valid ports, use it. 221 if (hasValidClientPortInList(i)) { 222 currentClientPort = clientPortList.get(i); 223 } else { 224 tentativePort = selectClientPort(tentativePort); // update the seed 225 currentClientPort = tentativePort; 226 } 227 228 ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse); 229 // Setting {min,max}SessionTimeout defaults to be the same as in Zookeeper 230 server.setMinSessionTimeout( 231 configuration.getInt("hbase.zookeeper.property.minSessionTimeout", -1)); 232 server.setMaxSessionTimeout( 233 configuration.getInt("hbase.zookeeper.property.maxSessionTimeout", -1)); 234 NIOServerCnxnFactory standaloneServerFactory; 235 while (true) { 236 try { 237 standaloneServerFactory = new NIOServerCnxnFactory(); 238 standaloneServerFactory.configure(new InetSocketAddress(LOOPBACK_HOST, currentClientPort), 239 configuration.getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, 240 HConstants.DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS)); 241 } catch (BindException e) { 242 LOG.debug("Failed binding ZK Server to client port: " + currentClientPort, e); 243 // We're told to use some port but it's occupied, fail 244 if (hasValidClientPortInList(i)) { 245 return -1; 246 } 247 // This port is already in use, try to use another. 248 tentativePort = selectClientPort(tentativePort); 249 currentClientPort = tentativePort; 250 continue; 251 } 252 break; 253 } 254 255 // Start up this ZK server. Dump its stats. 256 standaloneServerFactory.startup(server); 257 LOG.info("Started connectionTimeout={}, dir={}, {}", connectionTimeout, dir, 258 getServerConfigurationOnOneLine(server)); 259 // Runs a 'stat' against the servers. 260 if (!waitForServerUp(currentClientPort, connectionTimeout)) { 261 Threads.printThreadInfo(System.out, "Why is zk standalone server not coming up?"); 262 throw new IOException( 263 "Waiting for startup of standalone server; " + "server isRunning=" + server.isRunning()); 264 } 265 266 // We have selected a port as a client port. Update clientPortList if necessary. 267 if (clientPortList.size() <= i) { // it is not in the list, add the port 268 clientPortList.add(currentClientPort); 269 } else if (clientPortList.get(i) <= 0) { // the list has invalid port, update with valid port 270 clientPortList.remove(i); 271 clientPortList.add(i, currentClientPort); 272 } 273 274 standaloneServerFactoryList.add(standaloneServerFactory); 275 zooKeeperServers.add(server); 276 } 277 278 // set the first one to be active ZK; Others are backups 279 activeZKServerIndex = 0; 280 started = true; 281 int clientPort = clientPortList.get(activeZKServerIndex); 282 LOG.info("Started MiniZooKeeperCluster and ran 'stat' on client port={}", clientPort); 283 return clientPort; 284 } 285 286 private String getServerConfigurationOnOneLine(ZooKeeperServer server) { 287 StringWriter sw = new StringWriter(); 288 try (PrintWriter pw = new PrintWriter(sw) { 289 @Override 290 public void println(int x) { 291 super.print(x); 292 super.print(", "); 293 } 294 295 @Override 296 public void println(String x) { 297 super.print(x); 298 super.print(", "); 299 } 300 }) { 301 server.dumpConf(pw); 302 } 303 return sw.toString(); 304 } 305 306 private void createDir(File dir) throws IOException { 307 try { 308 if (!dir.exists()) { 309 dir.mkdirs(); 310 } 311 } catch (SecurityException e) { 312 throw new IOException("creating dir: " + dir, e); 313 } 314 } 315 316 /** 317 * @throws IOException if waiting for the shutdown of a server fails 318 */ 319 public void shutdown() throws IOException { 320 // shut down all the zk servers 321 for (int i = 0; i < standaloneServerFactoryList.size(); i++) { 322 NIOServerCnxnFactory standaloneServerFactory = standaloneServerFactoryList.get(i); 323 int clientPort = clientPortList.get(i); 324 standaloneServerFactory.shutdown(); 325 if (!waitForServerDown(clientPort, connectionTimeout)) { 326 throw new IOException("Waiting for shutdown of standalone server at port=" + clientPort 327 + ", timeout=" + this.connectionTimeout); 328 } 329 } 330 standaloneServerFactoryList.clear(); 331 332 for (ZooKeeperServer zkServer : zooKeeperServers) { 333 // Explicitly close ZKDatabase since ZookeeperServer does not close them 334 zkServer.getZKDatabase().close(); 335 } 336 zooKeeperServers.clear(); 337 338 // clear everything 339 if (started) { 340 started = false; 341 activeZKServerIndex = 0; 342 clientPortList.clear(); 343 LOG.info("Shutdown MiniZK cluster with all ZK servers"); 344 } 345 } 346 347 /** 348 * @return clientPort return clientPort if there is another ZK backup can run when killing the 349 * current active; return -1, if there is no backups. 350 * @throws IOException if waiting for the shutdown of a server fails 351 */ 352 public int killCurrentActiveZooKeeperServer() throws IOException, InterruptedException { 353 if (!started || activeZKServerIndex < 0) { 354 return -1; 355 } 356 357 // Shutdown the current active one 358 NIOServerCnxnFactory standaloneServerFactory = 359 standaloneServerFactoryList.get(activeZKServerIndex); 360 int clientPort = clientPortList.get(activeZKServerIndex); 361 362 standaloneServerFactory.shutdown(); 363 if (!waitForServerDown(clientPort, connectionTimeout)) { 364 throw new IOException("Waiting for shutdown of standalone server"); 365 } 366 367 zooKeeperServers.get(activeZKServerIndex).getZKDatabase().close(); 368 369 // remove the current active zk server 370 standaloneServerFactoryList.remove(activeZKServerIndex); 371 clientPortList.remove(activeZKServerIndex); 372 zooKeeperServers.remove(activeZKServerIndex); 373 LOG.info("Kill the current active ZK servers in the cluster on client port: {}", clientPort); 374 375 if (standaloneServerFactoryList.isEmpty()) { 376 // there is no backup servers; 377 return -1; 378 } 379 clientPort = clientPortList.get(activeZKServerIndex); 380 LOG.info("Activate a backup zk server in the cluster on client port: {}", clientPort); 381 // return the next back zk server's port 382 return clientPort; 383 } 384 385 /** 386 * Kill one back up ZK servers. 387 * @throws IOException if waiting for the shutdown of a server fails 388 */ 389 public void killOneBackupZooKeeperServer() throws IOException, InterruptedException { 390 if (!started || activeZKServerIndex < 0 || standaloneServerFactoryList.size() <= 1) { 391 return; 392 } 393 394 int backupZKServerIndex = activeZKServerIndex + 1; 395 // Shutdown the current active one 396 NIOServerCnxnFactory standaloneServerFactory = 397 standaloneServerFactoryList.get(backupZKServerIndex); 398 int clientPort = clientPortList.get(backupZKServerIndex); 399 400 standaloneServerFactory.shutdown(); 401 if (!waitForServerDown(clientPort, connectionTimeout)) { 402 throw new IOException("Waiting for shutdown of standalone server"); 403 } 404 405 zooKeeperServers.get(backupZKServerIndex).getZKDatabase().close(); 406 407 // remove this backup zk server 408 standaloneServerFactoryList.remove(backupZKServerIndex); 409 clientPortList.remove(backupZKServerIndex); 410 zooKeeperServers.remove(backupZKServerIndex); 411 LOG.info("Kill one backup ZK servers in the cluster on client port: {}", clientPort); 412 } 413 414 // XXX: From o.a.zk.t.ClientBase. We just dropped the check for ssl/secure. 415 private static boolean waitForServerDown(int port, long timeout) throws IOException { 416 long start = EnvironmentEdgeManager.currentTime(); 417 while (true) { 418 try { 419 send4LetterWord(HOST, port, "stat", false, (int) timeout); 420 } catch (IOException | X509Exception.SSLContextException e) { 421 return true; 422 } 423 if (EnvironmentEdgeManager.currentTime() > start + timeout) { 424 break; 425 } 426 try { 427 Thread.sleep(TIMEOUT); 428 } catch (InterruptedException e) { 429 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 430 } 431 } 432 return false; 433 } 434 435 // XXX: From o.a.zk.t.ClientBase. Its in the test jar but we don't depend on zk test jar. 436 // We remove the SSL/secure bit. Not used in here. 437 private static boolean waitForServerUp(int port, long timeout) throws IOException { 438 long start = EnvironmentEdgeManager.currentTime(); 439 while (true) { 440 try { 441 String result = send4LetterWord(HOST, port, "stat", false, (int) timeout); 442 if (result.startsWith("Zookeeper version:") && !result.contains("READ-ONLY")) { 443 return true; 444 } else { 445 LOG.debug("Read {}", result); 446 } 447 } catch (ConnectException e) { 448 // ignore as this is expected, do not log stacktrace 449 LOG.info("{}:{} not up: {}", HOST, port, e.toString()); 450 } catch (IOException | X509Exception.SSLContextException e) { 451 // ignore as this is expected 452 LOG.info("{}:{} not up", HOST, port, e); 453 } 454 455 if (EnvironmentEdgeManager.currentTime() > start + timeout) { 456 break; 457 } 458 try { 459 Thread.sleep(TIMEOUT); 460 } catch (InterruptedException e) { 461 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 462 } 463 } 464 return false; 465 } 466 467 public int getClientPort() { 468 return activeZKServerIndex < 0 || activeZKServerIndex >= clientPortList.size() 469 ? -1 470 : clientPortList.get(activeZKServerIndex); 471 } 472 473 /** Returns Address for this cluster instance. */ 474 public Address getAddress() { 475 return Address.fromParts(HOST, getClientPort()); 476 } 477 478 List<ZooKeeperServer> getZooKeeperServers() { 479 return zooKeeperServers; 480 } 481}