001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2.store.wal; 019 020import java.io.IOException; 021import org.apache.hadoop.fs.FSDataInputStream; 022import org.apache.hadoop.hbase.procedure2.store.ProcedureTree; 023import org.apache.yetus.audience.InterfaceAudience; 024import org.slf4j.Logger; 025import org.slf4j.LoggerFactory; 026 027import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; 028 029import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 030import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALEntry; 031 032/** 033 * Helper class that loads the procedures stored in a WAL. 034 * @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we 035 * use the new region based procedure store. 036 */ 037@Deprecated 038@InterfaceAudience.Private 039class ProcedureWALFormatReader { 040 private static final Logger LOG = LoggerFactory.getLogger(ProcedureWALFormatReader.class); 041 042 /** 043 * We will use the localProcedureMap to track the active procedures for the current proc wal file, 044 * and when we finished reading one proc wal file, we will merge he localProcedureMap to the 045 * procedureMap, which tracks the global active procedures. 046 * <p/> 047 * See the comments of {@link WALProcedureMap} for more details. 048 * <p/> 049 * After reading all the proc wal files, we will use the procedures in the procedureMap to build a 050 * {@link ProcedureTree}, and then give the result to the upper layer. See the comments of 051 * {@link ProcedureTree} and the code in {@link #finish()} for more details. 052 */ 053 private final WALProcedureMap localProcedureMap = new WALProcedureMap(); 054 private final WALProcedureMap procedureMap = new WALProcedureMap(); 055 056 private final ProcedureWALFormat.Loader loader; 057 058 /** 059 * Global tracker that will be used by the WALProcedureStore after load. 060 * If the last WAL was closed cleanly we already have a full tracker ready to be used. 061 * If the last WAL was truncated (e.g. master killed) the tracker will be empty 062 * and the 'partial' flag will be set. In this case, on WAL replay we are going 063 * to rebuild the tracker. 064 */ 065 private final ProcedureStoreTracker tracker; 066 067 /** 068 * If tracker for a log file is partial (see {@link ProcedureStoreTracker#partial}), we re-build 069 * the list of procedures modified in that WAL because we need it for log cleaning purposes. If 070 * all procedures modified in a WAL are found to be obsolete, it can be safely deleted. (see 071 * {@link WALProcedureStore#removeInactiveLogs()}). 072 * <p/> 073 * Notice that, the deleted part for this tracker will not be global valid as we can only count 074 * the deletes in the current file, but it is not big problem as finally, the above tracker will 075 * have the global state of deleted, and it will also be used to build the cleanup tracker. 076 */ 077 private ProcedureStoreTracker localTracker; 078 079 private long maxProcId = 0; 080 081 public ProcedureWALFormatReader(final ProcedureStoreTracker tracker, 082 ProcedureWALFormat.Loader loader) { 083 this.tracker = tracker; 084 this.loader = loader; 085 } 086 087 public void read(ProcedureWALFile log) throws IOException { 088 localTracker = log.getTracker(); 089 if (localTracker.isPartial()) { 090 LOG.info("Rebuilding tracker for {}", log); 091 } 092 093 long count = 0; 094 FSDataInputStream stream = log.getStream(); 095 try { 096 boolean hasMore = true; 097 while (hasMore) { 098 ProcedureWALEntry entry = ProcedureWALFormat.readEntry(stream); 099 if (entry == null) { 100 LOG.warn("Nothing left to decode. Exiting with missing EOF, log={}", log); 101 break; 102 } 103 count++; 104 switch (entry.getType()) { 105 case PROCEDURE_WAL_INIT: 106 readInitEntry(entry); 107 break; 108 case PROCEDURE_WAL_INSERT: 109 readInsertEntry(entry); 110 break; 111 case PROCEDURE_WAL_UPDATE: 112 case PROCEDURE_WAL_COMPACT: 113 readUpdateEntry(entry); 114 break; 115 case PROCEDURE_WAL_DELETE: 116 readDeleteEntry(entry); 117 break; 118 case PROCEDURE_WAL_EOF: 119 hasMore = false; 120 break; 121 default: 122 throw new CorruptedWALProcedureStoreException("Invalid entry: " + entry); 123 } 124 } 125 LOG.info("Read {} entries in {}", count, log); 126 } catch (InvalidProtocolBufferException e) { 127 LOG.error("While reading entry #{} in {}", count, log, e); 128 loader.markCorruptedWAL(log, e); 129 } 130 131 if (!localProcedureMap.isEmpty()) { 132 log.setProcIds(localProcedureMap.getMinModifiedProcId(), 133 localProcedureMap.getMaxModifiedProcId()); 134 if (localTracker.isPartial()) { 135 localTracker.setMinMaxModifiedProcIds(localProcedureMap.getMinModifiedProcId(), 136 localProcedureMap.getMaxModifiedProcId()); 137 } 138 procedureMap.merge(localProcedureMap); 139 } 140 // Do not reset the partial flag for local tracker, as here the local tracker only know the 141 // procedures which are modified in this file. 142 } 143 144 public void finish() throws IOException { 145 // notify the loader about the max proc ID 146 loader.setMaxProcId(maxProcId); 147 148 // build the procedure execution tree. When building we will verify that whether a procedure is 149 // valid. 150 ProcedureTree tree = ProcedureTree.build(procedureMap.getProcedures()); 151 loader.load(tree.getValidProcs()); 152 loader.handleCorrupted(tree.getCorruptedProcs()); 153 } 154 155 private void setDeletedIfPartial(ProcedureStoreTracker tracker, long procId) { 156 if (tracker.isPartial()) { 157 tracker.setDeleted(procId, true); 158 } 159 } 160 161 private void insertIfPartial(ProcedureStoreTracker tracker, ProcedureProtos.Procedure proc) { 162 if (tracker.isPartial()) { 163 tracker.insert(proc.getProcId()); 164 } 165 } 166 167 private void loadProcedure(ProcedureWALEntry entry, ProcedureProtos.Procedure proc) { 168 maxProcId = Math.max(maxProcId, proc.getProcId()); 169 if (isRequired(proc.getProcId())) { 170 LOG.trace("Read {} entry {}", entry.getType(), proc.getProcId()); 171 localProcedureMap.add(proc); 172 insertIfPartial(tracker, proc); 173 } 174 insertIfPartial(localTracker, proc); 175 } 176 177 private void readInitEntry(ProcedureWALEntry entry) { 178 assert entry.getProcedureCount() == 1 : "Expected only one procedure"; 179 loadProcedure(entry, entry.getProcedure(0)); 180 } 181 182 private void readInsertEntry(ProcedureWALEntry entry) { 183 assert entry.getProcedureCount() >= 1 : "Expected one or more procedures"; 184 loadProcedure(entry, entry.getProcedure(0)); 185 for (int i = 1; i < entry.getProcedureCount(); ++i) { 186 loadProcedure(entry, entry.getProcedure(i)); 187 } 188 } 189 190 private void readUpdateEntry(ProcedureWALEntry entry) { 191 assert entry.getProcedureCount() == 1 : "Expected only one procedure"; 192 loadProcedure(entry, entry.getProcedure(0)); 193 } 194 195 private void readDeleteEntry(ProcedureWALEntry entry) { 196 assert entry.hasProcId() : "expected ProcID"; 197 198 if (entry.getChildIdCount() > 0) { 199 assert entry.getProcedureCount() == 1 : "Expected only one procedure"; 200 201 // update the parent procedure 202 loadProcedure(entry, entry.getProcedure(0)); 203 204 // remove the child procedures of entry.getProcId() 205 for (int i = 0, count = entry.getChildIdCount(); i < count; ++i) { 206 deleteEntry(entry.getChildId(i)); 207 } 208 } else { 209 assert entry.getProcedureCount() == 0 : "Expected no procedures"; 210 211 // delete the procedure 212 deleteEntry(entry.getProcId()); 213 } 214 } 215 216 private void deleteEntry(final long procId) { 217 LOG.trace("delete entry {}", procId); 218 maxProcId = Math.max(maxProcId, procId); 219 localProcedureMap.remove(procId); 220 assert !procedureMap.contains(procId); 221 setDeletedIfPartial(tracker, procId); 222 setDeletedIfPartial(localTracker, procId); 223 } 224 225 private boolean isDeleted(long procId) { 226 return tracker.isDeleted(procId) == ProcedureStoreTracker.DeleteState.YES; 227 } 228 229 private boolean isRequired(long procId) { 230 return !isDeleted(procId) && !procedureMap.contains(procId); 231 } 232}