001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.zookeeper;
020
021import java.io.BufferedReader;
022import java.io.File;
023import java.io.IOException;
024import java.io.InputStreamReader;
025import java.io.InterruptedIOException;
026import java.io.OutputStream;
027import java.io.Reader;
028import java.net.BindException;
029import java.net.InetSocketAddress;
030import java.net.Socket;
031import java.util.ArrayList;
032import java.util.List;
033import java.util.Random;
034
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.apache.zookeeper.server.NIOServerCnxnFactory;
039import org.apache.zookeeper.server.ZooKeeperServer;
040import org.apache.zookeeper.server.persistence.FileTxnLog;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
045
046/**
047 * TODO: Most of the code in this class is ripped from ZooKeeper tests. Instead
048 * of redoing it, we should contribute updates to their code which let us more
049 * easily access testing helper objects.
050 */
051@InterfaceAudience.Public
052public class MiniZooKeeperCluster {
053  private static final Logger LOG = LoggerFactory.getLogger(MiniZooKeeperCluster.class);
054
055  private static final int TICK_TIME = 2000;
056  private static final int DEFAULT_CONNECTION_TIMEOUT = 30000;
057  private final int connectionTimeout;
058
059  private boolean started;
060
061  /** The default port. If zero, we use a random port. */
062  private int defaultClientPort = 0;
063
064  private final List<NIOServerCnxnFactory> standaloneServerFactoryList;
065  private final List<ZooKeeperServer> zooKeeperServers;
066  private final List<Integer> clientPortList;
067
068  private int activeZKServerIndex;
069  private int tickTime = 0;
070
071  private final Configuration configuration;
072
073  public MiniZooKeeperCluster() {
074    this(new Configuration());
075  }
076
077  public MiniZooKeeperCluster(Configuration configuration) {
078    this.started = false;
079    this.configuration = configuration;
080    activeZKServerIndex = -1;
081    zooKeeperServers = new ArrayList<>();
082    clientPortList = new ArrayList<>();
083    standaloneServerFactoryList = new ArrayList<>();
084    connectionTimeout = configuration.getInt(HConstants.ZK_SESSION_TIMEOUT + ".localHBaseCluster",
085      DEFAULT_CONNECTION_TIMEOUT);
086  }
087
088  /**
089   * Add a client port to the list.
090   *
091   * @param clientPort the specified port
092   */
093  public void addClientPort(int clientPort) {
094    clientPortList.add(clientPort);
095  }
096
097  /**
098   * Get the list of client ports.
099   *
100   * @return clientPortList the client port list
101   */
102  @VisibleForTesting
103  public List<Integer> getClientPortList() {
104    return clientPortList;
105  }
106
107  /**
108   * Check whether the client port in a specific position of the client port list is valid.
109   *
110   * @param index the specified position
111   */
112  private boolean hasValidClientPortInList(int index) {
113    return (clientPortList.size() > index && clientPortList.get(index) > 0);
114  }
115
116  public void setDefaultClientPort(int clientPort) {
117    if (clientPort <= 0) {
118      throw new IllegalArgumentException("Invalid default ZK client port: "
119          + clientPort);
120    }
121    this.defaultClientPort = clientPort;
122  }
123
124  /**
125   * Selects a ZK client port.
126   *
127   * @param seedPort the seed port to start with; -1 means first time.
128   * @return a valid and unused client port
129   */
130  private int selectClientPort(int seedPort) {
131    int i;
132    int returnClientPort = seedPort + 1;
133    if (returnClientPort == 0) {
134      // If the new port is invalid, find one - starting with the default client port.
135      // If the default client port is not specified, starting with a random port.
136      // The random port is selected from the range between 49152 to 65535. These ports cannot be
137      // registered with IANA and are intended for dynamic allocation (see http://bit.ly/dynports).
138      if (defaultClientPort > 0) {
139        returnClientPort = defaultClientPort;
140      } else {
141        returnClientPort = 0xc000 + new Random().nextInt(0x3f00);
142      }
143    }
144    // Make sure that the port is unused.
145    // break when an unused port is found
146    do {
147      for (i = 0; i < clientPortList.size(); i++) {
148        if (returnClientPort == clientPortList.get(i)) {
149          // Already used. Update the port and retry.
150          returnClientPort++;
151          break;
152        }
153      }
154    } while (i != clientPortList.size());
155    return returnClientPort;
156  }
157
158  public void setTickTime(int tickTime) {
159    this.tickTime = tickTime;
160  }
161
162  public int getBackupZooKeeperServerNum() {
163    return zooKeeperServers.size() - 1;
164  }
165
166  public int getZooKeeperServerNum() {
167    return zooKeeperServers.size();
168  }
169
170  // / XXX: From o.a.zk.t.ClientBase
171  private static void setupTestEnv() {
172    // during the tests we run with 100K prealloc in the logs.
173    // on windows systems prealloc of 64M was seen to take ~15seconds
174    // resulting in test failure (client timeout on first session).
175    // set env and directly in order to handle static init/gc issues
176    System.setProperty("zookeeper.preAllocSize", "100");
177    FileTxnLog.setPreallocSize(100 * 1024);
178    // allow all 4 letter words
179    System.setProperty("zookeeper.4lw.commands.whitelist", "*");
180  }
181
182  public int startup(File baseDir) throws IOException, InterruptedException {
183    int numZooKeeperServers = clientPortList.size();
184    if (numZooKeeperServers == 0) {
185      numZooKeeperServers = 1; // need at least 1 ZK server for testing
186    }
187    return startup(baseDir, numZooKeeperServers);
188  }
189
190  /**
191   * @param baseDir the base directory to use
192   * @param numZooKeeperServers the number of ZooKeeper servers
193   * @return ClientPort server bound to, -1 if there was a binding problem and we couldn't pick
194   *         another port.
195   * @throws IOException if an operation fails during the startup
196   * @throws InterruptedException if the startup fails
197   */
198  public int startup(File baseDir, int numZooKeeperServers) throws IOException,
199          InterruptedException {
200    if (numZooKeeperServers <= 0) {
201      return -1;
202    }
203
204    setupTestEnv();
205    shutdown();
206
207    int tentativePort = -1; // the seed port
208    int currentClientPort;
209
210    // running all the ZK servers
211    for (int i = 0; i < numZooKeeperServers; i++) {
212      File dir = new File(baseDir, "zookeeper_" + i).getAbsoluteFile();
213      createDir(dir);
214      int tickTimeToUse;
215      if (this.tickTime > 0) {
216        tickTimeToUse = this.tickTime;
217      } else {
218        tickTimeToUse = TICK_TIME;
219      }
220
221      // Set up client port - if we have already had a list of valid ports, use it.
222      if (hasValidClientPortInList(i)) {
223        currentClientPort = clientPortList.get(i);
224      } else {
225        tentativePort = selectClientPort(tentativePort); // update the seed
226        currentClientPort = tentativePort;
227      }
228
229      ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse);
230      // Setting {min,max}SessionTimeout defaults to be the same as in Zookeeper
231      server.setMinSessionTimeout(configuration.getInt(
232              "hbase.zookeeper.property.minSessionTimeout", -1));
233      server.setMaxSessionTimeout(configuration.getInt(
234              "hbase.zookeeper.property.maxSessionTimeout", -1));
235      NIOServerCnxnFactory standaloneServerFactory;
236      while (true) {
237        try {
238          standaloneServerFactory = new NIOServerCnxnFactory();
239          standaloneServerFactory.configure(
240            new InetSocketAddress(currentClientPort),
241            configuration.getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS,
242                    HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS));
243        } catch (BindException e) {
244          LOG.debug("Failed binding ZK Server to client port: " +
245              currentClientPort, e);
246          // We're told to use some port but it's occupied, fail
247          if (hasValidClientPortInList(i)) {
248            return -1;
249          }
250          // This port is already in use, try to use another.
251          tentativePort = selectClientPort(tentativePort);
252          currentClientPort = tentativePort;
253          continue;
254        }
255        break;
256      }
257
258      // Start up this ZK server
259      standaloneServerFactory.startup(server);
260      // Runs a 'stat' against the servers.
261      if (!waitForServerUp(currentClientPort, connectionTimeout)) {
262        throw new IOException("Waiting for startup of standalone server");
263      }
264
265      // We have selected a port as a client port.  Update clientPortList if necessary.
266      if (clientPortList.size() <= i) { // it is not in the list, add the port
267        clientPortList.add(currentClientPort);
268      } else if (clientPortList.get(i) <= 0) { // the list has invalid port, update with valid port
269        clientPortList.remove(i);
270        clientPortList.add(i, currentClientPort);
271      }
272
273      standaloneServerFactoryList.add(standaloneServerFactory);
274      zooKeeperServers.add(server);
275    }
276
277    // set the first one to be active ZK; Others are backups
278    activeZKServerIndex = 0;
279    started = true;
280    int clientPort = clientPortList.get(activeZKServerIndex);
281    LOG.info("Started MiniZooKeeperCluster and ran successful 'stat' " +
282        "on client port=" + clientPort);
283    return clientPort;
284  }
285
286  private void createDir(File dir) throws IOException {
287    try {
288      if (!dir.exists()) {
289        dir.mkdirs();
290      }
291    } catch (SecurityException e) {
292      throw new IOException("creating dir: " + dir, e);
293    }
294  }
295
296  /**
297   * @throws IOException if waiting for the shutdown of a server fails
298   */
299  public void shutdown() throws IOException {
300    // shut down all the zk servers
301    for (int i = 0; i < standaloneServerFactoryList.size(); i++) {
302      NIOServerCnxnFactory standaloneServerFactory =
303        standaloneServerFactoryList.get(i);
304      int clientPort = clientPortList.get(i);
305
306      standaloneServerFactory.shutdown();
307      if (!waitForServerDown(clientPort, connectionTimeout)) {
308        throw new IOException("Waiting for shutdown of standalone server");
309      }
310    }
311    standaloneServerFactoryList.clear();
312
313    for (ZooKeeperServer zkServer: zooKeeperServers) {
314      //explicitly close ZKDatabase since ZookeeperServer does not close them
315      zkServer.getZKDatabase().close();
316    }
317    zooKeeperServers.clear();
318
319    // clear everything
320    if (started) {
321      started = false;
322      activeZKServerIndex = 0;
323      clientPortList.clear();
324      LOG.info("Shutdown MiniZK cluster with all ZK servers");
325    }
326  }
327
328  /**
329   * @return clientPort return clientPort if there is another ZK backup can run
330   *         when killing the current active; return -1, if there is no backups.
331   * @throws IOException if waiting for the shutdown of a server fails
332   */
333  public int killCurrentActiveZooKeeperServer() throws IOException, InterruptedException {
334    if (!started || activeZKServerIndex < 0) {
335      return -1;
336    }
337
338    // Shutdown the current active one
339    NIOServerCnxnFactory standaloneServerFactory =
340      standaloneServerFactoryList.get(activeZKServerIndex);
341    int clientPort = clientPortList.get(activeZKServerIndex);
342
343    standaloneServerFactory.shutdown();
344    if (!waitForServerDown(clientPort, connectionTimeout)) {
345      throw new IOException("Waiting for shutdown of standalone server");
346    }
347
348    zooKeeperServers.get(activeZKServerIndex).getZKDatabase().close();
349
350    // remove the current active zk server
351    standaloneServerFactoryList.remove(activeZKServerIndex);
352    clientPortList.remove(activeZKServerIndex);
353    zooKeeperServers.remove(activeZKServerIndex);
354    LOG.info("Kill the current active ZK servers in the cluster " +
355        "on client port: " + clientPort);
356
357    if (standaloneServerFactoryList.isEmpty()) {
358      // there is no backup servers;
359      return -1;
360    }
361    clientPort = clientPortList.get(activeZKServerIndex);
362    LOG.info("Activate a backup zk server in the cluster " +
363        "on client port: " + clientPort);
364    // return the next back zk server's port
365    return clientPort;
366  }
367
368  /**
369   * Kill one back up ZK servers.
370   *
371   * @throws IOException if waiting for the shutdown of a server fails
372   */
373  public void killOneBackupZooKeeperServer() throws IOException, InterruptedException {
374    if (!started || activeZKServerIndex < 0 || standaloneServerFactoryList.size() <= 1) {
375      return ;
376    }
377
378    int backupZKServerIndex = activeZKServerIndex+1;
379    // Shutdown the current active one
380    NIOServerCnxnFactory standaloneServerFactory =
381      standaloneServerFactoryList.get(backupZKServerIndex);
382    int clientPort = clientPortList.get(backupZKServerIndex);
383
384    standaloneServerFactory.shutdown();
385    if (!waitForServerDown(clientPort, connectionTimeout)) {
386      throw new IOException("Waiting for shutdown of standalone server");
387    }
388
389    zooKeeperServers.get(backupZKServerIndex).getZKDatabase().close();
390
391    // remove this backup zk server
392    standaloneServerFactoryList.remove(backupZKServerIndex);
393    clientPortList.remove(backupZKServerIndex);
394    zooKeeperServers.remove(backupZKServerIndex);
395    LOG.info("Kill one backup ZK servers in the cluster " +
396        "on client port: " + clientPort);
397  }
398
399  // XXX: From o.a.zk.t.ClientBase
400  private static boolean waitForServerDown(int port, long timeout) throws IOException {
401    long start = System.currentTimeMillis();
402    while (true) {
403      try {
404        try (Socket sock = new Socket("localhost", port)) {
405          OutputStream outstream = sock.getOutputStream();
406          outstream.write("stat".getBytes());
407          outstream.flush();
408        }
409      } catch (IOException e) {
410        return true;
411      }
412
413      if (System.currentTimeMillis() > start + timeout) {
414        break;
415      }
416      try {
417        Thread.sleep(250);
418      } catch (InterruptedException e) {
419        throw (InterruptedIOException)new InterruptedIOException().initCause(e);
420      }
421    }
422    return false;
423  }
424
425  // XXX: From o.a.zk.t.ClientBase
426  private static boolean waitForServerUp(int port, long timeout) throws IOException {
427    long start = System.currentTimeMillis();
428    while (true) {
429      try {
430        Socket sock = new Socket("localhost", port);
431        BufferedReader reader = null;
432        try {
433          OutputStream outstream = sock.getOutputStream();
434          outstream.write("stat".getBytes());
435          outstream.flush();
436
437          Reader isr = new InputStreamReader(sock.getInputStream());
438          reader = new BufferedReader(isr);
439          String line = reader.readLine();
440          if (line != null && line.startsWith("Zookeeper version:")) {
441            return true;
442          }
443        } finally {
444          sock.close();
445          if (reader != null) {
446            reader.close();
447          }
448        }
449      } catch (IOException e) {
450        // ignore as this is expected
451        LOG.info("server localhost:" + port + " not up " + e);
452      }
453
454      if (System.currentTimeMillis() > start + timeout) {
455        break;
456      }
457      try {
458        Thread.sleep(250);
459      } catch (InterruptedException e) {
460        throw (InterruptedIOException)new InterruptedIOException().initCause(e);
461      }
462    }
463    return false;
464  }
465
466  public int getClientPort() {
467    return activeZKServerIndex < 0 || activeZKServerIndex >= clientPortList.size() ? -1
468        : clientPortList.get(activeZKServerIndex);
469  }
470
471  List<ZooKeeperServer> getZooKeeperServers() {
472    return zooKeeperServers;
473  }
474}