View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  
21  package org.apache.hadoop.hbase.regionserver.wal;
22  
23  import java.io.IOException;
24  import java.util.Arrays;
25  import java.io.InterruptedIOException;
26  import java.util.List;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.fs.FSDataInputStream;
32  import org.apache.hadoop.fs.FileSystem;
33  import org.apache.hadoop.fs.Path;
34  import org.apache.hadoop.hbase.HConstants;
35  import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
36  import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer;
37  import org.apache.hadoop.hbase.util.CancelableProgressable;
38  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
39  
40  public class HLogFactory {
41      private static final Log LOG = LogFactory.getLog(HLogFactory.class);
42  
43      public static HLog createHLog(final FileSystem fs, final Path root, final String logName,
44          final Configuration conf) throws IOException {
45        return new FSHLog(fs, root, logName, conf);
46      }
47      
48      public static HLog createHLog(final FileSystem fs, final Path root, final String logName,
49          final String oldLogName, final Configuration conf) throws IOException {
50        return new FSHLog(fs, root, logName, oldLogName, conf, null, true, null, false);
51  }
52      
53      public static HLog createHLog(final FileSystem fs, final Path root, final String logName,
54          final Configuration conf, final List<WALActionsListener> listeners,
55          final String prefix) throws IOException {
56        return new FSHLog(fs, root, logName, HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners,
57          true, prefix, false);
58      }
59  
60      public static HLog createMetaHLog(final FileSystem fs, final Path root, final String logName,
61          final Configuration conf, final List<WALActionsListener> listeners,
62          final String prefix) throws IOException {
63        return new FSHLog(fs, root, logName, HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners,
64          false, prefix, true);
65      }
66  
67      /*
68       * WAL Reader
69       */
70      private static Class<? extends Reader> logReaderClass;
71  
72      static void resetLogReaderClass() {
73        logReaderClass = null;
74      }
75  
76      public static HLog.Reader createReader(final FileSystem fs,
77          final Path path, Configuration conf) throws IOException {
78        return createReader(fs, path, conf, null);
79      }
80  
81      /**
82       * Create a reader for the WAL. If you are reading from a file that's being written to
83       * and need to reopen it multiple times, use {@link HLog.Reader#reset()} instead of this method
84       * then just seek back to the last known good position.
85       * @return A WAL reader.  Close when done with it.
86       * @throws IOException
87       */
88      public static HLog.Reader createReader(final FileSystem fs, final Path path,
89          Configuration conf, CancelableProgressable reporter) throws IOException {
90        return createReader(fs, path, conf, reporter, true);
91      }
92  
93      public static HLog.Reader createReader(final FileSystem fs, final Path path,
94        Configuration conf, CancelableProgressable reporter, boolean allowCustom)
95          throws IOException {
96        if (allowCustom && (logReaderClass == null)) {
97          logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
98            ProtobufLogReader.class, Reader.class);
99        }
100       Class<? extends Reader> lrClass = allowCustom ? logReaderClass : ProtobufLogReader.class;
101 
102       try {
103         // A hlog file could be under recovery, so it may take several
104         // tries to get it open. Instead of claiming it is corrupted, retry
105         // to open it up to 5 minutes by default.
106         long startWaiting = EnvironmentEdgeManager.currentTimeMillis();
107         long openTimeout = conf.getInt("hbase.hlog.open.timeout", 300000) + startWaiting;
108         int nbAttempt = 0;
109         while (true) {
110           try {
111             if (lrClass != ProtobufLogReader.class) {
112               // User is overriding the WAL reader, let them.
113               HLog.Reader reader = lrClass.newInstance();
114               reader.init(fs, path, conf, null);
115               return reader;
116             } else {
117               FSDataInputStream stream = fs.open(path);
118               // Note that zero-length file will fail to read PB magic, and attempt to create
119               // a non-PB reader and fail the same way existing code expects it to. If we get
120               // rid of the old reader entirely, we need to handle 0-size files differently from
121               // merely non-PB files.
122               byte[] magic = new byte[ProtobufLogReader.PB_WAL_MAGIC.length];
123               boolean isPbWal = (stream.read(magic) == magic.length)
124                   && Arrays.equals(magic, ProtobufLogReader.PB_WAL_MAGIC);
125               HLog.Reader reader =
126                   isPbWal ? new ProtobufLogReader() : new SequenceFileLogReader();
127               reader.init(fs, path, conf, stream);
128               return reader;
129             }
130           } catch (IOException e) {
131             String msg = e.getMessage();
132             if (msg != null && (msg.contains("Cannot obtain block length")
133                 || msg.contains("Could not obtain the last block")
134                 || msg.matches("Blocklist for [^ ]* has changed.*"))) {
135               if (++nbAttempt == 1) {
136                 LOG.warn("Lease should have recovered. This is not expected. Will retry", e);
137               }
138               if (reporter != null && !reporter.progress()) {
139                 throw new InterruptedIOException("Operation is cancelled");
140               }
141               if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTimeMillis()) {
142                 LOG.error("Can't open after " + nbAttempt + " attempts and "
143                   + (EnvironmentEdgeManager.currentTimeMillis() - startWaiting)
144                   + "ms " + " for " + path);
145               } else {
146                 try {
147                   Thread.sleep(nbAttempt < 3 ? 500 : 1000);
148                   continue; // retry
149                 } catch (InterruptedException ie) {
150                   InterruptedIOException iioe = new InterruptedIOException();
151                   iioe.initCause(ie);
152                   throw iioe;
153                 }
154               }
155             }
156             throw e;
157           }
158         }
159       } catch (IOException ie) {
160         throw ie;
161       } catch (Exception e) {
162         throw new IOException("Cannot get log reader", e);
163       }
164     }
165 
166     /*
167      * WAL writer
168      */
169     private static Class<? extends Writer> logWriterClass;
170 
171     static void resetLogWriterClass() {
172       logWriterClass = null;
173     }
174 
175     /**
176      * Create a writer for the WAL.
177      * @return A WAL writer.  Close when done with it.
178      * @throws IOException
179      */
180     public static HLog.Writer createWALWriter(final FileSystem fs,
181         final Path path, Configuration conf) throws IOException {
182       return createWriter(fs, path, conf, false);
183     }
184 
185     public static HLog.Writer createRecoveredEditsWriter(final FileSystem fs,
186         final Path path, Configuration conf) throws IOException {
187       return createWriter(fs, path, conf, true);
188     }
189 
190     private static HLog.Writer createWriter(final FileSystem fs,
191         final Path path, Configuration conf, boolean overwritable)
192     throws IOException {
193       try {
194         if (logWriterClass == null) {
195           logWriterClass = conf.getClass("hbase.regionserver.hlog.writer.impl",
196               ProtobufLogWriter.class, Writer.class);
197         }
198         HLog.Writer writer = (HLog.Writer)logWriterClass.newInstance();
199         writer.init(fs, path, conf, overwritable);
200         return writer;
201       } catch (Exception e) {
202         throw new IOException("cannot get log writer", e);
203       }
204     }
205 }