001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2.store;
019
020import java.util.ArrayList;
021import java.util.Collection;
022import java.util.Collections;
023import java.util.Comparator;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027import org.apache.commons.lang3.mutable.MutableInt;
028import org.apache.hadoop.hbase.procedure2.Procedure;
029import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
030import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
036
037/**
038 * Used to build the tree for procedures.
039 * <p/>
040 * We will group the procedures with the root procedure, and then validate each group. For each
041 * group of procedures(with the same root procedure), we will collect all the stack ids, if the max
042 * stack id is n, then all the stack ids should be from 0 to n, non-repetition and non-omission. If
043 * not, we will consider all the procedures in this group as corrupted. Please see the code in
044 * {@link #checkReady(Entry, Map)} method.
045 * <p/>
046 * For the procedures not in any group, i.e, can not find the root procedure for these procedures,
047 * we will also consider them as corrupted. Please see the code in {@link #checkOrphan(Map)} method.
048 */
049@InterfaceAudience.Private
050public final class ProcedureTree {
051
052  private static final Logger LOG = LoggerFactory.getLogger(ProcedureTree.class);
053
054  private static final class Entry {
055
056    private final ProcedureProtos.Procedure proc;
057
058    private final List<Entry> subProcs = new ArrayList<>();
059
060    public Entry(ProcedureProtos.Procedure proc) {
061      this.proc = proc;
062    }
063
064    @Override
065    public String toString() {
066      StringBuilder sb = new StringBuilder();
067      sb.append("Procedure(pid=");
068      sb.append(proc.getProcId());
069      sb.append(", ppid=");
070      sb.append(proc.hasParentId() ? proc.getParentId() : Procedure.NO_PROC_ID);
071      sb.append(", class=");
072      sb.append(proc.getClassName());
073      sb.append(")");
074      return sb.toString();
075    }
076  }
077
078  private final List<ProtoAndProcedure> validProcs = new ArrayList<>();
079
080  private final List<ProtoAndProcedure> corruptedProcs = new ArrayList<>();
081
082  private ProcedureTree(Map<Long, Entry> procMap) {
083    List<Entry> rootEntries = buildTree(procMap);
084    for (Entry rootEntry : rootEntries) {
085      checkReady(rootEntry, procMap);
086    }
087    checkOrphan(procMap);
088    Comparator<ProtoAndProcedure> cmp =
089      (p1, p2) -> Long.compare(p1.getProto().getProcId(), p2.getProto().getProcId());
090    Collections.sort(validProcs, cmp);
091    Collections.sort(corruptedProcs, cmp);
092  }
093
094  private List<Entry> buildTree(Map<Long, Entry> procMap) {
095    List<Entry> rootEntries = new ArrayList<>();
096    procMap.values().forEach(entry -> {
097      if (!entry.proc.hasParentId()) {
098        rootEntries.add(entry);
099      } else {
100        Entry parentEntry = procMap.get(entry.proc.getParentId());
101        // For a valid procedure this should not be null. We will log the error later if it is null,
102        // as it will not be referenced by any root procedures.
103        if (parentEntry != null) {
104          parentEntry.subProcs.add(entry);
105        }
106      }
107    });
108    return rootEntries;
109  }
110
111  private void collectStackId(Entry entry, Map<Integer, List<Entry>> stackId2Proc,
112    MutableInt maxStackId) {
113    if (LOG.isDebugEnabled()) {
114      LOG.debug("Procedure {} stack ids={}", entry, entry.proc.getStackIdList());
115    }
116    for (int i = 0, n = entry.proc.getStackIdCount(); i < n; i++) {
117      int stackId = entry.proc.getStackId(i);
118      if (stackId > maxStackId.intValue()) {
119        maxStackId.setValue(stackId);
120      }
121      stackId2Proc.computeIfAbsent(stackId, k -> new ArrayList<>()).add(entry);
122    }
123    entry.subProcs.forEach(e -> collectStackId(e, stackId2Proc, maxStackId));
124  }
125
126  private void addAllToCorruptedAndRemoveFromProcMap(Entry entry,
127    Map<Long, Entry> remainingProcMap) {
128    corruptedProcs.add(new ProtoAndProcedure(entry.proc));
129    remainingProcMap.remove(entry.proc.getProcId());
130    for (Entry e : entry.subProcs) {
131      addAllToCorruptedAndRemoveFromProcMap(e, remainingProcMap);
132    }
133  }
134
135  private void addAllToValidAndRemoveFromProcMap(Entry entry, Map<Long, Entry> remainingProcMap) {
136    validProcs.add(new ProtoAndProcedure(entry.proc));
137    remainingProcMap.remove(entry.proc.getProcId());
138    for (Entry e : entry.subProcs) {
139      addAllToValidAndRemoveFromProcMap(e, remainingProcMap);
140    }
141  }
142
143  // In this method first we will check whether the given root procedure and all its sub procedures
144  // are valid, through the procedure stack. And we will also remove all these procedures from the
145  // remainingProcMap, so at last, if there are still procedures in the map, we know that there are
146  // orphan procedures.
147  private void checkReady(Entry rootEntry, Map<Long, Entry> remainingProcMap) {
148    if (ProcedureUtil.isFinished(rootEntry.proc)) {
149      if (!rootEntry.subProcs.isEmpty()) {
150        LOG.error("unexpected active children for root-procedure: {}", rootEntry);
151        rootEntry.subProcs.forEach(e -> LOG.error("unexpected active children: {}", e));
152        addAllToCorruptedAndRemoveFromProcMap(rootEntry, remainingProcMap);
153      } else {
154        addAllToValidAndRemoveFromProcMap(rootEntry, remainingProcMap);
155      }
156      return;
157    }
158    Map<Integer, List<Entry>> stackId2Proc = new HashMap<>();
159    MutableInt maxStackId = new MutableInt(Integer.MIN_VALUE);
160    collectStackId(rootEntry, stackId2Proc, maxStackId);
161    // the stack ids should start from 0 and increase by one every time
162    boolean valid = true;
163    for (int i = 0; i <= maxStackId.intValue(); i++) {
164      List<Entry> entries = stackId2Proc.get(i);
165      if (entries == null) {
166        LOG.error("Missing stack id {}, max stack id is {}, root procedure is {}", i, maxStackId,
167          rootEntry);
168        valid = false;
169      } else if (entries.size() > 1) {
170        LOG.error("Multiple procedures {} have the same stack id {}, max stack id is {},"
171          + " root procedure is {}", entries, i, maxStackId, rootEntry);
172        valid = false;
173      }
174    }
175    if (valid) {
176      addAllToValidAndRemoveFromProcMap(rootEntry, remainingProcMap);
177    } else {
178      addAllToCorruptedAndRemoveFromProcMap(rootEntry, remainingProcMap);
179    }
180  }
181
182  private void checkOrphan(Map<Long, Entry> procMap) {
183    procMap.values().forEach(entry -> {
184      LOG.error("Orphan procedure: {}", entry);
185      corruptedProcs.add(new ProtoAndProcedure(entry.proc));
186    });
187  }
188
189  public ProcedureIterator getValidProcs() {
190    return new InMemoryProcedureIterator(validProcs);
191  }
192
193  public ProcedureIterator getCorruptedProcs() {
194    return new InMemoryProcedureIterator(corruptedProcs);
195  }
196
197  public static ProcedureTree build(Collection<ProcedureProtos.Procedure> procedures) {
198    Map<Long, Entry> procMap = new HashMap<>();
199    for (ProcedureProtos.Procedure proc : procedures) {
200      procMap.put(proc.getProcId(), new Entry(proc));
201    }
202    return new ProcedureTree(procMap);
203  }
204}