1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.util;
20
21 import java.io.InterruptedIOException;
22 import java.io.IOException;
23 import java.lang.reflect.Constructor;
24 import java.lang.reflect.InvocationTargetException;
25 import java.util.List;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.hbase.CoordinatedStateManager;
32 import org.apache.hadoop.hbase.master.HMaster;
33 import org.apache.hadoop.hbase.regionserver.HRegionServer;
34
35
36
37
38 @InterfaceAudience.Private
39 public class JVMClusterUtil {
40 private static final Log LOG = LogFactory.getLog(JVMClusterUtil.class);
41
42
43
44
45 public static class RegionServerThread extends Thread {
46 private final HRegionServer regionServer;
47
48 public RegionServerThread(final HRegionServer r, final int index) {
49 super(r, "RS:" + index + ";" + r.getServerName().toShortString());
50 this.regionServer = r;
51 }
52
53
54 public HRegionServer getRegionServer() {
55 return this.regionServer;
56 }
57
58
59
60
61
62 public void waitForServerOnline() {
63
64
65
66
67 regionServer.waitForServerOnline();
68 }
69 }
70
71
72
73
74
75
76
77
78
79
80
81 public static JVMClusterUtil.RegionServerThread createRegionServerThread(
82 final Configuration c, CoordinatedStateManager cp, final Class<? extends HRegionServer> hrsc,
83 final int index)
84 throws IOException {
85 HRegionServer server;
86 try {
87
88 Constructor<? extends HRegionServer> ctor = hrsc.getConstructor(Configuration.class,
89 CoordinatedStateManager.class);
90 ctor.setAccessible(true);
91 server = ctor.newInstance(c, cp);
92 } catch (InvocationTargetException ite) {
93 Throwable target = ite.getTargetException();
94 throw new RuntimeException("Failed construction of RegionServer: " +
95 hrsc.toString() + ((target.getCause() != null)?
96 target.getCause().getMessage(): ""), target);
97 } catch (Exception e) {
98 IOException ioe = new IOException();
99 ioe.initCause(e);
100 throw ioe;
101 }
102 return new JVMClusterUtil.RegionServerThread(server, index);
103 }
104
105
106
107
108
109 public static class MasterThread extends Thread {
110 private final HMaster master;
111
112 public MasterThread(final HMaster m, final int index) {
113 super(m, "M:" + index + ";" + m.getServerName().toShortString());
114 this.master = m;
115 }
116
117
118 public HMaster getMaster() {
119 return this.master;
120 }
121 }
122
123
124
125
126
127
128
129
130
131
132
133 public static JVMClusterUtil.MasterThread createMasterThread(
134 final Configuration c, CoordinatedStateManager cp, final Class<? extends HMaster> hmc,
135 final int index)
136 throws IOException {
137 HMaster server;
138 try {
139 server = hmc.getConstructor(Configuration.class, CoordinatedStateManager.class).
140 newInstance(c, cp);
141 } catch (InvocationTargetException ite) {
142 Throwable target = ite.getTargetException();
143 throw new RuntimeException("Failed construction of Master: " +
144 hmc.toString() + ((target.getCause() != null)?
145 target.getCause().getMessage(): ""), target);
146 } catch (Exception e) {
147 IOException ioe = new IOException();
148 ioe.initCause(e);
149 throw ioe;
150 }
151 return new JVMClusterUtil.MasterThread(server, index);
152 }
153
154 private static JVMClusterUtil.MasterThread findActiveMaster(
155 List<JVMClusterUtil.MasterThread> masters) {
156 for (JVMClusterUtil.MasterThread t : masters) {
157 if (t.master.isActiveMaster()) {
158 return t;
159 }
160 }
161
162 return null;
163 }
164
165
166
167
168
169
170
171
172 public static String startup(final List<JVMClusterUtil.MasterThread> masters,
173 final List<JVMClusterUtil.RegionServerThread> regionservers) throws IOException {
174
175 if (masters == null || masters.isEmpty()) {
176 return null;
177 }
178
179 for (JVMClusterUtil.MasterThread t : masters) {
180 t.start();
181 }
182
183
184
185
186 long startTime = System.currentTimeMillis();
187 while (findActiveMaster(masters) == null) {
188 try {
189 Thread.sleep(100);
190 } catch (InterruptedException e) {
191 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
192 }
193 if (System.currentTimeMillis() > startTime + 30000) {
194 throw new RuntimeException("Master not active after 30 seconds");
195 }
196 }
197
198 if (regionservers != null) {
199 for (JVMClusterUtil.RegionServerThread t: regionservers) {
200 t.start();
201 }
202 }
203
204
205
206 startTime = System.currentTimeMillis();
207 final int maxwait = 200000;
208 while (true) {
209 JVMClusterUtil.MasterThread t = findActiveMaster(masters);
210 if (t != null && t.master.isInitialized()) {
211 return t.master.getServerName().toString();
212 }
213
214 if (System.currentTimeMillis() > startTime + 10000) {
215 try {
216 Thread.sleep(1000);
217 } catch (InterruptedException e) {
218 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
219 }
220 }
221 if (System.currentTimeMillis() > startTime + maxwait) {
222 String msg = "Master not initialized after " + maxwait + "ms seconds";
223 Threads.printThreadInfo(System.out,
224 "Thread dump because: " + msg);
225 throw new RuntimeException(msg);
226 }
227 try {
228 Thread.sleep(100);
229 } catch (InterruptedException e) {
230 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
231 }
232 }
233 }
234
235
236
237
238
239 public static void shutdown(final List<MasterThread> masters,
240 final List<RegionServerThread> regionservers) {
241 LOG.debug("Shutting down HBase Cluster");
242 if (masters != null) {
243
244 JVMClusterUtil.MasterThread activeMaster = null;
245 for (JVMClusterUtil.MasterThread t : masters) {
246 if (!t.master.isActiveMaster()) {
247 try {
248 t.master.stopMaster();
249 } catch (IOException e) {
250 LOG.error("Exception occurred while stopping master", e);
251 }
252 } else {
253 activeMaster = t;
254 }
255 }
256
257 if (activeMaster != null) {
258 try {
259 activeMaster.master.shutdown();
260 } catch (IOException e) {
261 LOG.error("Exception occurred in HMaster.shutdown()", e);
262 }
263 }
264
265 }
266 boolean wasInterrupted = false;
267 final long maxTime = System.currentTimeMillis() + 30 * 1000;
268 if (regionservers != null) {
269
270 for (RegionServerThread t : regionservers) {
271 t.getRegionServer().stop("Shutdown requested");
272 }
273 for (RegionServerThread t : regionservers) {
274 long now = System.currentTimeMillis();
275 if (t.isAlive() && !wasInterrupted && now < maxTime) {
276 try {
277 t.join(maxTime - now);
278 } catch (InterruptedException e) {
279 LOG.info("Got InterruptedException on shutdown - " +
280 "not waiting anymore on region server ends", e);
281 wasInterrupted = true;
282 }
283 }
284 }
285
286
287 for (int i = 0; i < 100; ++i) {
288 boolean atLeastOneLiveServer = false;
289 for (RegionServerThread t : regionservers) {
290 if (t.isAlive()) {
291 atLeastOneLiveServer = true;
292 try {
293 LOG.warn("RegionServerThreads remaining, give one more chance before interrupting");
294 t.join(1000);
295 } catch (InterruptedException e) {
296 wasInterrupted = true;
297 }
298 }
299 }
300 if (!atLeastOneLiveServer) break;
301 for (RegionServerThread t : regionservers) {
302 if (t.isAlive()) {
303 LOG.warn("RegionServerThreads taking too long to stop, interrupting");
304 t.interrupt();
305 }
306 }
307 }
308 }
309
310 if (masters != null) {
311 for (JVMClusterUtil.MasterThread t : masters) {
312 while (t.master.isAlive() && !wasInterrupted) {
313 try {
314
315
316
317 Threads.threadDumpingIsAlive(t.master.getThread());
318 } catch(InterruptedException e) {
319 LOG.info("Got InterruptedException on shutdown - " +
320 "not waiting anymore on master ends", e);
321 wasInterrupted = true;
322 }
323 }
324 }
325 }
326 LOG.info("Shutdown of " +
327 ((masters != null) ? masters.size() : "0") + " master(s) and " +
328 ((regionservers != null) ? regionservers.size() : "0") +
329 " regionserver(s) " + (wasInterrupted ? "interrupted" : "complete"));
330
331 if (wasInterrupted){
332 Thread.currentThread().interrupt();
333 }
334 }
335 }