001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertTrue; 022 023import java.io.IOException; 024import java.util.HashMap; 025import java.util.Map; 026import java.util.concurrent.Exchanger; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.HBaseCommonTestingUtil; 029import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure; 030import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; 031import org.apache.hadoop.hbase.testclassification.MasterTests; 032import org.apache.hadoop.hbase.testclassification.SmallTests; 033import org.junit.jupiter.api.AfterAll; 034import org.junit.jupiter.api.AfterEach; 035import org.junit.jupiter.api.BeforeAll; 036import org.junit.jupiter.api.BeforeEach; 037import org.junit.jupiter.api.Tag; 038import org.junit.jupiter.api.Test; 039import org.junit.jupiter.api.TestInfo; 040 041import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; 042 043@Tag(MasterTests.TAG) 044@Tag(SmallTests.TAG) 045public class TestForceUpdateProcedure { 046 047 private static final HBaseCommonTestingUtil UTIL = new HBaseCommonTestingUtil(); 048 049 private static WALProcedureStore STORE; 050 051 private static ProcedureExecutor<Void> EXEC; 052 053 private static final Exchanger<Boolean> EXCHANGER = new Exchanger<>(); 054 055 private static final int WAL_COUNT = 5; 056 057 private String methodName; 058 059 private void createStoreAndExecutor() throws IOException { 060 UTIL.getConfiguration().setInt(CompletedProcedureCleaner.CLEANER_INTERVAL_CONF_KEY, 1000); 061 Path logDir = UTIL.getDataTestDir(methodName); 062 STORE = ProcedureTestingUtility.createWalStore(UTIL.getConfiguration(), logDir); 063 STORE.start(1); 064 EXEC = new ProcedureExecutor<>(UTIL.getConfiguration(), null, STORE); 065 ProcedureTestingUtility.initAndStartWorkers(EXEC, 1, true); 066 } 067 068 @BeforeAll 069 public static void setUpBeforeClass() throws IOException { 070 UTIL.getConfiguration().setInt(WALProcedureStore.WAL_COUNT_WARN_THRESHOLD_CONF_KEY, WAL_COUNT); 071 } 072 073 private void stopStoreAndExecutor() { 074 EXEC.stop(); 075 STORE.stop(false); 076 EXEC = null; 077 STORE = null; 078 } 079 080 @AfterAll 081 public static void tearDownAfterClass() throws IOException { 082 UTIL.cleanupTestDir(); 083 } 084 085 @BeforeEach 086 public void setUp(TestInfo testInfo) throws IOException { 087 methodName = testInfo.getTestMethod().get().getName(); 088 createStoreAndExecutor(); 089 } 090 091 @AfterEach 092 public void tearDown() { 093 stopStoreAndExecutor(); 094 } 095 096 public static final class WaitingProcedure extends NoopProcedure<Void> { 097 098 @Override 099 protected Procedure<Void>[] execute(Void env) 100 throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { 101 EXCHANGER.exchange(Boolean.TRUE); 102 setState(ProcedureState.WAITING_TIMEOUT); 103 setTimeout(Integer.MAX_VALUE); 104 throw new ProcedureSuspendedException(); 105 } 106 } 107 108 public static final class ParentProcedure extends NoopProcedure<Void> { 109 110 @SuppressWarnings("unchecked") 111 @Override 112 protected Procedure<Void>[] execute(Void env) 113 throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { 114 return new Procedure[] { new NoopProcedure<>(), new WaitingProcedure() }; 115 } 116 } 117 118 public static final class ExchangeProcedure extends NoopProcedure<Void> { 119 120 @SuppressWarnings("unchecked") 121 @Override 122 protected Procedure<Void>[] execute(Void env) 123 throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { 124 if (EXCHANGER.exchange(Boolean.TRUE)) { 125 return new Procedure[] { this }; 126 } else { 127 return null; 128 } 129 } 130 } 131 132 public static final class NoopNoAckProcedure extends NoopProcedure<Void> { 133 134 @Override 135 protected boolean shouldWaitClientAck(Void env) { 136 return false; 137 } 138 } 139 140 @Test 141 public void testProcedureStuck() throws IOException, InterruptedException { 142 EXEC.submitProcedure(new ParentProcedure()); 143 EXCHANGER.exchange(Boolean.TRUE); 144 UTIL.waitFor(10000, () -> EXEC.getActiveExecutorCount() == 0); 145 // The above operations are used to make sure that we have persist the states of the two 146 // procedures. 147 long procId = EXEC.submitProcedure(new ExchangeProcedure()); 148 assertEquals(1, STORE.getActiveLogs().size()); 149 for (int i = 0; i < WAL_COUNT - 1; i++) { 150 assertTrue(STORE.rollWriterForTesting()); 151 // The WaitinProcedure never gets updated so we can not delete the oldest wal file, so the 152 // number of wal files will increase 153 assertEquals(2 + i, STORE.getActiveLogs().size()); 154 EXCHANGER.exchange(Boolean.TRUE); 155 Thread.sleep(1000); 156 } 157 STORE.rollWriterForTesting(); 158 // Finish the ExchangeProcedure 159 EXCHANGER.exchange(Boolean.FALSE); 160 // Make sure that we can delete several wal files because we force update the state of 161 // WaitingProcedure. Notice that the last closed wal files can not be deleted, as when rolling 162 // the newest wal file does not have anything in it, and in the closed file we still have the 163 // state for the ExchangeProcedure so it can not be deleted 164 UTIL.waitFor(10000, () -> STORE.getActiveLogs().size() <= 2); 165 UTIL.waitFor(10000, () -> EXEC.isFinished(procId)); 166 // Make sure that after the force update we could still load the procedures 167 stopStoreAndExecutor(); 168 createStoreAndExecutor(); 169 Map<Class<?>, Procedure<Void>> procMap = new HashMap<>(); 170 EXEC.getActiveProceduresNoCopy().forEach(p -> procMap.put(p.getClass(), p)); 171 assertEquals(3, procMap.size()); 172 ParentProcedure parentProc = (ParentProcedure) procMap.get(ParentProcedure.class); 173 assertEquals(ProcedureState.WAITING, parentProc.getState()); 174 WaitingProcedure waitingProc = (WaitingProcedure) procMap.get(WaitingProcedure.class); 175 assertEquals(ProcedureState.WAITING_TIMEOUT, waitingProc.getState()); 176 NoopProcedure<Void> noopProc = (NoopProcedure<Void>) procMap.get(NoopProcedure.class); 177 assertEquals(ProcedureState.SUCCESS, noopProc.getState()); 178 } 179 180 @Test 181 public void testCompletedProcedure() throws InterruptedException, IOException { 182 long procId = EXEC.submitProcedure(new ExchangeProcedure()); 183 EXCHANGER.exchange(Boolean.FALSE); 184 UTIL.waitFor(10000, () -> EXEC.isFinished(procId)); 185 for (int i = 0; i < WAL_COUNT - 1; i++) { 186 assertTrue(STORE.rollWriterForTesting()); 187 // The exchange procedure is completed but still not deleted yet so we can not delete the 188 // oldest wal file 189 long pid = EXEC.submitProcedure(new NoopNoAckProcedure()); 190 assertEquals(2 + i, STORE.getActiveLogs().size()); 191 UTIL.waitFor(10000, () -> EXEC.isFinished(pid)); 192 } 193 // Only the exchange procedure can not be deleted 194 UTIL.waitFor(10000, () -> EXEC.getCompletedSize() == 1); 195 STORE.rollWriterForTesting(); 196 UTIL.waitFor(10000, () -> STORE.getActiveLogs().size() <= 1); 197 } 198}