1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.hadoop.hbase.wal;
22
23 import java.io.IOException;
24 import java.util.Arrays;
25 import java.io.InterruptedIOException;
26 import java.util.Collections;
27 import java.util.List;
28 import java.util.concurrent.atomic.AtomicReference;
29
30 import com.google.common.annotations.VisibleForTesting;
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.hadoop.hbase.classification.InterfaceAudience;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.fs.FSDataInputStream;
36 import org.apache.hadoop.fs.FileSystem;
37 import org.apache.hadoop.fs.Path;
38 import org.apache.hadoop.hbase.HConstants;
39 import org.apache.hadoop.hbase.wal.WAL.Reader;
40 import org.apache.hadoop.hbase.wal.WALProvider.Writer;
41 import org.apache.hadoop.hbase.util.CancelableProgressable;
42 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
43 import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
44
45
46 import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
47 import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
48 import org.apache.hadoop.hbase.regionserver.wal.SequenceFileLogReader;
49 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69 @InterfaceAudience.Private
70 public class WALFactory {
71
72 private static final Log LOG = LogFactory.getLog(WALFactory.class);
73
74
75
76
77 static enum Providers {
78 defaultProvider(DefaultWALProvider.class),
79 filesystem(DefaultWALProvider.class),
80 multiwal(BoundedRegionGroupingProvider.class);
81
82 Class<? extends WALProvider> clazz;
83 Providers(Class<? extends WALProvider> clazz) {
84 this.clazz = clazz;
85 }
86 }
87
88 public static final String WAL_PROVIDER = "hbase.wal.provider";
89 static final String DEFAULT_WAL_PROVIDER = Providers.defaultProvider.name();
90
91 static final String META_WAL_PROVIDER = "hbase.wal.meta_provider";
92 static final String DEFAULT_META_WAL_PROVIDER = Providers.defaultProvider.name();
93
94 final String factoryId;
95 final WALProvider provider;
96
97
98
99 final AtomicReference<WALProvider> metaProvider = new AtomicReference<WALProvider>();
100
101
102
103
104 private final Class<? extends DefaultWALProvider.Reader> logReaderClass;
105
106
107
108
109 private final int timeoutMillis;
110
111 private final Configuration conf;
112
113
114 private WALFactory(Configuration conf) {
115
116
117
118 timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000);
119
120 logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
121 DefaultWALProvider.Reader.class);
122 this.conf = conf;
123
124
125
126 provider = null;
127 factoryId = SINGLETON_ID;
128 }
129
130
131
132
133
134 WALProvider getProvider(final String key, final String defaultValue,
135 final List<WALActionsListener> listeners, final String providerId) throws IOException {
136 Class<? extends WALProvider> clazz;
137 try {
138 clazz = Providers.valueOf(conf.get(key, defaultValue)).clazz;
139 } catch (IllegalArgumentException exception) {
140
141
142
143 clazz = conf.getClass(key, DefaultWALProvider.class, WALProvider.class);
144 }
145 LOG.info("Instantiating WALProvider of type " + clazz);
146 try {
147 final WALProvider result = clazz.newInstance();
148 result.init(this, conf, listeners, providerId);
149 return result;
150 } catch (InstantiationException exception) {
151 LOG.error("couldn't set up WALProvider, check config key " + key);
152 LOG.debug("Exception details for failure to load WALProvider.", exception);
153 throw new IOException("couldn't set up WALProvider", exception);
154 } catch (IllegalAccessException exception) {
155 LOG.error("couldn't set up WALProvider, check config key " + key);
156 LOG.debug("Exception details for failure to load WALProvider.", exception);
157 throw new IOException("couldn't set up WALProvider", exception);
158 }
159 }
160
161
162
163
164
165
166
167
168 public WALFactory(final Configuration conf, final List<WALActionsListener> listeners,
169 final String factoryId) throws IOException {
170
171
172 timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000);
173
174 logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
175 DefaultWALProvider.Reader.class);
176 this.conf = conf;
177 this.factoryId = factoryId;
178
179 if (conf.getBoolean("hbase.regionserver.hlog.enabled", true)) {
180 provider = getProvider(WAL_PROVIDER, DEFAULT_WAL_PROVIDER, listeners, null);
181 } else {
182
183 LOG.warn("Running with WAL disabled.");
184 provider = new DisabledWALProvider();
185 provider.init(this, conf, null, factoryId);
186 }
187 }
188
189
190
191
192
193
194 public void close() throws IOException {
195 final WALProvider metaProvider = this.metaProvider.get();
196 if (null != metaProvider) {
197 metaProvider.close();
198 }
199
200
201 if (null != provider) {
202 provider.close();
203 }
204 }
205
206
207
208
209
210
211 public void shutdown() throws IOException {
212 IOException exception = null;
213 final WALProvider metaProvider = this.metaProvider.get();
214 if (null != metaProvider) {
215 try {
216 metaProvider.shutdown();
217 } catch(IOException ioe) {
218 exception = ioe;
219 }
220 }
221 provider.shutdown();
222 if (null != exception) {
223 throw exception;
224 }
225 }
226
227
228
229
230 public WAL getWAL(final byte[] identifier) throws IOException {
231 return provider.getWAL(identifier);
232 }
233
234
235
236
237 public WAL getMetaWAL(final byte[] identifier) throws IOException {
238 WALProvider metaProvider = this.metaProvider.get();
239 if (null == metaProvider) {
240 final WALProvider temp = getProvider(META_WAL_PROVIDER, DEFAULT_META_WAL_PROVIDER,
241 Collections.<WALActionsListener>singletonList(new MetricsWAL()),
242 DefaultWALProvider.META_WAL_PROVIDER_ID);
243 if (this.metaProvider.compareAndSet(null, temp)) {
244 metaProvider = temp;
245 } else {
246
247 temp.close();
248 metaProvider = this.metaProvider.get();
249 }
250 }
251 return metaProvider.getWAL(identifier);
252 }
253
254 public Reader createReader(final FileSystem fs, final Path path) throws IOException {
255 return createReader(fs, path, (CancelableProgressable)null);
256 }
257
258
259
260
261
262
263
264
265 public Reader createReader(final FileSystem fs, final Path path,
266 CancelableProgressable reporter) throws IOException {
267 return createReader(fs, path, reporter, true);
268 }
269
270 public Reader createReader(final FileSystem fs, final Path path,
271 CancelableProgressable reporter, boolean allowCustom)
272 throws IOException {
273 Class<? extends DefaultWALProvider.Reader> lrClass =
274 allowCustom ? logReaderClass : ProtobufLogReader.class;
275
276 try {
277
278
279
280 long startWaiting = EnvironmentEdgeManager.currentTime();
281 long openTimeout = timeoutMillis + startWaiting;
282 int nbAttempt = 0;
283 FSDataInputStream stream = null;
284 DefaultWALProvider.Reader reader = null;
285 while (true) {
286 try {
287 if (lrClass != ProtobufLogReader.class) {
288
289 reader = lrClass.newInstance();
290 reader.init(fs, path, conf, null);
291 return reader;
292 } else {
293 stream = fs.open(path);
294
295
296
297
298 byte[] magic = new byte[ProtobufLogReader.PB_WAL_MAGIC.length];
299 boolean isPbWal = (stream.read(magic) == magic.length)
300 && Arrays.equals(magic, ProtobufLogReader.PB_WAL_MAGIC);
301 reader =
302 isPbWal ? new ProtobufLogReader() : new SequenceFileLogReader();
303 reader.init(fs, path, conf, stream);
304 return reader;
305 }
306 } catch (IOException e) {
307 if (stream != null) {
308 try {
309 stream.close();
310 } catch (IOException exception) {
311 LOG.warn("Could not close DefaultWALProvider.Reader" + exception.getMessage());
312 LOG.debug("exception details", exception);
313 }
314 }
315 if (reader != null) {
316 try {
317 reader.close();
318 } catch (IOException exception) {
319 LOG.warn("Could not close FSDataInputStream" + exception.getMessage());
320 LOG.debug("exception details", exception);
321 }
322 }
323 String msg = e.getMessage();
324 if (msg != null && (msg.contains("Cannot obtain block length")
325 || msg.contains("Could not obtain the last block")
326 || msg.matches("Blocklist for [^ ]* has changed.*"))) {
327 if (++nbAttempt == 1) {
328 LOG.warn("Lease should have recovered. This is not expected. Will retry", e);
329 }
330 if (reporter != null && !reporter.progress()) {
331 throw new InterruptedIOException("Operation is cancelled");
332 }
333 if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTime()) {
334 LOG.error("Can't open after " + nbAttempt + " attempts and "
335 + (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms " + " for " + path);
336 } else {
337 try {
338 Thread.sleep(nbAttempt < 3 ? 500 : 1000);
339 continue;
340 } catch (InterruptedException ie) {
341 InterruptedIOException iioe = new InterruptedIOException();
342 iioe.initCause(ie);
343 throw iioe;
344 }
345 }
346 throw new LeaseNotRecoveredException(e);
347 } else {
348 throw e;
349 }
350 }
351 }
352 } catch (IOException ie) {
353 throw ie;
354 } catch (Exception e) {
355 throw new IOException("Cannot get log reader", e);
356 }
357 }
358
359
360
361
362
363
364
365
366 public Writer createWALWriter(final FileSystem fs, final Path path) throws IOException {
367 return DefaultWALProvider.createWriter(conf, fs, path, false);
368 }
369
370
371
372
373
374 @VisibleForTesting
375 public Writer createRecoveredEditsWriter(final FileSystem fs, final Path path)
376 throws IOException {
377 return DefaultWALProvider.createWriter(conf, fs, path, true);
378 }
379
380
381
382
383
384 private static final AtomicReference<WALFactory> singleton = new AtomicReference<WALFactory>();
385 private static final String SINGLETON_ID = WALFactory.class.getName();
386
387
388 public static WALFactory getInstance(Configuration configuration) {
389 WALFactory factory = singleton.get();
390 if (null == factory) {
391 WALFactory temp = new WALFactory(configuration);
392 if (singleton.compareAndSet(null, temp)) {
393 factory = temp;
394 } else {
395
396 try {
397 temp.close();
398 } catch (IOException exception) {
399 LOG.debug("failed to close temporary singleton. ignoring.", exception);
400 }
401 factory = singleton.get();
402 }
403 }
404 return factory;
405 }
406
407
408
409
410
411
412 public static Reader createReader(final FileSystem fs, final Path path,
413 final Configuration configuration) throws IOException {
414 return getInstance(configuration).createReader(fs, path);
415 }
416
417
418
419
420
421
422 static Reader createReader(final FileSystem fs, final Path path,
423 final Configuration configuration, final CancelableProgressable reporter) throws IOException {
424 return getInstance(configuration).createReader(fs, path, reporter);
425 }
426
427
428
429
430
431
432
433 public static Reader createReaderIgnoreCustomClass(final FileSystem fs, final Path path,
434 final Configuration configuration) throws IOException {
435 return getInstance(configuration).createReader(fs, path, null, false);
436 }
437
438
439
440
441
442 static Writer createRecoveredEditsWriter(final FileSystem fs, final Path path,
443 final Configuration configuration)
444 throws IOException {
445 return DefaultWALProvider.createWriter(configuration, fs, path, true);
446 }
447
448
449
450
451
452 @VisibleForTesting
453 public static Writer createWALWriter(final FileSystem fs, final Path path,
454 final Configuration configuration)
455 throws IOException {
456 return DefaultWALProvider.createWriter(configuration, fs, path, false);
457 }
458 }