001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2.store.wal;
019
020import java.io.IOException;
021import java.io.InputStream;
022import java.io.OutputStream;
023import java.util.Iterator;
024import org.apache.hadoop.fs.FSDataInputStream;
025import org.apache.hadoop.fs.FSDataOutputStream;
026import org.apache.hadoop.hbase.io.util.StreamUtils;
027import org.apache.hadoop.hbase.procedure2.Procedure;
028import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
029import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureLoader;
030import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
031import org.apache.yetus.audience.InterfaceAudience;
032
033import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
034
035import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
036import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
037import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALTrailer;
038
039/**
040 * Helper class that contains the WAL serialization utils.
041 * @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we
042 *             use the new region based procedure store.
043 */
044@Deprecated
045@InterfaceAudience.Private
046final class ProcedureWALFormat {
047
048  static final byte LOG_TYPE_STREAM = 0;
049  static final byte LOG_TYPE_COMPACTED = 1;
050  static final byte LOG_TYPE_MAX_VALID = 1;
051
052  static final byte HEADER_VERSION = 1;
053  static final byte TRAILER_VERSION = 1;
054  static final long HEADER_MAGIC = 0x31764c4157637250L;
055  static final long TRAILER_MAGIC = 0x50726357414c7631L;
056
057  @InterfaceAudience.Private
058  public static class InvalidWALDataException extends IOException {
059
060    private static final long serialVersionUID = 5471733223070202196L;
061
062    public InvalidWALDataException(String s) {
063      super(s);
064    }
065
066    public InvalidWALDataException(Throwable t) {
067      super(t);
068    }
069  }
070
071  interface Loader extends ProcedureLoader {
072    void markCorruptedWAL(ProcedureWALFile log, IOException e);
073  }
074
075  private ProcedureWALFormat() {
076  }
077
078  /**
079   * Load all the procedures in these ProcedureWALFiles, and rebuild the given {@code tracker} if
080   * needed, i.e, the {@code tracker} is a partial one.
081   * <p/>
082   * The method in the give {@code loader} will be called at the end after we load all the
083   * procedures and construct the hierarchy.
084   * <p/>
085   * And we will call the {@link ProcedureStoreTracker#resetModified()} method for the given
086   * {@code tracker} before returning, as it will be used to track the next proc wal file's modified
087   * procedures.
088   */
089  public static void load(Iterator<ProcedureWALFile> logs, ProcedureStoreTracker tracker,
090    Loader loader) throws IOException {
091    ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker, loader);
092    tracker.setKeepDeletes(true);
093    // Ignore the last log which is current active log.
094    while (logs.hasNext()) {
095      ProcedureWALFile log = logs.next();
096      log.open();
097      try {
098        reader.read(log);
099      } finally {
100        log.close();
101      }
102    }
103    reader.finish();
104
105    // The tracker is now updated with all the procedures read from the logs
106    if (tracker.isPartial()) {
107      tracker.setPartialFlag(false);
108    }
109    tracker.resetModified();
110    tracker.setKeepDeletes(false);
111  }
112
113  public static void writeHeader(OutputStream stream, ProcedureWALHeader header)
114    throws IOException {
115    header.writeDelimitedTo(stream);
116  }
117
118  /*
119   * +-----------------+ | END OF WAL DATA | <---+ +-----------------+ | | | | | Tracker | | | | |
120   * +-----------------+ | | version | | +-----------------+ | | TRAILER_MAGIC | |
121   * +-----------------+ | | offset |-----+ +-----------------+
122   */
123  public static long writeTrailer(FSDataOutputStream stream, ProcedureStoreTracker tracker)
124    throws IOException {
125    long offset = stream.getPos();
126
127    // Write EOF Entry
128    ProcedureWALEntry.newBuilder().setType(ProcedureWALEntry.Type.PROCEDURE_WAL_EOF).build()
129      .writeDelimitedTo(stream);
130
131    // Write Tracker
132    tracker.toProto().writeDelimitedTo(stream);
133
134    stream.write(TRAILER_VERSION);
135    StreamUtils.writeLong(stream, TRAILER_MAGIC);
136    StreamUtils.writeLong(stream, offset);
137    return stream.getPos() - offset;
138  }
139
140  public static ProcedureWALHeader readHeader(InputStream stream) throws IOException {
141    ProcedureWALHeader header;
142    try {
143      header = ProcedureWALHeader.parseDelimitedFrom(stream);
144    } catch (InvalidProtocolBufferException e) {
145      throw new InvalidWALDataException(e);
146    }
147
148    if (header == null) {
149      throw new InvalidWALDataException("No data available to read the Header");
150    }
151
152    if (header.getVersion() < 0 || header.getVersion() != HEADER_VERSION) {
153      throw new InvalidWALDataException(
154        "Invalid Header version. got " + header.getVersion() + " expected " + HEADER_VERSION);
155    }
156
157    if (header.getType() < 0 || header.getType() > LOG_TYPE_MAX_VALID) {
158      throw new InvalidWALDataException("Invalid header type. got " + header.getType());
159    }
160
161    return header;
162  }
163
164  public static ProcedureWALTrailer readTrailer(FSDataInputStream stream, long startPos, long size)
165    throws IOException {
166    // Beginning of the Trailer Jump. 17 = 1 byte version + 8 byte magic + 8 byte offset
167    long trailerPos = size - 17;
168
169    if (trailerPos < startPos) {
170      throw new InvalidWALDataException("Missing trailer: size=" + size + " startPos=" + startPos);
171    }
172
173    stream.seek(trailerPos);
174    int version = stream.read();
175    if (version != TRAILER_VERSION) {
176      throw new InvalidWALDataException(
177        "Invalid Trailer version. got " + version + " expected " + TRAILER_VERSION);
178    }
179
180    long magic = StreamUtils.readLong(stream);
181    if (magic != TRAILER_MAGIC) {
182      throw new InvalidWALDataException(
183        "Invalid Trailer magic. got " + magic + " expected " + TRAILER_MAGIC);
184    }
185
186    long trailerOffset = StreamUtils.readLong(stream);
187    stream.seek(trailerOffset);
188
189    ProcedureWALEntry entry = readEntry(stream);
190    if (entry.getType() != ProcedureWALEntry.Type.PROCEDURE_WAL_EOF) {
191      throw new InvalidWALDataException("Invalid Trailer begin");
192    }
193
194    ProcedureWALTrailer trailer =
195      ProcedureWALTrailer.newBuilder().setVersion(version).setTrackerPos(stream.getPos()).build();
196    return trailer;
197  }
198
199  public static ProcedureWALEntry readEntry(InputStream stream) throws IOException {
200    return ProcedureWALEntry.parseDelimitedFrom(stream);
201  }
202
203  public static void writeEntry(ByteSlot slot, ProcedureWALEntry.Type type, Procedure<?> proc,
204    Procedure<?>[] subprocs) throws IOException {
205    final ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder();
206    builder.setType(type);
207    builder.addProcedure(ProcedureUtil.convertToProtoProcedure(proc));
208    if (subprocs != null) {
209      for (int i = 0; i < subprocs.length; ++i) {
210        builder.addProcedure(ProcedureUtil.convertToProtoProcedure(subprocs[i]));
211      }
212    }
213    builder.build().writeDelimitedTo(slot);
214  }
215
216  public static void writeInsert(ByteSlot slot, Procedure<?> proc) throws IOException {
217    writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_INIT, proc, null);
218  }
219
220  public static void writeInsert(ByteSlot slot, Procedure<?> proc, Procedure<?>[] subprocs)
221    throws IOException {
222    writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_INSERT, proc, subprocs);
223  }
224
225  public static void writeUpdate(ByteSlot slot, Procedure<?> proc) throws IOException {
226    writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_UPDATE, proc, null);
227  }
228
229  public static void writeDelete(ByteSlot slot, long procId) throws IOException {
230    final ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder();
231    builder.setType(ProcedureWALEntry.Type.PROCEDURE_WAL_DELETE);
232    builder.setProcId(procId);
233    builder.build().writeDelimitedTo(slot);
234  }
235
236  public static void writeDelete(ByteSlot slot, Procedure<?> proc, long[] subprocs)
237    throws IOException {
238    final ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder();
239    builder.setType(ProcedureWALEntry.Type.PROCEDURE_WAL_DELETE);
240    builder.setProcId(proc.getProcId());
241    if (subprocs != null) {
242      builder.addProcedure(ProcedureUtil.convertToProtoProcedure(proc));
243      for (int i = 0; i < subprocs.length; ++i) {
244        builder.addChildId(subprocs[i]);
245      }
246    }
247    builder.build().writeDelimitedTo(slot);
248  }
249}