View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.procedure2.store.wal;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.OutputStream;
24  import java.util.Iterator;
25  
26  import org.apache.hadoop.hbase.classification.InterfaceAudience;
27  import org.apache.hadoop.hbase.classification.InterfaceStability;
28  import org.apache.hadoop.fs.FSDataInputStream;
29  import org.apache.hadoop.fs.FSDataOutputStream;
30  import org.apache.hadoop.hbase.io.util.StreamUtils;
31  import org.apache.hadoop.hbase.procedure2.Procedure;
32  import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
33  import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureLoader;
34  import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
35  import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
36  import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
37  import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALTrailer;
38  
39  import com.google.protobuf.InvalidProtocolBufferException;
40  
41  /**
42   * Helper class that contains the WAL serialization utils.
43   */
44  @InterfaceAudience.Private
45  @InterfaceStability.Evolving
46  public final class ProcedureWALFormat {
47    static final byte LOG_TYPE_STREAM = 0;
48    static final byte LOG_TYPE_COMPACTED = 1;
49    static final byte LOG_TYPE_MAX_VALID = 1;
50  
51    static final byte HEADER_VERSION = 1;
52    static final byte TRAILER_VERSION = 1;
53    static final long HEADER_MAGIC = 0x31764c4157637250L;
54    static final long TRAILER_MAGIC = 0x50726357414c7631L;
55  
56    @InterfaceAudience.Private
57    public static class InvalidWALDataException extends IOException {
58      public InvalidWALDataException(String s) {
59        super(s);
60      }
61  
62      public InvalidWALDataException(Throwable t) {
63        super(t);
64      }
65    }
66  
67    interface Loader extends ProcedureLoader {
68      void markCorruptedWAL(ProcedureWALFile log, IOException e);
69    }
70  
71    private ProcedureWALFormat() {}
72  
73    public static void load(final Iterator<ProcedureWALFile> logs,
74        final ProcedureStoreTracker tracker, final Loader loader) throws IOException {
75      ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker);
76      tracker.setKeepDeletes(true);
77      try {
78        while (logs.hasNext()) {
79          ProcedureWALFile log = logs.next();
80          log.open();
81          try {
82            reader.read(log, loader);
83          } finally {
84            log.close();
85          }
86        }
87        reader.finalize(loader);
88        // The tracker is now updated with all the procedures read from the logs
89        tracker.setPartialFlag(false);
90        tracker.resetUpdates();
91      } finally {
92        tracker.setKeepDeletes(false);
93      }
94    }
95  
96    public static void writeHeader(OutputStream stream, ProcedureWALHeader header)
97        throws IOException {
98      header.writeDelimitedTo(stream);
99    }
100 
101   /*
102    * +-----------------+
103    * | END OF WAL DATA | <---+
104    * +-----------------+     |
105    * |                 |     |
106    * |     Tracker     |     |
107    * |                 |     |
108    * +-----------------+     |
109    * |     version     |     |
110    * +-----------------+     |
111    * |  TRAILER_MAGIC  |     |
112    * +-----------------+     |
113    * |      offset     |-----+
114    * +-----------------+
115    */
116   public static void writeTrailer(FSDataOutputStream stream, ProcedureStoreTracker tracker)
117       throws IOException {
118     long offset = stream.getPos();
119 
120     // Write EOF Entry
121     ProcedureWALEntry.newBuilder()
122       .setType(ProcedureWALEntry.Type.EOF)
123       .build().writeDelimitedTo(stream);
124 
125     // Write Tracker
126     tracker.writeTo(stream);
127 
128     stream.write(TRAILER_VERSION);
129     StreamUtils.writeLong(stream, TRAILER_MAGIC);
130     StreamUtils.writeLong(stream, offset);
131   }
132 
133   public static ProcedureWALHeader readHeader(InputStream stream)
134       throws IOException {
135     ProcedureWALHeader header;
136     try {
137       header = ProcedureWALHeader.parseDelimitedFrom(stream);
138     } catch (InvalidProtocolBufferException e) {
139       throw new InvalidWALDataException(e);
140     }
141 
142     if (header == null) {
143       throw new InvalidWALDataException("No data available to read the Header");
144     }
145 
146     if (header.getVersion() < 0 || header.getVersion() != HEADER_VERSION) {
147       throw new InvalidWALDataException("Invalid Header version. got " + header.getVersion() +
148           " expected " + HEADER_VERSION);
149     }
150 
151     if (header.getType() < 0 || header.getType() > LOG_TYPE_MAX_VALID) {
152       throw new InvalidWALDataException("Invalid header type. got " + header.getType());
153     }
154 
155     return header;
156   }
157 
158   public static ProcedureWALTrailer readTrailer(FSDataInputStream stream, long startPos, long size)
159       throws IOException {
160     long trailerPos = size - 17; // Beginning of the Trailer Jump
161 
162     if (trailerPos < startPos) {
163       throw new InvalidWALDataException("Missing trailer: size=" + size + " startPos=" + startPos);
164     }
165 
166     stream.seek(trailerPos);
167     int version = stream.read();
168     if (version != TRAILER_VERSION) {
169       throw new InvalidWALDataException("Invalid Trailer version. got " + version +
170           " expected " + TRAILER_VERSION);
171     }
172 
173     long magic = StreamUtils.readLong(stream);
174     if (magic != TRAILER_MAGIC) {
175       throw new InvalidWALDataException("Invalid Trailer magic. got " + magic +
176           " expected " + TRAILER_MAGIC);
177     }
178 
179     long trailerOffset = StreamUtils.readLong(stream);
180     stream.seek(trailerOffset);
181 
182     ProcedureWALEntry entry = readEntry(stream);
183     if (entry.getType() != ProcedureWALEntry.Type.EOF) {
184       throw new InvalidWALDataException("Invalid Trailer begin");
185     }
186 
187     ProcedureWALTrailer trailer = ProcedureWALTrailer.newBuilder()
188       .setVersion(version)
189       .setTrackerPos(stream.getPos())
190       .build();
191     return trailer;
192   }
193 
194   public static ProcedureWALEntry readEntry(InputStream stream) throws IOException {
195     return ProcedureWALEntry.parseDelimitedFrom(stream);
196   }
197 
198   public static void writeEntry(ByteSlot slot, ProcedureWALEntry.Type type,
199       Procedure proc, Procedure[] subprocs) throws IOException {
200     ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder();
201     builder.setType(type);
202     builder.addProcedure(Procedure.convert(proc));
203     if (subprocs != null) {
204       for (int i = 0; i < subprocs.length; ++i) {
205         builder.addProcedure(Procedure.convert(subprocs[i]));
206       }
207     }
208     builder.build().writeDelimitedTo(slot);
209   }
210 
211   public static void writeInsert(ByteSlot slot, Procedure proc)
212       throws IOException {
213     writeEntry(slot, ProcedureWALEntry.Type.INIT, proc, null);
214   }
215 
216   public static void writeInsert(ByteSlot slot, Procedure proc, Procedure[] subprocs)
217       throws IOException {
218     writeEntry(slot, ProcedureWALEntry.Type.INSERT, proc, subprocs);
219   }
220 
221   public static void writeUpdate(ByteSlot slot, Procedure proc)
222       throws IOException {
223     writeEntry(slot, ProcedureWALEntry.Type.UPDATE, proc, null);
224   }
225 
226   public static void writeDelete(ByteSlot slot, long procId)
227       throws IOException {
228     ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder();
229     builder.setType(ProcedureWALEntry.Type.DELETE);
230     builder.setProcId(procId);
231     builder.build().writeDelimitedTo(slot);
232   }
233 }