001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2.store.wal; 019 020import java.io.IOException; 021import org.apache.hadoop.fs.FSDataInputStream; 022import org.apache.hadoop.hbase.procedure2.store.ProcedureTree; 023import org.apache.yetus.audience.InterfaceAudience; 024import org.slf4j.Logger; 025import org.slf4j.LoggerFactory; 026 027import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; 028 029import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 030import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALEntry; 031 032/** 033 * Helper class that loads the procedures stored in a WAL. 034 * @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we 035 * use the new region based procedure store. 036 */ 037@Deprecated 038@InterfaceAudience.Private 039class ProcedureWALFormatReader { 040 private static final Logger LOG = LoggerFactory.getLogger(ProcedureWALFormatReader.class); 041 042 /** 043 * We will use the localProcedureMap to track the active procedures for the current proc wal file, 044 * and when we finished reading one proc wal file, we will merge he localProcedureMap to the 045 * procedureMap, which tracks the global active procedures. 046 * <p/> 047 * See the comments of {@link WALProcedureMap} for more details. 048 * <p/> 049 * After reading all the proc wal files, we will use the procedures in the procedureMap to build a 050 * {@link ProcedureTree}, and then give the result to the upper layer. See the comments of 051 * {@link ProcedureTree} and the code in {@link #finish()} for more details. 052 */ 053 private final WALProcedureMap localProcedureMap = new WALProcedureMap(); 054 private final WALProcedureMap procedureMap = new WALProcedureMap(); 055 056 private final ProcedureWALFormat.Loader loader; 057 058 /** 059 * Global tracker that will be used by the WALProcedureStore after load. If the last WAL was 060 * closed cleanly we already have a full tracker ready to be used. If the last WAL was truncated 061 * (e.g. master killed) the tracker will be empty and the 'partial' flag will be set. In this 062 * case, on WAL replay we are going to rebuild the tracker. 063 */ 064 private final ProcedureStoreTracker tracker; 065 066 /** 067 * If tracker for a log file is partial (see {@link ProcedureStoreTracker#partial}), we re-build 068 * the list of procedures modified in that WAL because we need it for log cleaning purposes. If 069 * all procedures modified in a WAL are found to be obsolete, it can be safely deleted. (see 070 * {@link WALProcedureStore#removeInactiveLogs()}). 071 * <p/> 072 * Notice that, the deleted part for this tracker will not be global valid as we can only count 073 * the deletes in the current file, but it is not big problem as finally, the above tracker will 074 * have the global state of deleted, and it will also be used to build the cleanup tracker. 075 */ 076 private ProcedureStoreTracker localTracker; 077 078 private long maxProcId = 0; 079 080 public ProcedureWALFormatReader(final ProcedureStoreTracker tracker, 081 ProcedureWALFormat.Loader loader) { 082 this.tracker = tracker; 083 this.loader = loader; 084 } 085 086 public void read(ProcedureWALFile log) throws IOException { 087 localTracker = log.getTracker(); 088 if (localTracker.isPartial()) { 089 LOG.info("Rebuilding tracker for {}", log); 090 } 091 092 long count = 0; 093 FSDataInputStream stream = log.getStream(); 094 try { 095 boolean hasMore = true; 096 while (hasMore) { 097 ProcedureWALEntry entry = ProcedureWALFormat.readEntry(stream); 098 if (entry == null) { 099 LOG.warn("Nothing left to decode. Exiting with missing EOF, log={}", log); 100 break; 101 } 102 count++; 103 switch (entry.getType()) { 104 case PROCEDURE_WAL_INIT: 105 readInitEntry(entry); 106 break; 107 case PROCEDURE_WAL_INSERT: 108 readInsertEntry(entry); 109 break; 110 case PROCEDURE_WAL_UPDATE: 111 case PROCEDURE_WAL_COMPACT: 112 readUpdateEntry(entry); 113 break; 114 case PROCEDURE_WAL_DELETE: 115 readDeleteEntry(entry); 116 break; 117 case PROCEDURE_WAL_EOF: 118 hasMore = false; 119 break; 120 default: 121 throw new CorruptedWALProcedureStoreException("Invalid entry: " + entry); 122 } 123 } 124 LOG.info("Read {} entries in {}", count, log); 125 } catch (InvalidProtocolBufferException e) { 126 LOG.error("While reading entry #{} in {}", count, log, e); 127 loader.markCorruptedWAL(log, e); 128 } 129 130 if (!localProcedureMap.isEmpty()) { 131 log.setProcIds(localProcedureMap.getMinModifiedProcId(), 132 localProcedureMap.getMaxModifiedProcId()); 133 if (localTracker.isPartial()) { 134 localTracker.setMinMaxModifiedProcIds(localProcedureMap.getMinModifiedProcId(), 135 localProcedureMap.getMaxModifiedProcId()); 136 } 137 procedureMap.merge(localProcedureMap); 138 } 139 // Do not reset the partial flag for local tracker, as here the local tracker only know the 140 // procedures which are modified in this file. 141 } 142 143 public void finish() throws IOException { 144 // notify the loader about the max proc ID 145 loader.setMaxProcId(maxProcId); 146 147 // build the procedure execution tree. When building we will verify that whether a procedure is 148 // valid. 149 ProcedureTree tree = ProcedureTree.build(procedureMap.getProcedures()); 150 loader.load(tree.getValidProcs()); 151 loader.handleCorrupted(tree.getCorruptedProcs()); 152 } 153 154 private void setDeletedIfPartial(ProcedureStoreTracker tracker, long procId) { 155 if (tracker.isPartial()) { 156 tracker.setDeleted(procId, true); 157 } 158 } 159 160 private void insertIfPartial(ProcedureStoreTracker tracker, ProcedureProtos.Procedure proc) { 161 if (tracker.isPartial()) { 162 tracker.insert(proc.getProcId()); 163 } 164 } 165 166 private void loadProcedure(ProcedureWALEntry entry, ProcedureProtos.Procedure proc) { 167 maxProcId = Math.max(maxProcId, proc.getProcId()); 168 if (isRequired(proc.getProcId())) { 169 LOG.trace("Read {} entry {}", entry.getType(), proc.getProcId()); 170 localProcedureMap.add(proc); 171 insertIfPartial(tracker, proc); 172 } 173 insertIfPartial(localTracker, proc); 174 } 175 176 private void readInitEntry(ProcedureWALEntry entry) { 177 assert entry.getProcedureCount() == 1 : "Expected only one procedure"; 178 loadProcedure(entry, entry.getProcedure(0)); 179 } 180 181 private void readInsertEntry(ProcedureWALEntry entry) { 182 assert entry.getProcedureCount() >= 1 : "Expected one or more procedures"; 183 loadProcedure(entry, entry.getProcedure(0)); 184 for (int i = 1; i < entry.getProcedureCount(); ++i) { 185 loadProcedure(entry, entry.getProcedure(i)); 186 } 187 } 188 189 private void readUpdateEntry(ProcedureWALEntry entry) { 190 assert entry.getProcedureCount() == 1 : "Expected only one procedure"; 191 loadProcedure(entry, entry.getProcedure(0)); 192 } 193 194 private void readDeleteEntry(ProcedureWALEntry entry) { 195 assert entry.hasProcId() : "expected ProcID"; 196 197 if (entry.getChildIdCount() > 0) { 198 assert entry.getProcedureCount() == 1 : "Expected only one procedure"; 199 200 // update the parent procedure 201 loadProcedure(entry, entry.getProcedure(0)); 202 203 // remove the child procedures of entry.getProcId() 204 for (int i = 0, count = entry.getChildIdCount(); i < count; ++i) { 205 deleteEntry(entry.getChildId(i)); 206 } 207 } else { 208 assert entry.getProcedureCount() == 0 : "Expected no procedures"; 209 210 // delete the procedure 211 deleteEntry(entry.getProcId()); 212 } 213 } 214 215 private void deleteEntry(final long procId) { 216 LOG.trace("delete entry {}", procId); 217 maxProcId = Math.max(maxProcId, procId); 218 localProcedureMap.remove(procId); 219 assert !procedureMap.contains(procId); 220 setDeletedIfPartial(tracker, procId); 221 setDeletedIfPartial(localTracker, procId); 222 } 223 224 private boolean isDeleted(long procId) { 225 return tracker.isDeleted(procId) == ProcedureStoreTracker.DeleteState.YES; 226 } 227 228 private boolean isRequired(long procId) { 229 return !isDeleted(procId) && !procedureMap.contains(procId); 230 } 231}