001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.procedure2.store.wal; 020 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.OutputStream; 024import java.util.Iterator; 025import org.apache.hadoop.fs.FSDataInputStream; 026import org.apache.hadoop.fs.FSDataOutputStream; 027import org.apache.hadoop.hbase.io.util.StreamUtils; 028import org.apache.hadoop.hbase.procedure2.Procedure; 029import org.apache.hadoop.hbase.procedure2.ProcedureUtil; 030import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureLoader; 031import org.apache.hadoop.hbase.procedure2.util.ByteSlot; 032import org.apache.yetus.audience.InterfaceAudience; 033 034import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; 035 036import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALEntry; 037import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader; 038import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALTrailer; 039 040/** 041 * Helper class that contains the WAL serialization utils. 042 * @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we 043 * use the new region based procedure store. 044 */ 045@Deprecated 046@InterfaceAudience.Private 047final class ProcedureWALFormat { 048 049 static final byte LOG_TYPE_STREAM = 0; 050 static final byte LOG_TYPE_COMPACTED = 1; 051 static final byte LOG_TYPE_MAX_VALID = 1; 052 053 static final byte HEADER_VERSION = 1; 054 static final byte TRAILER_VERSION = 1; 055 static final long HEADER_MAGIC = 0x31764c4157637250L; 056 static final long TRAILER_MAGIC = 0x50726357414c7631L; 057 058 @InterfaceAudience.Private 059 public static class InvalidWALDataException extends IOException { 060 061 private static final long serialVersionUID = 5471733223070202196L; 062 063 public InvalidWALDataException(String s) { 064 super(s); 065 } 066 067 public InvalidWALDataException(Throwable t) { 068 super(t); 069 } 070 } 071 072 interface Loader extends ProcedureLoader { 073 void markCorruptedWAL(ProcedureWALFile log, IOException e); 074 } 075 076 private ProcedureWALFormat() {} 077 078 /** 079 * Load all the procedures in these ProcedureWALFiles, and rebuild the given {@code tracker} if 080 * needed, i.e, the {@code tracker} is a partial one. 081 * <p/> 082 * The method in the give {@code loader} will be called at the end after we load all the 083 * procedures and construct the hierarchy. 084 * <p/> 085 * And we will call the {@link ProcedureStoreTracker#resetModified()} method for the given 086 * {@code tracker} before returning, as it will be used to track the next proc wal file's modified 087 * procedures. 088 */ 089 public static void load(Iterator<ProcedureWALFile> logs, ProcedureStoreTracker tracker, 090 Loader loader) throws IOException { 091 ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker, loader); 092 tracker.setKeepDeletes(true); 093 // Ignore the last log which is current active log. 094 while (logs.hasNext()) { 095 ProcedureWALFile log = logs.next(); 096 log.open(); 097 try { 098 reader.read(log); 099 } finally { 100 log.close(); 101 } 102 } 103 reader.finish(); 104 105 // The tracker is now updated with all the procedures read from the logs 106 if (tracker.isPartial()) { 107 tracker.setPartialFlag(false); 108 } 109 tracker.resetModified(); 110 tracker.setKeepDeletes(false); 111 } 112 113 public static void writeHeader(OutputStream stream, ProcedureWALHeader header) 114 throws IOException { 115 header.writeDelimitedTo(stream); 116 } 117 118 /* 119 * +-----------------+ 120 * | END OF WAL DATA | <---+ 121 * +-----------------+ | 122 * | | | 123 * | Tracker | | 124 * | | | 125 * +-----------------+ | 126 * | version | | 127 * +-----------------+ | 128 * | TRAILER_MAGIC | | 129 * +-----------------+ | 130 * | offset |-----+ 131 * +-----------------+ 132 */ 133 public static long writeTrailer(FSDataOutputStream stream, ProcedureStoreTracker tracker) 134 throws IOException { 135 long offset = stream.getPos(); 136 137 // Write EOF Entry 138 ProcedureWALEntry.newBuilder() 139 .setType(ProcedureWALEntry.Type.PROCEDURE_WAL_EOF) 140 .build().writeDelimitedTo(stream); 141 142 // Write Tracker 143 tracker.toProto().writeDelimitedTo(stream); 144 145 stream.write(TRAILER_VERSION); 146 StreamUtils.writeLong(stream, TRAILER_MAGIC); 147 StreamUtils.writeLong(stream, offset); 148 return stream.getPos() - offset; 149 } 150 151 public static ProcedureWALHeader readHeader(InputStream stream) 152 throws IOException { 153 ProcedureWALHeader header; 154 try { 155 header = ProcedureWALHeader.parseDelimitedFrom(stream); 156 } catch (InvalidProtocolBufferException e) { 157 throw new InvalidWALDataException(e); 158 } 159 160 if (header == null) { 161 throw new InvalidWALDataException("No data available to read the Header"); 162 } 163 164 if (header.getVersion() < 0 || header.getVersion() != HEADER_VERSION) { 165 throw new InvalidWALDataException("Invalid Header version. got " + header.getVersion() + 166 " expected " + HEADER_VERSION); 167 } 168 169 if (header.getType() < 0 || header.getType() > LOG_TYPE_MAX_VALID) { 170 throw new InvalidWALDataException("Invalid header type. got " + header.getType()); 171 } 172 173 return header; 174 } 175 176 public static ProcedureWALTrailer readTrailer(FSDataInputStream stream, long startPos, long size) 177 throws IOException { 178 // Beginning of the Trailer Jump. 17 = 1 byte version + 8 byte magic + 8 byte offset 179 long trailerPos = size - 17; 180 181 if (trailerPos < startPos) { 182 throw new InvalidWALDataException("Missing trailer: size=" + size + " startPos=" + startPos); 183 } 184 185 stream.seek(trailerPos); 186 int version = stream.read(); 187 if (version != TRAILER_VERSION) { 188 throw new InvalidWALDataException("Invalid Trailer version. got " + version + 189 " expected " + TRAILER_VERSION); 190 } 191 192 long magic = StreamUtils.readLong(stream); 193 if (magic != TRAILER_MAGIC) { 194 throw new InvalidWALDataException("Invalid Trailer magic. got " + magic + 195 " expected " + TRAILER_MAGIC); 196 } 197 198 long trailerOffset = StreamUtils.readLong(stream); 199 stream.seek(trailerOffset); 200 201 ProcedureWALEntry entry = readEntry(stream); 202 if (entry.getType() != ProcedureWALEntry.Type.PROCEDURE_WAL_EOF) { 203 throw new InvalidWALDataException("Invalid Trailer begin"); 204 } 205 206 ProcedureWALTrailer trailer = ProcedureWALTrailer.newBuilder() 207 .setVersion(version) 208 .setTrackerPos(stream.getPos()) 209 .build(); 210 return trailer; 211 } 212 213 public static ProcedureWALEntry readEntry(InputStream stream) throws IOException { 214 return ProcedureWALEntry.parseDelimitedFrom(stream); 215 } 216 217 public static void writeEntry(ByteSlot slot, ProcedureWALEntry.Type type, 218 Procedure<?> proc, Procedure<?>[] subprocs) throws IOException { 219 final ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder(); 220 builder.setType(type); 221 builder.addProcedure(ProcedureUtil.convertToProtoProcedure(proc)); 222 if (subprocs != null) { 223 for (int i = 0; i < subprocs.length; ++i) { 224 builder.addProcedure(ProcedureUtil.convertToProtoProcedure(subprocs[i])); 225 } 226 } 227 builder.build().writeDelimitedTo(slot); 228 } 229 230 public static void writeInsert(ByteSlot slot, Procedure<?> proc) 231 throws IOException { 232 writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_INIT, proc, null); 233 } 234 235 public static void writeInsert(ByteSlot slot, Procedure<?> proc, Procedure<?>[] subprocs) 236 throws IOException { 237 writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_INSERT, proc, subprocs); 238 } 239 240 public static void writeUpdate(ByteSlot slot, Procedure<?> proc) 241 throws IOException { 242 writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_UPDATE, proc, null); 243 } 244 245 public static void writeDelete(ByteSlot slot, long procId) 246 throws IOException { 247 final ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder(); 248 builder.setType(ProcedureWALEntry.Type.PROCEDURE_WAL_DELETE); 249 builder.setProcId(procId); 250 builder.build().writeDelimitedTo(slot); 251 } 252 253 public static void writeDelete(ByteSlot slot, Procedure<?> proc, long[] subprocs) 254 throws IOException { 255 final ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder(); 256 builder.setType(ProcedureWALEntry.Type.PROCEDURE_WAL_DELETE); 257 builder.setProcId(proc.getProcId()); 258 if (subprocs != null) { 259 builder.addProcedure(ProcedureUtil.convertToProtoProcedure(proc)); 260 for (int i = 0; i < subprocs.length; ++i) { 261 builder.addChildId(subprocs[i]); 262 } 263 } 264 builder.build().writeDelimitedTo(slot); 265 } 266}