001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.HashMap; 025import java.util.Map; 026import java.util.concurrent.Exchanger; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseCommonTestingUtility; 030import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure; 031import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; 032import org.apache.hadoop.hbase.testclassification.MasterTests; 033import org.apache.hadoop.hbase.testclassification.SmallTests; 034import org.junit.After; 035import org.junit.AfterClass; 036import org.junit.Before; 037import org.junit.BeforeClass; 038import org.junit.ClassRule; 039import org.junit.Rule; 040import org.junit.Test; 041import org.junit.experimental.categories.Category; 042import org.junit.rules.TestName; 043 044import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; 045 046@Category({ MasterTests.class, SmallTests.class }) 047public class TestForceUpdateProcedure { 048 049 @ClassRule 050 public static final HBaseClassTestRule CLASS_RULE = 051 HBaseClassTestRule.forClass(TestForceUpdateProcedure.class); 052 053 private static HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility(); 054 055 private static WALProcedureStore STORE; 056 057 private static ProcedureExecutor<Void> EXEC; 058 059 private static Exchanger<Boolean> EXCHANGER = new Exchanger<>(); 060 061 private static int WAL_COUNT = 5; 062 063 @Rule 064 public final TestName name = new TestName(); 065 066 private void createStoreAndExecutor() throws IOException { 067 UTIL.getConfiguration().setInt(CompletedProcedureCleaner.CLEANER_INTERVAL_CONF_KEY, 1000); 068 Path logDir = UTIL.getDataTestDir(name.getMethodName()); 069 STORE = ProcedureTestingUtility.createWalStore(UTIL.getConfiguration(), logDir); 070 STORE.start(1); 071 EXEC = new ProcedureExecutor<Void>(UTIL.getConfiguration(), null, STORE); 072 ProcedureTestingUtility.initAndStartWorkers(EXEC, 1, true); 073 } 074 075 @BeforeClass 076 public static void setUpBeforeClass() throws IOException { 077 UTIL.getConfiguration().setInt(WALProcedureStore.WAL_COUNT_WARN_THRESHOLD_CONF_KEY, WAL_COUNT); 078 } 079 080 private void stopStoreAndExecutor() { 081 EXEC.stop(); 082 STORE.stop(false); 083 EXEC = null; 084 STORE = null; 085 } 086 087 @AfterClass 088 public static void tearDownAfterClass() throws IOException { 089 UTIL.cleanupTestDir(); 090 } 091 092 @Before 093 public void setUp() throws IOException { 094 createStoreAndExecutor(); 095 } 096 097 @After 098 public void tearDown() { 099 stopStoreAndExecutor(); 100 } 101 102 public static final class WaitingProcedure extends NoopProcedure<Void> { 103 104 @Override 105 protected Procedure<Void>[] execute(Void env) 106 throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { 107 EXCHANGER.exchange(Boolean.TRUE); 108 setState(ProcedureState.WAITING_TIMEOUT); 109 setTimeout(Integer.MAX_VALUE); 110 throw new ProcedureSuspendedException(); 111 } 112 } 113 114 public static final class ParentProcedure extends NoopProcedure<Void> { 115 116 @SuppressWarnings("unchecked") 117 @Override 118 protected Procedure<Void>[] execute(Void env) 119 throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { 120 return new Procedure[] { new NoopProcedure<>(), new WaitingProcedure() }; 121 } 122 } 123 124 public static final class ExchangeProcedure extends NoopProcedure<Void> { 125 126 @SuppressWarnings("unchecked") 127 @Override 128 protected Procedure<Void>[] execute(Void env) 129 throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { 130 if (EXCHANGER.exchange(Boolean.TRUE)) { 131 return new Procedure[] { this }; 132 } else { 133 return null; 134 } 135 } 136 } 137 138 public static final class NoopNoAckProcedure extends NoopProcedure<Void> { 139 140 @Override 141 protected boolean shouldWaitClientAck(Void env) { 142 return false; 143 } 144 } 145 146 @Test 147 public void testProcedureStuck() throws IOException, InterruptedException { 148 EXEC.submitProcedure(new ParentProcedure()); 149 EXCHANGER.exchange(Boolean.TRUE); 150 UTIL.waitFor(10000, () -> EXEC.getActiveExecutorCount() == 0); 151 // The above operations are used to make sure that we have persist the states of the two 152 // procedures. 153 long procId = EXEC.submitProcedure(new ExchangeProcedure()); 154 assertEquals(1, STORE.getActiveLogs().size()); 155 for (int i = 0; i < WAL_COUNT - 1; i++) { 156 assertTrue(STORE.rollWriterForTesting()); 157 // The WaitinProcedure never gets updated so we can not delete the oldest wal file, so the 158 // number of wal files will increase 159 assertEquals(2 + i, STORE.getActiveLogs().size()); 160 EXCHANGER.exchange(Boolean.TRUE); 161 Thread.sleep(1000); 162 } 163 STORE.rollWriterForTesting(); 164 // Finish the ExchangeProcedure 165 EXCHANGER.exchange(Boolean.FALSE); 166 // Make sure that we can delete several wal files because we force update the state of 167 // WaitingProcedure. Notice that the last closed wal files can not be deleted, as when rolling 168 // the newest wal file does not have anything in it, and in the closed file we still have the 169 // state for the ExchangeProcedure so it can not be deleted 170 UTIL.waitFor(10000, () -> STORE.getActiveLogs().size() <= 2); 171 UTIL.waitFor(10000, () -> EXEC.isFinished(procId)); 172 // Make sure that after the force update we could still load the procedures 173 stopStoreAndExecutor(); 174 createStoreAndExecutor(); 175 Map<Class<?>, Procedure<Void>> procMap = new HashMap<>(); 176 EXEC.getActiveProceduresNoCopy().forEach(p -> procMap.put(p.getClass(), p)); 177 assertEquals(3, procMap.size()); 178 ParentProcedure parentProc = (ParentProcedure) procMap.get(ParentProcedure.class); 179 assertEquals(ProcedureState.WAITING, parentProc.getState()); 180 WaitingProcedure waitingProc = (WaitingProcedure) procMap.get(WaitingProcedure.class); 181 assertEquals(ProcedureState.WAITING_TIMEOUT, waitingProc.getState()); 182 NoopProcedure<Void> noopProc = (NoopProcedure<Void>) procMap.get(NoopProcedure.class); 183 assertEquals(ProcedureState.SUCCESS, noopProc.getState()); 184 } 185 186 @Test 187 public void testCompletedProcedure() throws InterruptedException, IOException { 188 long procId = EXEC.submitProcedure(new ExchangeProcedure()); 189 EXCHANGER.exchange(Boolean.FALSE); 190 UTIL.waitFor(10000, () -> EXEC.isFinished(procId)); 191 for (int i = 0; i < WAL_COUNT - 1; i++) { 192 assertTrue(STORE.rollWriterForTesting()); 193 // The exchange procedure is completed but still not deleted yet so we can not delete the 194 // oldest wal file 195 long pid = EXEC.submitProcedure(new NoopNoAckProcedure()); 196 assertEquals(2 + i, STORE.getActiveLogs().size()); 197 UTIL.waitFor(10000, () -> EXEC.isFinished(pid)); 198 } 199 // Only the exchange procedure can not be deleted 200 UTIL.waitFor(10000, () -> EXEC.getCompletedSize() == 1); 201 STORE.rollWriterForTesting(); 202 UTIL.waitFor(10000, () -> STORE.getActiveLogs().size() <= 1); 203 } 204}