001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2.store.wal; 019 020import java.io.IOException; 021import java.io.InputStream; 022import java.io.OutputStream; 023import java.util.Iterator; 024import org.apache.hadoop.fs.FSDataInputStream; 025import org.apache.hadoop.fs.FSDataOutputStream; 026import org.apache.hadoop.hbase.io.util.StreamUtils; 027import org.apache.hadoop.hbase.procedure2.Procedure; 028import org.apache.hadoop.hbase.procedure2.ProcedureUtil; 029import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureLoader; 030import org.apache.hadoop.hbase.procedure2.util.ByteSlot; 031import org.apache.yetus.audience.InterfaceAudience; 032 033import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; 034 035import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALEntry; 036import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader; 037import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALTrailer; 038 039/** 040 * Helper class that contains the WAL serialization utils. 041 * @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we 042 * use the new region based procedure store. 043 */ 044@Deprecated 045@InterfaceAudience.Private 046final class ProcedureWALFormat { 047 048 static final byte LOG_TYPE_STREAM = 0; 049 static final byte LOG_TYPE_COMPACTED = 1; 050 static final byte LOG_TYPE_MAX_VALID = 1; 051 052 static final byte HEADER_VERSION = 1; 053 static final byte TRAILER_VERSION = 1; 054 static final long HEADER_MAGIC = 0x31764c4157637250L; 055 static final long TRAILER_MAGIC = 0x50726357414c7631L; 056 057 @InterfaceAudience.Private 058 public static class InvalidWALDataException extends IOException { 059 060 private static final long serialVersionUID = 5471733223070202196L; 061 062 public InvalidWALDataException(String s) { 063 super(s); 064 } 065 066 public InvalidWALDataException(Throwable t) { 067 super(t); 068 } 069 } 070 071 interface Loader extends ProcedureLoader { 072 void markCorruptedWAL(ProcedureWALFile log, IOException e); 073 } 074 075 private ProcedureWALFormat() { 076 } 077 078 /** 079 * Load all the procedures in these ProcedureWALFiles, and rebuild the given {@code tracker} if 080 * needed, i.e, the {@code tracker} is a partial one. 081 * <p/> 082 * The method in the give {@code loader} will be called at the end after we load all the 083 * procedures and construct the hierarchy. 084 * <p/> 085 * And we will call the {@link ProcedureStoreTracker#resetModified()} method for the given 086 * {@code tracker} before returning, as it will be used to track the next proc wal file's modified 087 * procedures. 088 */ 089 public static void load(Iterator<ProcedureWALFile> logs, ProcedureStoreTracker tracker, 090 Loader loader) throws IOException { 091 ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker, loader); 092 tracker.setKeepDeletes(true); 093 // Ignore the last log which is current active log. 094 while (logs.hasNext()) { 095 ProcedureWALFile log = logs.next(); 096 log.open(); 097 try { 098 reader.read(log); 099 } finally { 100 log.close(); 101 } 102 } 103 reader.finish(); 104 105 // The tracker is now updated with all the procedures read from the logs 106 if (tracker.isPartial()) { 107 tracker.setPartialFlag(false); 108 } 109 tracker.resetModified(); 110 tracker.setKeepDeletes(false); 111 } 112 113 public static void writeHeader(OutputStream stream, ProcedureWALHeader header) 114 throws IOException { 115 header.writeDelimitedTo(stream); 116 } 117 118 /* 119 * +-----------------+ | END OF WAL DATA | <---+ +-----------------+ | | | | | Tracker | | | | | 120 * +-----------------+ | | version | | +-----------------+ | | TRAILER_MAGIC | | 121 * +-----------------+ | | offset |-----+ +-----------------+ 122 */ 123 public static long writeTrailer(FSDataOutputStream stream, ProcedureStoreTracker tracker) 124 throws IOException { 125 long offset = stream.getPos(); 126 127 // Write EOF Entry 128 ProcedureWALEntry.newBuilder().setType(ProcedureWALEntry.Type.PROCEDURE_WAL_EOF).build() 129 .writeDelimitedTo(stream); 130 131 // Write Tracker 132 tracker.toProto().writeDelimitedTo(stream); 133 134 stream.write(TRAILER_VERSION); 135 StreamUtils.writeLong(stream, TRAILER_MAGIC); 136 StreamUtils.writeLong(stream, offset); 137 return stream.getPos() - offset; 138 } 139 140 public static ProcedureWALHeader readHeader(InputStream stream) throws IOException { 141 ProcedureWALHeader header; 142 try { 143 header = ProcedureWALHeader.parseDelimitedFrom(stream); 144 } catch (InvalidProtocolBufferException e) { 145 throw new InvalidWALDataException(e); 146 } 147 148 if (header == null) { 149 throw new InvalidWALDataException("No data available to read the Header"); 150 } 151 152 if (header.getVersion() < 0 || header.getVersion() != HEADER_VERSION) { 153 throw new InvalidWALDataException( 154 "Invalid Header version. got " + header.getVersion() + " expected " + HEADER_VERSION); 155 } 156 157 if (header.getType() < 0 || header.getType() > LOG_TYPE_MAX_VALID) { 158 throw new InvalidWALDataException("Invalid header type. got " + header.getType()); 159 } 160 161 return header; 162 } 163 164 public static ProcedureWALTrailer readTrailer(FSDataInputStream stream, long startPos, long size) 165 throws IOException { 166 // Beginning of the Trailer Jump. 17 = 1 byte version + 8 byte magic + 8 byte offset 167 long trailerPos = size - 17; 168 169 if (trailerPos < startPos) { 170 throw new InvalidWALDataException("Missing trailer: size=" + size + " startPos=" + startPos); 171 } 172 173 stream.seek(trailerPos); 174 int version = stream.read(); 175 if (version != TRAILER_VERSION) { 176 throw new InvalidWALDataException( 177 "Invalid Trailer version. got " + version + " expected " + TRAILER_VERSION); 178 } 179 180 long magic = StreamUtils.readLong(stream); 181 if (magic != TRAILER_MAGIC) { 182 throw new InvalidWALDataException( 183 "Invalid Trailer magic. got " + magic + " expected " + TRAILER_MAGIC); 184 } 185 186 long trailerOffset = StreamUtils.readLong(stream); 187 stream.seek(trailerOffset); 188 189 ProcedureWALEntry entry = readEntry(stream); 190 if (entry.getType() != ProcedureWALEntry.Type.PROCEDURE_WAL_EOF) { 191 throw new InvalidWALDataException("Invalid Trailer begin"); 192 } 193 194 ProcedureWALTrailer trailer = 195 ProcedureWALTrailer.newBuilder().setVersion(version).setTrackerPos(stream.getPos()).build(); 196 return trailer; 197 } 198 199 public static ProcedureWALEntry readEntry(InputStream stream) throws IOException { 200 return ProcedureWALEntry.parseDelimitedFrom(stream); 201 } 202 203 public static void writeEntry(ByteSlot slot, ProcedureWALEntry.Type type, Procedure<?> proc, 204 Procedure<?>[] subprocs) throws IOException { 205 final ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder(); 206 builder.setType(type); 207 builder.addProcedure(ProcedureUtil.convertToProtoProcedure(proc)); 208 if (subprocs != null) { 209 for (int i = 0; i < subprocs.length; ++i) { 210 builder.addProcedure(ProcedureUtil.convertToProtoProcedure(subprocs[i])); 211 } 212 } 213 builder.build().writeDelimitedTo(slot); 214 } 215 216 public static void writeInsert(ByteSlot slot, Procedure<?> proc) throws IOException { 217 writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_INIT, proc, null); 218 } 219 220 public static void writeInsert(ByteSlot slot, Procedure<?> proc, Procedure<?>[] subprocs) 221 throws IOException { 222 writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_INSERT, proc, subprocs); 223 } 224 225 public static void writeUpdate(ByteSlot slot, Procedure<?> proc) throws IOException { 226 writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_UPDATE, proc, null); 227 } 228 229 public static void writeDelete(ByteSlot slot, long procId) throws IOException { 230 final ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder(); 231 builder.setType(ProcedureWALEntry.Type.PROCEDURE_WAL_DELETE); 232 builder.setProcId(procId); 233 builder.build().writeDelimitedTo(slot); 234 } 235 236 public static void writeDelete(ByteSlot slot, Procedure<?> proc, long[] subprocs) 237 throws IOException { 238 final ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder(); 239 builder.setType(ProcedureWALEntry.Type.PROCEDURE_WAL_DELETE); 240 builder.setProcId(proc.getProcId()); 241 if (subprocs != null) { 242 builder.addProcedure(ProcedureUtil.convertToProtoProcedure(proc)); 243 for (int i = 0; i < subprocs.length; ++i) { 244 builder.addChildId(subprocs[i]); 245 } 246 } 247 builder.build().writeDelimitedTo(slot); 248 } 249}