1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver.wal;
21
22 import java.io.IOException;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.hbase.classification.InterfaceAudience;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.fs.FSDataInputStream;
29 import org.apache.hadoop.fs.FileSystem;
30 import org.apache.hadoop.fs.Path;
31 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
32 import org.apache.hadoop.hbase.TableName;
33 import org.apache.hadoop.hbase.io.util.LRUDictionary;
34 import org.apache.hadoop.hbase.util.FSUtils;
35
36 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
37 import org.apache.hadoop.hbase.wal.WAL.Entry;
38
39 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
40 public abstract class ReaderBase implements DefaultWALProvider.Reader {
41 private static final Log LOG = LogFactory.getLog(ReaderBase.class);
42 protected Configuration conf;
43 protected FileSystem fs;
44 protected Path path;
45 protected long edit = 0;
46 protected long fileLength;
47
48
49
50 protected CompressionContext compressionContext = null;
51 protected boolean emptyCompressionContext = true;
52
53
54
55
56 public ReaderBase() {
57 }
58
59 @Override
60 public void init(FileSystem fs, Path path, Configuration conf, FSDataInputStream stream)
61 throws IOException {
62 this.conf = conf;
63 this.path = path;
64 this.fs = fs;
65 this.fileLength = this.fs.getFileStatus(path).getLen();
66 String cellCodecClsName = initReader(stream);
67
68 boolean compression = hasCompression();
69 if (compression) {
70
71 try {
72 if (compressionContext == null) {
73 compressionContext = new CompressionContext(LRUDictionary.class,
74 FSUtils.isRecoveredEdits(path), hasTagCompression());
75 } else {
76 compressionContext.clear();
77 }
78 } catch (Exception e) {
79 throw new IOException("Failed to initialize CompressionContext", e);
80 }
81 }
82 initAfterCompression(cellCodecClsName);
83 }
84
85 @Override
86 public Entry next() throws IOException {
87 return next(null);
88 }
89
90 @Override
91 public Entry next(Entry reuse) throws IOException {
92 Entry e = reuse;
93 if (e == null) {
94
95
96 e = new Entry(new HLogKey(), new WALEdit());
97 }
98 if (compressionContext != null) {
99 e.setCompressionContext(compressionContext);
100 }
101
102 boolean hasEntry = false;
103 try {
104 hasEntry = readNext(e);
105 } catch (IllegalArgumentException iae) {
106 TableName tableName = e.getKey().getTablename();
107 if (tableName != null && tableName.equals(TableName.OLD_ROOT_TABLE_NAME)) {
108
109 LOG.info("Got an old ROOT edit, ignoring ");
110 return next(e);
111 }
112 else throw iae;
113 }
114 edit++;
115 if (compressionContext != null && emptyCompressionContext) {
116 emptyCompressionContext = false;
117 }
118 return hasEntry ? e : null;
119 }
120
121 @Override
122 public void seek(long pos) throws IOException {
123 if (compressionContext != null && emptyCompressionContext) {
124 while (next() != null) {
125 if (getPosition() == pos) {
126 emptyCompressionContext = false;
127 break;
128 }
129 }
130 }
131 seekOnFs(pos);
132 }
133
134
135
136
137
138
139 protected abstract String initReader(FSDataInputStream stream) throws IOException;
140
141
142
143
144 protected abstract void initAfterCompression() throws IOException;
145
146
147
148
149
150 protected abstract void initAfterCompression(String cellCodecClsName) throws IOException;
151
152
153
154 protected abstract boolean hasCompression();
155
156
157
158
159 protected abstract boolean hasTagCompression();
160
161
162
163
164
165
166 protected abstract boolean readNext(Entry e) throws IOException;
167
168
169
170
171 protected abstract void seekOnFs(long pos) throws IOException;
172 }