001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import java.io.IOException;
021import org.apache.hadoop.conf.Configuration;
022import org.apache.hadoop.fs.FSDataInputStream;
023import org.apache.hadoop.fs.FileSystem;
024import org.apache.hadoop.fs.Path;
025import org.apache.hadoop.hbase.HBaseInterfaceAudience;
026import org.apache.hadoop.hbase.TableName;
027import org.apache.hadoop.hbase.io.compress.Compression;
028import org.apache.hadoop.hbase.io.util.LRUDictionary;
029import org.apache.hadoop.hbase.util.CommonFSUtils;
030import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
031import org.apache.hadoop.hbase.wal.WAL.Entry;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX })
037public abstract class ReaderBase implements AbstractFSWALProvider.Reader {
038  private static final Logger LOG = LoggerFactory.getLogger(ReaderBase.class);
039  protected Configuration conf;
040  protected FileSystem fs;
041  protected Path path;
042  protected long edit = 0;
043  protected long fileLength;
044  /**
045   * Compression context to use reading. Can be null if no compression.
046   */
047  protected CompressionContext compressionContext = null;
048  protected boolean emptyCompressionContext = true;
049
050  /**
051   * Default constructor.
052   */
053  public ReaderBase() {
054  }
055
056  @Override
057  public void init(FileSystem fs, Path path, Configuration conf, FSDataInputStream stream)
058    throws IOException {
059    this.conf = conf;
060    this.path = path;
061    this.fs = fs;
062    this.fileLength = this.fs.getFileStatus(path).getLen();
063    String cellCodecClsName = initReader(stream);
064
065    boolean compression = hasCompression();
066    if (compression) {
067      // If compression is enabled, new dictionaries are created here.
068      try {
069        if (compressionContext == null) {
070          if (LOG.isDebugEnabled()) {
071            LOG.debug(
072              "Initializing compression context for {}: isRecoveredEdits={}"
073                + ", hasTagCompression={}, hasValueCompression={}, valueCompressionType={}",
074              path, CommonFSUtils.isRecoveredEdits(path), hasTagCompression(),
075              hasValueCompression(), getValueCompressionAlgorithm());
076          }
077          compressionContext =
078            new CompressionContext(LRUDictionary.class, CommonFSUtils.isRecoveredEdits(path),
079              hasTagCompression(), hasValueCompression(), getValueCompressionAlgorithm());
080        } else {
081          compressionContext.clear();
082        }
083      } catch (Exception e) {
084        throw new IOException("Failed to initialize CompressionContext", e);
085      }
086    }
087    initAfterCompression(cellCodecClsName);
088  }
089
090  @Override
091  public Entry next() throws IOException {
092    return next(null);
093  }
094
095  @Override
096  public Entry next(Entry reuse) throws IOException {
097    Entry e = reuse;
098    if (e == null) {
099      e = new Entry();
100    }
101
102    boolean hasEntry = false;
103    try {
104      hasEntry = readNext(e);
105    } catch (IllegalArgumentException iae) {
106      TableName tableName = e.getKey().getTableName();
107      if (tableName != null && tableName.equals(TableName.OLD_ROOT_TABLE_NAME)) {
108        // It is old ROOT table edit, ignore it
109        LOG.info("Got an old ROOT edit, ignoring ");
110        return next(e);
111      } else throw iae;
112    }
113    edit++;
114    if (compressionContext != null && emptyCompressionContext) {
115      emptyCompressionContext = false;
116    }
117    return hasEntry ? e : null;
118  }
119
120  @Override
121  public void seek(long pos) throws IOException {
122    if (compressionContext != null && emptyCompressionContext) {
123      while (next() != null) {
124        if (getPosition() == pos) {
125          emptyCompressionContext = false;
126          break;
127        }
128      }
129    }
130    seekOnFs(pos);
131  }
132
133  /**
134   * Initializes the log reader with a particular stream (may be null). Reader assumes ownership of
135   * the stream if not null and may use it. Called once.
136   * @return the class name of cell Codec, null if such information is not available
137   */
138  protected abstract String initReader(FSDataInputStream stream) throws IOException;
139
140  /**
141   * Initializes the compression after the shared stuff has been initialized. Called once.
142   */
143  protected abstract void initAfterCompression() throws IOException;
144
145  /**
146   * Initializes the compression after the shared stuff has been initialized. Called once.
147   * @param cellCodecClsName class name of cell Codec
148   */
149  protected abstract void initAfterCompression(String cellCodecClsName) throws IOException;
150
151  /** Returns Whether compression is enabled for this log. */
152  protected abstract boolean hasCompression();
153
154  /** Returns Whether tag compression is enabled for this log. */
155  protected abstract boolean hasTagCompression();
156
157  /** Returns Whether value compression is enabled for this log. */
158  protected abstract boolean hasValueCompression();
159
160  /** Returns Value compression algorithm for this log. */
161  protected abstract Compression.Algorithm getValueCompressionAlgorithm();
162
163  /**
164   * Read next entry.
165   * @param e The entry to read into.
166   * @return Whether there was anything to read.
167   */
168  protected abstract boolean readNext(Entry e) throws IOException;
169
170  /**
171   * Performs a filesystem-level seek to a certain position in an underlying file.
172   */
173  protected abstract void seekOnFs(long pos) throws IOException;
174}