1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.procedure2.store.wal;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.OutputStream;
24 import java.util.Iterator;
25
26 import org.apache.hadoop.hbase.classification.InterfaceAudience;
27 import org.apache.hadoop.hbase.classification.InterfaceStability;
28 import org.apache.hadoop.fs.FSDataInputStream;
29 import org.apache.hadoop.fs.FSDataOutputStream;
30 import org.apache.hadoop.hbase.io.util.StreamUtils;
31 import org.apache.hadoop.hbase.procedure2.Procedure;
32 import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
33 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureLoader;
34 import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
35 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
36 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
37 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALTrailer;
38
39 import com.google.protobuf.InvalidProtocolBufferException;
40
41
42
43
44 @InterfaceAudience.Private
45 @InterfaceStability.Evolving
46 public final class ProcedureWALFormat {
47 static final byte LOG_TYPE_STREAM = 0;
48 static final byte LOG_TYPE_COMPACTED = 1;
49 static final byte LOG_TYPE_MAX_VALID = 1;
50
51 static final byte HEADER_VERSION = 1;
52 static final byte TRAILER_VERSION = 1;
53 static final long HEADER_MAGIC = 0x31764c4157637250L;
54 static final long TRAILER_MAGIC = 0x50726357414c7631L;
55
56 @InterfaceAudience.Private
57 public static class InvalidWALDataException extends IOException {
58 public InvalidWALDataException(String s) {
59 super(s);
60 }
61
62 public InvalidWALDataException(Throwable t) {
63 super(t);
64 }
65 }
66
67 interface Loader extends ProcedureLoader {
68 void markCorruptedWAL(ProcedureWALFile log, IOException e);
69 }
70
71 private ProcedureWALFormat() {}
72
73 public static void load(final Iterator<ProcedureWALFile> logs,
74 final ProcedureStoreTracker tracker, final Loader loader) throws IOException {
75 ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker);
76 tracker.setKeepDeletes(true);
77 try {
78 while (logs.hasNext()) {
79 ProcedureWALFile log = logs.next();
80 log.open();
81 try {
82 reader.read(log, loader);
83 } finally {
84 log.close();
85 }
86 }
87 reader.finalize(loader);
88
89 tracker.setPartialFlag(false);
90 tracker.resetUpdates();
91 } finally {
92 tracker.setKeepDeletes(false);
93 }
94 }
95
96 public static void writeHeader(OutputStream stream, ProcedureWALHeader header)
97 throws IOException {
98 header.writeDelimitedTo(stream);
99 }
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116 public static void writeTrailer(FSDataOutputStream stream, ProcedureStoreTracker tracker)
117 throws IOException {
118 long offset = stream.getPos();
119
120
121 ProcedureWALEntry.newBuilder()
122 .setType(ProcedureWALEntry.Type.EOF)
123 .build().writeDelimitedTo(stream);
124
125
126 tracker.writeTo(stream);
127
128 stream.write(TRAILER_VERSION);
129 StreamUtils.writeLong(stream, TRAILER_MAGIC);
130 StreamUtils.writeLong(stream, offset);
131 }
132
133 public static ProcedureWALHeader readHeader(InputStream stream)
134 throws IOException {
135 ProcedureWALHeader header;
136 try {
137 header = ProcedureWALHeader.parseDelimitedFrom(stream);
138 } catch (InvalidProtocolBufferException e) {
139 throw new InvalidWALDataException(e);
140 }
141
142 if (header == null) {
143 throw new InvalidWALDataException("No data available to read the Header");
144 }
145
146 if (header.getVersion() < 0 || header.getVersion() != HEADER_VERSION) {
147 throw new InvalidWALDataException("Invalid Header version. got " + header.getVersion() +
148 " expected " + HEADER_VERSION);
149 }
150
151 if (header.getType() < 0 || header.getType() > LOG_TYPE_MAX_VALID) {
152 throw new InvalidWALDataException("Invalid header type. got " + header.getType());
153 }
154
155 return header;
156 }
157
158 public static ProcedureWALTrailer readTrailer(FSDataInputStream stream, long startPos, long size)
159 throws IOException {
160 long trailerPos = size - 17;
161
162 if (trailerPos < startPos) {
163 throw new InvalidWALDataException("Missing trailer: size=" + size + " startPos=" + startPos);
164 }
165
166 stream.seek(trailerPos);
167 int version = stream.read();
168 if (version != TRAILER_VERSION) {
169 throw new InvalidWALDataException("Invalid Trailer version. got " + version +
170 " expected " + TRAILER_VERSION);
171 }
172
173 long magic = StreamUtils.readLong(stream);
174 if (magic != TRAILER_MAGIC) {
175 throw new InvalidWALDataException("Invalid Trailer magic. got " + magic +
176 " expected " + TRAILER_MAGIC);
177 }
178
179 long trailerOffset = StreamUtils.readLong(stream);
180 stream.seek(trailerOffset);
181
182 ProcedureWALEntry entry = readEntry(stream);
183 if (entry.getType() != ProcedureWALEntry.Type.EOF) {
184 throw new InvalidWALDataException("Invalid Trailer begin");
185 }
186
187 ProcedureWALTrailer trailer = ProcedureWALTrailer.newBuilder()
188 .setVersion(version)
189 .setTrackerPos(stream.getPos())
190 .build();
191 return trailer;
192 }
193
194 public static ProcedureWALEntry readEntry(InputStream stream) throws IOException {
195 return ProcedureWALEntry.parseDelimitedFrom(stream);
196 }
197
198 public static void writeEntry(ByteSlot slot, ProcedureWALEntry.Type type,
199 Procedure proc, Procedure[] subprocs) throws IOException {
200 ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder();
201 builder.setType(type);
202 builder.addProcedure(Procedure.convert(proc));
203 if (subprocs != null) {
204 for (int i = 0; i < subprocs.length; ++i) {
205 builder.addProcedure(Procedure.convert(subprocs[i]));
206 }
207 }
208 builder.build().writeDelimitedTo(slot);
209 }
210
211 public static void writeInsert(ByteSlot slot, Procedure proc)
212 throws IOException {
213 writeEntry(slot, ProcedureWALEntry.Type.INIT, proc, null);
214 }
215
216 public static void writeInsert(ByteSlot slot, Procedure proc, Procedure[] subprocs)
217 throws IOException {
218 writeEntry(slot, ProcedureWALEntry.Type.INSERT, proc, subprocs);
219 }
220
221 public static void writeUpdate(ByteSlot slot, Procedure proc)
222 throws IOException {
223 writeEntry(slot, ProcedureWALEntry.Type.UPDATE, proc, null);
224 }
225
226 public static void writeDelete(ByteSlot slot, long procId)
227 throws IOException {
228 ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder();
229 builder.setType(ProcedureWALEntry.Type.DELETE);
230 builder.setProcId(procId);
231 builder.build().writeDelimitedTo(slot);
232 }
233 }