001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2.store.wal; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.Collections; 024import java.util.Comparator; 025import java.util.HashMap; 026import java.util.Iterator; 027import java.util.List; 028import java.util.Map; 029import java.util.NoSuchElementException; 030import org.apache.commons.lang3.mutable.MutableInt; 031import org.apache.hadoop.hbase.procedure2.Procedure; 032import org.apache.hadoop.hbase.procedure2.ProcedureUtil; 033import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 039 040/** 041 * Used to build the tree for procedures. 042 * <p/> 043 * We will group the procedures with the root procedure, and then validate each group. For each 044 * group of procedures(with the same root procedure), we will collect all the stack ids, if the max 045 * stack id is n, then all the stack ids should be from 0 to n, non-repetition and non-omission. If 046 * not, we will consider all the procedures in this group as corrupted. Please see the code in 047 * {@link #checkReady(Entry, Map)} method. 048 * <p/> 049 * For the procedures not in any group, i.e, can not find the root procedure for these procedures, 050 * we will also consider them as corrupted. Please see the code in {@link #checkOrphan(Map)} method. 051 */ 052@InterfaceAudience.Private 053public final class WALProcedureTree { 054 055 private static final Logger LOG = LoggerFactory.getLogger(WALProcedureTree.class); 056 057 private static final class Entry { 058 059 private final ProcedureProtos.Procedure proc; 060 061 private final List<Entry> subProcs = new ArrayList<>(); 062 063 public Entry(ProcedureProtos.Procedure proc) { 064 this.proc = proc; 065 } 066 067 @Override 068 public String toString() { 069 StringBuilder sb = new StringBuilder(); 070 sb.append("Procedure(pid="); 071 sb.append(proc.getProcId()); 072 sb.append(", ppid="); 073 sb.append(proc.hasParentId() ? proc.getParentId() : Procedure.NO_PROC_ID); 074 sb.append(", class="); 075 sb.append(proc.getClassName()); 076 sb.append(")"); 077 return sb.toString(); 078 } 079 } 080 081 // when loading we will iterator the procedures twice, so use this class to cache the deserialized 082 // result to prevent deserializing multiple times. 083 private static final class ProtoAndProc { 084 private final ProcedureProtos.Procedure proto; 085 086 private Procedure<?> proc; 087 088 public ProtoAndProc(ProcedureProtos.Procedure proto) { 089 this.proto = proto; 090 } 091 092 public Procedure<?> getProc() throws IOException { 093 if (proc == null) { 094 proc = ProcedureUtil.convertToProcedure(proto); 095 } 096 return proc; 097 } 098 } 099 100 private final List<ProtoAndProc> validProcs = new ArrayList<>(); 101 102 private final List<ProtoAndProc> corruptedProcs = new ArrayList<>(); 103 104 private static boolean isFinished(ProcedureProtos.Procedure proc) { 105 if (!proc.hasParentId()) { 106 switch (proc.getState()) { 107 case ROLLEDBACK: 108 case SUCCESS: 109 return true; 110 default: 111 break; 112 } 113 } 114 return false; 115 } 116 117 private WALProcedureTree(Map<Long, Entry> procMap) { 118 List<Entry> rootEntries = buildTree(procMap); 119 for (Entry rootEntry : rootEntries) { 120 checkReady(rootEntry, procMap); 121 } 122 checkOrphan(procMap); 123 Comparator<ProtoAndProc> cmp = 124 (p1, p2) -> Long.compare(p1.proto.getProcId(), p2.proto.getProcId()); 125 Collections.sort(validProcs, cmp); 126 Collections.sort(corruptedProcs, cmp); 127 } 128 129 private List<Entry> buildTree(Map<Long, Entry> procMap) { 130 List<Entry> rootEntries = new ArrayList<>(); 131 procMap.values().forEach(entry -> { 132 if (!entry.proc.hasParentId()) { 133 rootEntries.add(entry); 134 } else { 135 Entry parentEntry = procMap.get(entry.proc.getParentId()); 136 // For a valid procedure this should not be null. We will log the error later if it is null, 137 // as it will not be referenced by any root procedures. 138 if (parentEntry != null) { 139 parentEntry.subProcs.add(entry); 140 } 141 } 142 }); 143 return rootEntries; 144 } 145 146 private void collectStackId(Entry entry, Map<Integer, List<Entry>> stackId2Proc, 147 MutableInt maxStackId) { 148 if (LOG.isDebugEnabled()) { 149 LOG.debug("Procedure {} stack ids={}", entry, entry.proc.getStackIdList()); 150 } 151 for (int i = 0, n = entry.proc.getStackIdCount(); i < n; i++) { 152 int stackId = entry.proc.getStackId(i); 153 if (stackId > maxStackId.intValue()) { 154 maxStackId.setValue(stackId); 155 } 156 stackId2Proc.computeIfAbsent(stackId, k -> new ArrayList<>()).add(entry); 157 } 158 entry.subProcs.forEach(e -> collectStackId(e, stackId2Proc, maxStackId)); 159 } 160 161 private void addAllToCorruptedAndRemoveFromProcMap(Entry entry, 162 Map<Long, Entry> remainingProcMap) { 163 corruptedProcs.add(new ProtoAndProc(entry.proc)); 164 remainingProcMap.remove(entry.proc.getProcId()); 165 for (Entry e : entry.subProcs) { 166 addAllToCorruptedAndRemoveFromProcMap(e, remainingProcMap); 167 } 168 } 169 170 private void addAllToValidAndRemoveFromProcMap(Entry entry, Map<Long, Entry> remainingProcMap) { 171 validProcs.add(new ProtoAndProc(entry.proc)); 172 remainingProcMap.remove(entry.proc.getProcId()); 173 for (Entry e : entry.subProcs) { 174 addAllToValidAndRemoveFromProcMap(e, remainingProcMap); 175 } 176 } 177 178 // In this method first we will check whether the given root procedure and all its sub procedures 179 // are valid, through the procedure stack. And we will also remove all these procedures from the 180 // remainingProcMap, so at last, if there are still procedures in the map, we know that there are 181 // orphan procedures. 182 private void checkReady(Entry rootEntry, Map<Long, Entry> remainingProcMap) { 183 if (isFinished(rootEntry.proc)) { 184 if (!rootEntry.subProcs.isEmpty()) { 185 LOG.error("unexpected active children for root-procedure: {}", rootEntry); 186 rootEntry.subProcs.forEach(e -> LOG.error("unexpected active children: {}", e)); 187 addAllToCorruptedAndRemoveFromProcMap(rootEntry, remainingProcMap); 188 } else { 189 addAllToValidAndRemoveFromProcMap(rootEntry, remainingProcMap); 190 } 191 return; 192 } 193 Map<Integer, List<Entry>> stackId2Proc = new HashMap<>(); 194 MutableInt maxStackId = new MutableInt(Integer.MIN_VALUE); 195 collectStackId(rootEntry, stackId2Proc, maxStackId); 196 // the stack ids should start from 0 and increase by one every time 197 boolean valid = true; 198 for (int i = 0; i <= maxStackId.intValue(); i++) { 199 List<Entry> entries = stackId2Proc.get(i); 200 if (entries == null) { 201 LOG.error("Missing stack id {}, max stack id is {}, root procedure is {}", i, maxStackId, 202 rootEntry); 203 valid = false; 204 } else if (entries.size() > 1) { 205 LOG.error("Multiple procedures {} have the same stack id {}, max stack id is {}," + 206 " root procedure is {}", entries, i, maxStackId, rootEntry); 207 valid = false; 208 } 209 } 210 if (valid) { 211 addAllToValidAndRemoveFromProcMap(rootEntry, remainingProcMap); 212 } else { 213 addAllToCorruptedAndRemoveFromProcMap(rootEntry, remainingProcMap); 214 } 215 } 216 217 private void checkOrphan(Map<Long, Entry> procMap) { 218 procMap.values().forEach(entry -> { 219 LOG.error("Orphan procedure: {}", entry); 220 corruptedProcs.add(new ProtoAndProc(entry.proc)); 221 }); 222 } 223 224 private static final class Iter implements ProcedureIterator { 225 226 private final List<ProtoAndProc> procs; 227 228 private Iterator<ProtoAndProc> iter; 229 230 private ProtoAndProc current; 231 232 public Iter(List<ProtoAndProc> procs) { 233 this.procs = procs; 234 reset(); 235 } 236 237 @Override 238 public void reset() { 239 iter = procs.iterator(); 240 if (iter.hasNext()) { 241 current = iter.next(); 242 } else { 243 current = null; 244 } 245 } 246 247 @Override 248 public boolean hasNext() { 249 return current != null; 250 } 251 252 private void checkNext() { 253 if (!hasNext()) { 254 throw new NoSuchElementException(); 255 } 256 } 257 258 @Override 259 public boolean isNextFinished() { 260 checkNext(); 261 return isFinished(current.proto); 262 } 263 264 private void moveToNext() { 265 if (iter.hasNext()) { 266 current = iter.next(); 267 } else { 268 current = null; 269 } 270 } 271 272 @Override 273 public void skipNext() { 274 checkNext(); 275 moveToNext(); 276 } 277 278 @Override 279 public Procedure<?> next() throws IOException { 280 checkNext(); 281 Procedure<?> proc = current.getProc(); 282 moveToNext(); 283 return proc; 284 } 285 } 286 287 public ProcedureIterator getValidProcs() { 288 return new Iter(validProcs); 289 } 290 291 public ProcedureIterator getCorruptedProcs() { 292 return new Iter(corruptedProcs); 293 } 294 295 public static WALProcedureTree build(Collection<ProcedureProtos.Procedure> procedures) { 296 Map<Long, Entry> procMap = new HashMap<>(); 297 for (ProcedureProtos.Procedure proc : procedures) { 298 procMap.put(proc.getProcId(), new Entry(proc)); 299 } 300 return new WALProcedureTree(procMap); 301 } 302}