001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import java.io.IOException;
021import java.io.UncheckedIOException;
022import java.util.LinkedHashMap;
023import java.util.concurrent.atomic.AtomicBoolean;
024import java.util.concurrent.atomic.AtomicLong;
025import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
026import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
027import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase;
028import org.apache.hadoop.hbase.procedure2.store.ProcedureTree;
029import org.apache.hadoop.hbase.testclassification.MasterTests;
030import org.apache.hadoop.hbase.testclassification.SmallTests;
031import org.apache.hadoop.hbase.util.AtomicUtils;
032import org.junit.jupiter.api.AfterEach;
033import org.junit.jupiter.api.BeforeEach;
034import org.junit.jupiter.api.Tag;
035import org.junit.jupiter.api.Test;
036
037import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
038
039/**
040 * Testcase for HBASE-28210, where we persist the procedure which has been inserted later to
041 * {@link RootProcedureState} first and then crash, and then cause holes in stack ids when loading,
042 * and finally fail the start up of master.
043 */
044@Tag(MasterTests.TAG)
045@Tag(SmallTests.TAG)
046public class TestStackIdHoles {
047
048  private final class DummyProcedureStore extends ProcedureStoreBase {
049
050    private int numThreads;
051
052    private final LinkedHashMap<Long, ProcedureProtos.Procedure> procMap =
053      new LinkedHashMap<Long, ProcedureProtos.Procedure>();
054
055    private final AtomicLong maxProcId = new AtomicLong(0);
056
057    private final AtomicBoolean updated = new AtomicBoolean(false);
058
059    @Override
060    public void start(int numThreads) throws IOException {
061      this.numThreads = numThreads;
062      setRunning(true);
063    }
064
065    @Override
066    public void stop(boolean abort) {
067    }
068
069    @Override
070    public int getNumThreads() {
071      return numThreads;
072    }
073
074    @Override
075    public int setRunningProcedureCount(int count) {
076      return count;
077    }
078
079    @Override
080    public void recoverLease() throws IOException {
081    }
082
083    @Override
084    public void load(ProcedureLoader loader) throws IOException {
085      loader.setMaxProcId(maxProcId.get());
086      ProcedureTree tree = ProcedureTree.build(procMap.values());
087      loader.load(tree.getValidProcs());
088      loader.handleCorrupted(tree.getCorruptedProcs());
089    }
090
091    @Override
092    public void insert(Procedure<?> proc, Procedure<?>[] subprocs) {
093      long max = proc.getProcId();
094      synchronized (procMap) {
095        try {
096          procMap.put(proc.getProcId(), ProcedureUtil.convertToProtoProcedure(proc));
097          if (subprocs != null) {
098            for (Procedure<?> p : subprocs) {
099              procMap.put(p.getProcId(), ProcedureUtil.convertToProtoProcedure(p));
100              max = Math.max(max, p.getProcId());
101            }
102          }
103        } catch (IOException e) {
104          throw new UncheckedIOException(e);
105        }
106      }
107      AtomicUtils.updateMax(maxProcId, max);
108    }
109
110    @Override
111    public void insert(Procedure<?>[] procs) {
112      long max = -1;
113      synchronized (procMap) {
114        try {
115          for (Procedure<?> p : procs) {
116            procMap.put(p.getProcId(), ProcedureUtil.convertToProtoProcedure(p));
117            max = Math.max(max, p.getProcId());
118          }
119        } catch (IOException e) {
120          throw new UncheckedIOException(e);
121        }
122      }
123      AtomicUtils.updateMax(maxProcId, max);
124    }
125
126    @Override
127    public void update(Procedure<?> proc) {
128      // inject a sleep to simulate the scenario in HBASE-28210
129      if (proc.hasParent() && proc.getStackIndexes() != null) {
130        int lastStackId = proc.getStackIndexes()[proc.getStackIndexes().length - 1];
131        try {
132          // sleep more times if the stack id is smaller
133          Thread.sleep(100L * (10 - lastStackId));
134        } catch (InterruptedException e) {
135          Thread.currentThread().interrupt();
136          return;
137        }
138        // simulate the failure when updating the second sub procedure
139        if (!updated.compareAndSet(false, true)) {
140          procExec.stop();
141          throw new RuntimeException("inject error");
142        }
143      }
144      synchronized (procMap) {
145        try {
146          procMap.put(proc.getProcId(), ProcedureUtil.convertToProtoProcedure(proc));
147        } catch (IOException e) {
148          throw new UncheckedIOException(e);
149        }
150      }
151    }
152
153    @Override
154    public void delete(long procId) {
155      synchronized (procMap) {
156        procMap.remove(procId);
157      }
158    }
159
160    @Override
161    public void delete(Procedure<?> parentProc, long[] subProcIds) {
162      synchronized (procMap) {
163        try {
164          procMap.put(parentProc.getProcId(), ProcedureUtil.convertToProtoProcedure(parentProc));
165          for (long procId : subProcIds) {
166            procMap.remove(procId);
167          }
168        } catch (IOException e) {
169          throw new UncheckedIOException(e);
170        }
171      }
172    }
173
174    @Override
175    public void delete(long[] procIds, int offset, int count) {
176      synchronized (procMap) {
177        for (int i = 0; i < count; i++) {
178          long procId = procIds[offset + i];
179          procMap.remove(procId);
180        }
181      }
182    }
183  }
184
185  private final HBaseCommonTestingUtil HBTU = new HBaseCommonTestingUtil();
186
187  private DummyProcedureStore procStore;
188
189  private ProcedureExecutor<Void> procExec;
190
191  @BeforeEach
192  public void setUp() throws IOException {
193    procStore = new DummyProcedureStore();
194    procStore.start(4);
195    procExec = new ProcedureExecutor<Void>(HBTU.getConfiguration(), null, procStore);
196    procExec.init(4, true);
197    procExec.startWorkers();
198  }
199
200  @AfterEach
201  public void tearDown() {
202    procExec.stop();
203  }
204
205  public static class DummyProcedure extends NoopProcedure<Void> {
206
207    @Override
208    protected Procedure<Void>[] execute(Void env)
209      throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
210      return new Procedure[] { new NoopProcedure<Void>(), new NoopProcedure<Void>() };
211    }
212  }
213
214  @Test
215  public void testLoad() throws IOException {
216    procExec.submitProcedure(new DummyProcedure());
217    // wait for the error
218    HBTU.waitFor(30000, () -> !procExec.isRunning());
219    procExec = new ProcedureExecutor<Void>(HBTU.getConfiguration(), null, procStore);
220    // make sure there is no error while loading
221    procExec.init(4, true);
222  }
223}