001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import java.io.IOException; 021import java.io.UncheckedIOException; 022import java.util.LinkedHashMap; 023import java.util.concurrent.atomic.AtomicBoolean; 024import java.util.concurrent.atomic.AtomicLong; 025import org.apache.hadoop.hbase.HBaseCommonTestingUtil; 026import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure; 027import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase; 028import org.apache.hadoop.hbase.procedure2.store.ProcedureTree; 029import org.apache.hadoop.hbase.testclassification.MasterTests; 030import org.apache.hadoop.hbase.testclassification.SmallTests; 031import org.apache.hadoop.hbase.util.AtomicUtils; 032import org.junit.jupiter.api.AfterEach; 033import org.junit.jupiter.api.BeforeEach; 034import org.junit.jupiter.api.Tag; 035import org.junit.jupiter.api.Test; 036 037import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 038 039/** 040 * Testcase for HBASE-28210, where we persist the procedure which has been inserted later to 041 * {@link RootProcedureState} first and then crash, and then cause holes in stack ids when loading, 042 * and finally fail the start up of master. 043 */ 044@Tag(MasterTests.TAG) 045@Tag(SmallTests.TAG) 046public class TestStackIdHoles { 047 048 private final class DummyProcedureStore extends ProcedureStoreBase { 049 050 private int numThreads; 051 052 private final LinkedHashMap<Long, ProcedureProtos.Procedure> procMap = 053 new LinkedHashMap<Long, ProcedureProtos.Procedure>(); 054 055 private final AtomicLong maxProcId = new AtomicLong(0); 056 057 private final AtomicBoolean updated = new AtomicBoolean(false); 058 059 @Override 060 public void start(int numThreads) throws IOException { 061 this.numThreads = numThreads; 062 setRunning(true); 063 } 064 065 @Override 066 public void stop(boolean abort) { 067 } 068 069 @Override 070 public int getNumThreads() { 071 return numThreads; 072 } 073 074 @Override 075 public int setRunningProcedureCount(int count) { 076 return count; 077 } 078 079 @Override 080 public void recoverLease() throws IOException { 081 } 082 083 @Override 084 public void load(ProcedureLoader loader) throws IOException { 085 loader.setMaxProcId(maxProcId.get()); 086 ProcedureTree tree = ProcedureTree.build(procMap.values()); 087 loader.load(tree.getValidProcs()); 088 loader.handleCorrupted(tree.getCorruptedProcs()); 089 } 090 091 @Override 092 public void insert(Procedure<?> proc, Procedure<?>[] subprocs) { 093 long max = proc.getProcId(); 094 synchronized (procMap) { 095 try { 096 procMap.put(proc.getProcId(), ProcedureUtil.convertToProtoProcedure(proc)); 097 if (subprocs != null) { 098 for (Procedure<?> p : subprocs) { 099 procMap.put(p.getProcId(), ProcedureUtil.convertToProtoProcedure(p)); 100 max = Math.max(max, p.getProcId()); 101 } 102 } 103 } catch (IOException e) { 104 throw new UncheckedIOException(e); 105 } 106 } 107 AtomicUtils.updateMax(maxProcId, max); 108 } 109 110 @Override 111 public void insert(Procedure<?>[] procs) { 112 long max = -1; 113 synchronized (procMap) { 114 try { 115 for (Procedure<?> p : procs) { 116 procMap.put(p.getProcId(), ProcedureUtil.convertToProtoProcedure(p)); 117 max = Math.max(max, p.getProcId()); 118 } 119 } catch (IOException e) { 120 throw new UncheckedIOException(e); 121 } 122 } 123 AtomicUtils.updateMax(maxProcId, max); 124 } 125 126 @Override 127 public void update(Procedure<?> proc) { 128 // inject a sleep to simulate the scenario in HBASE-28210 129 if (proc.hasParent() && proc.getStackIndexes() != null) { 130 int lastStackId = proc.getStackIndexes()[proc.getStackIndexes().length - 1]; 131 try { 132 // sleep more times if the stack id is smaller 133 Thread.sleep(100L * (10 - lastStackId)); 134 } catch (InterruptedException e) { 135 Thread.currentThread().interrupt(); 136 return; 137 } 138 // simulate the failure when updating the second sub procedure 139 if (!updated.compareAndSet(false, true)) { 140 procExec.stop(); 141 throw new RuntimeException("inject error"); 142 } 143 } 144 synchronized (procMap) { 145 try { 146 procMap.put(proc.getProcId(), ProcedureUtil.convertToProtoProcedure(proc)); 147 } catch (IOException e) { 148 throw new UncheckedIOException(e); 149 } 150 } 151 } 152 153 @Override 154 public void delete(long procId) { 155 synchronized (procMap) { 156 procMap.remove(procId); 157 } 158 } 159 160 @Override 161 public void delete(Procedure<?> parentProc, long[] subProcIds) { 162 synchronized (procMap) { 163 try { 164 procMap.put(parentProc.getProcId(), ProcedureUtil.convertToProtoProcedure(parentProc)); 165 for (long procId : subProcIds) { 166 procMap.remove(procId); 167 } 168 } catch (IOException e) { 169 throw new UncheckedIOException(e); 170 } 171 } 172 } 173 174 @Override 175 public void delete(long[] procIds, int offset, int count) { 176 synchronized (procMap) { 177 for (int i = 0; i < count; i++) { 178 long procId = procIds[offset + i]; 179 procMap.remove(procId); 180 } 181 } 182 } 183 } 184 185 private final HBaseCommonTestingUtil HBTU = new HBaseCommonTestingUtil(); 186 187 private DummyProcedureStore procStore; 188 189 private ProcedureExecutor<Void> procExec; 190 191 @BeforeEach 192 public void setUp() throws IOException { 193 procStore = new DummyProcedureStore(); 194 procStore.start(4); 195 procExec = new ProcedureExecutor<Void>(HBTU.getConfiguration(), null, procStore); 196 procExec.init(4, true); 197 procExec.startWorkers(); 198 } 199 200 @AfterEach 201 public void tearDown() { 202 procExec.stop(); 203 } 204 205 public static class DummyProcedure extends NoopProcedure<Void> { 206 207 @Override 208 protected Procedure<Void>[] execute(Void env) 209 throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { 210 return new Procedure[] { new NoopProcedure<Void>(), new NoopProcedure<Void>() }; 211 } 212 } 213 214 @Test 215 public void testLoad() throws IOException { 216 procExec.submitProcedure(new DummyProcedure()); 217 // wait for the error 218 HBTU.waitFor(30000, () -> !procExec.isRunning()); 219 procExec = new ProcedureExecutor<Void>(HBTU.getConfiguration(), null, procStore); 220 // make sure there is no error while loading 221 procExec.init(4, true); 222 } 223}