001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.zookeeper;
019
020import static org.apache.zookeeper.client.FourLetterWordMain.send4LetterWord;
021
022import java.io.File;
023import java.io.IOException;
024import java.io.InterruptedIOException;
025import java.io.PrintWriter;
026import java.io.StringWriter;
027import java.net.BindException;
028import java.net.ConnectException;
029import java.net.InetAddress;
030import java.net.InetSocketAddress;
031import java.util.ArrayList;
032import java.util.List;
033import java.util.concurrent.ThreadLocalRandom;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.net.Address;
037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
038import org.apache.hadoop.hbase.util.Threads;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.apache.zookeeper.common.X509Exception;
041import org.apache.zookeeper.server.NIOServerCnxnFactory;
042import org.apache.zookeeper.server.ZooKeeperServer;
043import org.apache.zookeeper.server.persistence.FileTxnLog;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047/**
048 * TODO: Most of the code in this class is ripped from ZooKeeper tests. Instead of redoing it, we
049 * should contribute updates to their code which let us more easily access testing helper objects.
050 */
051@InterfaceAudience.Public
052public class MiniZooKeeperCluster {
053  private static final Logger LOG = LoggerFactory.getLogger(MiniZooKeeperCluster.class);
054  private static final int TICK_TIME = 2000;
055  private static final int TIMEOUT = 1000;
056  private static final int DEFAULT_CONNECTION_TIMEOUT = 30000;
057  private int connectionTimeout;
058  public static final String LOOPBACK_HOST = InetAddress.getLoopbackAddress().getHostName();
059  public static final String HOST = LOOPBACK_HOST;
060
061  private boolean started;
062
063  /**
064   * The default port. If zero, we use a random port.
065   */
066  private int defaultClientPort = 0;
067
068  private final List<NIOServerCnxnFactory> standaloneServerFactoryList;
069  private final List<ZooKeeperServer> zooKeeperServers;
070  private final List<Integer> clientPortList;
071
072  private int activeZKServerIndex;
073  private int tickTime = 0;
074
075  private final Configuration configuration;
076
077  public MiniZooKeeperCluster() {
078    this(new Configuration());
079  }
080
081  public MiniZooKeeperCluster(Configuration configuration) {
082    this.started = false;
083    this.configuration = configuration;
084    activeZKServerIndex = -1;
085    zooKeeperServers = new ArrayList<>();
086    clientPortList = new ArrayList<>();
087    standaloneServerFactoryList = new ArrayList<>();
088    connectionTimeout = configuration.getInt(HConstants.ZK_SESSION_TIMEOUT + ".localHBaseCluster",
089      DEFAULT_CONNECTION_TIMEOUT);
090  }
091
092  /**
093   * Add a client port to the list.
094   * @param clientPort the specified port
095   */
096  public void addClientPort(int clientPort) {
097    clientPortList.add(clientPort);
098  }
099
100  /**
101   * Get the list of client ports.
102   * @return clientPortList the client port list
103   */
104  @InterfaceAudience.Private
105  public List<Integer> getClientPortList() {
106    return clientPortList;
107  }
108
109  /**
110   * Check whether the client port in a specific position of the client port list is valid.
111   * @param index the specified position
112   */
113  private boolean hasValidClientPortInList(int index) {
114    return (clientPortList.size() > index && clientPortList.get(index) > 0);
115  }
116
117  public void setDefaultClientPort(int clientPort) {
118    if (clientPort <= 0) {
119      throw new IllegalArgumentException("Invalid default ZK client port: " + clientPort);
120    }
121    this.defaultClientPort = clientPort;
122  }
123
124  /**
125   * Selects a ZK client port.
126   * @param seedPort the seed port to start with; -1 means first time.
127   * @return a valid and unused client port
128   */
129  private int selectClientPort(int seedPort) {
130    int i;
131    int returnClientPort = seedPort + 1;
132    if (returnClientPort == 0) {
133      // If the new port is invalid, find one - starting with the default client port.
134      // If the default client port is not specified, starting with a random port.
135      // The random port is selected from the range between 49152 to 65535. These ports cannot be
136      // registered with IANA and are intended for dynamic allocation (see http://bit.ly/dynports).
137      if (defaultClientPort > 0) {
138        returnClientPort = defaultClientPort;
139      } else {
140        returnClientPort = 0xc000 + ThreadLocalRandom.current().nextInt(0x3f00);
141      }
142    }
143    // Make sure that the port is unused.
144    // break when an unused port is found
145    do {
146      for (i = 0; i < clientPortList.size(); i++) {
147        if (returnClientPort == clientPortList.get(i)) {
148          // Already used. Update the port and retry.
149          returnClientPort++;
150          break;
151        }
152      }
153    } while (i != clientPortList.size());
154    return returnClientPort;
155  }
156
157  public void setTickTime(int tickTime) {
158    this.tickTime = tickTime;
159  }
160
161  public int getBackupZooKeeperServerNum() {
162    return zooKeeperServers.size() - 1;
163  }
164
165  public int getZooKeeperServerNum() {
166    return zooKeeperServers.size();
167  }
168
169  // / XXX: From o.a.zk.t.ClientBase
170  private static void setupTestEnv() {
171    // during the tests we run with 100K prealloc in the logs.
172    // on windows systems prealloc of 64M was seen to take ~15seconds
173    // resulting in test failure (client timeout on first session).
174    // set env and directly in order to handle static init/gc issues
175    System.setProperty("zookeeper.preAllocSize", "100");
176    FileTxnLog.setPreallocSize(100 * 1024);
177    // allow all 4 letter words
178    System.setProperty("zookeeper.4lw.commands.whitelist", "*");
179  }
180
181  public int startup(File baseDir) throws IOException, InterruptedException {
182    int numZooKeeperServers = clientPortList.size();
183    if (numZooKeeperServers == 0) {
184      numZooKeeperServers = 1; // need at least 1 ZK server for testing
185    }
186    return startup(baseDir, numZooKeeperServers);
187  }
188
189  /**
190   * @param baseDir             the base directory to use
191   * @param numZooKeeperServers the number of ZooKeeper servers
192   * @return ClientPort server bound to, -1 if there was a binding problem and we couldn't pick
193   *         another port.
194   * @throws IOException          if an operation fails during the startup
195   * @throws InterruptedException if the startup fails
196   */
197  public int startup(File baseDir, int numZooKeeperServers)
198    throws IOException, InterruptedException {
199    if (numZooKeeperServers <= 0) {
200      return -1;
201    }
202
203    setupTestEnv();
204    shutdown();
205
206    int tentativePort = -1; // the seed port
207    int currentClientPort;
208
209    // running all the ZK servers
210    for (int i = 0; i < numZooKeeperServers; i++) {
211      File dir = new File(baseDir, "zookeeper_" + i).getAbsoluteFile();
212      createDir(dir);
213      int tickTimeToUse;
214      if (this.tickTime > 0) {
215        tickTimeToUse = this.tickTime;
216      } else {
217        tickTimeToUse = TICK_TIME;
218      }
219
220      // Set up client port - if we have already had a list of valid ports, use it.
221      if (hasValidClientPortInList(i)) {
222        currentClientPort = clientPortList.get(i);
223      } else {
224        tentativePort = selectClientPort(tentativePort); // update the seed
225        currentClientPort = tentativePort;
226      }
227
228      ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse);
229      // Setting {min,max}SessionTimeout defaults to be the same as in Zookeeper
230      server.setMinSessionTimeout(
231        configuration.getInt("hbase.zookeeper.property.minSessionTimeout", -1));
232      server.setMaxSessionTimeout(
233        configuration.getInt("hbase.zookeeper.property.maxSessionTimeout", -1));
234      NIOServerCnxnFactory standaloneServerFactory;
235      while (true) {
236        try {
237          standaloneServerFactory = new NIOServerCnxnFactory();
238          standaloneServerFactory.configure(new InetSocketAddress(LOOPBACK_HOST, currentClientPort),
239            configuration.getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS,
240              HConstants.DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS));
241        } catch (BindException e) {
242          LOG.debug("Failed binding ZK Server to client port: " + currentClientPort, e);
243          // We're told to use some port but it's occupied, fail
244          if (hasValidClientPortInList(i)) {
245            return -1;
246          }
247          // This port is already in use, try to use another.
248          tentativePort = selectClientPort(tentativePort);
249          currentClientPort = tentativePort;
250          continue;
251        }
252        break;
253      }
254
255      // Start up this ZK server. Dump its stats.
256      standaloneServerFactory.startup(server);
257      LOG.info("Started connectionTimeout={}, dir={}, {}", connectionTimeout, dir,
258        getServerConfigurationOnOneLine(server));
259      // Runs a 'stat' against the servers.
260      if (!waitForServerUp(currentClientPort, connectionTimeout)) {
261        Threads.printThreadInfo(System.out, "Why is zk standalone server not coming up?");
262        throw new IOException(
263          "Waiting for startup of standalone server; " + "server isRunning=" + server.isRunning());
264      }
265
266      // We have selected a port as a client port. Update clientPortList if necessary.
267      if (clientPortList.size() <= i) { // it is not in the list, add the port
268        clientPortList.add(currentClientPort);
269      } else if (clientPortList.get(i) <= 0) { // the list has invalid port, update with valid port
270        clientPortList.remove(i);
271        clientPortList.add(i, currentClientPort);
272      }
273
274      standaloneServerFactoryList.add(standaloneServerFactory);
275      zooKeeperServers.add(server);
276    }
277
278    // set the first one to be active ZK; Others are backups
279    activeZKServerIndex = 0;
280    started = true;
281    int clientPort = clientPortList.get(activeZKServerIndex);
282    LOG.info("Started MiniZooKeeperCluster and ran 'stat' on client port={}", clientPort);
283    return clientPort;
284  }
285
286  private String getServerConfigurationOnOneLine(ZooKeeperServer server) {
287    StringWriter sw = new StringWriter();
288    try (PrintWriter pw = new PrintWriter(sw) {
289      @Override
290      public void println(int x) {
291        super.print(x);
292        super.print(", ");
293      }
294
295      @Override
296      public void println(String x) {
297        super.print(x);
298        super.print(", ");
299      }
300    }) {
301      server.dumpConf(pw);
302    }
303    return sw.toString();
304  }
305
306  private void createDir(File dir) throws IOException {
307    try {
308      if (!dir.exists()) {
309        dir.mkdirs();
310      }
311    } catch (SecurityException e) {
312      throw new IOException("creating dir: " + dir, e);
313    }
314  }
315
316  /**
317   * @throws IOException if waiting for the shutdown of a server fails
318   */
319  public void shutdown() throws IOException {
320    // shut down all the zk servers
321    for (int i = 0; i < standaloneServerFactoryList.size(); i++) {
322      NIOServerCnxnFactory standaloneServerFactory = standaloneServerFactoryList.get(i);
323      int clientPort = clientPortList.get(i);
324      standaloneServerFactory.shutdown();
325      if (!waitForServerDown(clientPort, connectionTimeout)) {
326        throw new IOException("Waiting for shutdown of standalone server at port=" + clientPort
327          + ", timeout=" + this.connectionTimeout);
328      }
329    }
330    standaloneServerFactoryList.clear();
331
332    for (ZooKeeperServer zkServer : zooKeeperServers) {
333      // Explicitly close ZKDatabase since ZookeeperServer does not close them
334      zkServer.getZKDatabase().close();
335    }
336    zooKeeperServers.clear();
337
338    // clear everything
339    if (started) {
340      started = false;
341      activeZKServerIndex = 0;
342      clientPortList.clear();
343      LOG.info("Shutdown MiniZK cluster with all ZK servers");
344    }
345  }
346
347  /**
348   * @return clientPort return clientPort if there is another ZK backup can run when killing the
349   *         current active; return -1, if there is no backups.
350   * @throws IOException if waiting for the shutdown of a server fails
351   */
352  public int killCurrentActiveZooKeeperServer() throws IOException, InterruptedException {
353    if (!started || activeZKServerIndex < 0) {
354      return -1;
355    }
356
357    // Shutdown the current active one
358    NIOServerCnxnFactory standaloneServerFactory =
359      standaloneServerFactoryList.get(activeZKServerIndex);
360    int clientPort = clientPortList.get(activeZKServerIndex);
361
362    standaloneServerFactory.shutdown();
363    if (!waitForServerDown(clientPort, connectionTimeout)) {
364      throw new IOException("Waiting for shutdown of standalone server");
365    }
366
367    zooKeeperServers.get(activeZKServerIndex).getZKDatabase().close();
368
369    // remove the current active zk server
370    standaloneServerFactoryList.remove(activeZKServerIndex);
371    clientPortList.remove(activeZKServerIndex);
372    zooKeeperServers.remove(activeZKServerIndex);
373    LOG.info("Kill the current active ZK servers in the cluster on client port: {}", clientPort);
374
375    if (standaloneServerFactoryList.isEmpty()) {
376      // there is no backup servers;
377      return -1;
378    }
379    clientPort = clientPortList.get(activeZKServerIndex);
380    LOG.info("Activate a backup zk server in the cluster on client port: {}", clientPort);
381    // return the next back zk server's port
382    return clientPort;
383  }
384
385  /**
386   * Kill one back up ZK servers.
387   * @throws IOException if waiting for the shutdown of a server fails
388   */
389  public void killOneBackupZooKeeperServer() throws IOException, InterruptedException {
390    if (!started || activeZKServerIndex < 0 || standaloneServerFactoryList.size() <= 1) {
391      return;
392    }
393
394    int backupZKServerIndex = activeZKServerIndex + 1;
395    // Shutdown the current active one
396    NIOServerCnxnFactory standaloneServerFactory =
397      standaloneServerFactoryList.get(backupZKServerIndex);
398    int clientPort = clientPortList.get(backupZKServerIndex);
399
400    standaloneServerFactory.shutdown();
401    if (!waitForServerDown(clientPort, connectionTimeout)) {
402      throw new IOException("Waiting for shutdown of standalone server");
403    }
404
405    zooKeeperServers.get(backupZKServerIndex).getZKDatabase().close();
406
407    // remove this backup zk server
408    standaloneServerFactoryList.remove(backupZKServerIndex);
409    clientPortList.remove(backupZKServerIndex);
410    zooKeeperServers.remove(backupZKServerIndex);
411    LOG.info("Kill one backup ZK servers in the cluster on client port: {}", clientPort);
412  }
413
414  // XXX: From o.a.zk.t.ClientBase. We just dropped the check for ssl/secure.
415  private static boolean waitForServerDown(int port, long timeout) throws IOException {
416    long start = EnvironmentEdgeManager.currentTime();
417    while (true) {
418      try {
419        send4LetterWord(HOST, port, "stat", false, (int) timeout);
420      } catch (IOException | X509Exception.SSLContextException e) {
421        return true;
422      }
423      if (EnvironmentEdgeManager.currentTime() > start + timeout) {
424        break;
425      }
426      try {
427        Thread.sleep(TIMEOUT);
428      } catch (InterruptedException e) {
429        throw (InterruptedIOException) new InterruptedIOException().initCause(e);
430      }
431    }
432    return false;
433  }
434
435  // XXX: From o.a.zk.t.ClientBase. Its in the test jar but we don't depend on zk test jar.
436  // We remove the SSL/secure bit. Not used in here.
437  private static boolean waitForServerUp(int port, long timeout) throws IOException {
438    long start = EnvironmentEdgeManager.currentTime();
439    while (true) {
440      try {
441        String result = send4LetterWord(HOST, port, "stat", false, (int) timeout);
442        if (result.startsWith("Zookeeper version:") && !result.contains("READ-ONLY")) {
443          return true;
444        } else {
445          LOG.debug("Read {}", result);
446        }
447      } catch (ConnectException e) {
448        // ignore as this is expected, do not log stacktrace
449        LOG.info("{}:{} not up: {}", HOST, port, e.toString());
450      } catch (IOException | X509Exception.SSLContextException e) {
451        // ignore as this is expected
452        LOG.info("{}:{} not up", HOST, port, e);
453      }
454
455      if (EnvironmentEdgeManager.currentTime() > start + timeout) {
456        break;
457      }
458      try {
459        Thread.sleep(TIMEOUT);
460      } catch (InterruptedException e) {
461        throw (InterruptedIOException) new InterruptedIOException().initCause(e);
462      }
463    }
464    return false;
465  }
466
467  public int getClientPort() {
468    return activeZKServerIndex < 0 || activeZKServerIndex >= clientPortList.size()
469      ? -1
470      : clientPortList.get(activeZKServerIndex);
471  }
472
473  /** Returns Address for this cluster instance. */
474  public Address getAddress() {
475    return Address.fromParts(HOST, getClientPort());
476  }
477
478  List<ZooKeeperServer> getZooKeeperServers() {
479    return zooKeeperServers;
480  }
481}