001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import static org.junit.Assert.assertEquals; 021 022import java.io.IOException; 023import java.util.concurrent.atomic.AtomicInteger; 024import org.apache.hadoop.fs.FileSystem; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.hbase.HBaseClassTestRule; 027import org.apache.hadoop.hbase.HBaseCommonTestingUtility; 028import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure; 029import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 030import org.apache.hadoop.hbase.testclassification.MasterTests; 031import org.apache.hadoop.hbase.testclassification.MediumTests; 032import org.junit.After; 033import org.junit.Before; 034import org.junit.ClassRule; 035import org.junit.Test; 036import org.junit.experimental.categories.Category; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040import org.apache.hbase.thirdparty.com.google.protobuf.Int32Value; 041 042import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; 043 044@Category({ MasterTests.class, MediumTests.class }) 045public class TestProcedureEvents { 046 047 @ClassRule 048 public static final HBaseClassTestRule CLASS_RULE = 049 HBaseClassTestRule.forClass(TestProcedureEvents.class); 050 051 private static final Logger LOG = LoggerFactory.getLogger(TestProcedureEvents.class); 052 053 private TestProcEnv procEnv; 054 private ProcedureStore procStore; 055 private ProcedureExecutor<TestProcEnv> procExecutor; 056 057 private HBaseCommonTestingUtility htu; 058 private FileSystem fs; 059 private Path logDir; 060 061 @Before 062 public void setUp() throws IOException { 063 htu = new HBaseCommonTestingUtility(); 064 Path testDir = htu.getDataTestDir(); 065 fs = testDir.getFileSystem(htu.getConfiguration()); 066 logDir = new Path(testDir, "proc-logs"); 067 068 procEnv = new TestProcEnv(); 069 procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); 070 procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore); 071 procStore.start(1); 072 ProcedureTestingUtility.initAndStartWorkers(procExecutor, 1, true); 073 } 074 075 @After 076 public void tearDown() throws IOException { 077 procExecutor.stop(); 078 procStore.stop(false); 079 procExecutor.join(); 080 fs.delete(logDir, true); 081 } 082 083 /** 084 * Tests being able to suspend a Procedure for N timeouts and then failing.s Resets the timeout 085 * after each elapses. See {@link TestTimeoutEventProcedure} for example of how to do this sort of 086 * trickery with the ProcedureExecutor; i.e. suspend for a while, check for a condition and if not 087 * set, suspend again, etc., ultimately failing or succeeding eventually. 088 */ 089 @Test 090 public void testTimeoutEventProcedure() throws Exception { 091 final int NTIMEOUTS = 5; 092 093 TestTimeoutEventProcedure proc = new TestTimeoutEventProcedure(500, NTIMEOUTS); 094 procExecutor.submitProcedure(proc); 095 096 ProcedureTestingUtility.waitProcedure(procExecutor, proc.getProcId()); 097 ProcedureTestingUtility.assertIsAbortException(procExecutor.getResult(proc.getProcId())); 098 assertEquals(NTIMEOUTS + 1, proc.getTimeoutsCount()); 099 } 100 101 @Test 102 public void testTimeoutEventProcedureDoubleExecution() throws Exception { 103 testTimeoutEventProcedureDoubleExecution(false); 104 } 105 106 @Test 107 public void testTimeoutEventProcedureDoubleExecutionKillIfSuspended() throws Exception { 108 testTimeoutEventProcedureDoubleExecution(true); 109 } 110 111 private void testTimeoutEventProcedureDoubleExecution(final boolean killIfSuspended) 112 throws Exception { 113 TestTimeoutEventProcedure proc = new TestTimeoutEventProcedure(1000, 3); 114 ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExecutor, true); 115 ProcedureTestingUtility.setKillIfSuspended(procExecutor, killIfSuspended); 116 long procId = procExecutor.submitProcedure(proc); 117 ProcedureTestingUtility.testRecoveryAndDoubleExecution(procExecutor, procId, true); 118 ProcedureTestingUtility.assertIsAbortException(procExecutor.getResult(proc.getProcId())); 119 } 120 121 /** 122 * This Event+Procedure exhibits following behavior: 123 * <ul> 124 * <li>On procedure execute() 125 * <ul> 126 * <li>If had enough timeouts, abort the procedure. Else....</li> 127 * <li>Suspend the event and add self to its suspend queue</li> 128 * <li>Go into waiting state</li> 129 * </ul> 130 * </li> 131 * <li>On waiting timeout 132 * <ul> 133 * <li>Wake the event (which adds this procedure back into scheduler queue), and set own's state 134 * to RUNNABLE (so can be executed again).</li> 135 * </ul> 136 * </li> 137 * </ul> 138 */ 139 public static class TestTimeoutEventProcedure extends NoopProcedure<TestProcEnv> { 140 private final ProcedureEvent event = new ProcedureEvent("timeout-event"); 141 142 private final AtomicInteger ntimeouts = new AtomicInteger(0); 143 private int maxTimeouts = 1; 144 145 public TestTimeoutEventProcedure() { 146 } 147 148 public TestTimeoutEventProcedure(final int timeoutMsec, final int maxTimeouts) { 149 this.maxTimeouts = maxTimeouts; 150 setTimeout(timeoutMsec); 151 } 152 153 public int getTimeoutsCount() { 154 return ntimeouts.get(); 155 } 156 157 @Override 158 protected Procedure[] execute(final TestProcEnv env) throws ProcedureSuspendedException { 159 LOG.info("EXECUTE " + this + " ntimeouts=" + ntimeouts); 160 if (ntimeouts.get() > maxTimeouts) { 161 setAbortFailure("test", "give up after " + ntimeouts.get()); 162 return null; 163 } 164 165 event.suspend(); 166 if (event.suspendIfNotReady(this)) { 167 setState(ProcedureState.WAITING_TIMEOUT); 168 throw new ProcedureSuspendedException(); 169 } 170 171 return null; 172 } 173 174 @Override 175 protected synchronized boolean setTimeoutFailure(final TestProcEnv env) { 176 int n = ntimeouts.incrementAndGet(); 177 LOG.info("HANDLE TIMEOUT " + this + " ntimeouts=" + n); 178 setState(ProcedureState.RUNNABLE); 179 event.wake((AbstractProcedureScheduler) env.getProcedureScheduler()); 180 return false; 181 } 182 183 @Override 184 protected void afterReplay(final TestProcEnv env) { 185 if (getState() == ProcedureState.WAITING_TIMEOUT) { 186 event.suspend(); 187 event.suspendIfNotReady(this); 188 } 189 } 190 191 @Override 192 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 193 Int32Value.Builder ntimeoutsBuilder = Int32Value.newBuilder().setValue(ntimeouts.get()); 194 serializer.serialize(ntimeoutsBuilder.build()); 195 196 Int32Value.Builder maxTimeoutsBuilder = Int32Value.newBuilder().setValue(maxTimeouts); 197 serializer.serialize(maxTimeoutsBuilder.build()); 198 } 199 200 @Override 201 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 202 Int32Value ntimeoutsValue = serializer.deserialize(Int32Value.class); 203 ntimeouts.set(ntimeoutsValue.getValue()); 204 205 Int32Value maxTimeoutsValue = serializer.deserialize(Int32Value.class); 206 maxTimeouts = maxTimeoutsValue.getValue(); 207 } 208 } 209 210 private class TestProcEnv { 211 public ProcedureScheduler getProcedureScheduler() { 212 return procExecutor.getScheduler(); 213 } 214 } 215}