001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.procedure2.store.wal;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.OutputStream;
024import java.util.Iterator;
025import org.apache.hadoop.fs.FSDataInputStream;
026import org.apache.hadoop.fs.FSDataOutputStream;
027import org.apache.hadoop.hbase.io.util.StreamUtils;
028import org.apache.hadoop.hbase.procedure2.Procedure;
029import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
030import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureLoader;
031import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
032import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
033import org.apache.yetus.audience.InterfaceAudience;
034
035import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
036
037import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
038import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
039import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALTrailer;
040
041/**
042 * Helper class that contains the WAL serialization utils.
043 */
044@InterfaceAudience.Private
045public final class ProcedureWALFormat {
046
047  static final byte LOG_TYPE_STREAM = 0;
048  static final byte LOG_TYPE_COMPACTED = 1;
049  static final byte LOG_TYPE_MAX_VALID = 1;
050
051  static final byte HEADER_VERSION = 1;
052  static final byte TRAILER_VERSION = 1;
053  static final long HEADER_MAGIC = 0x31764c4157637250L;
054  static final long TRAILER_MAGIC = 0x50726357414c7631L;
055
056  @InterfaceAudience.Private
057  public static class InvalidWALDataException extends IOException {
058
059    private static final long serialVersionUID = 5471733223070202196L;
060
061    public InvalidWALDataException(String s) {
062      super(s);
063    }
064
065    public InvalidWALDataException(Throwable t) {
066      super(t);
067    }
068  }
069
070  interface Loader extends ProcedureLoader {
071    void markCorruptedWAL(ProcedureWALFile log, IOException e);
072  }
073
074  private ProcedureWALFormat() {}
075
076  /**
077   * Load all the procedures in these ProcedureWALFiles, and rebuild the given {@code tracker} if
078   * needed, i.e, the {@code tracker} is a partial one.
079   * <p/>
080   * The method in the give {@code loader} will be called at the end after we load all the
081   * procedures and construct the hierarchy.
082   * <p/>
083   * And we will call the {@link ProcedureStoreTracker#resetModified()} method for the given
084   * {@code tracker} before returning, as it will be used to track the next proc wal file's modified
085   * procedures.
086   */
087  public static void load(Iterator<ProcedureWALFile> logs, ProcedureStoreTracker tracker,
088      Loader loader) throws IOException {
089    ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker, loader);
090    tracker.setKeepDeletes(true);
091    // Ignore the last log which is current active log.
092    while (logs.hasNext()) {
093      ProcedureWALFile log = logs.next();
094      log.open();
095      try {
096        reader.read(log);
097      } finally {
098        log.close();
099      }
100    }
101    reader.finish();
102
103    // The tracker is now updated with all the procedures read from the logs
104    if (tracker.isPartial()) {
105      tracker.setPartialFlag(false);
106    }
107    tracker.resetModified();
108    tracker.setKeepDeletes(false);
109  }
110
111  public static void writeHeader(OutputStream stream, ProcedureWALHeader header)
112      throws IOException {
113    header.writeDelimitedTo(stream);
114  }
115
116  /*
117   * +-----------------+
118   * | END OF WAL DATA | <---+
119   * +-----------------+     |
120   * |                 |     |
121   * |     Tracker     |     |
122   * |                 |     |
123   * +-----------------+     |
124   * |     version     |     |
125   * +-----------------+     |
126   * |  TRAILER_MAGIC  |     |
127   * +-----------------+     |
128   * |      offset     |-----+
129   * +-----------------+
130   */
131  public static long writeTrailer(FSDataOutputStream stream, ProcedureStoreTracker tracker)
132      throws IOException {
133    long offset = stream.getPos();
134
135    // Write EOF Entry
136    ProcedureWALEntry.newBuilder()
137      .setType(ProcedureWALEntry.Type.PROCEDURE_WAL_EOF)
138      .build().writeDelimitedTo(stream);
139
140    // Write Tracker
141    tracker.toProto().writeDelimitedTo(stream);
142
143    stream.write(TRAILER_VERSION);
144    StreamUtils.writeLong(stream, TRAILER_MAGIC);
145    StreamUtils.writeLong(stream, offset);
146    return stream.getPos() - offset;
147  }
148
149  public static ProcedureWALHeader readHeader(InputStream stream)
150      throws IOException {
151    ProcedureWALHeader header;
152    try {
153      header = ProcedureWALHeader.parseDelimitedFrom(stream);
154    } catch (InvalidProtocolBufferException e) {
155      throw new InvalidWALDataException(e);
156    }
157
158    if (header == null) {
159      throw new InvalidWALDataException("No data available to read the Header");
160    }
161
162    if (header.getVersion() < 0 || header.getVersion() != HEADER_VERSION) {
163      throw new InvalidWALDataException("Invalid Header version. got " + header.getVersion() +
164          " expected " + HEADER_VERSION);
165    }
166
167    if (header.getType() < 0 || header.getType() > LOG_TYPE_MAX_VALID) {
168      throw new InvalidWALDataException("Invalid header type. got " + header.getType());
169    }
170
171    return header;
172  }
173
174  public static ProcedureWALTrailer readTrailer(FSDataInputStream stream, long startPos, long size)
175      throws IOException {
176    // Beginning of the Trailer Jump. 17 = 1 byte version + 8 byte magic + 8 byte offset
177    long trailerPos = size - 17;
178
179    if (trailerPos < startPos) {
180      throw new InvalidWALDataException("Missing trailer: size=" + size + " startPos=" + startPos);
181    }
182
183    stream.seek(trailerPos);
184    int version = stream.read();
185    if (version != TRAILER_VERSION) {
186      throw new InvalidWALDataException("Invalid Trailer version. got " + version +
187          " expected " + TRAILER_VERSION);
188    }
189
190    long magic = StreamUtils.readLong(stream);
191    if (magic != TRAILER_MAGIC) {
192      throw new InvalidWALDataException("Invalid Trailer magic. got " + magic +
193          " expected " + TRAILER_MAGIC);
194    }
195
196    long trailerOffset = StreamUtils.readLong(stream);
197    stream.seek(trailerOffset);
198
199    ProcedureWALEntry entry = readEntry(stream);
200    if (entry.getType() != ProcedureWALEntry.Type.PROCEDURE_WAL_EOF) {
201      throw new InvalidWALDataException("Invalid Trailer begin");
202    }
203
204    ProcedureWALTrailer trailer = ProcedureWALTrailer.newBuilder()
205      .setVersion(version)
206      .setTrackerPos(stream.getPos())
207      .build();
208    return trailer;
209  }
210
211  public static ProcedureWALEntry readEntry(InputStream stream) throws IOException {
212    return ProcedureWALEntry.parseDelimitedFrom(stream);
213  }
214
215  public static void writeEntry(ByteSlot slot, ProcedureWALEntry.Type type,
216      Procedure<?> proc, Procedure<?>[] subprocs) throws IOException {
217    final ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder();
218    builder.setType(type);
219    builder.addProcedure(ProcedureUtil.convertToProtoProcedure(proc));
220    if (subprocs != null) {
221      for (int i = 0; i < subprocs.length; ++i) {
222        builder.addProcedure(ProcedureUtil.convertToProtoProcedure(subprocs[i]));
223      }
224    }
225    builder.build().writeDelimitedTo(slot);
226  }
227
228  public static void writeInsert(ByteSlot slot, Procedure<?> proc)
229      throws IOException {
230    writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_INIT, proc, null);
231  }
232
233  public static void writeInsert(ByteSlot slot, Procedure<?> proc, Procedure<?>[] subprocs)
234      throws IOException {
235    writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_INSERT, proc, subprocs);
236  }
237
238  public static void writeUpdate(ByteSlot slot, Procedure<?> proc)
239      throws IOException {
240    writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_UPDATE, proc, null);
241  }
242
243  public static void writeDelete(ByteSlot slot, long procId)
244      throws IOException {
245    final ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder();
246    builder.setType(ProcedureWALEntry.Type.PROCEDURE_WAL_DELETE);
247    builder.setProcId(procId);
248    builder.build().writeDelimitedTo(slot);
249  }
250
251  public static void writeDelete(ByteSlot slot, Procedure<?> proc, long[] subprocs)
252      throws IOException {
253    final ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder();
254    builder.setType(ProcedureWALEntry.Type.PROCEDURE_WAL_DELETE);
255    builder.setProcId(proc.getProcId());
256    if (subprocs != null) {
257      builder.addProcedure(ProcedureUtil.convertToProtoProcedure(proc));
258      for (int i = 0; i < subprocs.length; ++i) {
259        builder.addChildId(subprocs[i]);
260      }
261    }
262    builder.build().writeDelimitedTo(slot);
263  }
264}