1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.util;
20
21 import java.io.FileNotFoundException;
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.lang.reflect.Method;
25 import java.net.InetSocketAddress;
26 import java.net.URI;
27 import java.util.HashSet;
28 import java.util.Map;
29 import java.util.Set;
30 import java.util.Collection;
31
32 import com.google.common.collect.Sets;
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.hbase.classification.InterfaceAudience;
36 import org.apache.hadoop.hbase.classification.InterfaceStability;
37 import org.apache.hadoop.conf.Configuration;
38 import org.apache.hadoop.fs.FileSystem;
39 import org.apache.hadoop.fs.Path;
40 import org.apache.hadoop.hdfs.DistributedFileSystem;
41 import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
42
43
44
45
46
47 @InterfaceAudience.Private
48 @InterfaceStability.Evolving
49 public class FSHDFSUtils extends FSUtils {
50 private static final Log LOG = LogFactory.getLog(FSHDFSUtils.class);
51 private static Class dfsUtilClazz;
52 private static Method getNNAddressesMethod;
53
54
55
56
57
58
59 private static Set<InetSocketAddress> getNNAddresses(DistributedFileSystem fs,
60 Configuration conf) {
61 Set<InetSocketAddress> addresses = new HashSet<InetSocketAddress>();
62 String serviceName = fs.getCanonicalServiceName();
63
64 if (serviceName.startsWith("ha-hdfs")) {
65 try {
66 if (dfsUtilClazz == null) {
67 dfsUtilClazz = Class.forName("org.apache.hadoop.hdfs.DFSUtil");
68 }
69 if (getNNAddressesMethod == null) {
70 try {
71
72
73 getNNAddressesMethod =
74 dfsUtilClazz.getMethod("getNNServiceRpcAddressesForCluster", Configuration.class);
75 } catch (NoSuchMethodException e) {
76
77 getNNAddressesMethod =
78 dfsUtilClazz.getMethod("getNNServiceRpcAddresses", Configuration.class);
79 }
80
81 }
82
83 Map<String, Map<String, InetSocketAddress>> addressMap =
84 (Map<String, Map<String, InetSocketAddress>>) getNNAddressesMethod
85 .invoke(null, conf);
86 for (Map.Entry<String, Map<String, InetSocketAddress>> entry : addressMap.entrySet()) {
87 Map<String, InetSocketAddress> nnMap = entry.getValue();
88 for (Map.Entry<String, InetSocketAddress> e2 : nnMap.entrySet()) {
89 InetSocketAddress addr = e2.getValue();
90 addresses.add(addr);
91 }
92 }
93 } catch (Exception e) {
94 LOG.warn("DFSUtil.getNNServiceRpcAddresses failed. serviceName=" + serviceName, e);
95 }
96 } else {
97 URI uri = fs.getUri();
98 int port = uri.getPort();
99 if (port < 0) {
100 int idx = serviceName.indexOf(':');
101 port = Integer.parseInt(serviceName.substring(idx+1));
102 }
103 InetSocketAddress addr = new InetSocketAddress(uri.getHost(), port);
104 addresses.add(addr);
105 }
106
107 return addresses;
108 }
109
110
111
112
113
114
115
116 public static boolean isSameHdfs(Configuration conf, FileSystem srcFs, FileSystem desFs) {
117
118
119 String srcServiceName = srcFs.getCanonicalServiceName();
120 String desServiceName = desFs.getCanonicalServiceName();
121
122 if (srcServiceName == null || desServiceName == null) {
123 return false;
124 }
125 if (srcServiceName.equals(desServiceName)) {
126 return true;
127 }
128 if (srcServiceName.startsWith("ha-hdfs") && desServiceName.startsWith("ha-hdfs")) {
129 Collection<String> internalNameServices =
130 conf.getTrimmedStringCollection("dfs.internal.nameservices");
131 if (!internalNameServices.isEmpty()) {
132 if (internalNameServices.contains(srcServiceName.split(":")[1])) {
133 return true;
134 } else {
135 return false;
136 }
137 }
138 }
139 if (srcFs instanceof DistributedFileSystem && desFs instanceof DistributedFileSystem) {
140
141
142
143 Set<InetSocketAddress> srcAddrs = getNNAddresses((DistributedFileSystem) srcFs, conf);
144 Set<InetSocketAddress> desAddrs = getNNAddresses((DistributedFileSystem) desFs, conf);
145 if (Sets.intersection(srcAddrs, desAddrs).size() > 0) {
146 return true;
147 }
148 }
149
150 return false;
151 }
152
153
154
155
156 @Override
157 public void recoverFileLease(final FileSystem fs, final Path p,
158 Configuration conf, CancelableProgressable reporter)
159 throws IOException {
160
161 if (!(fs instanceof DistributedFileSystem)) return;
162 recoverDFSFileLease((DistributedFileSystem)fs, p, conf, reporter);
163 }
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191 boolean recoverDFSFileLease(final DistributedFileSystem dfs, final Path p,
192 final Configuration conf, final CancelableProgressable reporter)
193 throws IOException {
194 LOG.info("Recover lease on dfs file " + p);
195 long startWaiting = EnvironmentEdgeManager.currentTime();
196
197
198
199 long recoveryTimeout = conf.getInt("hbase.lease.recovery.timeout", 900000) + startWaiting;
200
201 long firstPause = conf.getInt("hbase.lease.recovery.first.pause", 4000);
202
203
204
205
206
207 long subsequentPauseBase = conf.getLong("hbase.lease.recovery.dfs.timeout", 61 * 1000);
208
209 Method isFileClosedMeth = null;
210
211 boolean findIsFileClosedMeth = true;
212 boolean recovered = false;
213
214 for (int nbAttempt = 0; !recovered; nbAttempt++) {
215 recovered = recoverLease(dfs, nbAttempt, p, startWaiting);
216 if (recovered) break;
217 checkIfCancelled(reporter);
218 if (checkIfTimedout(conf, recoveryTimeout, nbAttempt, p, startWaiting)) break;
219 try {
220
221 if (nbAttempt == 0) {
222 Thread.sleep(firstPause);
223 } else {
224
225
226 long localStartWaiting = EnvironmentEdgeManager.currentTime();
227 while ((EnvironmentEdgeManager.currentTime() - localStartWaiting) <
228 subsequentPauseBase * nbAttempt) {
229 Thread.sleep(conf.getInt("hbase.lease.recovery.pause", 1000));
230 if (findIsFileClosedMeth) {
231 try {
232 isFileClosedMeth = dfs.getClass().getMethod("isFileClosed",
233 new Class[]{ Path.class });
234 } catch (NoSuchMethodException nsme) {
235 LOG.debug("isFileClosed not available");
236 } finally {
237 findIsFileClosedMeth = false;
238 }
239 }
240 if (isFileClosedMeth != null && isFileClosed(dfs, isFileClosedMeth, p)) {
241 recovered = true;
242 break;
243 }
244 checkIfCancelled(reporter);
245 }
246 }
247 } catch (InterruptedException ie) {
248 InterruptedIOException iioe = new InterruptedIOException();
249 iioe.initCause(ie);
250 throw iioe;
251 }
252 }
253 return recovered;
254 }
255
256 boolean checkIfTimedout(final Configuration conf, final long recoveryTimeout,
257 final int nbAttempt, final Path p, final long startWaiting) {
258 if (recoveryTimeout < EnvironmentEdgeManager.currentTime()) {
259 LOG.warn("Cannot recoverLease after trying for " +
260 conf.getInt("hbase.lease.recovery.timeout", 900000) +
261 "ms (hbase.lease.recovery.timeout); continuing, but may be DATALOSS!!!; " +
262 getLogMessageDetail(nbAttempt, p, startWaiting));
263 return true;
264 }
265 return false;
266 }
267
268
269
270
271
272
273
274
275
276
277 boolean recoverLease(final DistributedFileSystem dfs, final int nbAttempt, final Path p,
278 final long startWaiting)
279 throws FileNotFoundException {
280 boolean recovered = false;
281 try {
282 recovered = dfs.recoverLease(p);
283 LOG.info((recovered? "Recovered lease, ": "Failed to recover lease, ") +
284 getLogMessageDetail(nbAttempt, p, startWaiting));
285 } catch (IOException e) {
286 if (e instanceof LeaseExpiredException && e.getMessage().contains("File does not exist")) {
287
288 throw new FileNotFoundException("The given WAL wasn't found at " + p);
289 } else if (e instanceof FileNotFoundException) {
290 throw (FileNotFoundException)e;
291 }
292 LOG.warn(getLogMessageDetail(nbAttempt, p, startWaiting), e);
293 }
294 return recovered;
295 }
296
297
298
299
300
301
302
303 private String getLogMessageDetail(final int nbAttempt, final Path p, final long startWaiting) {
304 return "attempt=" + nbAttempt + " on file=" + p + " after " +
305 (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms";
306 }
307
308
309
310
311
312
313
314
315 private boolean isFileClosed(final DistributedFileSystem dfs, final Method m, final Path p) {
316 try {
317 return (Boolean) m.invoke(dfs, p);
318 } catch (SecurityException e) {
319 LOG.warn("No access", e);
320 } catch (Exception e) {
321 LOG.warn("Failed invocation for " + p.toString(), e);
322 }
323 return false;
324 }
325
326 void checkIfCancelled(final CancelableProgressable reporter)
327 throws InterruptedIOException {
328 if (reporter == null) return;
329 if (!reporter.progress()) throw new InterruptedIOException("Operation cancelled");
330 }
331 }