View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.zookeeper;
20  
21  import java.io.BufferedReader;
22  import java.io.File;
23  import java.io.InterruptedIOException;
24  import java.io.IOException;
25  import java.io.InputStreamReader;
26  import java.io.OutputStream;
27  import java.io.Reader;
28  import java.net.BindException;
29  import java.net.InetSocketAddress;
30  import java.net.Socket;
31  import java.util.ArrayList;
32  import java.util.List;
33  import java.util.Random;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.hbase.classification.InterfaceAudience;
38  import org.apache.hadoop.hbase.classification.InterfaceStability;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.hbase.HConstants;
41  import org.apache.zookeeper.server.NIOServerCnxnFactory;
42  import org.apache.zookeeper.server.ZooKeeperServer;
43  import org.apache.zookeeper.server.persistence.FileTxnLog;
44  
45  /**
46   * TODO: Most of the code in this class is ripped from ZooKeeper tests. Instead
47   * of redoing it, we should contribute updates to their code which let us more
48   * easily access testing helper objects.
49   */
50  @InterfaceAudience.Public
51  @InterfaceStability.Evolving
52  public class MiniZooKeeperCluster {
53    private static final Log LOG = LogFactory.getLog(MiniZooKeeperCluster.class);
54  
55    private static final int TICK_TIME = 2000;
56    private static final int CONNECTION_TIMEOUT = 30000;
57  
58    private boolean started;
59  
60    /** The default port. If zero, we use a random port. */
61    private int defaultClientPort = 0;
62  
63    private List<NIOServerCnxnFactory> standaloneServerFactoryList;
64    private List<ZooKeeperServer> zooKeeperServers;
65    private List<Integer> clientPortList;
66  
67    private int activeZKServerIndex;
68    private int tickTime = 0;
69  
70    private Configuration configuration;
71  
72    public MiniZooKeeperCluster() {
73      this(new Configuration());
74    }
75  
76    public MiniZooKeeperCluster(Configuration configuration) {
77      this.started = false;
78      this.configuration = configuration;
79      activeZKServerIndex = -1;
80      zooKeeperServers = new ArrayList<ZooKeeperServer>();
81      clientPortList = new ArrayList<Integer>();
82      standaloneServerFactoryList = new ArrayList<NIOServerCnxnFactory>();
83    }
84  
85    public void setDefaultClientPort(int clientPort) {
86      if (clientPort <= 0) {
87        throw new IllegalArgumentException("Invalid default ZK client port: "
88            + clientPort);
89      }
90      this.defaultClientPort = clientPort;
91    }
92  
93    /**
94     * Selects a ZK client port. Returns the default port if specified.
95     * Otherwise, returns a random port. The random port is selected from the
96     * range between 49152 to 65535. These ports cannot be registered with IANA
97     * and are intended for dynamic allocation (see http://bit.ly/dynports).
98     */
99    private int selectClientPort() {
100     if (defaultClientPort > 0) {
101       return defaultClientPort;
102     }
103     return 0xc000 + new Random().nextInt(0x3f00);
104   }
105 
106   public void setTickTime(int tickTime) {
107     this.tickTime = tickTime;
108   }
109 
110   public int getBackupZooKeeperServerNum() {
111     return zooKeeperServers.size()-1;
112   }
113 
114   public int getZooKeeperServerNum() {
115     return zooKeeperServers.size();
116   }
117 
118   // / XXX: From o.a.zk.t.ClientBase
119   private static void setupTestEnv() {
120     // during the tests we run with 100K prealloc in the logs.
121     // on windows systems prealloc of 64M was seen to take ~15seconds
122     // resulting in test failure (client timeout on first session).
123     // set env and directly in order to handle static init/gc issues
124     System.setProperty("zookeeper.preAllocSize", "100");
125     FileTxnLog.setPreallocSize(100 * 1024);
126   }
127 
128   public int startup(File baseDir) throws IOException, InterruptedException {
129     return startup(baseDir,1);
130   }
131 
132   /**
133    * @param baseDir
134    * @param numZooKeeperServers
135    * @return ClientPort server bound to, -1 if there was a
136    *         binding problem and we couldn't pick another port.
137    * @throws IOException
138    * @throws InterruptedException
139    */
140   public int startup(File baseDir, int numZooKeeperServers) throws IOException,
141       InterruptedException {
142     if (numZooKeeperServers <= 0)
143       return -1;
144 
145     setupTestEnv();
146     shutdown();
147 
148     int tentativePort = selectClientPort();
149 
150     // running all the ZK servers
151     for (int i = 0; i < numZooKeeperServers; i++) {
152       File dir = new File(baseDir, "zookeeper_"+i).getAbsoluteFile();
153       createDir(dir);
154       int tickTimeToUse;
155       if (this.tickTime > 0) {
156         tickTimeToUse = this.tickTime;
157       } else {
158         tickTimeToUse = TICK_TIME;
159       }
160       ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse);
161       NIOServerCnxnFactory standaloneServerFactory;
162       while (true) {
163         try {
164           standaloneServerFactory = new NIOServerCnxnFactory();
165           standaloneServerFactory.configure(
166             new InetSocketAddress(tentativePort),
167             configuration.getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, 1000));
168         } catch (BindException e) {
169           LOG.debug("Failed binding ZK Server to client port: " +
170               tentativePort, e);
171           // We're told to use some port but it's occupied, fail
172           if (defaultClientPort > 0) return -1;
173           // This port is already in use, try to use another.
174           tentativePort = selectClientPort();
175           continue;
176         }
177         break;
178       }
179 
180       // Start up this ZK server
181       standaloneServerFactory.startup(server);
182       // Runs a 'stat' against the servers.
183       if (!waitForServerUp(tentativePort, CONNECTION_TIMEOUT)) {
184         throw new IOException("Waiting for startup of standalone server");
185       }
186 
187       // We have selected this port as a client port.
188       clientPortList.add(tentativePort);
189       standaloneServerFactoryList.add(standaloneServerFactory);
190       zooKeeperServers.add(server);
191       tentativePort++; //for the next server
192     }
193 
194     // set the first one to be active ZK; Others are backups
195     activeZKServerIndex = 0;
196     started = true;
197     int clientPort = clientPortList.get(activeZKServerIndex);
198     LOG.info("Started MiniZooKeeperCluster and ran successful 'stat' " +
199         "on client port=" + clientPort);
200     return clientPort;
201   }
202 
203   private void createDir(File dir) throws IOException {
204     try {
205       if (!dir.exists()) {
206         dir.mkdirs();
207       }
208     } catch (SecurityException e) {
209       throw new IOException("creating dir: " + dir, e);
210     }
211   }
212 
213   /**
214    * @throws IOException
215    */
216   public void shutdown() throws IOException {
217     if (!started) {
218       return;
219     }
220 
221     // shut down all the zk servers
222     for (int i = 0; i < standaloneServerFactoryList.size(); i++) {
223       NIOServerCnxnFactory standaloneServerFactory =
224         standaloneServerFactoryList.get(i);
225       int clientPort = clientPortList.get(i);
226 
227       standaloneServerFactory.shutdown();
228       if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
229         throw new IOException("Waiting for shutdown of standalone server");
230       }
231     }
232     for (ZooKeeperServer zkServer: zooKeeperServers) {
233       //explicitly close ZKDatabase since ZookeeperServer does not close them
234       zkServer.getZKDatabase().close();
235     }
236 
237     // clear everything
238     started = false;
239     activeZKServerIndex = 0;
240     standaloneServerFactoryList.clear();
241     clientPortList.clear();
242     zooKeeperServers.clear();
243 
244     LOG.info("Shutdown MiniZK cluster with all ZK servers");
245   }
246 
247   /**@return clientPort return clientPort if there is another ZK backup can run
248    *         when killing the current active; return -1, if there is no backups.
249    * @throws IOException
250    * @throws InterruptedException
251    */
252   public int killCurrentActiveZooKeeperServer() throws IOException,
253                                         InterruptedException {
254     if (!started || activeZKServerIndex < 0 ) {
255       return -1;
256     }
257 
258     // Shutdown the current active one
259     NIOServerCnxnFactory standaloneServerFactory =
260       standaloneServerFactoryList.get(activeZKServerIndex);
261     int clientPort = clientPortList.get(activeZKServerIndex);
262 
263     standaloneServerFactory.shutdown();
264     if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
265       throw new IOException("Waiting for shutdown of standalone server");
266     }
267 
268     zooKeeperServers.get(activeZKServerIndex).getZKDatabase().close();
269 
270     // remove the current active zk server
271     standaloneServerFactoryList.remove(activeZKServerIndex);
272     clientPortList.remove(activeZKServerIndex);
273     zooKeeperServers.remove(activeZKServerIndex);
274     LOG.info("Kill the current active ZK servers in the cluster " +
275         "on client port: " + clientPort);
276 
277     if (standaloneServerFactoryList.size() == 0) {
278       // there is no backup servers;
279       return -1;
280     }
281     clientPort = clientPortList.get(activeZKServerIndex);
282     LOG.info("Activate a backup zk server in the cluster " +
283         "on client port: " + clientPort);
284     // return the next back zk server's port
285     return clientPort;
286   }
287 
288   /**
289    * Kill one back up ZK servers
290    * @throws IOException
291    * @throws InterruptedException
292    */
293   public void killOneBackupZooKeeperServer() throws IOException,
294                                         InterruptedException {
295     if (!started || activeZKServerIndex < 0 ||
296         standaloneServerFactoryList.size() <= 1) {
297       return ;
298     }
299 
300     int backupZKServerIndex = activeZKServerIndex+1;
301     // Shutdown the current active one
302     NIOServerCnxnFactory standaloneServerFactory =
303       standaloneServerFactoryList.get(backupZKServerIndex);
304     int clientPort = clientPortList.get(backupZKServerIndex);
305 
306     standaloneServerFactory.shutdown();
307     if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
308       throw new IOException("Waiting for shutdown of standalone server");
309     }
310 
311     zooKeeperServers.get(backupZKServerIndex).getZKDatabase().close();
312 
313     // remove this backup zk server
314     standaloneServerFactoryList.remove(backupZKServerIndex);
315     clientPortList.remove(backupZKServerIndex);
316     zooKeeperServers.remove(backupZKServerIndex);
317     LOG.info("Kill one backup ZK servers in the cluster " +
318         "on client port: " + clientPort);
319   }
320 
321   // XXX: From o.a.zk.t.ClientBase
322   private static boolean waitForServerDown(int port, long timeout) throws IOException {
323     long start = System.currentTimeMillis();
324     while (true) {
325       try {
326         Socket sock = new Socket("localhost", port);
327         try {
328           OutputStream outstream = sock.getOutputStream();
329           outstream.write("stat".getBytes());
330           outstream.flush();
331         } finally {
332           sock.close();
333         }
334       } catch (IOException e) {
335         return true;
336       }
337 
338       if (System.currentTimeMillis() > start + timeout) {
339         break;
340       }
341       try {
342         Thread.sleep(250);
343       } catch (InterruptedException e) {
344         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
345       }
346     }
347     return false;
348   }
349 
350   // XXX: From o.a.zk.t.ClientBase
351   private static boolean waitForServerUp(int port, long timeout) throws IOException {
352     long start = System.currentTimeMillis();
353     while (true) {
354       try {
355         Socket sock = new Socket("localhost", port);
356         BufferedReader reader = null;
357         try {
358           OutputStream outstream = sock.getOutputStream();
359           outstream.write("stat".getBytes());
360           outstream.flush();
361 
362           Reader isr = new InputStreamReader(sock.getInputStream());
363           reader = new BufferedReader(isr);
364           String line = reader.readLine();
365           if (line != null && line.startsWith("Zookeeper version:")) {
366             return true;
367           }
368         } finally {
369           sock.close();
370           if (reader != null) {
371             reader.close();
372           }
373         }
374       } catch (IOException e) {
375         // ignore as this is expected
376         LOG.info("server localhost:" + port + " not up " + e);
377       }
378 
379       if (System.currentTimeMillis() > start + timeout) {
380         break;
381       }
382       try {
383         Thread.sleep(250);
384       } catch (InterruptedException e) {
385         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
386       }
387     }
388     return false;
389   }
390 
391   public int getClientPort() {
392     return activeZKServerIndex < 0 || activeZKServerIndex >= clientPortList.size() ? -1
393         : clientPortList.get(activeZKServerIndex);
394   }
395 }