001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.concurrent.CountDownLatch; 026import java.util.concurrent.atomic.AtomicReference; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.HBaseCommonTestingUtility; 031import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 032import org.apache.hadoop.hbase.security.User; 033import org.apache.hadoop.hbase.testclassification.MasterTests; 034import org.apache.hadoop.hbase.testclassification.SmallTests; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.apache.hadoop.hbase.util.NonceKey; 037import org.apache.hadoop.hbase.util.Threads; 038import org.junit.After; 039import org.junit.Before; 040import org.junit.ClassRule; 041import org.junit.Test; 042import org.junit.experimental.categories.Category; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046@Category({MasterTests.class, SmallTests.class}) 047public class TestProcedureNonce { 048 049 @ClassRule 050 public static final HBaseClassTestRule CLASS_RULE = 051 HBaseClassTestRule.forClass(TestProcedureNonce.class); 052 053 private static final Logger LOG = LoggerFactory.getLogger(TestProcedureNonce.class); 054 055 private static final int PROCEDURE_EXECUTOR_SLOTS = 2; 056 057 private static TestProcEnv procEnv; 058 private static ProcedureExecutor<TestProcEnv> procExecutor; 059 private static ProcedureStore procStore; 060 061 private HBaseCommonTestingUtility htu; 062 private FileSystem fs; 063 private Path logDir; 064 065 @Before 066 public void setUp() throws IOException { 067 htu = new HBaseCommonTestingUtility(); 068 Path testDir = htu.getDataTestDir(); 069 fs = testDir.getFileSystem(htu.getConfiguration()); 070 assertTrue(testDir.depth() > 1); 071 072 logDir = new Path(testDir, "proc-logs"); 073 procEnv = new TestProcEnv(); 074 procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir); 075 procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore); 076 procExecutor.testing = new ProcedureExecutor.Testing(); 077 procStore.start(PROCEDURE_EXECUTOR_SLOTS); 078 ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true); 079 } 080 081 @After 082 public void tearDown() throws IOException { 083 procExecutor.stop(); 084 procStore.stop(false); 085 fs.delete(logDir, true); 086 } 087 088 @Test 089 public void testCompletedProcWithSameNonce() throws Exception { 090 final long nonceGroup = 123; 091 final long nonce = 2222; 092 093 // register the nonce 094 final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce); 095 assertFalse(procExecutor.registerNonce(nonceKey) >= 0); 096 097 // Submit a proc and wait for its completion 098 Procedure proc = new TestSingleStepProcedure(); 099 long procId = procExecutor.submitProcedure(proc, nonceKey); 100 ProcedureTestingUtility.waitProcedure(procExecutor, procId); 101 102 // Restart 103 ProcedureTestingUtility.restart(procExecutor); 104 ProcedureTestingUtility.waitProcedure(procExecutor, procId); 105 106 // try to register a procedure with the same nonce 107 // we should get back the old procId 108 assertEquals(procId, procExecutor.registerNonce(nonceKey)); 109 110 Procedure<?> result = procExecutor.getResult(procId); 111 ProcedureTestingUtility.assertProcNotFailed(result); 112 } 113 114 @Test 115 public void testRunningProcWithSameNonce() throws Exception { 116 final long nonceGroup = 456; 117 final long nonce = 33333; 118 119 // register the nonce 120 final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce); 121 assertFalse(procExecutor.registerNonce(nonceKey) >= 0); 122 123 // Submit a proc and use a latch to prevent the step execution until we submitted proc2 124 CountDownLatch latch = new CountDownLatch(1); 125 TestSingleStepProcedure proc = new TestSingleStepProcedure(); 126 procEnv.setWaitLatch(latch); 127 long procId = procExecutor.submitProcedure(proc, nonceKey); 128 while (proc.step != 1) Threads.sleep(25); 129 130 // try to register a procedure with the same nonce 131 // we should get back the old procId 132 assertEquals(procId, procExecutor.registerNonce(nonceKey)); 133 134 // complete the procedure 135 latch.countDown(); 136 137 // Restart, the procedure is not completed yet 138 ProcedureTestingUtility.restart(procExecutor); 139 ProcedureTestingUtility.waitProcedure(procExecutor, procId); 140 141 // try to register a procedure with the same nonce 142 // we should get back the old procId 143 assertEquals(procId, procExecutor.registerNonce(nonceKey)); 144 145 Procedure<?> result = procExecutor.getResult(procId); 146 ProcedureTestingUtility.assertProcNotFailed(result); 147 } 148 149 @Test 150 public void testSetFailureResultForNonce() throws IOException { 151 final long nonceGroup = 234; 152 final long nonce = 55555; 153 154 // check and register the request nonce 155 final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce); 156 assertFalse(procExecutor.registerNonce(nonceKey) >= 0); 157 158 procExecutor.setFailureResultForNonce(nonceKey, "testProc", User.getCurrent(), 159 new IOException("test failure")); 160 161 final long procId = procExecutor.registerNonce(nonceKey); 162 Procedure<?> result = procExecutor.getResult(procId); 163 ProcedureTestingUtility.assertProcFailed(result); 164 } 165 166 @Test 167 public void testConcurrentNonceRegistration() throws IOException { 168 testConcurrentNonceRegistration(true, 567, 44444); 169 } 170 171 @Test 172 public void testConcurrentNonceRegistrationWithRollback() throws IOException { 173 testConcurrentNonceRegistration(false, 890, 55555); 174 } 175 176 private void testConcurrentNonceRegistration(final boolean submitProcedure, 177 final long nonceGroup, final long nonce) throws IOException { 178 // register the nonce 179 final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce); 180 181 final AtomicReference<Throwable> t1Exception = new AtomicReference(); 182 final AtomicReference<Throwable> t2Exception = new AtomicReference(); 183 184 final CountDownLatch t1NonceRegisteredLatch = new CountDownLatch(1); 185 final CountDownLatch t2BeforeNonceRegisteredLatch = new CountDownLatch(1); 186 final Thread[] threads = new Thread[2]; 187 threads[0] = new Thread() { 188 @Override 189 public void run() { 190 try { 191 // release the nonce and wake t2 192 assertFalse("unexpected already registered nonce", 193 procExecutor.registerNonce(nonceKey) >= 0); 194 t1NonceRegisteredLatch.countDown(); 195 196 // hold the submission until t2 is registering the nonce 197 t2BeforeNonceRegisteredLatch.await(); 198 Threads.sleep(1000); 199 200 if (submitProcedure) { 201 CountDownLatch latch = new CountDownLatch(1); 202 TestSingleStepProcedure proc = new TestSingleStepProcedure(); 203 procEnv.setWaitLatch(latch); 204 205 procExecutor.submitProcedure(proc, nonceKey); 206 Threads.sleep(100); 207 208 // complete the procedure 209 latch.countDown(); 210 } else { 211 procExecutor.unregisterNonceIfProcedureWasNotSubmitted(nonceKey); 212 } 213 } catch (Throwable e) { 214 t1Exception.set(e); 215 } finally { 216 t1NonceRegisteredLatch.countDown(); 217 t2BeforeNonceRegisteredLatch.countDown(); 218 } 219 } 220 }; 221 222 threads[1] = new Thread() { 223 @Override 224 public void run() { 225 try { 226 // wait until t1 has registered the nonce 227 t1NonceRegisteredLatch.await(); 228 229 // register the nonce 230 t2BeforeNonceRegisteredLatch.countDown(); 231 assertFalse("unexpected non registered nonce", 232 procExecutor.registerNonce(nonceKey) < 0); 233 } catch (Throwable e) { 234 t2Exception.set(e); 235 } finally { 236 t1NonceRegisteredLatch.countDown(); 237 t2BeforeNonceRegisteredLatch.countDown(); 238 } 239 } 240 }; 241 242 for (int i = 0; i < threads.length; ++i) threads[i].start(); 243 for (int i = 0; i < threads.length; ++i) Threads.shutdown(threads[i]); 244 ProcedureTestingUtility.waitNoProcedureRunning(procExecutor); 245 assertEquals(null, t1Exception.get()); 246 assertEquals(null, t2Exception.get()); 247 } 248 249 public static class TestSingleStepProcedure extends SequentialProcedure<TestProcEnv> { 250 private int step = 0; 251 252 public TestSingleStepProcedure() { } 253 254 @Override 255 protected Procedure[] execute(TestProcEnv env) throws InterruptedException { 256 step++; 257 env.waitOnLatch(); 258 LOG.debug("execute procedure " + this + " step=" + step); 259 step++; 260 setResult(Bytes.toBytes(step)); 261 return null; 262 } 263 264 @Override 265 protected void rollback(TestProcEnv env) { } 266 267 @Override 268 protected boolean abort(TestProcEnv env) { return true; } 269 } 270 271 private static class TestProcEnv { 272 private CountDownLatch latch = null; 273 274 /** 275 * set/unset a latch. every procedure execute() step will wait on the latch if any. 276 */ 277 public void setWaitLatch(CountDownLatch latch) { 278 this.latch = latch; 279 } 280 281 public void waitOnLatch() throws InterruptedException { 282 if (latch != null) { 283 latch.await(); 284 } 285 } 286 } 287}