001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.zookeeper;
019
020import static org.apache.zookeeper.client.FourLetterWordMain.send4LetterWord;
021
022import java.io.File;
023import java.io.IOException;
024import java.io.InterruptedIOException;
025import java.io.PrintWriter;
026import java.io.StringWriter;
027import java.net.BindException;
028import java.net.ConnectException;
029import java.net.InetAddress;
030import java.net.InetSocketAddress;
031import java.util.ArrayList;
032import java.util.List;
033import java.util.concurrent.ThreadLocalRandom;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.net.Address;
037import org.apache.hadoop.hbase.util.Bytes;
038import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
039import org.apache.hadoop.hbase.util.Threads;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.apache.zookeeper.common.X509Exception;
042import org.apache.zookeeper.server.NIOServerCnxnFactory;
043import org.apache.zookeeper.server.ZooKeeperServer;
044import org.apache.zookeeper.server.persistence.FileTxnLog;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048/**
049 * TODO: Most of the code in this class is ripped from ZooKeeper tests. Instead of redoing it, we
050 * should contribute updates to their code which let us more easily access testing helper objects.
051 */
052@InterfaceAudience.Public
053public class MiniZooKeeperCluster {
054  private static final Logger LOG = LoggerFactory.getLogger(MiniZooKeeperCluster.class);
055  private static final int TICK_TIME = 2000;
056  private static final int TIMEOUT = 1000;
057  private static final int DEFAULT_CONNECTION_TIMEOUT = 30000;
058  private static final byte[] STATIC_BYTES = Bytes.toBytes("stat");
059  private final int connectionTimeout;
060  public static final String LOOPBACK_HOST = InetAddress.getLoopbackAddress().getHostName();
061  public static final String HOST = LOOPBACK_HOST;
062
063  private boolean started;
064
065  /**
066   * The default port. If zero, we use a random port.
067   */
068  private int defaultClientPort = 0;
069
070  private final List<NIOServerCnxnFactory> standaloneServerFactoryList;
071  private final List<ZooKeeperServer> zooKeeperServers;
072  private final List<Integer> clientPortList;
073
074  private int activeZKServerIndex;
075  private int tickTime = 0;
076
077  private final Configuration configuration;
078
079  public MiniZooKeeperCluster() {
080    this(new Configuration());
081  }
082
083  public MiniZooKeeperCluster(Configuration configuration) {
084    this.started = false;
085    this.configuration = configuration;
086    activeZKServerIndex = -1;
087    zooKeeperServers = new ArrayList<>();
088    clientPortList = new ArrayList<>();
089    standaloneServerFactoryList = new ArrayList<>();
090    connectionTimeout = configuration.getInt(HConstants.ZK_SESSION_TIMEOUT + ".localHBaseCluster",
091      DEFAULT_CONNECTION_TIMEOUT);
092  }
093
094  /**
095   * Add a client port to the list.
096   * @param clientPort the specified port
097   */
098  public void addClientPort(int clientPort) {
099    clientPortList.add(clientPort);
100  }
101
102  /**
103   * Get the list of client ports.
104   * @return clientPortList the client port list
105   */
106  @InterfaceAudience.Private
107  public List<Integer> getClientPortList() {
108    return clientPortList;
109  }
110
111  /**
112   * Check whether the client port in a specific position of the client port list is valid.
113   * @param index the specified position
114   */
115  private boolean hasValidClientPortInList(int index) {
116    return (clientPortList.size() > index && clientPortList.get(index) > 0);
117  }
118
119  public void setDefaultClientPort(int clientPort) {
120    if (clientPort <= 0) {
121      throw new IllegalArgumentException("Invalid default ZK client port: " + clientPort);
122    }
123    this.defaultClientPort = clientPort;
124  }
125
126  /**
127   * Selects a ZK client port.
128   * @param seedPort the seed port to start with; -1 means first time.
129   * @return a valid and unused client port
130   */
131  private int selectClientPort(int seedPort) {
132    int i;
133    int returnClientPort = seedPort + 1;
134    if (returnClientPort == 0) {
135      // If the new port is invalid, find one - starting with the default client port.
136      // If the default client port is not specified, starting with a random port.
137      // The random port is selected from the range between 49152 to 65535. These ports cannot be
138      // registered with IANA and are intended for dynamic allocation (see http://bit.ly/dynports).
139      if (defaultClientPort > 0) {
140        returnClientPort = defaultClientPort;
141      } else {
142        returnClientPort = 0xc000 + ThreadLocalRandom.current().nextInt(0x3f00);
143      }
144    }
145    // Make sure that the port is unused.
146    // break when an unused port is found
147    do {
148      for (i = 0; i < clientPortList.size(); i++) {
149        if (returnClientPort == clientPortList.get(i)) {
150          // Already used. Update the port and retry.
151          returnClientPort++;
152          break;
153        }
154      }
155    } while (i != clientPortList.size());
156    return returnClientPort;
157  }
158
159  public void setTickTime(int tickTime) {
160    this.tickTime = tickTime;
161  }
162
163  public int getBackupZooKeeperServerNum() {
164    return zooKeeperServers.size() - 1;
165  }
166
167  public int getZooKeeperServerNum() {
168    return zooKeeperServers.size();
169  }
170
171  // / XXX: From o.a.zk.t.ClientBase
172  private static void setupTestEnv() {
173    // during the tests we run with 100K prealloc in the logs.
174    // on windows systems prealloc of 64M was seen to take ~15seconds
175    // resulting in test failure (client timeout on first session).
176    // set env and directly in order to handle static init/gc issues
177    System.setProperty("zookeeper.preAllocSize", "100");
178    FileTxnLog.setPreallocSize(100 * 1024);
179    // allow all 4 letter words
180    System.setProperty("zookeeper.4lw.commands.whitelist", "*");
181  }
182
183  public int startup(File baseDir) throws IOException, InterruptedException {
184    int numZooKeeperServers = clientPortList.size();
185    if (numZooKeeperServers == 0) {
186      numZooKeeperServers = 1; // need at least 1 ZK server for testing
187    }
188    return startup(baseDir, numZooKeeperServers);
189  }
190
191  /**
192   * @param baseDir             the base directory to use
193   * @param numZooKeeperServers the number of ZooKeeper servers
194   * @return ClientPort server bound to, -1 if there was a binding problem and we couldn't pick
195   *         another port.
196   * @throws IOException          if an operation fails during the startup
197   * @throws InterruptedException if the startup fails
198   */
199  public int startup(File baseDir, int numZooKeeperServers)
200    throws IOException, InterruptedException {
201    if (numZooKeeperServers <= 0) {
202      return -1;
203    }
204
205    setupTestEnv();
206    shutdown();
207
208    int tentativePort = -1; // the seed port
209    int currentClientPort;
210
211    // running all the ZK servers
212    for (int i = 0; i < numZooKeeperServers; i++) {
213      File dir = new File(baseDir, "zookeeper_" + i).getAbsoluteFile();
214      createDir(dir);
215      int tickTimeToUse;
216      if (this.tickTime > 0) {
217        tickTimeToUse = this.tickTime;
218      } else {
219        tickTimeToUse = TICK_TIME;
220      }
221
222      // Set up client port - if we have already had a list of valid ports, use it.
223      if (hasValidClientPortInList(i)) {
224        currentClientPort = clientPortList.get(i);
225      } else {
226        tentativePort = selectClientPort(tentativePort); // update the seed
227        currentClientPort = tentativePort;
228      }
229
230      ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse);
231      // Setting {min,max}SessionTimeout defaults to be the same as in Zookeeper
232      server.setMinSessionTimeout(
233        configuration.getInt("hbase.zookeeper.property.minSessionTimeout", -1));
234      server.setMaxSessionTimeout(
235        configuration.getInt("hbase.zookeeper.property.maxSessionTimeout", -1));
236      NIOServerCnxnFactory standaloneServerFactory;
237      while (true) {
238        try {
239          standaloneServerFactory = new NIOServerCnxnFactory();
240          String bindAddr =
241            configuration.get("hbase.zookeeper.property.clientPortAddress", LOOPBACK_HOST);
242          standaloneServerFactory.configure(new InetSocketAddress(bindAddr, currentClientPort),
243            configuration.getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS,
244              HConstants.DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS));
245        } catch (BindException e) {
246          LOG.debug("Failed binding ZK Server to client port: " + currentClientPort, e);
247          // We're told to use some port but it's occupied, fail
248          if (hasValidClientPortInList(i)) {
249            return -1;
250          }
251          // This port is already in use, try to use another.
252          tentativePort = selectClientPort(tentativePort);
253          currentClientPort = tentativePort;
254          continue;
255        }
256        break;
257      }
258
259      // Start up this ZK server. Dump its stats.
260      standaloneServerFactory.startup(server);
261      LOG.info("Started connectionTimeout={}, dir={}, {}", connectionTimeout, dir,
262        getServerConfigurationOnOneLine(server));
263      // Runs a 'stat' against the servers.
264      if (!waitForServerUp(currentClientPort, connectionTimeout)) {
265        Threads.printThreadInfo(System.out, "Why is zk standalone server not coming up?");
266        throw new IOException(
267          "Waiting for startup of standalone server; " + "server isRunning=" + server.isRunning());
268      }
269
270      // We have selected a port as a client port. Update clientPortList if necessary.
271      if (clientPortList.size() <= i) { // it is not in the list, add the port
272        clientPortList.add(currentClientPort);
273      } else if (clientPortList.get(i) <= 0) { // the list has invalid port, update with valid port
274        clientPortList.remove(i);
275        clientPortList.add(i, currentClientPort);
276      }
277
278      standaloneServerFactoryList.add(standaloneServerFactory);
279      zooKeeperServers.add(server);
280    }
281
282    // set the first one to be active ZK; Others are backups
283    activeZKServerIndex = 0;
284    started = true;
285    int clientPort = clientPortList.get(activeZKServerIndex);
286    LOG.info("Started MiniZooKeeperCluster and ran 'stat' on client port={}", clientPort);
287    return clientPort;
288  }
289
290  private String getServerConfigurationOnOneLine(ZooKeeperServer server) {
291    StringWriter sw = new StringWriter();
292    try (PrintWriter pw = new PrintWriter(sw) {
293      @Override
294      public void println(int x) {
295        super.print(x);
296        super.print(", ");
297      }
298
299      @Override
300      public void println(String x) {
301        super.print(x);
302        super.print(", ");
303      }
304    }) {
305      server.dumpConf(pw);
306    }
307    return sw.toString();
308  }
309
310  private void createDir(File dir) throws IOException {
311    try {
312      if (!dir.exists()) {
313        dir.mkdirs();
314      }
315    } catch (SecurityException e) {
316      throw new IOException("creating dir: " + dir, e);
317    }
318  }
319
320  /**
321   * @throws IOException if waiting for the shutdown of a server fails
322   */
323  public void shutdown() throws IOException {
324    // shut down all the zk servers
325    for (int i = 0; i < standaloneServerFactoryList.size(); i++) {
326      NIOServerCnxnFactory standaloneServerFactory = standaloneServerFactoryList.get(i);
327      int clientPort = clientPortList.get(i);
328      standaloneServerFactory.shutdown();
329      if (!waitForServerDown(clientPort, connectionTimeout)) {
330        throw new IOException("Waiting for shutdown of standalone server at port=" + clientPort
331          + ", timeout=" + this.connectionTimeout);
332      }
333    }
334    standaloneServerFactoryList.clear();
335
336    for (ZooKeeperServer zkServer : zooKeeperServers) {
337      // Explicitly close ZKDatabase since ZookeeperServer does not close them
338      zkServer.getZKDatabase().close();
339    }
340    zooKeeperServers.clear();
341
342    // clear everything
343    if (started) {
344      started = false;
345      activeZKServerIndex = 0;
346      clientPortList.clear();
347      LOG.info("Shutdown MiniZK cluster with all ZK servers");
348    }
349  }
350
351  /**
352   * @return clientPort return clientPort if there is another ZK backup can run when killing the
353   *         current active; return -1, if there is no backups.
354   * @throws IOException if waiting for the shutdown of a server fails
355   */
356  public int killCurrentActiveZooKeeperServer() throws IOException, InterruptedException {
357    if (!started || activeZKServerIndex < 0) {
358      return -1;
359    }
360
361    // Shutdown the current active one
362    NIOServerCnxnFactory standaloneServerFactory =
363      standaloneServerFactoryList.get(activeZKServerIndex);
364    int clientPort = clientPortList.get(activeZKServerIndex);
365
366    standaloneServerFactory.shutdown();
367    if (!waitForServerDown(clientPort, connectionTimeout)) {
368      throw new IOException("Waiting for shutdown of standalone server");
369    }
370
371    zooKeeperServers.get(activeZKServerIndex).getZKDatabase().close();
372
373    // remove the current active zk server
374    standaloneServerFactoryList.remove(activeZKServerIndex);
375    clientPortList.remove(activeZKServerIndex);
376    zooKeeperServers.remove(activeZKServerIndex);
377    LOG.info("Kill the current active ZK servers in the cluster on client port: {}", clientPort);
378
379    if (standaloneServerFactoryList.isEmpty()) {
380      // there is no backup servers;
381      return -1;
382    }
383    clientPort = clientPortList.get(activeZKServerIndex);
384    LOG.info("Activate a backup zk server in the cluster on client port: {}", clientPort);
385    // return the next back zk server's port
386    return clientPort;
387  }
388
389  /**
390   * Kill one back up ZK servers.
391   * @throws IOException if waiting for the shutdown of a server fails
392   */
393  public void killOneBackupZooKeeperServer() throws IOException, InterruptedException {
394    if (!started || activeZKServerIndex < 0 || standaloneServerFactoryList.size() <= 1) {
395      return;
396    }
397
398    int backupZKServerIndex = activeZKServerIndex + 1;
399    // Shutdown the current active one
400    NIOServerCnxnFactory standaloneServerFactory =
401      standaloneServerFactoryList.get(backupZKServerIndex);
402    int clientPort = clientPortList.get(backupZKServerIndex);
403
404    standaloneServerFactory.shutdown();
405    if (!waitForServerDown(clientPort, connectionTimeout)) {
406      throw new IOException("Waiting for shutdown of standalone server");
407    }
408
409    zooKeeperServers.get(backupZKServerIndex).getZKDatabase().close();
410
411    // remove this backup zk server
412    standaloneServerFactoryList.remove(backupZKServerIndex);
413    clientPortList.remove(backupZKServerIndex);
414    zooKeeperServers.remove(backupZKServerIndex);
415    LOG.info("Kill one backup ZK servers in the cluster on client port: {}", clientPort);
416  }
417
418  // XXX: From o.a.zk.t.ClientBase. We just dropped the check for ssl/secure.
419  private static boolean waitForServerDown(int port, long timeout) throws IOException {
420    long start = EnvironmentEdgeManager.currentTime();
421    while (true) {
422      try {
423        send4LetterWord(HOST, port, "stat", false, (int) timeout);
424      } catch (IOException | X509Exception.SSLContextException e) {
425        return true;
426      }
427      if (EnvironmentEdgeManager.currentTime() > start + timeout) {
428        break;
429      }
430      try {
431        Thread.sleep(TIMEOUT);
432      } catch (InterruptedException e) {
433        throw (InterruptedIOException) new InterruptedIOException().initCause(e);
434      }
435    }
436    return false;
437  }
438
439  // XXX: From o.a.zk.t.ClientBase. Its in the test jar but we don't depend on zk test jar.
440  // We remove the SSL/secure bit. Not used in here.
441  private static boolean waitForServerUp(int port, long timeout) throws IOException {
442    long start = EnvironmentEdgeManager.currentTime();
443    while (true) {
444      try {
445        String result = send4LetterWord(HOST, port, "stat", false, (int) timeout);
446        if (result.startsWith("Zookeeper version:") && !result.contains("READ-ONLY")) {
447          return true;
448        } else {
449          LOG.debug("Read {}", result);
450        }
451      } catch (ConnectException e) {
452        // ignore as this is expected, do not log stacktrace
453        LOG.info("{}:{} not up: {}", HOST, port, e.toString());
454      } catch (IOException | X509Exception.SSLContextException e) {
455        // ignore as this is expected
456        LOG.info("{}:{} not up", HOST, port, e);
457      }
458
459      if (EnvironmentEdgeManager.currentTime() > start + timeout) {
460        break;
461      }
462      try {
463        Thread.sleep(TIMEOUT);
464      } catch (InterruptedException e) {
465        throw (InterruptedIOException) new InterruptedIOException().initCause(e);
466      }
467    }
468    return false;
469  }
470
471  public int getClientPort() {
472    return activeZKServerIndex < 0 || activeZKServerIndex >= clientPortList.size()
473      ? -1
474      : clientPortList.get(activeZKServerIndex);
475  }
476
477  /** Returns Address for this cluster instance. */
478  public Address getAddress() {
479    return Address.fromParts(HOST, getClientPort());
480  }
481
482  List<ZooKeeperServer> getZooKeeperServers() {
483    return zooKeeperServers;
484  }
485}