001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.regionserver.wal; 021 022import java.io.IOException; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.fs.FSDataInputStream; 025import org.apache.hadoop.fs.FileSystem; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.hbase.HBaseInterfaceAudience; 028import org.apache.hadoop.hbase.TableName; 029import org.apache.hadoop.hbase.io.util.LRUDictionary; 030import org.apache.hadoop.hbase.util.CommonFSUtils; 031import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 032import org.apache.hadoop.hbase.wal.WAL.Entry; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) 038public abstract class ReaderBase implements AbstractFSWALProvider.Reader { 039 private static final Logger LOG = LoggerFactory.getLogger(ReaderBase.class); 040 protected Configuration conf; 041 protected FileSystem fs; 042 protected Path path; 043 protected long edit = 0; 044 protected long fileLength; 045 /** 046 * Compression context to use reading. Can be null if no compression. 047 */ 048 protected CompressionContext compressionContext = null; 049 protected boolean emptyCompressionContext = true; 050 051 /** 052 * Default constructor. 053 */ 054 public ReaderBase() { 055 } 056 057 @Override 058 public void init(FileSystem fs, Path path, Configuration conf, FSDataInputStream stream) 059 throws IOException { 060 this.conf = conf; 061 this.path = path; 062 this.fs = fs; 063 this.fileLength = this.fs.getFileStatus(path).getLen(); 064 String cellCodecClsName = initReader(stream); 065 066 boolean compression = hasCompression(); 067 if (compression) { 068 // If compression is enabled, new dictionaries are created here. 069 try { 070 if (compressionContext == null) { 071 compressionContext = new CompressionContext(LRUDictionary.class, 072 CommonFSUtils.isRecoveredEdits(path), hasTagCompression()); 073 } else { 074 compressionContext.clear(); 075 } 076 } catch (Exception e) { 077 throw new IOException("Failed to initialize CompressionContext", e); 078 } 079 } 080 initAfterCompression(cellCodecClsName); 081 } 082 083 @Override 084 public Entry next() throws IOException { 085 return next(null); 086 } 087 088 @Override 089 public Entry next(Entry reuse) throws IOException { 090 Entry e = reuse; 091 if (e == null) { 092 e = new Entry(); 093 } 094 095 boolean hasEntry = false; 096 try { 097 hasEntry = readNext(e); 098 } catch (IllegalArgumentException iae) { 099 TableName tableName = e.getKey().getTableName(); 100 if (tableName != null && tableName.equals(TableName.OLD_ROOT_TABLE_NAME)) { 101 // It is old ROOT table edit, ignore it 102 LOG.info("Got an old ROOT edit, ignoring "); 103 return next(e); 104 } 105 else throw iae; 106 } 107 edit++; 108 if (compressionContext != null && emptyCompressionContext) { 109 emptyCompressionContext = false; 110 } 111 return hasEntry ? e : null; 112 } 113 114 @Override 115 public void seek(long pos) throws IOException { 116 if (compressionContext != null && emptyCompressionContext) { 117 while (next() != null) { 118 if (getPosition() == pos) { 119 emptyCompressionContext = false; 120 break; 121 } 122 } 123 } 124 seekOnFs(pos); 125 } 126 127 /** 128 * Initializes the log reader with a particular stream (may be null). 129 * Reader assumes ownership of the stream if not null and may use it. Called once. 130 * @return the class name of cell Codec, null if such information is not available 131 */ 132 protected abstract String initReader(FSDataInputStream stream) throws IOException; 133 134 /** 135 * Initializes the compression after the shared stuff has been initialized. Called once. 136 */ 137 protected abstract void initAfterCompression() throws IOException; 138 139 /** 140 * Initializes the compression after the shared stuff has been initialized. Called once. 141 * @param cellCodecClsName class name of cell Codec 142 */ 143 protected abstract void initAfterCompression(String cellCodecClsName) throws IOException; 144 /** 145 * @return Whether compression is enabled for this log. 146 */ 147 protected abstract boolean hasCompression(); 148 149 /** 150 * @return Whether tag compression is enabled for this log. 151 */ 152 protected abstract boolean hasTagCompression(); 153 154 /** 155 * Read next entry. 156 * @param e The entry to read into. 157 * @return Whether there was anything to read. 158 */ 159 protected abstract boolean readNext(Entry e) throws IOException; 160 161 /** 162 * Performs a filesystem-level seek to a certain position in an underlying file. 163 */ 164 protected abstract void seekOnFs(long pos) throws IOException; 165}