001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.zookeeper; 019 020import static org.apache.zookeeper.client.FourLetterWordMain.send4LetterWord; 021 022import java.io.File; 023import java.io.IOException; 024import java.io.InterruptedIOException; 025import java.io.PrintWriter; 026import java.io.StringWriter; 027import java.net.BindException; 028import java.net.ConnectException; 029import java.net.InetAddress; 030import java.net.InetSocketAddress; 031import java.util.ArrayList; 032import java.util.List; 033import java.util.concurrent.ThreadLocalRandom; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.net.Address; 037import org.apache.hadoop.hbase.util.Bytes; 038import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 039import org.apache.hadoop.hbase.util.Threads; 040import org.apache.yetus.audience.InterfaceAudience; 041import org.apache.zookeeper.common.X509Exception; 042import org.apache.zookeeper.server.NIOServerCnxnFactory; 043import org.apache.zookeeper.server.ZooKeeperServer; 044import org.apache.zookeeper.server.persistence.FileTxnLog; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048/** 049 * TODO: Most of the code in this class is ripped from ZooKeeper tests. Instead of redoing it, we 050 * should contribute updates to their code which let us more easily access testing helper objects. 051 */ 052@InterfaceAudience.Public 053public class MiniZooKeeperCluster { 054 private static final Logger LOG = LoggerFactory.getLogger(MiniZooKeeperCluster.class); 055 private static final int TICK_TIME = 2000; 056 private static final int TIMEOUT = 1000; 057 private static final int DEFAULT_CONNECTION_TIMEOUT = 30000; 058 private static final byte[] STATIC_BYTES = Bytes.toBytes("stat"); 059 private final int connectionTimeout; 060 public static final String LOOPBACK_HOST = InetAddress.getLoopbackAddress().getHostName(); 061 public static final String HOST = LOOPBACK_HOST; 062 063 private boolean started; 064 065 /** 066 * The default port. If zero, we use a random port. 067 */ 068 private int defaultClientPort = 0; 069 070 private final List<NIOServerCnxnFactory> standaloneServerFactoryList; 071 private final List<ZooKeeperServer> zooKeeperServers; 072 private final List<Integer> clientPortList; 073 074 private int activeZKServerIndex; 075 private int tickTime = 0; 076 077 private final Configuration configuration; 078 079 public MiniZooKeeperCluster() { 080 this(new Configuration()); 081 } 082 083 public MiniZooKeeperCluster(Configuration configuration) { 084 this.started = false; 085 this.configuration = configuration; 086 activeZKServerIndex = -1; 087 zooKeeperServers = new ArrayList<>(); 088 clientPortList = new ArrayList<>(); 089 standaloneServerFactoryList = new ArrayList<>(); 090 connectionTimeout = configuration.getInt(HConstants.ZK_SESSION_TIMEOUT + ".localHBaseCluster", 091 DEFAULT_CONNECTION_TIMEOUT); 092 } 093 094 /** 095 * Add a client port to the list. 096 * @param clientPort the specified port 097 */ 098 public void addClientPort(int clientPort) { 099 clientPortList.add(clientPort); 100 } 101 102 /** 103 * Get the list of client ports. 104 * @return clientPortList the client port list 105 */ 106 @InterfaceAudience.Private 107 public List<Integer> getClientPortList() { 108 return clientPortList; 109 } 110 111 /** 112 * Check whether the client port in a specific position of the client port list is valid. 113 * @param index the specified position 114 */ 115 private boolean hasValidClientPortInList(int index) { 116 return (clientPortList.size() > index && clientPortList.get(index) > 0); 117 } 118 119 public void setDefaultClientPort(int clientPort) { 120 if (clientPort <= 0) { 121 throw new IllegalArgumentException("Invalid default ZK client port: " + clientPort); 122 } 123 this.defaultClientPort = clientPort; 124 } 125 126 /** 127 * Selects a ZK client port. 128 * @param seedPort the seed port to start with; -1 means first time. 129 * @return a valid and unused client port 130 */ 131 private int selectClientPort(int seedPort) { 132 int i; 133 int returnClientPort = seedPort + 1; 134 if (returnClientPort == 0) { 135 // If the new port is invalid, find one - starting with the default client port. 136 // If the default client port is not specified, starting with a random port. 137 // The random port is selected from the range between 49152 to 65535. These ports cannot be 138 // registered with IANA and are intended for dynamic allocation (see http://bit.ly/dynports). 139 if (defaultClientPort > 0) { 140 returnClientPort = defaultClientPort; 141 } else { 142 returnClientPort = 0xc000 + ThreadLocalRandom.current().nextInt(0x3f00); 143 } 144 } 145 // Make sure that the port is unused. 146 // break when an unused port is found 147 do { 148 for (i = 0; i < clientPortList.size(); i++) { 149 if (returnClientPort == clientPortList.get(i)) { 150 // Already used. Update the port and retry. 151 returnClientPort++; 152 break; 153 } 154 } 155 } while (i != clientPortList.size()); 156 return returnClientPort; 157 } 158 159 public void setTickTime(int tickTime) { 160 this.tickTime = tickTime; 161 } 162 163 public int getBackupZooKeeperServerNum() { 164 return zooKeeperServers.size() - 1; 165 } 166 167 public int getZooKeeperServerNum() { 168 return zooKeeperServers.size(); 169 } 170 171 // / XXX: From o.a.zk.t.ClientBase 172 private static void setupTestEnv() { 173 // during the tests we run with 100K prealloc in the logs. 174 // on windows systems prealloc of 64M was seen to take ~15seconds 175 // resulting in test failure (client timeout on first session). 176 // set env and directly in order to handle static init/gc issues 177 System.setProperty("zookeeper.preAllocSize", "100"); 178 FileTxnLog.setPreallocSize(100 * 1024); 179 // allow all 4 letter words 180 System.setProperty("zookeeper.4lw.commands.whitelist", "*"); 181 } 182 183 public int startup(File baseDir) throws IOException, InterruptedException { 184 int numZooKeeperServers = clientPortList.size(); 185 if (numZooKeeperServers == 0) { 186 numZooKeeperServers = 1; // need at least 1 ZK server for testing 187 } 188 return startup(baseDir, numZooKeeperServers); 189 } 190 191 /** 192 * @param baseDir the base directory to use 193 * @param numZooKeeperServers the number of ZooKeeper servers 194 * @return ClientPort server bound to, -1 if there was a binding problem and we couldn't pick 195 * another port. 196 * @throws IOException if an operation fails during the startup 197 * @throws InterruptedException if the startup fails 198 */ 199 public int startup(File baseDir, int numZooKeeperServers) 200 throws IOException, InterruptedException { 201 if (numZooKeeperServers <= 0) { 202 return -1; 203 } 204 205 setupTestEnv(); 206 shutdown(); 207 208 int tentativePort = -1; // the seed port 209 int currentClientPort; 210 211 // running all the ZK servers 212 for (int i = 0; i < numZooKeeperServers; i++) { 213 File dir = new File(baseDir, "zookeeper_" + i).getAbsoluteFile(); 214 createDir(dir); 215 int tickTimeToUse; 216 if (this.tickTime > 0) { 217 tickTimeToUse = this.tickTime; 218 } else { 219 tickTimeToUse = TICK_TIME; 220 } 221 222 // Set up client port - if we have already had a list of valid ports, use it. 223 if (hasValidClientPortInList(i)) { 224 currentClientPort = clientPortList.get(i); 225 } else { 226 tentativePort = selectClientPort(tentativePort); // update the seed 227 currentClientPort = tentativePort; 228 } 229 230 ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse); 231 // Setting {min,max}SessionTimeout defaults to be the same as in Zookeeper 232 server.setMinSessionTimeout( 233 configuration.getInt("hbase.zookeeper.property.minSessionTimeout", -1)); 234 server.setMaxSessionTimeout( 235 configuration.getInt("hbase.zookeeper.property.maxSessionTimeout", -1)); 236 NIOServerCnxnFactory standaloneServerFactory; 237 while (true) { 238 try { 239 standaloneServerFactory = new NIOServerCnxnFactory(); 240 String bindAddr = 241 configuration.get("hbase.zookeeper.property.clientPortAddress", LOOPBACK_HOST); 242 standaloneServerFactory.configure(new InetSocketAddress(bindAddr, currentClientPort), 243 configuration.getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, 244 HConstants.DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS)); 245 } catch (BindException e) { 246 LOG.debug("Failed binding ZK Server to client port: " + currentClientPort, e); 247 // We're told to use some port but it's occupied, fail 248 if (hasValidClientPortInList(i)) { 249 return -1; 250 } 251 // This port is already in use, try to use another. 252 tentativePort = selectClientPort(tentativePort); 253 currentClientPort = tentativePort; 254 continue; 255 } 256 break; 257 } 258 259 // Start up this ZK server. Dump its stats. 260 standaloneServerFactory.startup(server); 261 LOG.info("Started connectionTimeout={}, dir={}, {}", connectionTimeout, dir, 262 getServerConfigurationOnOneLine(server)); 263 // Runs a 'stat' against the servers. 264 if (!waitForServerUp(currentClientPort, connectionTimeout)) { 265 Threads.printThreadInfo(System.out, "Why is zk standalone server not coming up?"); 266 throw new IOException( 267 "Waiting for startup of standalone server; " + "server isRunning=" + server.isRunning()); 268 } 269 270 // We have selected a port as a client port. Update clientPortList if necessary. 271 if (clientPortList.size() <= i) { // it is not in the list, add the port 272 clientPortList.add(currentClientPort); 273 } else if (clientPortList.get(i) <= 0) { // the list has invalid port, update with valid port 274 clientPortList.remove(i); 275 clientPortList.add(i, currentClientPort); 276 } 277 278 standaloneServerFactoryList.add(standaloneServerFactory); 279 zooKeeperServers.add(server); 280 } 281 282 // set the first one to be active ZK; Others are backups 283 activeZKServerIndex = 0; 284 started = true; 285 int clientPort = clientPortList.get(activeZKServerIndex); 286 LOG.info("Started MiniZooKeeperCluster and ran 'stat' on client port={}", clientPort); 287 return clientPort; 288 } 289 290 private String getServerConfigurationOnOneLine(ZooKeeperServer server) { 291 StringWriter sw = new StringWriter(); 292 try (PrintWriter pw = new PrintWriter(sw) { 293 @Override 294 public void println(int x) { 295 super.print(x); 296 super.print(", "); 297 } 298 299 @Override 300 public void println(String x) { 301 super.print(x); 302 super.print(", "); 303 } 304 }) { 305 server.dumpConf(pw); 306 } 307 return sw.toString(); 308 } 309 310 private void createDir(File dir) throws IOException { 311 try { 312 if (!dir.exists()) { 313 dir.mkdirs(); 314 } 315 } catch (SecurityException e) { 316 throw new IOException("creating dir: " + dir, e); 317 } 318 } 319 320 /** 321 * @throws IOException if waiting for the shutdown of a server fails 322 */ 323 public void shutdown() throws IOException { 324 // shut down all the zk servers 325 for (int i = 0; i < standaloneServerFactoryList.size(); i++) { 326 NIOServerCnxnFactory standaloneServerFactory = standaloneServerFactoryList.get(i); 327 int clientPort = clientPortList.get(i); 328 standaloneServerFactory.shutdown(); 329 if (!waitForServerDown(clientPort, connectionTimeout)) { 330 throw new IOException("Waiting for shutdown of standalone server at port=" + clientPort 331 + ", timeout=" + this.connectionTimeout); 332 } 333 } 334 standaloneServerFactoryList.clear(); 335 336 for (ZooKeeperServer zkServer : zooKeeperServers) { 337 // Explicitly close ZKDatabase since ZookeeperServer does not close them 338 zkServer.getZKDatabase().close(); 339 } 340 zooKeeperServers.clear(); 341 342 // clear everything 343 if (started) { 344 started = false; 345 activeZKServerIndex = 0; 346 clientPortList.clear(); 347 LOG.info("Shutdown MiniZK cluster with all ZK servers"); 348 } 349 } 350 351 /** 352 * @return clientPort return clientPort if there is another ZK backup can run when killing the 353 * current active; return -1, if there is no backups. 354 * @throws IOException if waiting for the shutdown of a server fails 355 */ 356 public int killCurrentActiveZooKeeperServer() throws IOException, InterruptedException { 357 if (!started || activeZKServerIndex < 0) { 358 return -1; 359 } 360 361 // Shutdown the current active one 362 NIOServerCnxnFactory standaloneServerFactory = 363 standaloneServerFactoryList.get(activeZKServerIndex); 364 int clientPort = clientPortList.get(activeZKServerIndex); 365 366 standaloneServerFactory.shutdown(); 367 if (!waitForServerDown(clientPort, connectionTimeout)) { 368 throw new IOException("Waiting for shutdown of standalone server"); 369 } 370 371 zooKeeperServers.get(activeZKServerIndex).getZKDatabase().close(); 372 373 // remove the current active zk server 374 standaloneServerFactoryList.remove(activeZKServerIndex); 375 clientPortList.remove(activeZKServerIndex); 376 zooKeeperServers.remove(activeZKServerIndex); 377 LOG.info("Kill the current active ZK servers in the cluster on client port: {}", clientPort); 378 379 if (standaloneServerFactoryList.isEmpty()) { 380 // there is no backup servers; 381 return -1; 382 } 383 clientPort = clientPortList.get(activeZKServerIndex); 384 LOG.info("Activate a backup zk server in the cluster on client port: {}", clientPort); 385 // return the next back zk server's port 386 return clientPort; 387 } 388 389 /** 390 * Kill one back up ZK servers. 391 * @throws IOException if waiting for the shutdown of a server fails 392 */ 393 public void killOneBackupZooKeeperServer() throws IOException, InterruptedException { 394 if (!started || activeZKServerIndex < 0 || standaloneServerFactoryList.size() <= 1) { 395 return; 396 } 397 398 int backupZKServerIndex = activeZKServerIndex + 1; 399 // Shutdown the current active one 400 NIOServerCnxnFactory standaloneServerFactory = 401 standaloneServerFactoryList.get(backupZKServerIndex); 402 int clientPort = clientPortList.get(backupZKServerIndex); 403 404 standaloneServerFactory.shutdown(); 405 if (!waitForServerDown(clientPort, connectionTimeout)) { 406 throw new IOException("Waiting for shutdown of standalone server"); 407 } 408 409 zooKeeperServers.get(backupZKServerIndex).getZKDatabase().close(); 410 411 // remove this backup zk server 412 standaloneServerFactoryList.remove(backupZKServerIndex); 413 clientPortList.remove(backupZKServerIndex); 414 zooKeeperServers.remove(backupZKServerIndex); 415 LOG.info("Kill one backup ZK servers in the cluster on client port: {}", clientPort); 416 } 417 418 // XXX: From o.a.zk.t.ClientBase. We just dropped the check for ssl/secure. 419 private static boolean waitForServerDown(int port, long timeout) throws IOException { 420 long start = EnvironmentEdgeManager.currentTime(); 421 while (true) { 422 try { 423 send4LetterWord(HOST, port, "stat", false, (int) timeout); 424 } catch (IOException | X509Exception.SSLContextException e) { 425 return true; 426 } 427 if (EnvironmentEdgeManager.currentTime() > start + timeout) { 428 break; 429 } 430 try { 431 Thread.sleep(TIMEOUT); 432 } catch (InterruptedException e) { 433 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 434 } 435 } 436 return false; 437 } 438 439 // XXX: From o.a.zk.t.ClientBase. Its in the test jar but we don't depend on zk test jar. 440 // We remove the SSL/secure bit. Not used in here. 441 private static boolean waitForServerUp(int port, long timeout) throws IOException { 442 long start = EnvironmentEdgeManager.currentTime(); 443 while (true) { 444 try { 445 String result = send4LetterWord(HOST, port, "stat", false, (int) timeout); 446 if (result.startsWith("Zookeeper version:") && !result.contains("READ-ONLY")) { 447 return true; 448 } else { 449 LOG.debug("Read {}", result); 450 } 451 } catch (ConnectException e) { 452 // ignore as this is expected, do not log stacktrace 453 LOG.info("{}:{} not up: {}", HOST, port, e.toString()); 454 } catch (IOException | X509Exception.SSLContextException e) { 455 // ignore as this is expected 456 LOG.info("{}:{} not up", HOST, port, e); 457 } 458 459 if (EnvironmentEdgeManager.currentTime() > start + timeout) { 460 break; 461 } 462 try { 463 Thread.sleep(TIMEOUT); 464 } catch (InterruptedException e) { 465 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 466 } 467 } 468 return false; 469 } 470 471 public int getClientPort() { 472 return activeZKServerIndex < 0 || activeZKServerIndex >= clientPortList.size() 473 ? -1 474 : clientPortList.get(activeZKServerIndex); 475 } 476 477 /** Returns Address for this cluster instance. */ 478 public Address getAddress() { 479 return Address.fromParts(HOST, getClientPort()); 480 } 481 482 List<ZooKeeperServer> getZooKeeperServers() { 483 return zooKeeperServers; 484 } 485}