1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication.regionserver;
20
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.apache.hadoop.hbase.classification.InterfaceAudience;
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.fs.FileSystem;
26 import org.apache.hadoop.fs.Path;
27 import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
28 import org.apache.hadoop.hbase.wal.WAL.Reader;
29 import org.apache.hadoop.hbase.wal.WAL.Entry;
30 import org.apache.hadoop.hbase.wal.WALFactory;
31
32 import java.io.IOException;
33
34
35
36
37
38 @InterfaceAudience.Private
39 public class ReplicationWALReaderManager {
40
41 private static final Log LOG = LogFactory.getLog(ReplicationWALReaderManager.class);
42 private final FileSystem fs;
43 private final Configuration conf;
44 private long position = 0;
45 private Reader reader;
46 private Path lastPath;
47
48
49
50
51
52
53
54 public ReplicationWALReaderManager(FileSystem fs, Configuration conf) {
55 this.fs = fs;
56 this.conf = conf;
57 }
58
59
60
61
62
63
64
65 public Reader openReader(Path path) throws IOException {
66
67
68 if (this.reader == null || !this.lastPath.equals(path)) {
69 this.closeReader();
70 this.reader = WALFactory.createReader(this.fs, path, this.conf);
71 this.lastPath = path;
72 } else {
73 try {
74 this.reader.reset();
75 } catch (NullPointerException npe) {
76 throw new IOException("NPE resetting reader, likely HDFS-4380", npe);
77 }
78 }
79 return this.reader;
80 }
81
82
83
84
85
86
87 public Entry readNextAndSetPosition() throws IOException {
88 Entry entry = this.reader.next();
89
90
91
92
93 this.position = this.reader.getPosition();
94
95 if (entry != null) {
96 entry.setCompressionContext(null);
97 }
98 return entry;
99 }
100
101
102
103
104
105 public void seek() throws IOException {
106 if (this.position != 0) {
107 this.reader.seek(this.position);
108 }
109 }
110
111
112
113
114
115 public long getPosition() {
116 return this.position;
117 }
118
119 public void setPosition(long pos) {
120 this.position = pos;
121 }
122
123 public long currentTrailerSize() {
124 long size = -1L;
125 if (reader instanceof ProtobufLogReader) {
126 final ProtobufLogReader pblr = (ProtobufLogReader)reader;
127 size = pblr.trailerSize();
128 }
129 return size;
130 }
131
132
133
134
135
136 public void closeReader() throws IOException {
137 if (this.reader != null) {
138 this.reader.close();
139 this.reader = null;
140 }
141 }
142
143
144
145
146 void finishCurrentFile() {
147 this.position = 0;
148 try {
149 this.closeReader();
150 } catch (IOException e) {
151 LOG.warn("Unable to close reader", e);
152 }
153 }
154
155 }