001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import java.io.IOException;
021import java.io.UncheckedIOException;
022import java.util.LinkedHashMap;
023import java.util.concurrent.atomic.AtomicBoolean;
024import java.util.concurrent.atomic.AtomicLong;
025import org.apache.hadoop.hbase.HBaseClassTestRule;
026import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
027import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
028import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase;
029import org.apache.hadoop.hbase.procedure2.store.ProcedureTree;
030import org.apache.hadoop.hbase.testclassification.MasterTests;
031import org.apache.hadoop.hbase.testclassification.SmallTests;
032import org.apache.hadoop.hbase.util.AtomicUtils;
033import org.junit.After;
034import org.junit.Before;
035import org.junit.ClassRule;
036import org.junit.Test;
037import org.junit.experimental.categories.Category;
038
039import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
040
041/**
042 * Testcase for HBASE-28210, where we persist the procedure which has been inserted later to
043 * {@link RootProcedureState} first and then crash, and then cause holes in stack ids when loading,
044 * and finally fail the start up of master.
045 */
046@Category({ MasterTests.class, SmallTests.class })
047public class TestStackIdHoles {
048
049  @ClassRule
050  public static final HBaseClassTestRule CLASS_RULE =
051    HBaseClassTestRule.forClass(TestStackIdHoles.class);
052
053  private final class DummyProcedureStore extends ProcedureStoreBase {
054
055    private int numThreads;
056
057    private final LinkedHashMap<Long, ProcedureProtos.Procedure> procMap =
058      new LinkedHashMap<Long, ProcedureProtos.Procedure>();
059
060    private final AtomicLong maxProcId = new AtomicLong(0);
061
062    private final AtomicBoolean updated = new AtomicBoolean(false);
063
064    @Override
065    public void start(int numThreads) throws IOException {
066      this.numThreads = numThreads;
067      setRunning(true);
068    }
069
070    @Override
071    public void stop(boolean abort) {
072    }
073
074    @Override
075    public int getNumThreads() {
076      return numThreads;
077    }
078
079    @Override
080    public int setRunningProcedureCount(int count) {
081      return count;
082    }
083
084    @Override
085    public void recoverLease() throws IOException {
086    }
087
088    @Override
089    public void load(ProcedureLoader loader) throws IOException {
090      loader.setMaxProcId(maxProcId.get());
091      ProcedureTree tree = ProcedureTree.build(procMap.values());
092      loader.load(tree.getValidProcs());
093      loader.handleCorrupted(tree.getCorruptedProcs());
094    }
095
096    @Override
097    public void insert(Procedure<?> proc, Procedure<?>[] subprocs) {
098      long max = proc.getProcId();
099      synchronized (procMap) {
100        try {
101          procMap.put(proc.getProcId(), ProcedureUtil.convertToProtoProcedure(proc));
102          if (subprocs != null) {
103            for (Procedure<?> p : subprocs) {
104              procMap.put(p.getProcId(), ProcedureUtil.convertToProtoProcedure(p));
105              max = Math.max(max, p.getProcId());
106            }
107          }
108        } catch (IOException e) {
109          throw new UncheckedIOException(e);
110        }
111      }
112      AtomicUtils.updateMax(maxProcId, max);
113    }
114
115    @Override
116    public void insert(Procedure<?>[] procs) {
117      long max = -1;
118      synchronized (procMap) {
119        try {
120          for (Procedure<?> p : procs) {
121            procMap.put(p.getProcId(), ProcedureUtil.convertToProtoProcedure(p));
122            max = Math.max(max, p.getProcId());
123          }
124        } catch (IOException e) {
125          throw new UncheckedIOException(e);
126        }
127      }
128      AtomicUtils.updateMax(maxProcId, max);
129    }
130
131    @Override
132    public void update(Procedure<?> proc) {
133      // inject a sleep to simulate the scenario in HBASE-28210
134      if (proc.hasParent() && proc.getStackIndexes() != null) {
135        int lastStackId = proc.getStackIndexes()[proc.getStackIndexes().length - 1];
136        try {
137          // sleep more times if the stack id is smaller
138          Thread.sleep(100L * (10 - lastStackId));
139        } catch (InterruptedException e) {
140          Thread.currentThread().interrupt();
141          return;
142        }
143        // simulate the failure when updating the second sub procedure
144        if (!updated.compareAndSet(false, true)) {
145          procExec.stop();
146          throw new RuntimeException("inject error");
147        }
148      }
149      synchronized (procMap) {
150        try {
151          procMap.put(proc.getProcId(), ProcedureUtil.convertToProtoProcedure(proc));
152        } catch (IOException e) {
153          throw new UncheckedIOException(e);
154        }
155      }
156    }
157
158    @Override
159    public void delete(long procId) {
160      synchronized (procMap) {
161        procMap.remove(procId);
162      }
163    }
164
165    @Override
166    public void delete(Procedure<?> parentProc, long[] subProcIds) {
167      synchronized (procMap) {
168        try {
169          procMap.put(parentProc.getProcId(), ProcedureUtil.convertToProtoProcedure(parentProc));
170          for (long procId : subProcIds) {
171            procMap.remove(procId);
172          }
173        } catch (IOException e) {
174          throw new UncheckedIOException(e);
175        }
176      }
177    }
178
179    @Override
180    public void delete(long[] procIds, int offset, int count) {
181      synchronized (procMap) {
182        for (int i = 0; i < count; i++) {
183          long procId = procIds[offset + i];
184          procMap.remove(procId);
185        }
186      }
187    }
188  }
189
190  private final HBaseCommonTestingUtil HBTU = new HBaseCommonTestingUtil();
191
192  private DummyProcedureStore procStore;
193
194  private ProcedureExecutor<Void> procExec;
195
196  @Before
197  public void setUp() throws IOException {
198    procStore = new DummyProcedureStore();
199    procStore.start(4);
200    procExec = new ProcedureExecutor<Void>(HBTU.getConfiguration(), null, procStore);
201    procExec.init(4, true);
202    procExec.startWorkers();
203  }
204
205  @After
206  public void tearDown() {
207    procExec.stop();
208  }
209
210  public static class DummyProcedure extends NoopProcedure<Void> {
211
212    @Override
213    protected Procedure<Void>[] execute(Void env)
214      throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
215      return new Procedure[] { new NoopProcedure<Void>(), new NoopProcedure<Void>() };
216    }
217  }
218
219  @Test
220  public void testLoad() throws IOException {
221    procExec.submitProcedure(new DummyProcedure());
222    // wait for the error
223    HBTU.waitFor(30000, () -> !procExec.isRunning());
224    procExec = new ProcedureExecutor<Void>(HBTU.getConfiguration(), null, procStore);
225    // make sure there is no error while loading
226    procExec.init(4, true);
227  }
228}