1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.lang.reflect.Field;
23 import java.util.HashMap;
24 import java.util.Map;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.hbase.classification.InterfaceAudience;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.fs.FileSystem;
31 import org.apache.hadoop.hbase.HBaseConfiguration;
32 import org.apache.hadoop.hbase.Stoppable;
33 import org.apache.hadoop.hbase.util.ShutdownHookManager;
34 import org.apache.hadoop.hbase.util.Threads;
35
36
37
38
39
40 @InterfaceAudience.Private
41 public class ShutdownHook {
42 private static final Log LOG = LogFactory.getLog(ShutdownHook.class);
43 private static final String CLIENT_FINALIZER_DATA_METHOD = "clientFinalizer";
44
45
46
47
48 public static final String RUN_SHUTDOWN_HOOK = "hbase.shutdown.hook";
49
50
51
52
53
54 public static final String FS_SHUTDOWN_HOOK_WAIT = "hbase.fs.shutdown.hook.wait";
55
56
57
58
59
60
61 private final static Map<Runnable, Integer> fsShutdownHooks = new HashMap<Runnable, Integer>();
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82 public static void install(final Configuration conf, final FileSystem fs,
83 final Stoppable stop, final Thread threadToJoin) {
84 Runnable fsShutdownHook = suppressHdfsShutdownHook(fs);
85 Thread t = new ShutdownHookThread(conf, stop, threadToJoin, fsShutdownHook);
86 ShutdownHookManager.affixShutdownHook(t, 0);
87 LOG.debug("Installed shutdown hook thread: " + t.getName());
88 }
89
90
91
92
93 private static class ShutdownHookThread extends Thread {
94 private final Stoppable stop;
95 private final Thread threadToJoin;
96 private final Runnable fsShutdownHook;
97 private final Configuration conf;
98
99 ShutdownHookThread(final Configuration conf, final Stoppable stop,
100 final Thread threadToJoin, final Runnable fsShutdownHook) {
101 super("Shutdownhook:" + threadToJoin.getName());
102 this.stop = stop;
103 this.threadToJoin = threadToJoin;
104 this.conf = conf;
105 this.fsShutdownHook = fsShutdownHook;
106 }
107
108 @Override
109 public void run() {
110 boolean b = this.conf.getBoolean(RUN_SHUTDOWN_HOOK, true);
111 LOG.info("Shutdown hook starting; " + RUN_SHUTDOWN_HOOK + "=" + b +
112 "; fsShutdownHook=" + this.fsShutdownHook);
113 if (b) {
114 this.stop.stop("Shutdown hook");
115 Threads.shutdown(this.threadToJoin);
116 if (this.fsShutdownHook != null) {
117 synchronized (fsShutdownHooks) {
118 int refs = fsShutdownHooks.get(fsShutdownHook);
119 if (refs == 1) {
120 LOG.info("Starting fs shutdown hook thread.");
121 Thread fsShutdownHookThread = (fsShutdownHook instanceof Thread) ?
122 (Thread)fsShutdownHook : new Thread(fsShutdownHook,
123 fsShutdownHook.getClass().getSimpleName() + "-shutdown-hook");
124 fsShutdownHookThread.start();
125 Threads.shutdown(fsShutdownHookThread,
126 this.conf.getLong(FS_SHUTDOWN_HOOK_WAIT, 30000));
127 }
128 if (refs > 0) {
129 fsShutdownHooks.put(fsShutdownHook, refs - 1);
130 }
131 }
132 }
133 }
134 LOG.info("Shutdown hook finished.");
135 }
136 }
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155 private static Runnable suppressHdfsShutdownHook(final FileSystem fs) {
156 try {
157
158
159
160
161
162
163
164 Runnable hdfsClientFinalizer = null;
165
166 Class<?> [] classes = FileSystem.class.getDeclaredClasses();
167 Class<?> cache = null;
168 for (Class<?> c: classes) {
169 if (c.getSimpleName().equals("Cache")) {
170 cache = c;
171 break;
172 }
173 }
174
175 if (cache == null) {
176 throw new RuntimeException(
177 "This should not happen. Could not find the cache class in FileSystem.");
178 }
179
180 Field field = null;
181 try {
182 field = cache.getDeclaredField(CLIENT_FINALIZER_DATA_METHOD);
183 } catch (NoSuchFieldException e) {
184
185
186 }
187 if (field != null) {
188 field.setAccessible(true);
189 Field cacheField = FileSystem.class.getDeclaredField("CACHE");
190 cacheField.setAccessible(true);
191 Object cacheInstance = cacheField.get(fs);
192 hdfsClientFinalizer = (Runnable)field.get(cacheInstance);
193 } else {
194
195 field = FileSystem.class.getDeclaredField(CLIENT_FINALIZER_DATA_METHOD);
196 field.setAccessible(true);
197 hdfsClientFinalizer = (Runnable)field.get(null);
198 }
199 if (hdfsClientFinalizer == null) {
200 throw new RuntimeException("Client finalizer is null, can't suppress!");
201 }
202 synchronized (fsShutdownHooks) {
203 boolean isFSCacheDisabled = fs.getConf().getBoolean("fs.hdfs.impl.disable.cache", false);
204 if (!isFSCacheDisabled && !fsShutdownHooks.containsKey(hdfsClientFinalizer)
205 && !ShutdownHookManager.deleteShutdownHook(hdfsClientFinalizer)) {
206 throw new RuntimeException(
207 "Failed suppression of fs shutdown hook: " + hdfsClientFinalizer);
208 }
209 Integer refs = fsShutdownHooks.get(hdfsClientFinalizer);
210 fsShutdownHooks.put(hdfsClientFinalizer, refs == null ? 1 : refs + 1);
211 }
212 return hdfsClientFinalizer;
213 } catch (NoSuchFieldException nsfe) {
214 LOG.fatal("Couldn't find field 'clientFinalizer' in FileSystem!", nsfe);
215 throw new RuntimeException("Failed to suppress HDFS shutdown hook");
216 } catch (IllegalAccessException iae) {
217 LOG.fatal("Couldn't access field 'clientFinalizer' in FileSystem!", iae);
218 throw new RuntimeException("Failed to suppress HDFS shutdown hook");
219 }
220 }
221
222
223 static class DoNothingThread extends Thread {
224 DoNothingThread() {
225 super("donothing");
226 }
227 @Override
228 public void run() {
229 super.run();
230 }
231 }
232
233
234 static class DoNothingStoppable implements Stoppable {
235 @Override
236 public boolean isStopped() {
237
238 return false;
239 }
240
241 @Override
242 public void stop(String why) {
243
244 }
245 }
246
247
248
249
250
251
252
253
254
255
256 public static void main(final String [] args) throws IOException {
257 Configuration conf = HBaseConfiguration.create();
258 String prop = System.getProperty(RUN_SHUTDOWN_HOOK);
259 if (prop != null) {
260 conf.setBoolean(RUN_SHUTDOWN_HOOK, Boolean.parseBoolean(prop));
261 }
262
263 FileSystem fs = FileSystem.get(conf);
264 Thread donothing = new DoNothingThread();
265 donothing.start();
266 ShutdownHook.install(conf, fs, new DoNothingStoppable(), donothing);
267 }
268 }