001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2.store.wal; 019 020import java.io.IOException; 021import org.apache.hadoop.fs.FSDataInputStream; 022import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; 023import org.apache.yetus.audience.InterfaceAudience; 024import org.slf4j.Logger; 025import org.slf4j.LoggerFactory; 026 027import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; 028 029import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 030import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALEntry; 031 032/** 033 * Helper class that loads the procedures stored in a WAL. 034 */ 035@InterfaceAudience.Private 036public class ProcedureWALFormatReader { 037 private static final Logger LOG = LoggerFactory.getLogger(ProcedureWALFormatReader.class); 038 039 /** 040 * We will use the localProcedureMap to track the active procedures for the current proc wal file, 041 * and when we finished reading one proc wal file, we will merge he localProcedureMap to the 042 * procedureMap, which tracks the global active procedures. 043 * <p/> 044 * See the comments of {@link WALProcedureMap} for more details. 045 * <p/> 046 * After reading all the proc wal files, we will use the procedures in the procedureMap to build a 047 * {@link WALProcedureTree}, and then give the result to the upper layer. See the comments of 048 * {@link WALProcedureTree} and the code in {@link #finish()} for more details. 049 */ 050 private final WALProcedureMap localProcedureMap = new WALProcedureMap(); 051 private final WALProcedureMap procedureMap = new WALProcedureMap(); 052 053 private final ProcedureWALFormat.Loader loader; 054 055 /** 056 * Global tracker that will be used by the WALProcedureStore after load. 057 * If the last WAL was closed cleanly we already have a full tracker ready to be used. 058 * If the last WAL was truncated (e.g. master killed) the tracker will be empty 059 * and the 'partial' flag will be set. In this case, on WAL replay we are going 060 * to rebuild the tracker. 061 */ 062 private final ProcedureStoreTracker tracker; 063 064 /** 065 * If tracker for a log file is partial (see {@link ProcedureStoreTracker#partial}), we re-build 066 * the list of procedures modified in that WAL because we need it for log cleaning purposes. If 067 * all procedures modified in a WAL are found to be obsolete, it can be safely deleted. (see 068 * {@link WALProcedureStore#removeInactiveLogs()}). 069 * <p/> 070 * Notice that, the deleted part for this tracker will not be global valid as we can only count 071 * the deletes in the current file, but it is not big problem as finally, the above tracker will 072 * have the global state of deleted, and it will also be used to build the cleanup tracker. 073 */ 074 private ProcedureStoreTracker localTracker; 075 076 private long maxProcId = 0; 077 078 public ProcedureWALFormatReader(final ProcedureStoreTracker tracker, 079 ProcedureWALFormat.Loader loader) { 080 this.tracker = tracker; 081 this.loader = loader; 082 } 083 084 public void read(ProcedureWALFile log) throws IOException { 085 localTracker = log.getTracker(); 086 if (localTracker.isPartial()) { 087 LOG.info("Rebuilding tracker for {}", log); 088 } 089 090 long count = 0; 091 FSDataInputStream stream = log.getStream(); 092 try { 093 boolean hasMore = true; 094 while (hasMore) { 095 ProcedureWALEntry entry = ProcedureWALFormat.readEntry(stream); 096 if (entry == null) { 097 LOG.warn("Nothing left to decode. Exiting with missing EOF, log={}", log); 098 break; 099 } 100 count++; 101 switch (entry.getType()) { 102 case PROCEDURE_WAL_INIT: 103 readInitEntry(entry); 104 break; 105 case PROCEDURE_WAL_INSERT: 106 readInsertEntry(entry); 107 break; 108 case PROCEDURE_WAL_UPDATE: 109 case PROCEDURE_WAL_COMPACT: 110 readUpdateEntry(entry); 111 break; 112 case PROCEDURE_WAL_DELETE: 113 readDeleteEntry(entry); 114 break; 115 case PROCEDURE_WAL_EOF: 116 hasMore = false; 117 break; 118 default: 119 throw new CorruptedWALProcedureStoreException("Invalid entry: " + entry); 120 } 121 } 122 LOG.info("Read {} entries in {}", count, log); 123 } catch (InvalidProtocolBufferException e) { 124 LOG.error("While reading entry #{} in {}", count, log, e); 125 loader.markCorruptedWAL(log, e); 126 } 127 128 if (!localProcedureMap.isEmpty()) { 129 log.setProcIds(localProcedureMap.getMinModifiedProcId(), 130 localProcedureMap.getMaxModifiedProcId()); 131 if (localTracker.isPartial()) { 132 localTracker.setMinMaxModifiedProcIds(localProcedureMap.getMinModifiedProcId(), 133 localProcedureMap.getMaxModifiedProcId()); 134 } 135 procedureMap.merge(localProcedureMap); 136 } 137 // Do not reset the partial flag for local tracker, as here the local tracker only know the 138 // procedures which are modified in this file. 139 } 140 141 public void finish() throws IOException { 142 // notify the loader about the max proc ID 143 loader.setMaxProcId(maxProcId); 144 145 // build the procedure execution tree. When building we will verify that whether a procedure is 146 // valid. 147 WALProcedureTree tree = WALProcedureTree.build(procedureMap.getProcedures()); 148 loader.load(tree.getValidProcs()); 149 loader.handleCorrupted(tree.getCorruptedProcs()); 150 } 151 152 private void setDeletedIfPartial(ProcedureStoreTracker tracker, long procId) { 153 if (tracker.isPartial()) { 154 tracker.setDeleted(procId, true); 155 } 156 } 157 158 private void insertIfPartial(ProcedureStoreTracker tracker, ProcedureProtos.Procedure proc) { 159 if (tracker.isPartial()) { 160 tracker.insert(proc.getProcId()); 161 } 162 } 163 164 private void loadProcedure(ProcedureWALEntry entry, ProcedureProtos.Procedure proc) { 165 maxProcId = Math.max(maxProcId, proc.getProcId()); 166 if (isRequired(proc.getProcId())) { 167 LOG.trace("Read {} entry {}", entry.getType(), proc.getProcId()); 168 localProcedureMap.add(proc); 169 insertIfPartial(tracker, proc); 170 } 171 insertIfPartial(localTracker, proc); 172 } 173 174 private void readInitEntry(ProcedureWALEntry entry) { 175 assert entry.getProcedureCount() == 1 : "Expected only one procedure"; 176 loadProcedure(entry, entry.getProcedure(0)); 177 } 178 179 private void readInsertEntry(ProcedureWALEntry entry) { 180 assert entry.getProcedureCount() >= 1 : "Expected one or more procedures"; 181 loadProcedure(entry, entry.getProcedure(0)); 182 for (int i = 1; i < entry.getProcedureCount(); ++i) { 183 loadProcedure(entry, entry.getProcedure(i)); 184 } 185 } 186 187 private void readUpdateEntry(ProcedureWALEntry entry) { 188 assert entry.getProcedureCount() == 1 : "Expected only one procedure"; 189 loadProcedure(entry, entry.getProcedure(0)); 190 } 191 192 private void readDeleteEntry(ProcedureWALEntry entry) { 193 assert entry.hasProcId() : "expected ProcID"; 194 195 if (entry.getChildIdCount() > 0) { 196 assert entry.getProcedureCount() == 1 : "Expected only one procedure"; 197 198 // update the parent procedure 199 loadProcedure(entry, entry.getProcedure(0)); 200 201 // remove the child procedures of entry.getProcId() 202 for (int i = 0, count = entry.getChildIdCount(); i < count; ++i) { 203 deleteEntry(entry.getChildId(i)); 204 } 205 } else { 206 assert entry.getProcedureCount() == 0 : "Expected no procedures"; 207 208 // delete the procedure 209 deleteEntry(entry.getProcId()); 210 } 211 } 212 213 private void deleteEntry(final long procId) { 214 LOG.trace("delete entry {}", procId); 215 maxProcId = Math.max(maxProcId, procId); 216 localProcedureMap.remove(procId); 217 assert !procedureMap.contains(procId); 218 setDeletedIfPartial(tracker, procId); 219 setDeletedIfPartial(localTracker, procId); 220 } 221 222 private boolean isDeleted(long procId) { 223 return tracker.isDeleted(procId) == ProcedureStoreTracker.DeleteState.YES; 224 } 225 226 private boolean isRequired(long procId) { 227 return !isDeleted(procId) && !procedureMap.contains(procId); 228 } 229}