001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021 022import java.io.IOException; 023import java.util.concurrent.atomic.AtomicInteger; 024import org.apache.hadoop.fs.FileSystem; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.hbase.HBaseCommonTestingUtil; 027import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure; 028import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 029import org.apache.hadoop.hbase.testclassification.MasterTests; 030import org.apache.hadoop.hbase.testclassification.MediumTests; 031import org.junit.jupiter.api.AfterEach; 032import org.junit.jupiter.api.BeforeEach; 033import org.junit.jupiter.api.Tag; 034import org.junit.jupiter.api.Test; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038import org.apache.hbase.thirdparty.com.google.protobuf.Int32Value; 039 040import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; 041 042@Tag(MasterTests.TAG) 043@Tag(MediumTests.TAG) 044public class TestProcedureEvents { 045 046 private static final Logger LOG = LoggerFactory.getLogger(TestProcedureEvents.class); 047 048 private TestProcEnv procEnv; 049 private ProcedureStore procStore; 050 private ProcedureExecutor<TestProcEnv> procExecutor; 051 052 private HBaseCommonTestingUtil htu; 053 private FileSystem fs; 054 private Path logDir; 055 056 @BeforeEach 057 public void setUp() throws IOException { 058 htu = new HBaseCommonTestingUtil(); 059 Path testDir = htu.getDataTestDir(); 060 fs = testDir.getFileSystem(htu.getConfiguration()); 061 logDir = new Path(testDir, "proc-logs"); 062 063 procEnv = new TestProcEnv(); 064 procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); 065 procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore); 066 procStore.start(1); 067 ProcedureTestingUtility.initAndStartWorkers(procExecutor, 1, true); 068 } 069 070 @AfterEach 071 public void tearDown() throws IOException { 072 procExecutor.stop(); 073 procStore.stop(false); 074 procExecutor.join(); 075 fs.delete(logDir, true); 076 } 077 078 /** 079 * Tests being able to suspend a Procedure for N timeouts and then failing.s Resets the timeout 080 * after each elapses. See {@link TestTimeoutEventProcedure} for example of how to do this sort of 081 * trickery with the ProcedureExecutor; i.e. suspend for a while, check for a condition and if not 082 * set, suspend again, etc., ultimately failing or succeeding eventually. 083 */ 084 @Test 085 public void testTimeoutEventProcedure() throws Exception { 086 final int NTIMEOUTS = 5; 087 088 TestTimeoutEventProcedure proc = new TestTimeoutEventProcedure(500, NTIMEOUTS); 089 procExecutor.submitProcedure(proc); 090 091 ProcedureTestingUtility.waitProcedure(procExecutor, proc.getProcId()); 092 ProcedureTestingUtility.assertIsAbortException(procExecutor.getResult(proc.getProcId())); 093 assertEquals(NTIMEOUTS + 1, proc.getTimeoutsCount()); 094 } 095 096 @Test 097 public void testTimeoutEventProcedureDoubleExecution() throws Exception { 098 testTimeoutEventProcedureDoubleExecution(false); 099 } 100 101 @Test 102 public void testTimeoutEventProcedureDoubleExecutionKillIfSuspended() throws Exception { 103 testTimeoutEventProcedureDoubleExecution(true); 104 } 105 106 private void testTimeoutEventProcedureDoubleExecution(final boolean killIfSuspended) 107 throws Exception { 108 TestTimeoutEventProcedure proc = new TestTimeoutEventProcedure(1000, 3); 109 ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExecutor, true); 110 ProcedureTestingUtility.setKillIfSuspended(procExecutor, killIfSuspended); 111 long procId = procExecutor.submitProcedure(proc); 112 ProcedureTestingUtility.testRecoveryAndDoubleExecution(procExecutor, procId, true); 113 ProcedureTestingUtility.assertIsAbortException(procExecutor.getResult(proc.getProcId())); 114 } 115 116 /** 117 * This Event+Procedure exhibits following behavior: 118 * <ul> 119 * <li>On procedure execute() 120 * <ul> 121 * <li>If had enough timeouts, abort the procedure. Else....</li> 122 * <li>Suspend the event and add self to its suspend queue</li> 123 * <li>Go into waiting state</li> 124 * </ul> 125 * </li> 126 * <li>On waiting timeout 127 * <ul> 128 * <li>Wake the event (which adds this procedure back into scheduler queue), and set own's state 129 * to RUNNABLE (so can be executed again).</li> 130 * </ul> 131 * </li> 132 * </ul> 133 */ 134 public static class TestTimeoutEventProcedure extends NoopProcedure<TestProcEnv> { 135 private final ProcedureEvent event = new ProcedureEvent("timeout-event"); 136 137 private final AtomicInteger ntimeouts = new AtomicInteger(0); 138 private int maxTimeouts = 1; 139 140 public TestTimeoutEventProcedure() { 141 } 142 143 public TestTimeoutEventProcedure(final int timeoutMsec, final int maxTimeouts) { 144 this.maxTimeouts = maxTimeouts; 145 setTimeout(timeoutMsec); 146 } 147 148 public int getTimeoutsCount() { 149 return ntimeouts.get(); 150 } 151 152 @Override 153 protected Procedure[] execute(final TestProcEnv env) throws ProcedureSuspendedException { 154 LOG.info("EXECUTE " + this + " ntimeouts=" + ntimeouts); 155 if (ntimeouts.get() > maxTimeouts) { 156 setAbortFailure("test", "give up after " + ntimeouts.get()); 157 return null; 158 } 159 160 event.suspend(); 161 if (event.suspendIfNotReady(this)) { 162 setState(ProcedureState.WAITING_TIMEOUT); 163 throw new ProcedureSuspendedException(); 164 } 165 166 return null; 167 } 168 169 @Override 170 protected synchronized boolean setTimeoutFailure(final TestProcEnv env) { 171 int n = ntimeouts.incrementAndGet(); 172 LOG.info("HANDLE TIMEOUT {} ntimeouts={}", this, n); 173 setState(ProcedureState.RUNNABLE); 174 event.wake((AbstractProcedureScheduler) env.getProcedureScheduler()); 175 return false; 176 } 177 178 @Override 179 protected void afterReplay(final TestProcEnv env) { 180 if (getState() == ProcedureState.WAITING_TIMEOUT) { 181 event.suspend(); 182 event.suspendIfNotReady(this); 183 } 184 } 185 186 @Override 187 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 188 Int32Value.Builder ntimeoutsBuilder = Int32Value.newBuilder().setValue(ntimeouts.get()); 189 serializer.serialize(ntimeoutsBuilder.build()); 190 191 Int32Value.Builder maxTimeoutsBuilder = Int32Value.newBuilder().setValue(maxTimeouts); 192 serializer.serialize(maxTimeoutsBuilder.build()); 193 } 194 195 @Override 196 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 197 Int32Value ntimeoutsValue = serializer.deserialize(Int32Value.class); 198 ntimeouts.set(ntimeoutsValue.getValue()); 199 200 Int32Value maxTimeoutsValue = serializer.deserialize(Int32Value.class); 201 maxTimeouts = maxTimeoutsValue.getValue(); 202 } 203 } 204 205 private class TestProcEnv { 206 public ProcedureScheduler getProcedureScheduler() { 207 return procExecutor.getScheduler(); 208 } 209 } 210}