001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import static org.junit.Assert.assertEquals; 021 022import java.io.IOException; 023import java.util.concurrent.atomic.AtomicInteger; 024import org.apache.hadoop.fs.FileSystem; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.hbase.HBaseClassTestRule; 027import org.apache.hadoop.hbase.HBaseCommonTestingUtility; 028import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure; 029import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 030import org.apache.hadoop.hbase.testclassification.MasterTests; 031import org.apache.hadoop.hbase.testclassification.SmallTests; 032import org.junit.After; 033import org.junit.Before; 034import org.junit.ClassRule; 035import org.junit.Test; 036import org.junit.experimental.categories.Category; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040import org.apache.hbase.thirdparty.com.google.protobuf.Int32Value; 041 042import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; 043 044@Category({MasterTests.class, SmallTests.class}) 045public class TestProcedureEvents { 046 047 @ClassRule 048 public static final HBaseClassTestRule CLASS_RULE = 049 HBaseClassTestRule.forClass(TestProcedureEvents.class); 050 051 private static final Logger LOG = LoggerFactory.getLogger(TestProcedureEvents.class); 052 053 private TestProcEnv procEnv; 054 private ProcedureStore procStore; 055 private ProcedureExecutor<TestProcEnv> procExecutor; 056 057 private HBaseCommonTestingUtility htu; 058 private FileSystem fs; 059 private Path logDir; 060 061 @Before 062 public void setUp() throws IOException { 063 htu = new HBaseCommonTestingUtility(); 064 Path testDir = htu.getDataTestDir(); 065 fs = testDir.getFileSystem(htu.getConfiguration()); 066 logDir = new Path(testDir, "proc-logs"); 067 068 procEnv = new TestProcEnv(); 069 procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); 070 procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore); 071 procStore.start(1); 072 ProcedureTestingUtility.initAndStartWorkers(procExecutor, 1, true); 073 } 074 075 @After 076 public void tearDown() throws IOException { 077 procExecutor.stop(); 078 procStore.stop(false); 079 procExecutor.join(); 080 fs.delete(logDir, true); 081 } 082 083 /** 084 * Tests being able to suspend a Procedure for N timeouts and then failing.s 085 * Resets the timeout after each elapses. See {@link TestTimeoutEventProcedure} for example 086 * of how to do this sort of trickery with the ProcedureExecutor; i.e. suspend for a while, 087 * check for a condition and if not set, suspend again, etc., ultimately failing or succeeding 088 * eventually. 089 */ 090 @Test 091 public void testTimeoutEventProcedure() throws Exception { 092 final int NTIMEOUTS = 5; 093 094 TestTimeoutEventProcedure proc = new TestTimeoutEventProcedure(500, NTIMEOUTS); 095 procExecutor.submitProcedure(proc); 096 097 ProcedureTestingUtility.waitProcedure(procExecutor, proc.getProcId()); 098 ProcedureTestingUtility.assertIsAbortException(procExecutor.getResult(proc.getProcId())); 099 assertEquals(NTIMEOUTS + 1, proc.getTimeoutsCount()); 100 } 101 102 @Test 103 public void testTimeoutEventProcedureDoubleExecution() throws Exception { 104 testTimeoutEventProcedureDoubleExecution(false); 105 } 106 107 @Test 108 public void testTimeoutEventProcedureDoubleExecutionKillIfSuspended() throws Exception { 109 testTimeoutEventProcedureDoubleExecution(true); 110 } 111 112 private void testTimeoutEventProcedureDoubleExecution(final boolean killIfSuspended) 113 throws Exception { 114 TestTimeoutEventProcedure proc = new TestTimeoutEventProcedure(1000, 3); 115 ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExecutor, true); 116 ProcedureTestingUtility.setKillIfSuspended(procExecutor, killIfSuspended); 117 long procId = procExecutor.submitProcedure(proc); 118 ProcedureTestingUtility.testRecoveryAndDoubleExecution(procExecutor, procId, true); 119 ProcedureTestingUtility.assertIsAbortException(procExecutor.getResult(proc.getProcId())); 120 } 121 122 /** 123 * This Event+Procedure exhibits following behavior: 124 * <ul> 125 * <li>On procedure execute() 126 * <ul> 127 * <li>If had enough timeouts, abort the procedure. Else....</li> 128 * <li>Suspend the event and add self to its suspend queue</li> 129 * <li>Go into waiting state</li> 130 * </ul> 131 * </li> 132 * <li> 133 * On waiting timeout 134 * <ul> 135 * <li>Wake the event (which adds this procedure back into scheduler queue), and set own's 136 * state to RUNNABLE (so can be executed again).</li> 137 * </ul> 138 * </li> 139 * </ul> 140 */ 141 public static class TestTimeoutEventProcedure extends NoopProcedure<TestProcEnv> { 142 private final ProcedureEvent event = new ProcedureEvent("timeout-event"); 143 144 private final AtomicInteger ntimeouts = new AtomicInteger(0); 145 private int maxTimeouts = 1; 146 147 public TestTimeoutEventProcedure() {} 148 149 public TestTimeoutEventProcedure(final int timeoutMsec, final int maxTimeouts) { 150 this.maxTimeouts = maxTimeouts; 151 setTimeout(timeoutMsec); 152 } 153 154 public int getTimeoutsCount() { 155 return ntimeouts.get(); 156 } 157 158 @Override 159 protected Procedure[] execute(final TestProcEnv env) throws ProcedureSuspendedException { 160 LOG.info("EXECUTE " + this + " ntimeouts=" + ntimeouts); 161 if (ntimeouts.get() > maxTimeouts) { 162 setAbortFailure("test", "give up after " + ntimeouts.get()); 163 return null; 164 } 165 166 event.suspend(); 167 if (event.suspendIfNotReady(this)) { 168 setState(ProcedureState.WAITING_TIMEOUT); 169 throw new ProcedureSuspendedException(); 170 } 171 172 return null; 173 } 174 175 @Override 176 protected synchronized boolean setTimeoutFailure(final TestProcEnv env) { 177 int n = ntimeouts.incrementAndGet(); 178 LOG.info("HANDLE TIMEOUT " + this + " ntimeouts=" + n); 179 setState(ProcedureState.RUNNABLE); 180 event.wake((AbstractProcedureScheduler) env.getProcedureScheduler()); 181 return false; 182 } 183 184 @Override 185 protected void afterReplay(final TestProcEnv env) { 186 if (getState() == ProcedureState.WAITING_TIMEOUT) { 187 event.suspend(); 188 event.suspendIfNotReady(this); 189 } 190 } 191 192 @Override 193 protected void serializeStateData(ProcedureStateSerializer serializer) 194 throws IOException { 195 Int32Value.Builder ntimeoutsBuilder = Int32Value.newBuilder().setValue(ntimeouts.get()); 196 serializer.serialize(ntimeoutsBuilder.build()); 197 198 Int32Value.Builder maxTimeoutsBuilder = Int32Value.newBuilder().setValue(maxTimeouts); 199 serializer.serialize(maxTimeoutsBuilder.build()); 200 } 201 202 @Override 203 protected void deserializeStateData(ProcedureStateSerializer serializer) 204 throws IOException { 205 Int32Value ntimeoutsValue = serializer.deserialize(Int32Value.class); 206 ntimeouts.set(ntimeoutsValue.getValue()); 207 208 Int32Value maxTimeoutsValue = serializer.deserialize(Int32Value.class); 209 maxTimeouts = maxTimeoutsValue.getValue(); 210 } 211 } 212 213 private class TestProcEnv { 214 public ProcedureScheduler getProcedureScheduler() { 215 return procExecutor.getScheduler(); 216 } 217 } 218}