1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.zookeeper;
20
21 import java.io.BufferedReader;
22 import java.io.File;
23 import java.io.InterruptedIOException;
24 import java.io.IOException;
25 import java.io.InputStreamReader;
26 import java.io.OutputStream;
27 import java.io.Reader;
28 import java.net.BindException;
29 import java.net.InetSocketAddress;
30 import java.net.Socket;
31 import java.util.ArrayList;
32 import java.util.List;
33 import java.util.Random;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.hbase.classification.InterfaceAudience;
38 import org.apache.hadoop.hbase.classification.InterfaceStability;
39 import org.apache.hadoop.conf.Configuration;
40 import org.apache.hadoop.hbase.HConstants;
41 import org.apache.zookeeper.server.NIOServerCnxnFactory;
42 import org.apache.zookeeper.server.ZooKeeperServer;
43 import org.apache.zookeeper.server.persistence.FileTxnLog;
44
45 import com.google.common.annotations.VisibleForTesting;
46
47
48
49
50
51
52 @InterfaceAudience.Public
53 @InterfaceStability.Evolving
54 public class MiniZooKeeperCluster {
55 private static final Log LOG = LogFactory.getLog(MiniZooKeeperCluster.class);
56
57 private static final int TICK_TIME = 2000;
58 private static final int CONNECTION_TIMEOUT = 30000;
59
60 private boolean started;
61
62
63 private int defaultClientPort = 0;
64
65 private List<NIOServerCnxnFactory> standaloneServerFactoryList;
66 private List<ZooKeeperServer> zooKeeperServers;
67 private List<Integer> clientPortList;
68
69 private int activeZKServerIndex;
70 private int tickTime = 0;
71
72 private Configuration configuration;
73
74 public MiniZooKeeperCluster() {
75 this(new Configuration());
76 }
77
78 public MiniZooKeeperCluster(Configuration configuration) {
79 this.started = false;
80 this.configuration = configuration;
81 activeZKServerIndex = -1;
82 zooKeeperServers = new ArrayList<ZooKeeperServer>();
83 clientPortList = new ArrayList<Integer>();
84 standaloneServerFactoryList = new ArrayList<NIOServerCnxnFactory>();
85 }
86
87
88
89
90
91
92 public void addClientPort(int clientPort) {
93 clientPortList.add(clientPort);
94 }
95
96
97
98
99
100 @VisibleForTesting
101 public List<Integer> getClientPortList() {
102 return clientPortList;
103 }
104
105
106
107
108
109
110 private boolean hasValidClientPortInList(int index) {
111 return (clientPortList.size() > index && clientPortList.get(index) > 0);
112 }
113
114 public void setDefaultClientPort(int clientPort) {
115 if (clientPort <= 0) {
116 throw new IllegalArgumentException("Invalid default ZK client port: "
117 + clientPort);
118 }
119 this.defaultClientPort = clientPort;
120 }
121
122
123
124
125
126
127
128 private int selectClientPort(int seedPort) {
129 int i;
130 int returnClientPort = seedPort + 1;
131 if (returnClientPort == 0) {
132
133
134
135
136 if (defaultClientPort > 0) {
137 returnClientPort = defaultClientPort;
138 } else {
139 returnClientPort = 0xc000 + new Random().nextInt(0x3f00);
140 }
141 }
142
143 while (true) {
144 for (i = 0; i < clientPortList.size(); i++) {
145 if (returnClientPort == clientPortList.get(i)) {
146
147 returnClientPort++;
148 break;
149 }
150 }
151 if (i == clientPortList.size()) {
152 break;
153 }
154 }
155 return returnClientPort;
156 }
157
158 public void setTickTime(int tickTime) {
159 this.tickTime = tickTime;
160 }
161
162 public int getBackupZooKeeperServerNum() {
163 return zooKeeperServers.size()-1;
164 }
165
166 public int getZooKeeperServerNum() {
167 return zooKeeperServers.size();
168 }
169
170
171 private static void setupTestEnv() {
172
173
174
175
176 System.setProperty("zookeeper.preAllocSize", "100");
177 FileTxnLog.setPreallocSize(100 * 1024);
178
179 System.setProperty("zookeeper.4lw.commands.whitelist","*");
180 }
181
182 public int startup(File baseDir) throws IOException, InterruptedException {
183 int numZooKeeperServers = clientPortList.size();
184 if (numZooKeeperServers == 0) {
185 numZooKeeperServers = 1;
186 }
187 return startup(baseDir, numZooKeeperServers);
188 }
189
190
191
192
193
194
195
196
197
198 public int startup(File baseDir, int numZooKeeperServers) throws IOException,
199 InterruptedException {
200 if (numZooKeeperServers <= 0)
201 return -1;
202
203 setupTestEnv();
204 shutdown();
205
206 int tentativePort = -1;
207 int currentClientPort;
208
209
210 for (int i = 0; i < numZooKeeperServers; i++) {
211 File dir = new File(baseDir, "zookeeper_"+i).getAbsoluteFile();
212 createDir(dir);
213 int tickTimeToUse;
214 if (this.tickTime > 0) {
215 tickTimeToUse = this.tickTime;
216 } else {
217 tickTimeToUse = TICK_TIME;
218 }
219
220
221 if (hasValidClientPortInList(i)) {
222 currentClientPort = clientPortList.get(i);
223 } else {
224 tentativePort = selectClientPort(tentativePort);
225 currentClientPort = tentativePort;
226 }
227
228 ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse);
229
230 server.setMinSessionTimeout(configuration.getInt("hbase.zookeeper.property.minSessionTimeout", -1));
231 server.setMaxSessionTimeout(configuration.getInt("hbase.zookeeper.property.maxSessionTimeout", -1));
232 NIOServerCnxnFactory standaloneServerFactory;
233 while (true) {
234 try {
235 standaloneServerFactory = new NIOServerCnxnFactory();
236 standaloneServerFactory.configure(
237 new InetSocketAddress(currentClientPort),
238 configuration.getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, 1000));
239 } catch (BindException e) {
240 LOG.debug("Failed binding ZK Server to client port: " +
241 currentClientPort, e);
242
243 if (hasValidClientPortInList(i)) {
244 return -1;
245 }
246
247 tentativePort = selectClientPort(tentativePort);
248 currentClientPort = tentativePort;
249 continue;
250 }
251 break;
252 }
253
254
255 standaloneServerFactory.startup(server);
256
257 if (!waitForServerUp(currentClientPort, CONNECTION_TIMEOUT)) {
258 throw new IOException("Waiting for startup of standalone server");
259 }
260
261
262 if (clientPortList.size() <= i) {
263 clientPortList.add(currentClientPort);
264 }
265 else if (clientPortList.get(i) <= 0) {
266 clientPortList.remove(i);
267 clientPortList.add(i, currentClientPort);
268 }
269
270 standaloneServerFactoryList.add(standaloneServerFactory);
271 zooKeeperServers.add(server);
272 }
273
274
275 activeZKServerIndex = 0;
276 started = true;
277 int clientPort = clientPortList.get(activeZKServerIndex);
278 LOG.info("Started MiniZooKeeperCluster and ran successful 'stat' " +
279 "on client port=" + clientPort);
280 return clientPort;
281 }
282
283 private void createDir(File dir) throws IOException {
284 try {
285 if (!dir.exists()) {
286 dir.mkdirs();
287 }
288 } catch (SecurityException e) {
289 throw new IOException("creating dir: " + dir, e);
290 }
291 }
292
293
294
295
296 public void shutdown() throws IOException {
297
298 for (int i = 0; i < standaloneServerFactoryList.size(); i++) {
299 NIOServerCnxnFactory standaloneServerFactory =
300 standaloneServerFactoryList.get(i);
301 int clientPort = clientPortList.get(i);
302
303 standaloneServerFactory.shutdown();
304 if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
305 throw new IOException("Waiting for shutdown of standalone server");
306 }
307 }
308 standaloneServerFactoryList.clear();
309
310 for (ZooKeeperServer zkServer: zooKeeperServers) {
311
312 zkServer.getZKDatabase().close();
313 }
314 zooKeeperServers.clear();
315
316
317 if (started) {
318 started = false;
319 activeZKServerIndex = 0;
320 clientPortList.clear();
321 LOG.info("Shutdown MiniZK cluster with all ZK servers");
322 }
323 }
324
325
326
327
328
329
330 public int killCurrentActiveZooKeeperServer() throws IOException,
331 InterruptedException {
332 if (!started || activeZKServerIndex < 0) {
333 return -1;
334 }
335
336
337 NIOServerCnxnFactory standaloneServerFactory =
338 standaloneServerFactoryList.get(activeZKServerIndex);
339 int clientPort = clientPortList.get(activeZKServerIndex);
340
341 standaloneServerFactory.shutdown();
342 if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
343 throw new IOException("Waiting for shutdown of standalone server");
344 }
345
346 zooKeeperServers.get(activeZKServerIndex).getZKDatabase().close();
347
348
349 standaloneServerFactoryList.remove(activeZKServerIndex);
350 clientPortList.remove(activeZKServerIndex);
351 zooKeeperServers.remove(activeZKServerIndex);
352 LOG.info("Kill the current active ZK servers in the cluster " +
353 "on client port: " + clientPort);
354
355 if (standaloneServerFactoryList.size() == 0) {
356
357 return -1;
358 }
359 clientPort = clientPortList.get(activeZKServerIndex);
360 LOG.info("Activate a backup zk server in the cluster " +
361 "on client port: " + clientPort);
362
363 return clientPort;
364 }
365
366
367
368
369
370
371 public void killOneBackupZooKeeperServer() throws IOException,
372 InterruptedException {
373 if (!started || activeZKServerIndex < 0 ||
374 standaloneServerFactoryList.size() <= 1) {
375 return ;
376 }
377
378 int backupZKServerIndex = activeZKServerIndex+1;
379
380 NIOServerCnxnFactory standaloneServerFactory =
381 standaloneServerFactoryList.get(backupZKServerIndex);
382 int clientPort = clientPortList.get(backupZKServerIndex);
383
384 standaloneServerFactory.shutdown();
385 if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
386 throw new IOException("Waiting for shutdown of standalone server");
387 }
388
389 zooKeeperServers.get(backupZKServerIndex).getZKDatabase().close();
390
391
392 standaloneServerFactoryList.remove(backupZKServerIndex);
393 clientPortList.remove(backupZKServerIndex);
394 zooKeeperServers.remove(backupZKServerIndex);
395 LOG.info("Kill one backup ZK servers in the cluster " +
396 "on client port: " + clientPort);
397 }
398
399
400 private static boolean waitForServerDown(int port, long timeout) throws IOException {
401 long start = System.currentTimeMillis();
402 while (true) {
403 try {
404 Socket sock = new Socket("localhost", port);
405 try {
406 OutputStream outstream = sock.getOutputStream();
407 outstream.write("stat".getBytes());
408 outstream.flush();
409 } finally {
410 sock.close();
411 }
412 } catch (IOException e) {
413 return true;
414 }
415
416 if (System.currentTimeMillis() > start + timeout) {
417 break;
418 }
419 try {
420 Thread.sleep(250);
421 } catch (InterruptedException e) {
422 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
423 }
424 }
425 return false;
426 }
427
428
429 private static boolean waitForServerUp(int port, long timeout) throws IOException {
430 long start = System.currentTimeMillis();
431 while (true) {
432 try {
433 Socket sock = new Socket("localhost", port);
434 BufferedReader reader = null;
435 try {
436 OutputStream outstream = sock.getOutputStream();
437 outstream.write("stat".getBytes());
438 outstream.flush();
439
440 Reader isr = new InputStreamReader(sock.getInputStream());
441 reader = new BufferedReader(isr);
442 String line = reader.readLine();
443 if (line != null && line.startsWith("Zookeeper version:")) {
444 return true;
445 }
446 } finally {
447 sock.close();
448 if (reader != null) {
449 reader.close();
450 }
451 }
452 } catch (IOException e) {
453
454 LOG.info("server localhost:" + port + " not up " + e);
455 }
456
457 if (System.currentTimeMillis() > start + timeout) {
458 break;
459 }
460 try {
461 Thread.sleep(250);
462 } catch (InterruptedException e) {
463 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
464 }
465 }
466 return false;
467 }
468
469 public int getClientPort() {
470 return activeZKServerIndex < 0 || activeZKServerIndex >= clientPortList.size() ? -1
471 : clientPortList.get(activeZKServerIndex);
472 }
473 }