001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.procedure2.store.wal;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.OutputStream;
024import java.util.Iterator;
025import org.apache.hadoop.fs.FSDataInputStream;
026import org.apache.hadoop.fs.FSDataOutputStream;
027import org.apache.hadoop.hbase.io.util.StreamUtils;
028import org.apache.hadoop.hbase.procedure2.Procedure;
029import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
030import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureLoader;
031import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
032import org.apache.yetus.audience.InterfaceAudience;
033
034import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
035
036import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
037import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
038import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALTrailer;
039
040/**
041 * Helper class that contains the WAL serialization utils.
042 * @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we
043 *             use the new region based procedure store.
044 */
045@Deprecated
046@InterfaceAudience.Private
047final class ProcedureWALFormat {
048
049  static final byte LOG_TYPE_STREAM = 0;
050  static final byte LOG_TYPE_COMPACTED = 1;
051  static final byte LOG_TYPE_MAX_VALID = 1;
052
053  static final byte HEADER_VERSION = 1;
054  static final byte TRAILER_VERSION = 1;
055  static final long HEADER_MAGIC = 0x31764c4157637250L;
056  static final long TRAILER_MAGIC = 0x50726357414c7631L;
057
058  @InterfaceAudience.Private
059  public static class InvalidWALDataException extends IOException {
060
061    private static final long serialVersionUID = 5471733223070202196L;
062
063    public InvalidWALDataException(String s) {
064      super(s);
065    }
066
067    public InvalidWALDataException(Throwable t) {
068      super(t);
069    }
070  }
071
072  interface Loader extends ProcedureLoader {
073    void markCorruptedWAL(ProcedureWALFile log, IOException e);
074  }
075
076  private ProcedureWALFormat() {}
077
078  /**
079   * Load all the procedures in these ProcedureWALFiles, and rebuild the given {@code tracker} if
080   * needed, i.e, the {@code tracker} is a partial one.
081   * <p/>
082   * The method in the give {@code loader} will be called at the end after we load all the
083   * procedures and construct the hierarchy.
084   * <p/>
085   * And we will call the {@link ProcedureStoreTracker#resetModified()} method for the given
086   * {@code tracker} before returning, as it will be used to track the next proc wal file's modified
087   * procedures.
088   */
089  public static void load(Iterator<ProcedureWALFile> logs, ProcedureStoreTracker tracker,
090      Loader loader) throws IOException {
091    ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker, loader);
092    tracker.setKeepDeletes(true);
093    // Ignore the last log which is current active log.
094    while (logs.hasNext()) {
095      ProcedureWALFile log = logs.next();
096      log.open();
097      try {
098        reader.read(log);
099      } finally {
100        log.close();
101      }
102    }
103    reader.finish();
104
105    // The tracker is now updated with all the procedures read from the logs
106    if (tracker.isPartial()) {
107      tracker.setPartialFlag(false);
108    }
109    tracker.resetModified();
110    tracker.setKeepDeletes(false);
111  }
112
113  public static void writeHeader(OutputStream stream, ProcedureWALHeader header)
114      throws IOException {
115    header.writeDelimitedTo(stream);
116  }
117
118  /*
119   * +-----------------+
120   * | END OF WAL DATA | <---+
121   * +-----------------+     |
122   * |                 |     |
123   * |     Tracker     |     |
124   * |                 |     |
125   * +-----------------+     |
126   * |     version     |     |
127   * +-----------------+     |
128   * |  TRAILER_MAGIC  |     |
129   * +-----------------+     |
130   * |      offset     |-----+
131   * +-----------------+
132   */
133  public static long writeTrailer(FSDataOutputStream stream, ProcedureStoreTracker tracker)
134      throws IOException {
135    long offset = stream.getPos();
136
137    // Write EOF Entry
138    ProcedureWALEntry.newBuilder()
139      .setType(ProcedureWALEntry.Type.PROCEDURE_WAL_EOF)
140      .build().writeDelimitedTo(stream);
141
142    // Write Tracker
143    tracker.toProto().writeDelimitedTo(stream);
144
145    stream.write(TRAILER_VERSION);
146    StreamUtils.writeLong(stream, TRAILER_MAGIC);
147    StreamUtils.writeLong(stream, offset);
148    return stream.getPos() - offset;
149  }
150
151  public static ProcedureWALHeader readHeader(InputStream stream)
152      throws IOException {
153    ProcedureWALHeader header;
154    try {
155      header = ProcedureWALHeader.parseDelimitedFrom(stream);
156    } catch (InvalidProtocolBufferException e) {
157      throw new InvalidWALDataException(e);
158    }
159
160    if (header == null) {
161      throw new InvalidWALDataException("No data available to read the Header");
162    }
163
164    if (header.getVersion() < 0 || header.getVersion() != HEADER_VERSION) {
165      throw new InvalidWALDataException("Invalid Header version. got " + header.getVersion() +
166          " expected " + HEADER_VERSION);
167    }
168
169    if (header.getType() < 0 || header.getType() > LOG_TYPE_MAX_VALID) {
170      throw new InvalidWALDataException("Invalid header type. got " + header.getType());
171    }
172
173    return header;
174  }
175
176  public static ProcedureWALTrailer readTrailer(FSDataInputStream stream, long startPos, long size)
177      throws IOException {
178    // Beginning of the Trailer Jump. 17 = 1 byte version + 8 byte magic + 8 byte offset
179    long trailerPos = size - 17;
180
181    if (trailerPos < startPos) {
182      throw new InvalidWALDataException("Missing trailer: size=" + size + " startPos=" + startPos);
183    }
184
185    stream.seek(trailerPos);
186    int version = stream.read();
187    if (version != TRAILER_VERSION) {
188      throw new InvalidWALDataException("Invalid Trailer version. got " + version +
189          " expected " + TRAILER_VERSION);
190    }
191
192    long magic = StreamUtils.readLong(stream);
193    if (magic != TRAILER_MAGIC) {
194      throw new InvalidWALDataException("Invalid Trailer magic. got " + magic +
195          " expected " + TRAILER_MAGIC);
196    }
197
198    long trailerOffset = StreamUtils.readLong(stream);
199    stream.seek(trailerOffset);
200
201    ProcedureWALEntry entry = readEntry(stream);
202    if (entry.getType() != ProcedureWALEntry.Type.PROCEDURE_WAL_EOF) {
203      throw new InvalidWALDataException("Invalid Trailer begin");
204    }
205
206    ProcedureWALTrailer trailer = ProcedureWALTrailer.newBuilder()
207      .setVersion(version)
208      .setTrackerPos(stream.getPos())
209      .build();
210    return trailer;
211  }
212
213  public static ProcedureWALEntry readEntry(InputStream stream) throws IOException {
214    return ProcedureWALEntry.parseDelimitedFrom(stream);
215  }
216
217  public static void writeEntry(ByteSlot slot, ProcedureWALEntry.Type type,
218      Procedure<?> proc, Procedure<?>[] subprocs) throws IOException {
219    final ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder();
220    builder.setType(type);
221    builder.addProcedure(ProcedureUtil.convertToProtoProcedure(proc));
222    if (subprocs != null) {
223      for (int i = 0; i < subprocs.length; ++i) {
224        builder.addProcedure(ProcedureUtil.convertToProtoProcedure(subprocs[i]));
225      }
226    }
227    builder.build().writeDelimitedTo(slot);
228  }
229
230  public static void writeInsert(ByteSlot slot, Procedure<?> proc)
231      throws IOException {
232    writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_INIT, proc, null);
233  }
234
235  public static void writeInsert(ByteSlot slot, Procedure<?> proc, Procedure<?>[] subprocs)
236      throws IOException {
237    writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_INSERT, proc, subprocs);
238  }
239
240  public static void writeUpdate(ByteSlot slot, Procedure<?> proc)
241      throws IOException {
242    writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_UPDATE, proc, null);
243  }
244
245  public static void writeDelete(ByteSlot slot, long procId)
246      throws IOException {
247    final ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder();
248    builder.setType(ProcedureWALEntry.Type.PROCEDURE_WAL_DELETE);
249    builder.setProcId(procId);
250    builder.build().writeDelimitedTo(slot);
251  }
252
253  public static void writeDelete(ByteSlot slot, Procedure<?> proc, long[] subprocs)
254      throws IOException {
255    final ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder();
256    builder.setType(ProcedureWALEntry.Type.PROCEDURE_WAL_DELETE);
257    builder.setProcId(proc.getProcId());
258    if (subprocs != null) {
259      builder.addProcedure(ProcedureUtil.convertToProtoProcedure(proc));
260      for (int i = 0; i < subprocs.length; ++i) {
261        builder.addChildId(subprocs[i]);
262      }
263    }
264    builder.build().writeDelimitedTo(slot);
265  }
266}