1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.zookeeper;
20
21 import java.io.BufferedReader;
22 import java.io.File;
23 import java.io.InterruptedIOException;
24 import java.io.IOException;
25 import java.io.InputStreamReader;
26 import java.io.OutputStream;
27 import java.io.Reader;
28 import java.net.BindException;
29 import java.net.InetSocketAddress;
30 import java.net.Socket;
31 import java.util.ArrayList;
32 import java.util.List;
33 import java.util.Random;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.hbase.classification.InterfaceAudience;
38 import org.apache.hadoop.hbase.classification.InterfaceStability;
39 import org.apache.hadoop.conf.Configuration;
40 import org.apache.hadoop.hbase.HConstants;
41 import org.apache.zookeeper.server.NIOServerCnxnFactory;
42 import org.apache.zookeeper.server.ZooKeeperServer;
43 import org.apache.zookeeper.server.persistence.FileTxnLog;
44
45 import com.google.common.annotations.VisibleForTesting;
46
47
48
49
50
51
52 @InterfaceAudience.Public
53 @InterfaceStability.Evolving
54 public class MiniZooKeeperCluster {
55 private static final Log LOG = LogFactory.getLog(MiniZooKeeperCluster.class);
56
57 private static final int TICK_TIME = 2000;
58 private static final int CONNECTION_TIMEOUT = 30000;
59
60 private boolean started;
61
62
63 private int defaultClientPort = 0;
64
65 private List<NIOServerCnxnFactory> standaloneServerFactoryList;
66 private List<ZooKeeperServer> zooKeeperServers;
67 private List<Integer> clientPortList;
68
69 private int activeZKServerIndex;
70 private int tickTime = 0;
71
72 private Configuration configuration;
73
74 public MiniZooKeeperCluster() {
75 this(new Configuration());
76 }
77
78 public MiniZooKeeperCluster(Configuration configuration) {
79 this.started = false;
80 this.configuration = configuration;
81 activeZKServerIndex = -1;
82 zooKeeperServers = new ArrayList<ZooKeeperServer>();
83 clientPortList = new ArrayList<Integer>();
84 standaloneServerFactoryList = new ArrayList<NIOServerCnxnFactory>();
85 }
86
87
88
89
90
91
92 public void addClientPort(int clientPort) {
93 clientPortList.add(clientPort);
94 }
95
96
97
98
99
100 @VisibleForTesting
101 public List<Integer> getClientPortList() {
102 return clientPortList;
103 }
104
105
106
107
108
109
110 private boolean hasValidClientPortInList(int index) {
111 return (clientPortList.size() > index && clientPortList.get(index) > 0);
112 }
113
114 public void setDefaultClientPort(int clientPort) {
115 if (clientPort <= 0) {
116 throw new IllegalArgumentException("Invalid default ZK client port: "
117 + clientPort);
118 }
119 this.defaultClientPort = clientPort;
120 }
121
122
123
124
125
126
127
128 private int selectClientPort(int seedPort) {
129 int i;
130 int returnClientPort = seedPort + 1;
131 if (returnClientPort == 0) {
132
133
134
135
136 if (defaultClientPort > 0) {
137 returnClientPort = defaultClientPort;
138 } else {
139 returnClientPort = 0xc000 + new Random().nextInt(0x3f00);
140 }
141 }
142
143 while (true) {
144 for (i = 0; i < clientPortList.size(); i++) {
145 if (returnClientPort == clientPortList.get(i)) {
146
147 returnClientPort++;
148 break;
149 }
150 }
151 if (i == clientPortList.size()) {
152 break;
153 }
154 }
155 return returnClientPort;
156 }
157
158 public void setTickTime(int tickTime) {
159 this.tickTime = tickTime;
160 }
161
162 public int getBackupZooKeeperServerNum() {
163 return zooKeeperServers.size()-1;
164 }
165
166 public int getZooKeeperServerNum() {
167 return zooKeeperServers.size();
168 }
169
170
171 private static void setupTestEnv() {
172
173
174
175
176 System.setProperty("zookeeper.preAllocSize", "100");
177 FileTxnLog.setPreallocSize(100 * 1024);
178 }
179
180 public int startup(File baseDir) throws IOException, InterruptedException {
181 int numZooKeeperServers = clientPortList.size();
182 if (numZooKeeperServers == 0) {
183 numZooKeeperServers = 1;
184 }
185 return startup(baseDir, numZooKeeperServers);
186 }
187
188
189
190
191
192
193
194
195
196 public int startup(File baseDir, int numZooKeeperServers) throws IOException,
197 InterruptedException {
198 if (numZooKeeperServers <= 0)
199 return -1;
200
201 setupTestEnv();
202 shutdown();
203
204 int tentativePort = -1;
205 int currentClientPort;
206
207
208 for (int i = 0; i < numZooKeeperServers; i++) {
209 File dir = new File(baseDir, "zookeeper_"+i).getAbsoluteFile();
210 createDir(dir);
211 int tickTimeToUse;
212 if (this.tickTime > 0) {
213 tickTimeToUse = this.tickTime;
214 } else {
215 tickTimeToUse = TICK_TIME;
216 }
217
218
219 if (hasValidClientPortInList(i)) {
220 currentClientPort = clientPortList.get(i);
221 } else {
222 tentativePort = selectClientPort(tentativePort);
223 currentClientPort = tentativePort;
224 }
225
226 ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse);
227 NIOServerCnxnFactory standaloneServerFactory;
228 while (true) {
229 try {
230 standaloneServerFactory = new NIOServerCnxnFactory();
231 standaloneServerFactory.configure(
232 new InetSocketAddress(currentClientPort),
233 configuration.getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, 1000));
234 } catch (BindException e) {
235 LOG.debug("Failed binding ZK Server to client port: " +
236 currentClientPort, e);
237
238 if (hasValidClientPortInList(i)) {
239 return -1;
240 }
241
242 tentativePort = selectClientPort(tentativePort);
243 currentClientPort = tentativePort;
244 continue;
245 }
246 break;
247 }
248
249
250 standaloneServerFactory.startup(server);
251
252 if (!waitForServerUp(currentClientPort, CONNECTION_TIMEOUT)) {
253 throw new IOException("Waiting for startup of standalone server");
254 }
255
256
257 if (clientPortList.size() <= i) {
258 clientPortList.add(currentClientPort);
259 }
260 else if (clientPortList.get(i) <= 0) {
261 clientPortList.remove(i);
262 clientPortList.add(i, currentClientPort);
263 }
264
265 standaloneServerFactoryList.add(standaloneServerFactory);
266 zooKeeperServers.add(server);
267 }
268
269
270 activeZKServerIndex = 0;
271 started = true;
272 int clientPort = clientPortList.get(activeZKServerIndex);
273 LOG.info("Started MiniZooKeeperCluster and ran successful 'stat' " +
274 "on client port=" + clientPort);
275 return clientPort;
276 }
277
278 private void createDir(File dir) throws IOException {
279 try {
280 if (!dir.exists()) {
281 dir.mkdirs();
282 }
283 } catch (SecurityException e) {
284 throw new IOException("creating dir: " + dir, e);
285 }
286 }
287
288
289
290
291 public void shutdown() throws IOException {
292
293 for (int i = 0; i < standaloneServerFactoryList.size(); i++) {
294 NIOServerCnxnFactory standaloneServerFactory =
295 standaloneServerFactoryList.get(i);
296 int clientPort = clientPortList.get(i);
297
298 standaloneServerFactory.shutdown();
299 if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
300 throw new IOException("Waiting for shutdown of standalone server");
301 }
302 }
303 standaloneServerFactoryList.clear();
304
305 for (ZooKeeperServer zkServer: zooKeeperServers) {
306
307 zkServer.getZKDatabase().close();
308 }
309 zooKeeperServers.clear();
310
311
312 if (started) {
313 started = false;
314 activeZKServerIndex = 0;
315 clientPortList.clear();
316 LOG.info("Shutdown MiniZK cluster with all ZK servers");
317 }
318 }
319
320
321
322
323
324
325 public int killCurrentActiveZooKeeperServer() throws IOException,
326 InterruptedException {
327 if (!started || activeZKServerIndex < 0) {
328 return -1;
329 }
330
331
332 NIOServerCnxnFactory standaloneServerFactory =
333 standaloneServerFactoryList.get(activeZKServerIndex);
334 int clientPort = clientPortList.get(activeZKServerIndex);
335
336 standaloneServerFactory.shutdown();
337 if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
338 throw new IOException("Waiting for shutdown of standalone server");
339 }
340
341 zooKeeperServers.get(activeZKServerIndex).getZKDatabase().close();
342
343
344 standaloneServerFactoryList.remove(activeZKServerIndex);
345 clientPortList.remove(activeZKServerIndex);
346 zooKeeperServers.remove(activeZKServerIndex);
347 LOG.info("Kill the current active ZK servers in the cluster " +
348 "on client port: " + clientPort);
349
350 if (standaloneServerFactoryList.size() == 0) {
351
352 return -1;
353 }
354 clientPort = clientPortList.get(activeZKServerIndex);
355 LOG.info("Activate a backup zk server in the cluster " +
356 "on client port: " + clientPort);
357
358 return clientPort;
359 }
360
361
362
363
364
365
366 public void killOneBackupZooKeeperServer() throws IOException,
367 InterruptedException {
368 if (!started || activeZKServerIndex < 0 ||
369 standaloneServerFactoryList.size() <= 1) {
370 return ;
371 }
372
373 int backupZKServerIndex = activeZKServerIndex+1;
374
375 NIOServerCnxnFactory standaloneServerFactory =
376 standaloneServerFactoryList.get(backupZKServerIndex);
377 int clientPort = clientPortList.get(backupZKServerIndex);
378
379 standaloneServerFactory.shutdown();
380 if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
381 throw new IOException("Waiting for shutdown of standalone server");
382 }
383
384 zooKeeperServers.get(backupZKServerIndex).getZKDatabase().close();
385
386
387 standaloneServerFactoryList.remove(backupZKServerIndex);
388 clientPortList.remove(backupZKServerIndex);
389 zooKeeperServers.remove(backupZKServerIndex);
390 LOG.info("Kill one backup ZK servers in the cluster " +
391 "on client port: " + clientPort);
392 }
393
394
395 private static boolean waitForServerDown(int port, long timeout) throws IOException {
396 long start = System.currentTimeMillis();
397 while (true) {
398 try {
399 Socket sock = new Socket("localhost", port);
400 try {
401 OutputStream outstream = sock.getOutputStream();
402 outstream.write("stat".getBytes());
403 outstream.flush();
404 } finally {
405 sock.close();
406 }
407 } catch (IOException e) {
408 return true;
409 }
410
411 if (System.currentTimeMillis() > start + timeout) {
412 break;
413 }
414 try {
415 Thread.sleep(250);
416 } catch (InterruptedException e) {
417 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
418 }
419 }
420 return false;
421 }
422
423
424 private static boolean waitForServerUp(int port, long timeout) throws IOException {
425 long start = System.currentTimeMillis();
426 while (true) {
427 try {
428 Socket sock = new Socket("localhost", port);
429 BufferedReader reader = null;
430 try {
431 OutputStream outstream = sock.getOutputStream();
432 outstream.write("stat".getBytes());
433 outstream.flush();
434
435 Reader isr = new InputStreamReader(sock.getInputStream());
436 reader = new BufferedReader(isr);
437 String line = reader.readLine();
438 if (line != null && line.startsWith("Zookeeper version:")) {
439 return true;
440 }
441 } finally {
442 sock.close();
443 if (reader != null) {
444 reader.close();
445 }
446 }
447 } catch (IOException e) {
448
449 LOG.info("server localhost:" + port + " not up " + e);
450 }
451
452 if (System.currentTimeMillis() > start + timeout) {
453 break;
454 }
455 try {
456 Thread.sleep(250);
457 } catch (InterruptedException e) {
458 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
459 }
460 }
461 return false;
462 }
463
464 public int getClientPort() {
465 return activeZKServerIndex < 0 || activeZKServerIndex >= clientPortList.size() ? -1
466 : clientPortList.get(activeZKServerIndex);
467 }
468 }