View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.zookeeper;
20  
21  import java.io.BufferedReader;
22  import java.io.File;
23  import java.io.InterruptedIOException;
24  import java.io.IOException;
25  import java.io.InputStreamReader;
26  import java.io.OutputStream;
27  import java.io.Reader;
28  import java.net.BindException;
29  import java.net.InetSocketAddress;
30  import java.net.Socket;
31  import java.util.ArrayList;
32  import java.util.List;
33  import java.util.Random;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.classification.InterfaceAudience;
38  import org.apache.hadoop.classification.InterfaceStability;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.fs.FileUtil;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.zookeeper.server.NIOServerCnxnFactory;
43  import org.apache.zookeeper.server.ZooKeeperServer;
44  import org.apache.zookeeper.server.persistence.FileTxnLog;
45  
46  /**
47   * TODO: Most of the code in this class is ripped from ZooKeeper tests. Instead
48   * of redoing it, we should contribute updates to their code which let us more
49   * easily access testing helper objects.
50   */
51  @InterfaceAudience.Public
52  @InterfaceStability.Evolving
53  public class MiniZooKeeperCluster {
54    private static final Log LOG = LogFactory.getLog(MiniZooKeeperCluster.class);
55  
56    private static final int TICK_TIME = 2000;
57    private static final int CONNECTION_TIMEOUT = 30000;
58  
59    private boolean started;
60  
61    /** The default port. If zero, we use a random port. */
62    private int defaultClientPort = 0;
63  
64    private List<NIOServerCnxnFactory> standaloneServerFactoryList;
65    private List<ZooKeeperServer> zooKeeperServers;
66    private List<Integer> clientPortList;
67  
68    private int activeZKServerIndex;
69    private int tickTime = 0;
70  
71    private Configuration configuration;
72  
73    public MiniZooKeeperCluster() {
74      this(new Configuration());
75    }
76  
77    public MiniZooKeeperCluster(Configuration configuration) {
78      this.started = false;
79      this.configuration = configuration;
80      activeZKServerIndex = -1;
81      zooKeeperServers = new ArrayList<ZooKeeperServer>();
82      clientPortList = new ArrayList<Integer>();
83      standaloneServerFactoryList = new ArrayList<NIOServerCnxnFactory>();
84    }
85  
86    public void setDefaultClientPort(int clientPort) {
87      if (clientPort <= 0) {
88        throw new IllegalArgumentException("Invalid default ZK client port: "
89            + clientPort);
90      }
91      this.defaultClientPort = clientPort;
92    }
93  
94    /**
95     * Selects a ZK client port. Returns the default port if specified.
96     * Otherwise, returns a random port. The random port is selected from the
97     * range between 49152 to 65535. These ports cannot be registered with IANA
98     * and are intended for dynamic allocation (see http://bit.ly/dynports).
99     */
100   private int selectClientPort() {
101     if (defaultClientPort > 0) {
102       return defaultClientPort;
103     }
104     return 0xc000 + new Random().nextInt(0x3f00);
105   }
106 
107   public void setTickTime(int tickTime) {
108     this.tickTime = tickTime;
109   }
110 
111   public int getBackupZooKeeperServerNum() {
112     return zooKeeperServers.size()-1;
113   }
114 
115   public int getZooKeeperServerNum() {
116     return zooKeeperServers.size();
117   }
118 
119   // / XXX: From o.a.zk.t.ClientBase
120   private static void setupTestEnv() {
121     // during the tests we run with 100K prealloc in the logs.
122     // on windows systems prealloc of 64M was seen to take ~15seconds
123     // resulting in test failure (client timeout on first session).
124     // set env and directly in order to handle static init/gc issues
125     System.setProperty("zookeeper.preAllocSize", "100");
126     FileTxnLog.setPreallocSize(100 * 1024);
127   }
128 
129   public int startup(File baseDir) throws IOException, InterruptedException {
130     return startup(baseDir,1);
131   }
132 
133   /**
134    * @param baseDir
135    * @param numZooKeeperServers
136    * @return ClientPort server bound to, -1 if there was a
137    *         binding problem and we couldn't pick another port.
138    * @throws IOException
139    * @throws InterruptedException
140    */
141   public int startup(File baseDir, int numZooKeeperServers) throws IOException,
142       InterruptedException {
143     if (numZooKeeperServers <= 0)
144       return -1;
145 
146     setupTestEnv();
147     shutdown();
148 
149     int tentativePort = selectClientPort();
150 
151     // running all the ZK servers
152     for (int i = 0; i < numZooKeeperServers; i++) {
153       File dir = new File(baseDir, "zookeeper_"+i).getAbsoluteFile();
154       recreateDir(dir);
155       int tickTimeToUse;
156       if (this.tickTime > 0) {
157         tickTimeToUse = this.tickTime;
158       } else {
159         tickTimeToUse = TICK_TIME;
160       }
161       ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse);
162       NIOServerCnxnFactory standaloneServerFactory;
163       while (true) {
164         try {
165           standaloneServerFactory = new NIOServerCnxnFactory();
166           standaloneServerFactory.configure(
167             new InetSocketAddress(tentativePort),
168             configuration.getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS,
169               1000));
170         } catch (BindException e) {
171           LOG.debug("Failed binding ZK Server to client port: " +
172               tentativePort, e);
173           // We're told to use some port but it's occupied, fail
174           if (defaultClientPort > 0) return -1;
175           // This port is already in use, try to use another.
176           tentativePort = selectClientPort();
177           continue;
178         }
179         break;
180       }
181 
182       // Start up this ZK server
183       standaloneServerFactory.startup(server);
184       if (!waitForServerUp(tentativePort, CONNECTION_TIMEOUT)) {
185         throw new IOException("Waiting for startup of standalone server");
186       }
187 
188       // We have selected this port as a client port.
189       clientPortList.add(tentativePort);
190       standaloneServerFactoryList.add(standaloneServerFactory);
191       zooKeeperServers.add(server);
192       tentativePort++; //for the next server
193     }
194 
195     // set the first one to be active ZK; Others are backups
196     activeZKServerIndex = 0;
197     started = true;
198     int clientPort = clientPortList.get(activeZKServerIndex);
199     LOG.info("Started MiniZK Cluster and connect 1 ZK server " +
200         "on client port: " + clientPort);
201     return clientPort;
202   }
203 
204   private void recreateDir(File dir) throws IOException {
205     if (dir.exists()) {
206       if(!FileUtil.fullyDelete(dir)) {
207         throw new IOException("Could not delete zk base directory: " + dir);
208       }
209     }
210     try {
211       dir.mkdirs();
212     } catch (SecurityException e) {
213       throw new IOException("creating dir: " + dir, e);
214     }
215   }
216 
217   /**
218    * @throws IOException
219    */
220   public void shutdown() throws IOException {
221     if (!started) {
222       return;
223     }
224 
225     // shut down all the zk servers
226     for (int i = 0; i < standaloneServerFactoryList.size(); i++) {
227       NIOServerCnxnFactory standaloneServerFactory =
228         standaloneServerFactoryList.get(i);
229       int clientPort = clientPortList.get(i);
230 
231       standaloneServerFactory.shutdown();
232       if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
233         throw new IOException("Waiting for shutdown of standalone server");
234       }
235     }
236     for (ZooKeeperServer zkServer: zooKeeperServers) {
237       //explicitly close ZKDatabase since ZookeeperServer does not close them
238       zkServer.getZKDatabase().close();
239     }
240 
241     // clear everything
242     started = false;
243     activeZKServerIndex = 0;
244     standaloneServerFactoryList.clear();
245     clientPortList.clear();
246     zooKeeperServers.clear();
247 
248     LOG.info("Shutdown MiniZK cluster with all ZK servers");
249   }
250 
251   /**@return clientPort return clientPort if there is another ZK backup can run
252    *         when killing the current active; return -1, if there is no backups.
253    * @throws IOException
254    * @throws InterruptedException
255    */
256   public int killCurrentActiveZooKeeperServer() throws IOException,
257                                         InterruptedException {
258     if (!started || activeZKServerIndex < 0 ) {
259       return -1;
260     }
261 
262     // Shutdown the current active one
263     NIOServerCnxnFactory standaloneServerFactory =
264       standaloneServerFactoryList.get(activeZKServerIndex);
265     int clientPort = clientPortList.get(activeZKServerIndex);
266 
267     standaloneServerFactory.shutdown();
268     if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
269       throw new IOException("Waiting for shutdown of standalone server");
270     }
271 
272     zooKeeperServers.get(activeZKServerIndex).getZKDatabase().close();
273 
274     // remove the current active zk server
275     standaloneServerFactoryList.remove(activeZKServerIndex);
276     clientPortList.remove(activeZKServerIndex);
277     zooKeeperServers.remove(activeZKServerIndex);
278     LOG.info("Kill the current active ZK servers in the cluster " +
279         "on client port: " + clientPort);
280 
281     if (standaloneServerFactoryList.size() == 0) {
282       // there is no backup servers;
283       return -1;
284     }
285     clientPort = clientPortList.get(activeZKServerIndex);
286     LOG.info("Activate a backup zk server in the cluster " +
287         "on client port: " + clientPort);
288     // return the next back zk server's port
289     return clientPort;
290   }
291 
292   /**
293    * Kill one back up ZK servers
294    * @throws IOException
295    * @throws InterruptedException
296    */
297   public void killOneBackupZooKeeperServer() throws IOException,
298                                         InterruptedException {
299     if (!started || activeZKServerIndex < 0 ||
300         standaloneServerFactoryList.size() <= 1) {
301       return ;
302     }
303 
304     int backupZKServerIndex = activeZKServerIndex+1;
305     // Shutdown the current active one
306     NIOServerCnxnFactory standaloneServerFactory =
307       standaloneServerFactoryList.get(backupZKServerIndex);
308     int clientPort = clientPortList.get(backupZKServerIndex);
309 
310     standaloneServerFactory.shutdown();
311     if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
312       throw new IOException("Waiting for shutdown of standalone server");
313     }
314 
315     zooKeeperServers.get(backupZKServerIndex).getZKDatabase().close();
316 
317     // remove this backup zk server
318     standaloneServerFactoryList.remove(backupZKServerIndex);
319     clientPortList.remove(backupZKServerIndex);
320     zooKeeperServers.remove(backupZKServerIndex);
321     LOG.info("Kill one backup ZK servers in the cluster " +
322         "on client port: " + clientPort);
323   }
324 
325   // XXX: From o.a.zk.t.ClientBase
326   private static boolean waitForServerDown(int port, long timeout) throws IOException {
327     long start = System.currentTimeMillis();
328     while (true) {
329       try {
330         Socket sock = new Socket("localhost", port);
331         try {
332           OutputStream outstream = sock.getOutputStream();
333           outstream.write("stat".getBytes());
334           outstream.flush();
335         } finally {
336           sock.close();
337         }
338       } catch (IOException e) {
339         return true;
340       }
341 
342       if (System.currentTimeMillis() > start + timeout) {
343         break;
344       }
345       try {
346         Thread.sleep(250);
347       } catch (InterruptedException e) {
348         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
349       }
350     }
351     return false;
352   }
353 
354   // XXX: From o.a.zk.t.ClientBase
355   private static boolean waitForServerUp(int port, long timeout) throws IOException {
356     long start = System.currentTimeMillis();
357     while (true) {
358       try {
359         Socket sock = new Socket("localhost", port);
360         BufferedReader reader = null;
361         try {
362           OutputStream outstream = sock.getOutputStream();
363           outstream.write("stat".getBytes());
364           outstream.flush();
365 
366           Reader isr = new InputStreamReader(sock.getInputStream());
367           reader = new BufferedReader(isr);
368           String line = reader.readLine();
369           if (line != null && line.startsWith("Zookeeper version:")) {
370             return true;
371           }
372         } finally {
373           sock.close();
374           if (reader != null) {
375             reader.close();
376           }
377         }
378       } catch (IOException e) {
379         // ignore as this is expected
380         LOG.info("server localhost:" + port + " not up " + e);
381       }
382 
383       if (System.currentTimeMillis() > start + timeout) {
384         break;
385       }
386       try {
387         Thread.sleep(250);
388       } catch (InterruptedException e) {
389         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
390       }
391     }
392     return false;
393   }
394 
395   public int getClientPort() {
396     return activeZKServerIndex < 0 || activeZKServerIndex >= clientPortList.size() ? -1
397         : clientPortList.get(activeZKServerIndex);
398   }
399 }