001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.zookeeper; 020 021import java.io.BufferedReader; 022import java.io.File; 023import java.io.IOException; 024import java.io.InputStreamReader; 025import java.io.InterruptedIOException; 026import java.io.OutputStream; 027import java.io.Reader; 028import java.net.BindException; 029import java.net.InetSocketAddress; 030import java.net.Socket; 031import java.util.ArrayList; 032import java.util.List; 033import java.util.Random; 034 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.apache.zookeeper.server.NIOServerCnxnFactory; 039import org.apache.zookeeper.server.ZooKeeperServer; 040import org.apache.zookeeper.server.persistence.FileTxnLog; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 045 046/** 047 * TODO: Most of the code in this class is ripped from ZooKeeper tests. Instead 048 * of redoing it, we should contribute updates to their code which let us more 049 * easily access testing helper objects. 050 */ 051@InterfaceAudience.Public 052public class MiniZooKeeperCluster { 053 private static final Logger LOG = LoggerFactory.getLogger(MiniZooKeeperCluster.class); 054 055 private static final int TICK_TIME = 2000; 056 private static final int DEFAULT_CONNECTION_TIMEOUT = 30000; 057 private final int connectionTimeout; 058 059 private boolean started; 060 061 /** The default port. If zero, we use a random port. */ 062 private int defaultClientPort = 0; 063 064 private final List<NIOServerCnxnFactory> standaloneServerFactoryList; 065 private final List<ZooKeeperServer> zooKeeperServers; 066 private final List<Integer> clientPortList; 067 068 private int activeZKServerIndex; 069 private int tickTime = 0; 070 071 private final Configuration configuration; 072 073 public MiniZooKeeperCluster() { 074 this(new Configuration()); 075 } 076 077 public MiniZooKeeperCluster(Configuration configuration) { 078 this.started = false; 079 this.configuration = configuration; 080 activeZKServerIndex = -1; 081 zooKeeperServers = new ArrayList<>(); 082 clientPortList = new ArrayList<>(); 083 standaloneServerFactoryList = new ArrayList<>(); 084 connectionTimeout = configuration.getInt(HConstants.ZK_SESSION_TIMEOUT + ".localHBaseCluster", 085 DEFAULT_CONNECTION_TIMEOUT); 086 } 087 088 /** 089 * Add a client port to the list. 090 * 091 * @param clientPort the specified port 092 */ 093 public void addClientPort(int clientPort) { 094 clientPortList.add(clientPort); 095 } 096 097 /** 098 * Get the list of client ports. 099 * 100 * @return clientPortList the client port list 101 */ 102 @VisibleForTesting 103 public List<Integer> getClientPortList() { 104 return clientPortList; 105 } 106 107 /** 108 * Check whether the client port in a specific position of the client port list is valid. 109 * 110 * @param index the specified position 111 */ 112 private boolean hasValidClientPortInList(int index) { 113 return (clientPortList.size() > index && clientPortList.get(index) > 0); 114 } 115 116 public void setDefaultClientPort(int clientPort) { 117 if (clientPort <= 0) { 118 throw new IllegalArgumentException("Invalid default ZK client port: " 119 + clientPort); 120 } 121 this.defaultClientPort = clientPort; 122 } 123 124 /** 125 * Selects a ZK client port. 126 * 127 * @param seedPort the seed port to start with; -1 means first time. 128 * @return a valid and unused client port 129 */ 130 private int selectClientPort(int seedPort) { 131 int i; 132 int returnClientPort = seedPort + 1; 133 if (returnClientPort == 0) { 134 // If the new port is invalid, find one - starting with the default client port. 135 // If the default client port is not specified, starting with a random port. 136 // The random port is selected from the range between 49152 to 65535. These ports cannot be 137 // registered with IANA and are intended for dynamic allocation (see http://bit.ly/dynports). 138 if (defaultClientPort > 0) { 139 returnClientPort = defaultClientPort; 140 } else { 141 returnClientPort = 0xc000 + new Random().nextInt(0x3f00); 142 } 143 } 144 // Make sure that the port is unused. 145 // break when an unused port is found 146 do { 147 for (i = 0; i < clientPortList.size(); i++) { 148 if (returnClientPort == clientPortList.get(i)) { 149 // Already used. Update the port and retry. 150 returnClientPort++; 151 break; 152 } 153 } 154 } while (i != clientPortList.size()); 155 return returnClientPort; 156 } 157 158 public void setTickTime(int tickTime) { 159 this.tickTime = tickTime; 160 } 161 162 public int getBackupZooKeeperServerNum() { 163 return zooKeeperServers.size() - 1; 164 } 165 166 public int getZooKeeperServerNum() { 167 return zooKeeperServers.size(); 168 } 169 170 // / XXX: From o.a.zk.t.ClientBase 171 private static void setupTestEnv() { 172 // during the tests we run with 100K prealloc in the logs. 173 // on windows systems prealloc of 64M was seen to take ~15seconds 174 // resulting in test failure (client timeout on first session). 175 // set env and directly in order to handle static init/gc issues 176 System.setProperty("zookeeper.preAllocSize", "100"); 177 FileTxnLog.setPreallocSize(100 * 1024); 178 // allow all 4 letter words 179 System.setProperty("zookeeper.4lw.commands.whitelist", "*"); 180 } 181 182 public int startup(File baseDir) throws IOException, InterruptedException { 183 int numZooKeeperServers = clientPortList.size(); 184 if (numZooKeeperServers == 0) { 185 numZooKeeperServers = 1; // need at least 1 ZK server for testing 186 } 187 return startup(baseDir, numZooKeeperServers); 188 } 189 190 /** 191 * @param baseDir the base directory to use 192 * @param numZooKeeperServers the number of ZooKeeper servers 193 * @return ClientPort server bound to, -1 if there was a binding problem and we couldn't pick 194 * another port. 195 * @throws IOException if an operation fails during the startup 196 * @throws InterruptedException if the startup fails 197 */ 198 public int startup(File baseDir, int numZooKeeperServers) throws IOException, 199 InterruptedException { 200 if (numZooKeeperServers <= 0) { 201 return -1; 202 } 203 204 setupTestEnv(); 205 shutdown(); 206 207 int tentativePort = -1; // the seed port 208 int currentClientPort; 209 210 // running all the ZK servers 211 for (int i = 0; i < numZooKeeperServers; i++) { 212 File dir = new File(baseDir, "zookeeper_" + i).getAbsoluteFile(); 213 createDir(dir); 214 int tickTimeToUse; 215 if (this.tickTime > 0) { 216 tickTimeToUse = this.tickTime; 217 } else { 218 tickTimeToUse = TICK_TIME; 219 } 220 221 // Set up client port - if we have already had a list of valid ports, use it. 222 if (hasValidClientPortInList(i)) { 223 currentClientPort = clientPortList.get(i); 224 } else { 225 tentativePort = selectClientPort(tentativePort); // update the seed 226 currentClientPort = tentativePort; 227 } 228 229 ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse); 230 // Setting {min,max}SessionTimeout defaults to be the same as in Zookeeper 231 server.setMinSessionTimeout(configuration.getInt( 232 "hbase.zookeeper.property.minSessionTimeout", -1)); 233 server.setMaxSessionTimeout(configuration.getInt( 234 "hbase.zookeeper.property.maxSessionTimeout", -1)); 235 NIOServerCnxnFactory standaloneServerFactory; 236 while (true) { 237 try { 238 standaloneServerFactory = new NIOServerCnxnFactory(); 239 standaloneServerFactory.configure( 240 new InetSocketAddress(currentClientPort), 241 configuration.getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, 242 HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS)); 243 } catch (BindException e) { 244 LOG.debug("Failed binding ZK Server to client port: " + 245 currentClientPort, e); 246 // We're told to use some port but it's occupied, fail 247 if (hasValidClientPortInList(i)) { 248 return -1; 249 } 250 // This port is already in use, try to use another. 251 tentativePort = selectClientPort(tentativePort); 252 currentClientPort = tentativePort; 253 continue; 254 } 255 break; 256 } 257 258 // Start up this ZK server 259 standaloneServerFactory.startup(server); 260 // Runs a 'stat' against the servers. 261 if (!waitForServerUp(currentClientPort, connectionTimeout)) { 262 throw new IOException("Waiting for startup of standalone server"); 263 } 264 265 // We have selected a port as a client port. Update clientPortList if necessary. 266 if (clientPortList.size() <= i) { // it is not in the list, add the port 267 clientPortList.add(currentClientPort); 268 } else if (clientPortList.get(i) <= 0) { // the list has invalid port, update with valid port 269 clientPortList.remove(i); 270 clientPortList.add(i, currentClientPort); 271 } 272 273 standaloneServerFactoryList.add(standaloneServerFactory); 274 zooKeeperServers.add(server); 275 } 276 277 // set the first one to be active ZK; Others are backups 278 activeZKServerIndex = 0; 279 started = true; 280 int clientPort = clientPortList.get(activeZKServerIndex); 281 LOG.info("Started MiniZooKeeperCluster and ran successful 'stat' " + 282 "on client port=" + clientPort); 283 return clientPort; 284 } 285 286 private void createDir(File dir) throws IOException { 287 try { 288 if (!dir.exists()) { 289 dir.mkdirs(); 290 } 291 } catch (SecurityException e) { 292 throw new IOException("creating dir: " + dir, e); 293 } 294 } 295 296 /** 297 * @throws IOException if waiting for the shutdown of a server fails 298 */ 299 public void shutdown() throws IOException { 300 // shut down all the zk servers 301 for (int i = 0; i < standaloneServerFactoryList.size(); i++) { 302 NIOServerCnxnFactory standaloneServerFactory = 303 standaloneServerFactoryList.get(i); 304 int clientPort = clientPortList.get(i); 305 306 standaloneServerFactory.shutdown(); 307 if (!waitForServerDown(clientPort, connectionTimeout)) { 308 throw new IOException("Waiting for shutdown of standalone server"); 309 } 310 } 311 standaloneServerFactoryList.clear(); 312 313 for (ZooKeeperServer zkServer: zooKeeperServers) { 314 //explicitly close ZKDatabase since ZookeeperServer does not close them 315 zkServer.getZKDatabase().close(); 316 } 317 zooKeeperServers.clear(); 318 319 // clear everything 320 if (started) { 321 started = false; 322 activeZKServerIndex = 0; 323 clientPortList.clear(); 324 LOG.info("Shutdown MiniZK cluster with all ZK servers"); 325 } 326 } 327 328 /** 329 * @return clientPort return clientPort if there is another ZK backup can run 330 * when killing the current active; return -1, if there is no backups. 331 * @throws IOException if waiting for the shutdown of a server fails 332 */ 333 public int killCurrentActiveZooKeeperServer() throws IOException, InterruptedException { 334 if (!started || activeZKServerIndex < 0) { 335 return -1; 336 } 337 338 // Shutdown the current active one 339 NIOServerCnxnFactory standaloneServerFactory = 340 standaloneServerFactoryList.get(activeZKServerIndex); 341 int clientPort = clientPortList.get(activeZKServerIndex); 342 343 standaloneServerFactory.shutdown(); 344 if (!waitForServerDown(clientPort, connectionTimeout)) { 345 throw new IOException("Waiting for shutdown of standalone server"); 346 } 347 348 zooKeeperServers.get(activeZKServerIndex).getZKDatabase().close(); 349 350 // remove the current active zk server 351 standaloneServerFactoryList.remove(activeZKServerIndex); 352 clientPortList.remove(activeZKServerIndex); 353 zooKeeperServers.remove(activeZKServerIndex); 354 LOG.info("Kill the current active ZK servers in the cluster " + 355 "on client port: " + clientPort); 356 357 if (standaloneServerFactoryList.isEmpty()) { 358 // there is no backup servers; 359 return -1; 360 } 361 clientPort = clientPortList.get(activeZKServerIndex); 362 LOG.info("Activate a backup zk server in the cluster " + 363 "on client port: " + clientPort); 364 // return the next back zk server's port 365 return clientPort; 366 } 367 368 /** 369 * Kill one back up ZK servers. 370 * 371 * @throws IOException if waiting for the shutdown of a server fails 372 */ 373 public void killOneBackupZooKeeperServer() throws IOException, InterruptedException { 374 if (!started || activeZKServerIndex < 0 || standaloneServerFactoryList.size() <= 1) { 375 return ; 376 } 377 378 int backupZKServerIndex = activeZKServerIndex+1; 379 // Shutdown the current active one 380 NIOServerCnxnFactory standaloneServerFactory = 381 standaloneServerFactoryList.get(backupZKServerIndex); 382 int clientPort = clientPortList.get(backupZKServerIndex); 383 384 standaloneServerFactory.shutdown(); 385 if (!waitForServerDown(clientPort, connectionTimeout)) { 386 throw new IOException("Waiting for shutdown of standalone server"); 387 } 388 389 zooKeeperServers.get(backupZKServerIndex).getZKDatabase().close(); 390 391 // remove this backup zk server 392 standaloneServerFactoryList.remove(backupZKServerIndex); 393 clientPortList.remove(backupZKServerIndex); 394 zooKeeperServers.remove(backupZKServerIndex); 395 LOG.info("Kill one backup ZK servers in the cluster " + 396 "on client port: " + clientPort); 397 } 398 399 // XXX: From o.a.zk.t.ClientBase 400 private static boolean waitForServerDown(int port, long timeout) throws IOException { 401 long start = System.currentTimeMillis(); 402 while (true) { 403 try { 404 try (Socket sock = new Socket("localhost", port)) { 405 OutputStream outstream = sock.getOutputStream(); 406 outstream.write("stat".getBytes()); 407 outstream.flush(); 408 } 409 } catch (IOException e) { 410 return true; 411 } 412 413 if (System.currentTimeMillis() > start + timeout) { 414 break; 415 } 416 try { 417 Thread.sleep(250); 418 } catch (InterruptedException e) { 419 throw (InterruptedIOException)new InterruptedIOException().initCause(e); 420 } 421 } 422 return false; 423 } 424 425 // XXX: From o.a.zk.t.ClientBase 426 private static boolean waitForServerUp(int port, long timeout) throws IOException { 427 long start = System.currentTimeMillis(); 428 while (true) { 429 try { 430 Socket sock = new Socket("localhost", port); 431 BufferedReader reader = null; 432 try { 433 OutputStream outstream = sock.getOutputStream(); 434 outstream.write("stat".getBytes()); 435 outstream.flush(); 436 437 Reader isr = new InputStreamReader(sock.getInputStream()); 438 reader = new BufferedReader(isr); 439 String line = reader.readLine(); 440 if (line != null && line.startsWith("Zookeeper version:")) { 441 return true; 442 } 443 } finally { 444 sock.close(); 445 if (reader != null) { 446 reader.close(); 447 } 448 } 449 } catch (IOException e) { 450 // ignore as this is expected 451 LOG.info("server localhost:" + port + " not up " + e); 452 } 453 454 if (System.currentTimeMillis() > start + timeout) { 455 break; 456 } 457 try { 458 Thread.sleep(250); 459 } catch (InterruptedException e) { 460 throw (InterruptedIOException)new InterruptedIOException().initCause(e); 461 } 462 } 463 return false; 464 } 465 466 public int getClientPort() { 467 return activeZKServerIndex < 0 || activeZKServerIndex >= clientPortList.size() ? -1 468 : clientPortList.get(activeZKServerIndex); 469 } 470 471 List<ZooKeeperServer> getZooKeeperServers() { 472 return zooKeeperServers; 473 } 474}