View Javadoc

1   /*
2    * Copyright 2009 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.zookeeper;
21  
22  import java.io.BufferedReader;
23  import java.io.File;
24  import java.io.IOException;
25  import java.io.InputStreamReader;
26  import java.io.OutputStream;
27  import java.io.Reader;
28  import java.net.BindException;
29  import java.net.InetSocketAddress;
30  import java.net.Socket;
31  import java.util.ArrayList;
32  import java.util.List;
33  import java.util.Random;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.fs.FileUtil;
39  import org.apache.hadoop.hbase.HConstants;
40  import org.apache.zookeeper.server.NIOServerCnxnFactory;
41  import org.apache.zookeeper.server.ZooKeeperServer;
42  import org.apache.zookeeper.server.persistence.FileTxnLog;
43  
44  /**
45   * TODO: Most of the code in this class is ripped from ZooKeeper tests. Instead
46   * of redoing it, we should contribute updates to their code which let us more
47   * easily access testing helper objects.
48   */
49  public class MiniZooKeeperCluster {
50    private static final Log LOG = LogFactory.getLog(MiniZooKeeperCluster.class);
51  
52    private static final int TICK_TIME = 2000;
53    private static final int CONNECTION_TIMEOUT = 30000;
54  
55    private boolean started;
56  
57    /** The default port. If zero, we use a random port. */
58    private int defaultClientPort = 0;
59  
60    private int clientPort;
61  
62    private List<NIOServerCnxnFactory> standaloneServerFactoryList;
63    private List<ZooKeeperServer> zooKeeperServers;
64    private List<Integer> clientPortList;
65  
66    private int activeZKServerIndex;
67    private int tickTime = 0;
68  
69    private Configuration configuration;
70  
71    public MiniZooKeeperCluster() {
72      this(new Configuration());
73    }
74  
75    public MiniZooKeeperCluster(Configuration configuration) {
76      this.started = false;
77      this.configuration = configuration;
78      activeZKServerIndex = -1;
79      zooKeeperServers = new ArrayList<ZooKeeperServer>();
80      clientPortList = new ArrayList<Integer>();
81      standaloneServerFactoryList = new ArrayList<NIOServerCnxnFactory>();
82    }
83  
84    public void setDefaultClientPort(int clientPort) {
85      if (clientPort <= 0) {
86        throw new IllegalArgumentException("Invalid default ZK client port: "
87            + clientPort);
88      }
89      this.defaultClientPort = clientPort;
90    }
91  
92    /**
93     * Selects a ZK client port. Returns the default port if specified.
94     * Otherwise, returns a random port. The random port is selected from the
95     * range between 49152 to 65535. These ports cannot be registered with IANA
96     * and are intended for dynamic allocation (see http://bit.ly/dynports).
97     */
98    private int selectClientPort() {
99      if (defaultClientPort > 0) {
100       return defaultClientPort;
101     }
102     return 0xc000 + new Random().nextInt(0x3f00);
103   }
104 
105   public void setTickTime(int tickTime) {
106     this.tickTime = tickTime;
107   }
108 
109   public int getBackupZooKeeperServerNum() {
110     return zooKeeperServers.size()-1;
111   }
112 
113   public int getZooKeeperServerNum() {
114     return zooKeeperServers.size();
115   }
116 
117   // / XXX: From o.a.zk.t.ClientBase
118   private static void setupTestEnv() {
119     // during the tests we run with 100K prealloc in the logs.
120     // on windows systems prealloc of 64M was seen to take ~15seconds
121     // resulting in test failure (client timeout on first session).
122     // set env and directly in order to handle static init/gc issues
123     System.setProperty("zookeeper.preAllocSize", "100");
124     FileTxnLog.setPreallocSize(100 * 1024);
125   }
126 
127   public int startup(File baseDir) throws IOException, InterruptedException {
128     return startup(baseDir,1);
129   }
130 
131   /**
132    * @param baseDir
133    * @param numZooKeeperServers
134    * @return ClientPort server bound to.
135    * @throws IOException
136    * @throws InterruptedException
137    */
138   public int startup(File baseDir, int numZooKeeperServers) throws IOException,
139       InterruptedException {
140     if (numZooKeeperServers <= 0)
141       return -1;
142 
143     setupTestEnv();
144     shutdown();
145 
146     int tentativePort = selectClientPort();
147 
148     // running all the ZK servers
149     for (int i = 0; i < numZooKeeperServers; i++) {
150       File dir = new File(baseDir, "zookeeper_"+i).getAbsoluteFile();
151       recreateDir(dir);
152       int tickTimeToUse;
153       if (this.tickTime > 0) {
154         tickTimeToUse = this.tickTime;
155       } else {
156         tickTimeToUse = TICK_TIME;
157       }
158       ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse);
159       NIOServerCnxnFactory standaloneServerFactory;
160       while (true) {
161         try {
162           standaloneServerFactory = new NIOServerCnxnFactory();
163           standaloneServerFactory.configure(
164             new InetSocketAddress(tentativePort),
165             configuration.getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS,
166               1000));
167         } catch (BindException e) {
168           LOG.debug("Failed binding ZK Server to client port: " +
169               tentativePort);
170           // This port is already in use, try to use another.
171           tentativePort++;
172           continue;
173         }
174         break;
175       }
176 
177       // Start up this ZK server
178       standaloneServerFactory.startup(server);
179       if (!waitForServerUp(tentativePort, CONNECTION_TIMEOUT)) {
180         throw new IOException("Waiting for startup of standalone server");
181       }
182 
183       // We have selected this port as a client port.
184       clientPortList.add(tentativePort);
185       standaloneServerFactoryList.add(standaloneServerFactory);
186       zooKeeperServers.add(server);
187     }
188 
189     // set the first one to be active ZK; Others are backups
190     activeZKServerIndex = 0;
191     started = true;
192     clientPort = clientPortList.get(activeZKServerIndex);
193     LOG.info("Started MiniZK Cluster and connect 1 ZK server " +
194         "on client port: " + clientPort);
195     return clientPort;
196   }
197 
198   private void recreateDir(File dir) throws IOException {
199     if (dir.exists()) {
200       if(!FileUtil.fullyDelete(dir)) {
201         throw new IOException("Could not delete zk base directory: " + dir);
202       }
203     }
204     try {
205       dir.mkdirs();
206     } catch (SecurityException e) {
207       throw new IOException("creating dir: " + dir, e);
208     }
209   }
210 
211   /**
212    * @throws IOException
213    */
214   public void shutdown() throws IOException {
215     if (!started) {
216       return;
217     }
218 
219     // shut down all the zk servers
220     for (int i = 0; i < standaloneServerFactoryList.size(); i++) {
221       NIOServerCnxnFactory standaloneServerFactory =
222         standaloneServerFactoryList.get(i);
223       int clientPort = clientPortList.get(i);
224 
225       standaloneServerFactory.shutdown();
226       if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
227         throw new IOException("Waiting for shutdown of standalone server");
228       }
229     }
230     for (ZooKeeperServer zkServer: zooKeeperServers) {
231       //explicitly close ZKDatabase since ZookeeperServer does not close them
232       zkServer.getZKDatabase().close();
233     }
234 
235     // clear everything
236     started = false;
237     activeZKServerIndex = 0;
238     standaloneServerFactoryList.clear();
239     clientPortList.clear();
240     zooKeeperServers.clear();
241 
242     LOG.info("Shutdown MiniZK cluster with all ZK servers");
243   }
244 
245   /**@return clientPort return clientPort if there is another ZK backup can run
246    *         when killing the current active; return -1, if there is no backups.
247    * @throws IOException
248    * @throws InterruptedException
249    */
250   public int killCurrentActiveZooKeeperServer() throws IOException,
251                                         InterruptedException {
252     if (!started || activeZKServerIndex < 0 ) {
253       return -1;
254     }
255 
256     // Shutdown the current active one
257     NIOServerCnxnFactory standaloneServerFactory =
258       standaloneServerFactoryList.get(activeZKServerIndex);
259     int clientPort = clientPortList.get(activeZKServerIndex);
260 
261     standaloneServerFactory.shutdown();
262     if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
263       throw new IOException("Waiting for shutdown of standalone server");
264     }
265     
266     zooKeeperServers.get(activeZKServerIndex).getZKDatabase().close();
267 
268     // remove the current active zk server
269     standaloneServerFactoryList.remove(activeZKServerIndex);
270     clientPortList.remove(activeZKServerIndex);
271     zooKeeperServers.remove(activeZKServerIndex);
272     LOG.info("Kill the current active ZK servers in the cluster " +
273         "on client port: " + clientPort);
274 
275     if (standaloneServerFactoryList.size() == 0) {
276       // there is no backup servers;
277       return -1;
278     }
279     clientPort = clientPortList.get(activeZKServerIndex);
280     LOG.info("Activate a backup zk server in the cluster " +
281         "on client port: " + clientPort);
282     // return the next back zk server's port
283     return clientPort;
284   }
285 
286   /**
287    * Kill one back up ZK servers
288    * @throws IOException
289    * @throws InterruptedException
290    */
291   public void killOneBackupZooKeeperServer() throws IOException,
292                                         InterruptedException {
293     if (!started || activeZKServerIndex < 0 ||
294         standaloneServerFactoryList.size() <= 1) {
295       return ;
296     }
297 
298     int backupZKServerIndex = activeZKServerIndex+1;
299     // Shutdown the current active one
300     NIOServerCnxnFactory standaloneServerFactory =
301       standaloneServerFactoryList.get(backupZKServerIndex);
302     int clientPort = clientPortList.get(backupZKServerIndex);
303 
304     standaloneServerFactory.shutdown();
305     if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
306       throw new IOException("Waiting for shutdown of standalone server");
307     }
308     
309     zooKeeperServers.get(backupZKServerIndex).getZKDatabase().close();
310 
311     // remove this backup zk server
312     standaloneServerFactoryList.remove(backupZKServerIndex);
313     clientPortList.remove(backupZKServerIndex);
314     zooKeeperServers.remove(backupZKServerIndex);
315     LOG.info("Kill one backup ZK servers in the cluster " +
316         "on client port: " + clientPort);
317   }
318 
319   // XXX: From o.a.zk.t.ClientBase
320   private static boolean waitForServerDown(int port, long timeout) {
321     long start = System.currentTimeMillis();
322     while (true) {
323       try {
324         Socket sock = new Socket("localhost", port);
325         try {
326           OutputStream outstream = sock.getOutputStream();
327           outstream.write("stat".getBytes());
328           outstream.flush();
329         } finally {
330           sock.close();
331         }
332       } catch (IOException e) {
333         return true;
334       }
335 
336       if (System.currentTimeMillis() > start + timeout) {
337         break;
338       }
339       try {
340         Thread.sleep(250);
341       } catch (InterruptedException e) {
342         // ignore
343       }
344     }
345     return false;
346   }
347 
348   // XXX: From o.a.zk.t.ClientBase
349   private static boolean waitForServerUp(int port, long timeout) {
350     long start = System.currentTimeMillis();
351     while (true) {
352       try {
353         Socket sock = new Socket("localhost", port);
354         BufferedReader reader = null;
355         try {
356           OutputStream outstream = sock.getOutputStream();
357           outstream.write("stat".getBytes());
358           outstream.flush();
359 
360           Reader isr = new InputStreamReader(sock.getInputStream());
361           reader = new BufferedReader(isr);
362           String line = reader.readLine();
363           if (line != null && line.startsWith("Zookeeper version:")) {
364             return true;
365           }
366         } finally {
367           sock.close();
368           if (reader != null) {
369             reader.close();
370           }
371         }
372       } catch (IOException e) {
373         // ignore as this is expected
374         LOG.info("server localhost:" + port + " not up " + e);
375       }
376 
377       if (System.currentTimeMillis() > start + timeout) {
378         break;
379       }
380       try {
381         Thread.sleep(250);
382       } catch (InterruptedException e) {
383         // ignore
384       }
385     }
386     return false;
387   }
388 
389   public int getClientPort() {
390     return clientPort;
391   }
392 }