001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2.store.wal;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.Collections;
024import java.util.Comparator;
025import java.util.HashMap;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Map;
029import java.util.NoSuchElementException;
030import org.apache.commons.lang3.mutable.MutableInt;
031import org.apache.hadoop.hbase.procedure2.Procedure;
032import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
033import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
039
040/**
041 * Used to build the tree for procedures.
042 * <p/>
043 * We will group the procedures with the root procedure, and then validate each group. For each
044 * group of procedures(with the same root procedure), we will collect all the stack ids, if the max
045 * stack id is n, then all the stack ids should be from 0 to n, non-repetition and non-omission. If
046 * not, we will consider all the procedures in this group as corrupted. Please see the code in
047 * {@link #checkReady(Entry, Map)} method.
048 * <p/>
049 * For the procedures not in any group, i.e, can not find the root procedure for these procedures,
050 * we will also consider them as corrupted. Please see the code in {@link #checkOrphan(Map)} method.
051 */
052@InterfaceAudience.Private
053public final class WALProcedureTree {
054
055  private static final Logger LOG = LoggerFactory.getLogger(WALProcedureTree.class);
056
057  private static final class Entry {
058
059    private final ProcedureProtos.Procedure proc;
060
061    private final List<Entry> subProcs = new ArrayList<>();
062
063    public Entry(ProcedureProtos.Procedure proc) {
064      this.proc = proc;
065    }
066
067    @Override
068    public String toString() {
069      StringBuilder sb = new StringBuilder();
070      sb.append("Procedure(pid=");
071      sb.append(proc.getProcId());
072      sb.append(", ppid=");
073      sb.append(proc.hasParentId() ? proc.getParentId() : Procedure.NO_PROC_ID);
074      sb.append(", class=");
075      sb.append(proc.getClassName());
076      sb.append(")");
077      return sb.toString();
078    }
079  }
080
081  // when loading we will iterator the procedures twice, so use this class to cache the deserialized
082  // result to prevent deserializing multiple times.
083  private static final class ProtoAndProc {
084    private final ProcedureProtos.Procedure proto;
085
086    private Procedure<?> proc;
087
088    public ProtoAndProc(ProcedureProtos.Procedure proto) {
089      this.proto = proto;
090    }
091
092    public Procedure<?> getProc() throws IOException {
093      if (proc == null) {
094        proc = ProcedureUtil.convertToProcedure(proto);
095      }
096      return proc;
097    }
098  }
099
100  private final List<ProtoAndProc> validProcs = new ArrayList<>();
101
102  private final List<ProtoAndProc> corruptedProcs = new ArrayList<>();
103
104  private static boolean isFinished(ProcedureProtos.Procedure proc) {
105    if (!proc.hasParentId()) {
106      switch (proc.getState()) {
107        case ROLLEDBACK:
108        case SUCCESS:
109          return true;
110        default:
111          break;
112      }
113    }
114    return false;
115  }
116
117  private WALProcedureTree(Map<Long, Entry> procMap) {
118    List<Entry> rootEntries = buildTree(procMap);
119    for (Entry rootEntry : rootEntries) {
120      checkReady(rootEntry, procMap);
121    }
122    checkOrphan(procMap);
123    Comparator<ProtoAndProc> cmp =
124      (p1, p2) -> Long.compare(p1.proto.getProcId(), p2.proto.getProcId());
125    Collections.sort(validProcs, cmp);
126    Collections.sort(corruptedProcs, cmp);
127  }
128
129  private List<Entry> buildTree(Map<Long, Entry> procMap) {
130    List<Entry> rootEntries = new ArrayList<>();
131    procMap.values().forEach(entry -> {
132      if (!entry.proc.hasParentId()) {
133        rootEntries.add(entry);
134      } else {
135        Entry parentEntry = procMap.get(entry.proc.getParentId());
136        // For a valid procedure this should not be null. We will log the error later if it is null,
137        // as it will not be referenced by any root procedures.
138        if (parentEntry != null) {
139          parentEntry.subProcs.add(entry);
140        }
141      }
142    });
143    return rootEntries;
144  }
145
146  private void collectStackId(Entry entry, Map<Integer, List<Entry>> stackId2Proc,
147      MutableInt maxStackId) {
148    if (LOG.isDebugEnabled()) {
149      LOG.debug("Procedure {} stack ids={}", entry, entry.proc.getStackIdList());
150    }
151    for (int i = 0, n = entry.proc.getStackIdCount(); i < n; i++) {
152      int stackId = entry.proc.getStackId(i);
153      if (stackId > maxStackId.intValue()) {
154        maxStackId.setValue(stackId);
155      }
156      stackId2Proc.computeIfAbsent(stackId, k -> new ArrayList<>()).add(entry);
157    }
158    entry.subProcs.forEach(e -> collectStackId(e, stackId2Proc, maxStackId));
159  }
160
161  private void addAllToCorruptedAndRemoveFromProcMap(Entry entry,
162      Map<Long, Entry> remainingProcMap) {
163    corruptedProcs.add(new ProtoAndProc(entry.proc));
164    remainingProcMap.remove(entry.proc.getProcId());
165    for (Entry e : entry.subProcs) {
166      addAllToCorruptedAndRemoveFromProcMap(e, remainingProcMap);
167    }
168  }
169
170  private void addAllToValidAndRemoveFromProcMap(Entry entry, Map<Long, Entry> remainingProcMap) {
171    validProcs.add(new ProtoAndProc(entry.proc));
172    remainingProcMap.remove(entry.proc.getProcId());
173    for (Entry e : entry.subProcs) {
174      addAllToValidAndRemoveFromProcMap(e, remainingProcMap);
175    }
176  }
177
178  // In this method first we will check whether the given root procedure and all its sub procedures
179  // are valid, through the procedure stack. And we will also remove all these procedures from the
180  // remainingProcMap, so at last, if there are still procedures in the map, we know that there are
181  // orphan procedures.
182  private void checkReady(Entry rootEntry, Map<Long, Entry> remainingProcMap) {
183    if (isFinished(rootEntry.proc)) {
184      if (!rootEntry.subProcs.isEmpty()) {
185        LOG.error("unexpected active children for root-procedure: {}", rootEntry);
186        rootEntry.subProcs.forEach(e -> LOG.error("unexpected active children: {}", e));
187        addAllToCorruptedAndRemoveFromProcMap(rootEntry, remainingProcMap);
188      } else {
189        addAllToValidAndRemoveFromProcMap(rootEntry, remainingProcMap);
190      }
191      return;
192    }
193    Map<Integer, List<Entry>> stackId2Proc = new HashMap<>();
194    MutableInt maxStackId = new MutableInt(Integer.MIN_VALUE);
195    collectStackId(rootEntry, stackId2Proc, maxStackId);
196    // the stack ids should start from 0 and increase by one every time
197    boolean valid = true;
198    for (int i = 0; i <= maxStackId.intValue(); i++) {
199      List<Entry> entries = stackId2Proc.get(i);
200      if (entries == null) {
201        LOG.error("Missing stack id {}, max stack id is {}, root procedure is {}", i, maxStackId,
202          rootEntry);
203        valid = false;
204      } else if (entries.size() > 1) {
205        LOG.error("Multiple procedures {} have the same stack id {}, max stack id is {}," +
206          " root procedure is {}", entries, i, maxStackId, rootEntry);
207        valid = false;
208      }
209    }
210    if (valid) {
211      addAllToValidAndRemoveFromProcMap(rootEntry, remainingProcMap);
212    } else {
213      addAllToCorruptedAndRemoveFromProcMap(rootEntry, remainingProcMap);
214    }
215  }
216
217  private void checkOrphan(Map<Long, Entry> procMap) {
218    procMap.values().forEach(entry -> {
219      LOG.error("Orphan procedure: {}", entry);
220      corruptedProcs.add(new ProtoAndProc(entry.proc));
221    });
222  }
223
224  private static final class Iter implements ProcedureIterator {
225
226    private final List<ProtoAndProc> procs;
227
228    private Iterator<ProtoAndProc> iter;
229
230    private ProtoAndProc current;
231
232    public Iter(List<ProtoAndProc> procs) {
233      this.procs = procs;
234      reset();
235    }
236
237    @Override
238    public void reset() {
239      iter = procs.iterator();
240      if (iter.hasNext()) {
241        current = iter.next();
242      } else {
243        current = null;
244      }
245    }
246
247    @Override
248    public boolean hasNext() {
249      return current != null;
250    }
251
252    private void checkNext() {
253      if (!hasNext()) {
254        throw new NoSuchElementException();
255      }
256    }
257
258    @Override
259    public boolean isNextFinished() {
260      checkNext();
261      return isFinished(current.proto);
262    }
263
264    private void moveToNext() {
265      if (iter.hasNext()) {
266        current = iter.next();
267      } else {
268        current = null;
269      }
270    }
271
272    @Override
273    public void skipNext() {
274      checkNext();
275      moveToNext();
276    }
277
278    @Override
279    public Procedure<?> next() throws IOException {
280      checkNext();
281      Procedure<?> proc = current.getProc();
282      moveToNext();
283      return proc;
284    }
285  }
286
287  public ProcedureIterator getValidProcs() {
288    return new Iter(validProcs);
289  }
290
291  public ProcedureIterator getCorruptedProcs() {
292    return new Iter(corruptedProcs);
293  }
294
295  public static WALProcedureTree build(Collection<ProcedureProtos.Procedure> procedures) {
296    Map<Long, Entry> procMap = new HashMap<>();
297    for (ProcedureProtos.Procedure proc : procedures) {
298      procMap.put(proc.getProcId(), new Entry(proc));
299    }
300    return new WALProcedureTree(procMap);
301  }
302}