View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.zookeeper;
20  
21  import java.io.BufferedReader;
22  import java.io.File;
23  import java.io.InterruptedIOException;
24  import java.io.IOException;
25  import java.io.InputStreamReader;
26  import java.io.OutputStream;
27  import java.io.Reader;
28  import java.net.BindException;
29  import java.net.InetSocketAddress;
30  import java.net.Socket;
31  import java.util.ArrayList;
32  import java.util.List;
33  import java.util.Random;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.hbase.classification.InterfaceAudience;
38  import org.apache.hadoop.hbase.classification.InterfaceStability;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.hbase.HConstants;
41  import org.apache.zookeeper.server.NIOServerCnxnFactory;
42  import org.apache.zookeeper.server.ZooKeeperServer;
43  import org.apache.zookeeper.server.persistence.FileTxnLog;
44  
45  import com.google.common.annotations.VisibleForTesting;
46  
47  /**
48   * TODO: Most of the code in this class is ripped from ZooKeeper tests. Instead
49   * of redoing it, we should contribute updates to their code which let us more
50   * easily access testing helper objects.
51   */
52  @InterfaceAudience.Public
53  @InterfaceStability.Evolving
54  public class MiniZooKeeperCluster {
55    private static final Log LOG = LogFactory.getLog(MiniZooKeeperCluster.class);
56  
57    private static final int TICK_TIME = 2000;
58    private static final int CONNECTION_TIMEOUT = 30000;
59  
60    private boolean started;
61  
62    /** The default port. If zero, we use a random port. */
63    private int defaultClientPort = 0;
64  
65    private List<NIOServerCnxnFactory> standaloneServerFactoryList;
66    private List<ZooKeeperServer> zooKeeperServers;
67    private List<Integer> clientPortList;
68  
69    private int activeZKServerIndex;
70    private int tickTime = 0;
71  
72    private Configuration configuration;
73  
74    public MiniZooKeeperCluster() {
75      this(new Configuration());
76    }
77  
78    public MiniZooKeeperCluster(Configuration configuration) {
79      this.started = false;
80      this.configuration = configuration;
81      activeZKServerIndex = -1;
82      zooKeeperServers = new ArrayList<ZooKeeperServer>();
83      clientPortList = new ArrayList<Integer>();
84      standaloneServerFactoryList = new ArrayList<NIOServerCnxnFactory>();
85    }
86  
87    /**
88     * Add a client port to the list.
89     *
90     * @param clientPort the specified port
91     */
92    public void addClientPort(int clientPort) {
93      clientPortList.add(clientPort);
94    }
95  
96    /**
97     * Get the list of client ports.
98     * @return clientPortList the client port list
99     */
100   @VisibleForTesting
101   public List<Integer> getClientPortList() {
102     return clientPortList;
103   }
104 
105   /**
106    * Check whether the client port in a specific position of the client port list is valid.
107    *
108    * @param index the specified position
109    */
110   private boolean hasValidClientPortInList(int index) {
111     return (clientPortList.size() > index && clientPortList.get(index) > 0);
112   }
113 
114   public void setDefaultClientPort(int clientPort) {
115     if (clientPort <= 0) {
116       throw new IllegalArgumentException("Invalid default ZK client port: "
117           + clientPort);
118     }
119     this.defaultClientPort = clientPort;
120   }
121 
122   /**
123    * Selects a ZK client port.
124    *
125    * @param seedPort the seed port to start with; -1 means first time.
126    * @Returns a valid and unused client port
127    */
128   private int selectClientPort(int seedPort) {
129     int i;
130     int returnClientPort = seedPort + 1;
131     if (returnClientPort == 0) {
132       // If the new port is invalid, find one - starting with the default client port.
133       // If the default client port is not specified, starting with a random port.
134       // The random port is selected from the range between 49152 to 65535. These ports cannot be
135       // registered with IANA and are intended for dynamic allocation (see http://bit.ly/dynports).
136       if (defaultClientPort > 0) {
137         returnClientPort = defaultClientPort;
138       } else {
139         returnClientPort = 0xc000 + new Random().nextInt(0x3f00);
140       }
141     }
142     // Make sure that the port is unused.
143     while (true) {
144       for (i = 0; i < clientPortList.size(); i++) {
145         if (returnClientPort == clientPortList.get(i)) {
146           // Already used. Update the port and retry.
147           returnClientPort++;
148           break;
149         }
150       }
151       if (i == clientPortList.size()) {
152         break; // found a unused port, exit
153       }
154     }
155     return returnClientPort;
156   }
157 
158   public void setTickTime(int tickTime) {
159     this.tickTime = tickTime;
160   }
161 
162   public int getBackupZooKeeperServerNum() {
163     return zooKeeperServers.size()-1;
164   }
165 
166   public int getZooKeeperServerNum() {
167     return zooKeeperServers.size();
168   }
169 
170   // / XXX: From o.a.zk.t.ClientBase
171   private static void setupTestEnv() {
172     // during the tests we run with 100K prealloc in the logs.
173     // on windows systems prealloc of 64M was seen to take ~15seconds
174     // resulting in test failure (client timeout on first session).
175     // set env and directly in order to handle static init/gc issues
176     System.setProperty("zookeeper.preAllocSize", "100");
177     FileTxnLog.setPreallocSize(100 * 1024);
178     // allow all 4 letter words
179     System.setProperty("zookeeper.4lw.commands.whitelist","*");
180   }
181 
182   public int startup(File baseDir) throws IOException, InterruptedException {
183     int numZooKeeperServers = clientPortList.size();
184     if (numZooKeeperServers == 0) {
185       numZooKeeperServers = 1; // need at least 1 ZK server for testing
186     }
187     return startup(baseDir, numZooKeeperServers);
188   }
189 
190   /**
191    * @param baseDir
192    * @param numZooKeeperServers
193    * @return ClientPort server bound to, -1 if there was a
194    *         binding problem and we couldn't pick another port.
195    * @throws IOException
196    * @throws InterruptedException
197    */
198   public int startup(File baseDir, int numZooKeeperServers) throws IOException,
199       InterruptedException {
200     if (numZooKeeperServers <= 0)
201       return -1;
202 
203     setupTestEnv();
204     shutdown();
205 
206     int tentativePort = -1; // the seed port
207     int currentClientPort;
208 
209     // running all the ZK servers
210     for (int i = 0; i < numZooKeeperServers; i++) {
211       File dir = new File(baseDir, "zookeeper_"+i).getAbsoluteFile();
212       createDir(dir);
213       int tickTimeToUse;
214       if (this.tickTime > 0) {
215         tickTimeToUse = this.tickTime;
216       } else {
217         tickTimeToUse = TICK_TIME;
218       }
219 
220       // Set up client port - if we have already had a list of valid ports, use it.
221       if (hasValidClientPortInList(i)) {
222         currentClientPort = clientPortList.get(i);
223       } else {
224         tentativePort = selectClientPort(tentativePort); // update the seed
225         currentClientPort = tentativePort;
226       }
227 
228       ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse);
229       // Setting {min,max}SessionTimeout defaults to be the same as in Zookeeper
230       server.setMinSessionTimeout(configuration.getInt("hbase.zookeeper.property.minSessionTimeout", -1));
231       server.setMaxSessionTimeout(configuration.getInt("hbase.zookeeper.property.maxSessionTimeout", -1));
232       NIOServerCnxnFactory standaloneServerFactory;
233       while (true) {
234         try {
235           standaloneServerFactory = new NIOServerCnxnFactory();
236           standaloneServerFactory.configure(
237             new InetSocketAddress(currentClientPort),
238             configuration.getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, 1000));
239         } catch (BindException e) {
240           LOG.debug("Failed binding ZK Server to client port: " +
241               currentClientPort, e);
242           // We're told to use some port but it's occupied, fail
243           if (hasValidClientPortInList(i)) {
244             return -1;
245           }
246           // This port is already in use, try to use another.
247           tentativePort = selectClientPort(tentativePort);
248           currentClientPort = tentativePort;
249           continue;
250         }
251         break;
252       }
253 
254       // Start up this ZK server
255       standaloneServerFactory.startup(server);
256       // Runs a 'stat' against the servers.
257       if (!waitForServerUp(currentClientPort, CONNECTION_TIMEOUT)) {
258         throw new IOException("Waiting for startup of standalone server");
259       }
260 
261       // We have selected a port as a client port.  Update clientPortList if necessary.
262       if (clientPortList.size() <= i) { // it is not in the list, add the port
263         clientPortList.add(currentClientPort);
264       }
265       else if (clientPortList.get(i) <= 0) { // the list has invalid port, update with valid port
266         clientPortList.remove(i);
267         clientPortList.add(i, currentClientPort);
268       }
269 
270       standaloneServerFactoryList.add(standaloneServerFactory);
271       zooKeeperServers.add(server);
272     }
273 
274     // set the first one to be active ZK; Others are backups
275     activeZKServerIndex = 0;
276     started = true;
277     int clientPort = clientPortList.get(activeZKServerIndex);
278     LOG.info("Started MiniZooKeeperCluster and ran successful 'stat' " +
279         "on client port=" + clientPort);
280     return clientPort;
281   }
282 
283   private void createDir(File dir) throws IOException {
284     try {
285       if (!dir.exists()) {
286         dir.mkdirs();
287       }
288     } catch (SecurityException e) {
289       throw new IOException("creating dir: " + dir, e);
290     }
291   }
292 
293   /**
294    * @throws IOException
295    */
296   public void shutdown() throws IOException {
297     // shut down all the zk servers
298     for (int i = 0; i < standaloneServerFactoryList.size(); i++) {
299       NIOServerCnxnFactory standaloneServerFactory =
300         standaloneServerFactoryList.get(i);
301       int clientPort = clientPortList.get(i);
302 
303       standaloneServerFactory.shutdown();
304       if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
305         throw new IOException("Waiting for shutdown of standalone server");
306       }
307     }
308     standaloneServerFactoryList.clear();
309 
310     for (ZooKeeperServer zkServer: zooKeeperServers) {
311       //explicitly close ZKDatabase since ZookeeperServer does not close them
312       zkServer.getZKDatabase().close();
313     }
314     zooKeeperServers.clear();
315 
316     // clear everything
317     if (started) {
318       started = false;
319       activeZKServerIndex = 0;
320       clientPortList.clear();
321       LOG.info("Shutdown MiniZK cluster with all ZK servers");
322     }
323   }
324 
325   /**@return clientPort return clientPort if there is another ZK backup can run
326    *         when killing the current active; return -1, if there is no backups.
327    * @throws IOException
328    * @throws InterruptedException
329    */
330   public int killCurrentActiveZooKeeperServer() throws IOException,
331                                         InterruptedException {
332     if (!started || activeZKServerIndex < 0) {
333       return -1;
334     }
335 
336     // Shutdown the current active one
337     NIOServerCnxnFactory standaloneServerFactory =
338       standaloneServerFactoryList.get(activeZKServerIndex);
339     int clientPort = clientPortList.get(activeZKServerIndex);
340 
341     standaloneServerFactory.shutdown();
342     if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
343       throw new IOException("Waiting for shutdown of standalone server");
344     }
345 
346     zooKeeperServers.get(activeZKServerIndex).getZKDatabase().close();
347 
348     // remove the current active zk server
349     standaloneServerFactoryList.remove(activeZKServerIndex);
350     clientPortList.remove(activeZKServerIndex);
351     zooKeeperServers.remove(activeZKServerIndex);
352     LOG.info("Kill the current active ZK servers in the cluster " +
353         "on client port: " + clientPort);
354 
355     if (standaloneServerFactoryList.size() == 0) {
356       // there is no backup servers;
357       return -1;
358     }
359     clientPort = clientPortList.get(activeZKServerIndex);
360     LOG.info("Activate a backup zk server in the cluster " +
361         "on client port: " + clientPort);
362     // return the next back zk server's port
363     return clientPort;
364   }
365 
366   /**
367    * Kill one back up ZK servers
368    * @throws IOException
369    * @throws InterruptedException
370    */
371   public void killOneBackupZooKeeperServer() throws IOException,
372                                         InterruptedException {
373     if (!started || activeZKServerIndex < 0 ||
374         standaloneServerFactoryList.size() <= 1) {
375       return ;
376     }
377 
378     int backupZKServerIndex = activeZKServerIndex+1;
379     // Shutdown the current active one
380     NIOServerCnxnFactory standaloneServerFactory =
381       standaloneServerFactoryList.get(backupZKServerIndex);
382     int clientPort = clientPortList.get(backupZKServerIndex);
383 
384     standaloneServerFactory.shutdown();
385     if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
386       throw new IOException("Waiting for shutdown of standalone server");
387     }
388 
389     zooKeeperServers.get(backupZKServerIndex).getZKDatabase().close();
390 
391     // remove this backup zk server
392     standaloneServerFactoryList.remove(backupZKServerIndex);
393     clientPortList.remove(backupZKServerIndex);
394     zooKeeperServers.remove(backupZKServerIndex);
395     LOG.info("Kill one backup ZK servers in the cluster " +
396         "on client port: " + clientPort);
397   }
398 
399   // XXX: From o.a.zk.t.ClientBase
400   private static boolean waitForServerDown(int port, long timeout) throws IOException {
401     long start = System.currentTimeMillis();
402     while (true) {
403       try {
404         Socket sock = new Socket("localhost", port);
405         try {
406           OutputStream outstream = sock.getOutputStream();
407           outstream.write("stat".getBytes());
408           outstream.flush();
409         } finally {
410           sock.close();
411         }
412       } catch (IOException e) {
413         return true;
414       }
415 
416       if (System.currentTimeMillis() > start + timeout) {
417         break;
418       }
419       try {
420         Thread.sleep(250);
421       } catch (InterruptedException e) {
422         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
423       }
424     }
425     return false;
426   }
427 
428   // XXX: From o.a.zk.t.ClientBase
429   private static boolean waitForServerUp(int port, long timeout) throws IOException {
430     long start = System.currentTimeMillis();
431     while (true) {
432       try {
433         Socket sock = new Socket("localhost", port);
434         BufferedReader reader = null;
435         try {
436           OutputStream outstream = sock.getOutputStream();
437           outstream.write("stat".getBytes());
438           outstream.flush();
439 
440           Reader isr = new InputStreamReader(sock.getInputStream());
441           reader = new BufferedReader(isr);
442           String line = reader.readLine();
443           if (line != null && line.startsWith("Zookeeper version:")) {
444             return true;
445           }
446         } finally {
447           sock.close();
448           if (reader != null) {
449             reader.close();
450           }
451         }
452       } catch (IOException e) {
453         // ignore as this is expected
454         LOG.info("server localhost:" + port + " not up " + e);
455       }
456 
457       if (System.currentTimeMillis() > start + timeout) {
458         break;
459       }
460       try {
461         Thread.sleep(250);
462       } catch (InterruptedException e) {
463         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
464       }
465     }
466     return false;
467   }
468 
469   public int getClientPort() {
470     return activeZKServerIndex < 0 || activeZKServerIndex >= clientPortList.size() ? -1
471         : clientPortList.get(activeZKServerIndex);
472   }
473 }