001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2.store; 019 020import java.util.ArrayList; 021import java.util.Collection; 022import java.util.Collections; 023import java.util.Comparator; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027import org.apache.commons.lang3.mutable.MutableInt; 028import org.apache.hadoop.hbase.procedure2.Procedure; 029import org.apache.hadoop.hbase.procedure2.ProcedureUtil; 030import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 036 037/** 038 * Used to build the tree for procedures. 039 * <p/> 040 * We will group the procedures with the root procedure, and then validate each group. For each 041 * group of procedures(with the same root procedure), we will collect all the stack ids, if the max 042 * stack id is n, then all the stack ids should be from 0 to n, non-repetition and non-omission. If 043 * not, we will consider all the procedures in this group as corrupted. Please see the code in 044 * {@link #checkReady(Entry, Map)} method. 045 * <p/> 046 * For the procedures not in any group, i.e, can not find the root procedure for these procedures, 047 * we will also consider them as corrupted. Please see the code in {@link #checkOrphan(Map)} method. 048 */ 049@InterfaceAudience.Private 050public final class ProcedureTree { 051 052 private static final Logger LOG = LoggerFactory.getLogger(ProcedureTree.class); 053 054 private static final class Entry { 055 056 private final ProcedureProtos.Procedure proc; 057 058 private final List<Entry> subProcs = new ArrayList<>(); 059 060 public Entry(ProcedureProtos.Procedure proc) { 061 this.proc = proc; 062 } 063 064 @Override 065 public String toString() { 066 StringBuilder sb = new StringBuilder(); 067 sb.append("Procedure(pid="); 068 sb.append(proc.getProcId()); 069 sb.append(", ppid="); 070 sb.append(proc.hasParentId() ? proc.getParentId() : Procedure.NO_PROC_ID); 071 sb.append(", class="); 072 sb.append(proc.getClassName()); 073 sb.append(")"); 074 return sb.toString(); 075 } 076 } 077 078 private final List<ProtoAndProcedure> validProcs = new ArrayList<>(); 079 080 private final List<ProtoAndProcedure> corruptedProcs = new ArrayList<>(); 081 082 private ProcedureTree(Map<Long, Entry> procMap) { 083 List<Entry> rootEntries = buildTree(procMap); 084 for (Entry rootEntry : rootEntries) { 085 checkReady(rootEntry, procMap); 086 } 087 checkOrphan(procMap); 088 Comparator<ProtoAndProcedure> cmp = 089 (p1, p2) -> Long.compare(p1.getProto().getProcId(), p2.getProto().getProcId()); 090 Collections.sort(validProcs, cmp); 091 Collections.sort(corruptedProcs, cmp); 092 } 093 094 private List<Entry> buildTree(Map<Long, Entry> procMap) { 095 List<Entry> rootEntries = new ArrayList<>(); 096 procMap.values().forEach(entry -> { 097 if (!entry.proc.hasParentId()) { 098 rootEntries.add(entry); 099 } else { 100 Entry parentEntry = procMap.get(entry.proc.getParentId()); 101 // For a valid procedure this should not be null. We will log the error later if it is null, 102 // as it will not be referenced by any root procedures. 103 if (parentEntry != null) { 104 parentEntry.subProcs.add(entry); 105 } 106 } 107 }); 108 return rootEntries; 109 } 110 111 private void collectStackId(Entry entry, Map<Integer, List<Entry>> stackId2Proc, 112 MutableInt maxStackId) { 113 if (LOG.isDebugEnabled()) { 114 LOG.debug("Procedure {} stack ids={}", entry, entry.proc.getStackIdList()); 115 } 116 for (int i = 0, n = entry.proc.getStackIdCount(); i < n; i++) { 117 int stackId = entry.proc.getStackId(i); 118 if (stackId > maxStackId.intValue()) { 119 maxStackId.setValue(stackId); 120 } 121 stackId2Proc.computeIfAbsent(stackId, k -> new ArrayList<>()).add(entry); 122 } 123 entry.subProcs.forEach(e -> collectStackId(e, stackId2Proc, maxStackId)); 124 } 125 126 private void addAllToCorruptedAndRemoveFromProcMap(Entry entry, 127 Map<Long, Entry> remainingProcMap) { 128 corruptedProcs.add(new ProtoAndProcedure(entry.proc)); 129 remainingProcMap.remove(entry.proc.getProcId()); 130 for (Entry e : entry.subProcs) { 131 addAllToCorruptedAndRemoveFromProcMap(e, remainingProcMap); 132 } 133 } 134 135 private void addAllToValidAndRemoveFromProcMap(Entry entry, Map<Long, Entry> remainingProcMap) { 136 validProcs.add(new ProtoAndProcedure(entry.proc)); 137 remainingProcMap.remove(entry.proc.getProcId()); 138 for (Entry e : entry.subProcs) { 139 addAllToValidAndRemoveFromProcMap(e, remainingProcMap); 140 } 141 } 142 143 // In this method first we will check whether the given root procedure and all its sub procedures 144 // are valid, through the procedure stack. And we will also remove all these procedures from the 145 // remainingProcMap, so at last, if there are still procedures in the map, we know that there are 146 // orphan procedures. 147 private void checkReady(Entry rootEntry, Map<Long, Entry> remainingProcMap) { 148 if (ProcedureUtil.isFinished(rootEntry.proc)) { 149 if (!rootEntry.subProcs.isEmpty()) { 150 LOG.error("unexpected active children for root-procedure: {}", rootEntry); 151 rootEntry.subProcs.forEach(e -> LOG.error("unexpected active children: {}", e)); 152 addAllToCorruptedAndRemoveFromProcMap(rootEntry, remainingProcMap); 153 } else { 154 addAllToValidAndRemoveFromProcMap(rootEntry, remainingProcMap); 155 } 156 return; 157 } 158 Map<Integer, List<Entry>> stackId2Proc = new HashMap<>(); 159 MutableInt maxStackId = new MutableInt(Integer.MIN_VALUE); 160 collectStackId(rootEntry, stackId2Proc, maxStackId); 161 // the stack ids should start from 0 and increase by one every time 162 boolean valid = true; 163 for (int i = 0; i <= maxStackId.intValue(); i++) { 164 List<Entry> entries = stackId2Proc.get(i); 165 if (entries == null) { 166 LOG.error("Missing stack id {}, max stack id is {}, root procedure is {}", i, maxStackId, 167 rootEntry); 168 valid = false; 169 } else if (entries.size() > 1) { 170 LOG.error("Multiple procedures {} have the same stack id {}, max stack id is {}," 171 + " root procedure is {}", entries, i, maxStackId, rootEntry); 172 valid = false; 173 } 174 } 175 if (valid) { 176 addAllToValidAndRemoveFromProcMap(rootEntry, remainingProcMap); 177 } else { 178 addAllToCorruptedAndRemoveFromProcMap(rootEntry, remainingProcMap); 179 } 180 } 181 182 private void checkOrphan(Map<Long, Entry> procMap) { 183 procMap.values().forEach(entry -> { 184 LOG.error("Orphan procedure: {}", entry); 185 corruptedProcs.add(new ProtoAndProcedure(entry.proc)); 186 }); 187 } 188 189 public ProcedureIterator getValidProcs() { 190 return new InMemoryProcedureIterator(validProcs); 191 } 192 193 public ProcedureIterator getCorruptedProcs() { 194 return new InMemoryProcedureIterator(corruptedProcs); 195 } 196 197 public static ProcedureTree build(Collection<ProcedureProtos.Procedure> procedures) { 198 Map<Long, Entry> procMap = new HashMap<>(); 199 for (ProcedureProtos.Procedure proc : procedures) { 200 procMap.put(proc.getProcId(), new Entry(proc)); 201 } 202 return new ProcedureTree(procMap); 203 } 204}