View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver.wal;
21  
22  import java.io.IOException;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.hbase.classification.InterfaceAudience;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.fs.FSDataInputStream;
29  import org.apache.hadoop.fs.FileSystem;
30  import org.apache.hadoop.fs.Path;
31  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
32  import org.apache.hadoop.hbase.TableName;
33  import org.apache.hadoop.hbase.io.util.LRUDictionary;
34  import org.apache.hadoop.hbase.util.FSUtils;
35  
36  import org.apache.hadoop.hbase.wal.DefaultWALProvider;
37  import org.apache.hadoop.hbase.wal.WAL.Entry;
38  
39  @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
40  public abstract class ReaderBase implements DefaultWALProvider.Reader {
41    private static final Log LOG = LogFactory.getLog(ReaderBase.class);
42    protected Configuration conf;
43    protected FileSystem fs;
44    protected Path path;
45    protected long edit = 0;
46    protected long fileLength;
47    /**
48     * Compression context to use reading.  Can be null if no compression.
49     */
50    protected CompressionContext compressionContext = null;
51    protected boolean emptyCompressionContext = true;
52  
53    /**
54     * Default constructor.
55     */
56    public ReaderBase() {
57    }
58  
59    @Override
60    public void init(FileSystem fs, Path path, Configuration conf, FSDataInputStream stream)
61        throws IOException {
62      this.conf = conf;
63      this.path = path;
64      this.fs = fs;
65      this.fileLength = this.fs.getFileStatus(path).getLen();
66      String cellCodecClsName = initReader(stream);
67  
68      boolean compression = hasCompression();
69      if (compression) {
70        // If compression is enabled, new dictionaries are created here.
71        try {
72          if (compressionContext == null) {
73            compressionContext = new CompressionContext(LRUDictionary.class,
74                FSUtils.isRecoveredEdits(path), hasTagCompression());
75          } else {
76            compressionContext.clear();
77          }
78        } catch (Exception e) {
79          throw new IOException("Failed to initialize CompressionContext", e);
80        }
81      }
82      initAfterCompression(cellCodecClsName);
83    }
84  
85    @Override
86    public Entry next() throws IOException {
87      return next(null);
88    }
89  
90    @Override
91    public Entry next(Entry reuse) throws IOException {
92      Entry e = reuse;
93      if (e == null) {
94        // we use HLogKey here instead of WALKey directly to support legacy coprocessors,
95        // seqencefile based readers, and HLogInputFormat.
96        e = new Entry(new HLogKey(), new WALEdit());
97      }
98      if (compressionContext != null) {
99        e.setCompressionContext(compressionContext);
100     }
101 
102     boolean hasEntry = false;
103     try {
104       hasEntry = readNext(e);
105     } catch (IllegalArgumentException iae) {
106       TableName tableName = e.getKey().getTablename();
107       if (tableName != null && tableName.equals(TableName.OLD_ROOT_TABLE_NAME)) {
108         // It is old ROOT table edit, ignore it
109         LOG.info("Got an old ROOT edit, ignoring ");
110         return next(e);
111       }
112       else throw iae;
113     }
114     edit++;
115     if (compressionContext != null && emptyCompressionContext) {
116       emptyCompressionContext = false;
117     }
118     return hasEntry ? e : null;
119   }
120 
121   @Override
122   public void seek(long pos) throws IOException {
123     if (compressionContext != null && emptyCompressionContext) {
124       while (next() != null) {
125         if (getPosition() == pos) {
126           emptyCompressionContext = false;
127           break;
128         }
129       }
130     }
131     seekOnFs(pos);
132   }
133 
134   /**
135    * Initializes the log reader with a particular stream (may be null).
136    * Reader assumes ownership of the stream if not null and may use it. Called once.
137    * @return the class name of cell Codec, null if such information is not available
138    */
139   protected abstract String initReader(FSDataInputStream stream) throws IOException;
140 
141   /**
142    * Initializes the compression after the shared stuff has been initialized. Called once.
143    */
144   protected abstract void initAfterCompression() throws IOException;
145   
146   /**
147    * Initializes the compression after the shared stuff has been initialized. Called once.
148    * @param cellCodecClsName class name of cell Codec
149    */
150   protected abstract void initAfterCompression(String cellCodecClsName) throws IOException;
151   /**
152    * @return Whether compression is enabled for this log.
153    */
154   protected abstract boolean hasCompression();
155 
156   /**
157    * @return Whether tag compression is enabled for this log.
158    */
159   protected abstract boolean hasTagCompression();
160 
161   /**
162    * Read next entry.
163    * @param e The entry to read into.
164    * @return Whether there was anything to read.
165    */
166   protected abstract boolean readNext(Entry e) throws IOException;
167 
168   /**
169    * Performs a filesystem-level seek to a certain position in an underlying file.
170    */
171   protected abstract void seekOnFs(long pos) throws IOException;
172 }