001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.procedure2.store.wal; 020 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.OutputStream; 024import java.util.Iterator; 025import org.apache.hadoop.fs.FSDataInputStream; 026import org.apache.hadoop.fs.FSDataOutputStream; 027import org.apache.hadoop.hbase.io.util.StreamUtils; 028import org.apache.hadoop.hbase.procedure2.Procedure; 029import org.apache.hadoop.hbase.procedure2.ProcedureUtil; 030import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureLoader; 031import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; 032import org.apache.hadoop.hbase.procedure2.util.ByteSlot; 033import org.apache.yetus.audience.InterfaceAudience; 034 035import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; 036 037import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALEntry; 038import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader; 039import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALTrailer; 040 041/** 042 * Helper class that contains the WAL serialization utils. 043 */ 044@InterfaceAudience.Private 045public final class ProcedureWALFormat { 046 047 static final byte LOG_TYPE_STREAM = 0; 048 static final byte LOG_TYPE_COMPACTED = 1; 049 static final byte LOG_TYPE_MAX_VALID = 1; 050 051 static final byte HEADER_VERSION = 1; 052 static final byte TRAILER_VERSION = 1; 053 static final long HEADER_MAGIC = 0x31764c4157637250L; 054 static final long TRAILER_MAGIC = 0x50726357414c7631L; 055 056 @InterfaceAudience.Private 057 public static class InvalidWALDataException extends IOException { 058 059 private static final long serialVersionUID = 5471733223070202196L; 060 061 public InvalidWALDataException(String s) { 062 super(s); 063 } 064 065 public InvalidWALDataException(Throwable t) { 066 super(t); 067 } 068 } 069 070 interface Loader extends ProcedureLoader { 071 void markCorruptedWAL(ProcedureWALFile log, IOException e); 072 } 073 074 private ProcedureWALFormat() {} 075 076 /** 077 * Load all the procedures in these ProcedureWALFiles, and rebuild the given {@code tracker} if 078 * needed, i.e, the {@code tracker} is a partial one. 079 * <p/> 080 * The method in the give {@code loader} will be called at the end after we load all the 081 * procedures and construct the hierarchy. 082 * <p/> 083 * And we will call the {@link ProcedureStoreTracker#resetModified()} method for the given 084 * {@code tracker} before returning, as it will be used to track the next proc wal file's modified 085 * procedures. 086 */ 087 public static void load(Iterator<ProcedureWALFile> logs, ProcedureStoreTracker tracker, 088 Loader loader) throws IOException { 089 ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker, loader); 090 tracker.setKeepDeletes(true); 091 // Ignore the last log which is current active log. 092 while (logs.hasNext()) { 093 ProcedureWALFile log = logs.next(); 094 log.open(); 095 try { 096 reader.read(log); 097 } finally { 098 log.close(); 099 } 100 } 101 reader.finish(); 102 103 // The tracker is now updated with all the procedures read from the logs 104 if (tracker.isPartial()) { 105 tracker.setPartialFlag(false); 106 } 107 tracker.resetModified(); 108 tracker.setKeepDeletes(false); 109 } 110 111 public static void writeHeader(OutputStream stream, ProcedureWALHeader header) 112 throws IOException { 113 header.writeDelimitedTo(stream); 114 } 115 116 /* 117 * +-----------------+ 118 * | END OF WAL DATA | <---+ 119 * +-----------------+ | 120 * | | | 121 * | Tracker | | 122 * | | | 123 * +-----------------+ | 124 * | version | | 125 * +-----------------+ | 126 * | TRAILER_MAGIC | | 127 * +-----------------+ | 128 * | offset |-----+ 129 * +-----------------+ 130 */ 131 public static long writeTrailer(FSDataOutputStream stream, ProcedureStoreTracker tracker) 132 throws IOException { 133 long offset = stream.getPos(); 134 135 // Write EOF Entry 136 ProcedureWALEntry.newBuilder() 137 .setType(ProcedureWALEntry.Type.PROCEDURE_WAL_EOF) 138 .build().writeDelimitedTo(stream); 139 140 // Write Tracker 141 tracker.toProto().writeDelimitedTo(stream); 142 143 stream.write(TRAILER_VERSION); 144 StreamUtils.writeLong(stream, TRAILER_MAGIC); 145 StreamUtils.writeLong(stream, offset); 146 return stream.getPos() - offset; 147 } 148 149 public static ProcedureWALHeader readHeader(InputStream stream) 150 throws IOException { 151 ProcedureWALHeader header; 152 try { 153 header = ProcedureWALHeader.parseDelimitedFrom(stream); 154 } catch (InvalidProtocolBufferException e) { 155 throw new InvalidWALDataException(e); 156 } 157 158 if (header == null) { 159 throw new InvalidWALDataException("No data available to read the Header"); 160 } 161 162 if (header.getVersion() < 0 || header.getVersion() != HEADER_VERSION) { 163 throw new InvalidWALDataException("Invalid Header version. got " + header.getVersion() + 164 " expected " + HEADER_VERSION); 165 } 166 167 if (header.getType() < 0 || header.getType() > LOG_TYPE_MAX_VALID) { 168 throw new InvalidWALDataException("Invalid header type. got " + header.getType()); 169 } 170 171 return header; 172 } 173 174 public static ProcedureWALTrailer readTrailer(FSDataInputStream stream, long startPos, long size) 175 throws IOException { 176 // Beginning of the Trailer Jump. 17 = 1 byte version + 8 byte magic + 8 byte offset 177 long trailerPos = size - 17; 178 179 if (trailerPos < startPos) { 180 throw new InvalidWALDataException("Missing trailer: size=" + size + " startPos=" + startPos); 181 } 182 183 stream.seek(trailerPos); 184 int version = stream.read(); 185 if (version != TRAILER_VERSION) { 186 throw new InvalidWALDataException("Invalid Trailer version. got " + version + 187 " expected " + TRAILER_VERSION); 188 } 189 190 long magic = StreamUtils.readLong(stream); 191 if (magic != TRAILER_MAGIC) { 192 throw new InvalidWALDataException("Invalid Trailer magic. got " + magic + 193 " expected " + TRAILER_MAGIC); 194 } 195 196 long trailerOffset = StreamUtils.readLong(stream); 197 stream.seek(trailerOffset); 198 199 ProcedureWALEntry entry = readEntry(stream); 200 if (entry.getType() != ProcedureWALEntry.Type.PROCEDURE_WAL_EOF) { 201 throw new InvalidWALDataException("Invalid Trailer begin"); 202 } 203 204 ProcedureWALTrailer trailer = ProcedureWALTrailer.newBuilder() 205 .setVersion(version) 206 .setTrackerPos(stream.getPos()) 207 .build(); 208 return trailer; 209 } 210 211 public static ProcedureWALEntry readEntry(InputStream stream) throws IOException { 212 return ProcedureWALEntry.parseDelimitedFrom(stream); 213 } 214 215 public static void writeEntry(ByteSlot slot, ProcedureWALEntry.Type type, 216 Procedure<?> proc, Procedure<?>[] subprocs) throws IOException { 217 final ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder(); 218 builder.setType(type); 219 builder.addProcedure(ProcedureUtil.convertToProtoProcedure(proc)); 220 if (subprocs != null) { 221 for (int i = 0; i < subprocs.length; ++i) { 222 builder.addProcedure(ProcedureUtil.convertToProtoProcedure(subprocs[i])); 223 } 224 } 225 builder.build().writeDelimitedTo(slot); 226 } 227 228 public static void writeInsert(ByteSlot slot, Procedure<?> proc) 229 throws IOException { 230 writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_INIT, proc, null); 231 } 232 233 public static void writeInsert(ByteSlot slot, Procedure<?> proc, Procedure<?>[] subprocs) 234 throws IOException { 235 writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_INSERT, proc, subprocs); 236 } 237 238 public static void writeUpdate(ByteSlot slot, Procedure<?> proc) 239 throws IOException { 240 writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_UPDATE, proc, null); 241 } 242 243 public static void writeDelete(ByteSlot slot, long procId) 244 throws IOException { 245 final ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder(); 246 builder.setType(ProcedureWALEntry.Type.PROCEDURE_WAL_DELETE); 247 builder.setProcId(procId); 248 builder.build().writeDelimitedTo(slot); 249 } 250 251 public static void writeDelete(ByteSlot slot, Procedure<?> proc, long[] subprocs) 252 throws IOException { 253 final ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder(); 254 builder.setType(ProcedureWALEntry.Type.PROCEDURE_WAL_DELETE); 255 builder.setProcId(proc.getProcId()); 256 if (subprocs != null) { 257 builder.addProcedure(ProcedureUtil.convertToProtoProcedure(proc)); 258 for (int i = 0; i < subprocs.length; ++i) { 259 builder.addChildId(subprocs[i]); 260 } 261 } 262 builder.build().writeDelimitedTo(slot); 263 } 264}