001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertNull; 023import static org.junit.jupiter.api.Assertions.assertTrue; 024 025import java.io.IOException; 026import java.util.concurrent.CountDownLatch; 027import java.util.concurrent.atomic.AtomicReference; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.HBaseCommonTestingUtil; 031import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 032import org.apache.hadoop.hbase.security.User; 033import org.apache.hadoop.hbase.testclassification.MasterTests; 034import org.apache.hadoop.hbase.testclassification.SmallTests; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.apache.hadoop.hbase.util.NonceKey; 037import org.apache.hadoop.hbase.util.Threads; 038import org.junit.jupiter.api.AfterEach; 039import org.junit.jupiter.api.BeforeEach; 040import org.junit.jupiter.api.Tag; 041import org.junit.jupiter.api.Test; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045@Tag(MasterTests.TAG) 046@Tag(SmallTests.TAG) 047public class TestProcedureNonce { 048 049 private static final Logger LOG = LoggerFactory.getLogger(TestProcedureNonce.class); 050 051 private static final int PROCEDURE_EXECUTOR_SLOTS = 2; 052 053 private static TestProcEnv procEnv; 054 private static ProcedureExecutor<TestProcEnv> procExecutor; 055 private static ProcedureStore procStore; 056 057 private HBaseCommonTestingUtil htu; 058 private FileSystem fs; 059 private Path logDir; 060 061 @BeforeEach 062 public void setUp() throws IOException { 063 htu = new HBaseCommonTestingUtil(); 064 Path testDir = htu.getDataTestDir(); 065 fs = testDir.getFileSystem(htu.getConfiguration()); 066 assertTrue(testDir.depth() > 1); 067 068 logDir = new Path(testDir, "proc-logs"); 069 procEnv = new TestProcEnv(); 070 procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir); 071 procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore); 072 procExecutor.testing = new ProcedureExecutor.Testing(); 073 procStore.start(PROCEDURE_EXECUTOR_SLOTS); 074 ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true); 075 } 076 077 @AfterEach 078 public void tearDown() throws IOException { 079 procExecutor.stop(); 080 procStore.stop(false); 081 fs.delete(logDir, true); 082 } 083 084 @Test 085 public void testCompletedProcWithSameNonce() throws Exception { 086 final long nonceGroup = 123; 087 final long nonce = 2222; 088 089 // register the nonce 090 final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce); 091 assertFalse(procExecutor.registerNonce(nonceKey) >= 0); 092 093 // Submit a proc and wait for its completion 094 Procedure proc = new TestSingleStepProcedure(); 095 long procId = procExecutor.submitProcedure(proc, nonceKey); 096 ProcedureTestingUtility.waitProcedure(procExecutor, procId); 097 098 // Restart 099 ProcedureTestingUtility.restart(procExecutor); 100 ProcedureTestingUtility.waitProcedure(procExecutor, procId); 101 102 // try to register a procedure with the same nonce 103 // we should get back the old procId 104 assertEquals(procId, procExecutor.registerNonce(nonceKey)); 105 106 Procedure<?> result = procExecutor.getResult(procId); 107 ProcedureTestingUtility.assertProcNotFailed(result); 108 } 109 110 @Test 111 public void testRunningProcWithSameNonce() throws Exception { 112 final long nonceGroup = 456; 113 final long nonce = 33333; 114 115 // register the nonce 116 final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce); 117 assertFalse(procExecutor.registerNonce(nonceKey) >= 0); 118 119 // Submit a proc and use a latch to prevent the step execution until we submitted proc2 120 CountDownLatch latch = new CountDownLatch(1); 121 TestSingleStepProcedure proc = new TestSingleStepProcedure(); 122 procEnv.setWaitLatch(latch); 123 long procId = procExecutor.submitProcedure(proc, nonceKey); 124 while (proc.step != 1) { 125 Threads.sleep(25); 126 } 127 128 // try to register a procedure with the same nonce 129 // we should get back the old procId 130 assertEquals(procId, procExecutor.registerNonce(nonceKey)); 131 132 // complete the procedure 133 latch.countDown(); 134 135 // Restart, the procedure is not completed yet 136 ProcedureTestingUtility.restart(procExecutor); 137 ProcedureTestingUtility.waitProcedure(procExecutor, procId); 138 139 // try to register a procedure with the same nonce 140 // we should get back the old procId 141 assertEquals(procId, procExecutor.registerNonce(nonceKey)); 142 143 Procedure<?> result = procExecutor.getResult(procId); 144 ProcedureTestingUtility.assertProcNotFailed(result); 145 } 146 147 @Test 148 public void testSetFailureResultForNonce() throws IOException { 149 final long nonceGroup = 234; 150 final long nonce = 55555; 151 152 // check and register the request nonce 153 final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce); 154 assertFalse(procExecutor.registerNonce(nonceKey) >= 0); 155 156 procExecutor.setFailureResultForNonce(nonceKey, "testProc", User.getCurrent(), 157 new IOException("test failure")); 158 159 final long procId = procExecutor.registerNonce(nonceKey); 160 Procedure<?> result = procExecutor.getResult(procId); 161 ProcedureTestingUtility.assertProcFailed(result); 162 } 163 164 @Test 165 public void testConcurrentNonceRegistration() throws IOException { 166 testConcurrentNonceRegistration(true, 567, 44444); 167 } 168 169 @Test 170 public void testConcurrentNonceRegistrationWithRollback() throws IOException { 171 testConcurrentNonceRegistration(false, 890, 55555); 172 } 173 174 private void testConcurrentNonceRegistration(final boolean submitProcedure, final long nonceGroup, 175 final long nonce) throws IOException { 176 // register the nonce 177 final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce); 178 179 final AtomicReference<Throwable> t1Exception = new AtomicReference(); 180 final AtomicReference<Throwable> t2Exception = new AtomicReference(); 181 182 final CountDownLatch t1NonceRegisteredLatch = new CountDownLatch(1); 183 final CountDownLatch t2BeforeNonceRegisteredLatch = new CountDownLatch(1); 184 final Thread[] threads = new Thread[2]; 185 threads[0] = new Thread() { 186 @Override 187 public void run() { 188 try { 189 // release the nonce and wake t2 190 assertFalse(procExecutor.registerNonce(nonceKey) >= 0, 191 "unexpected already registered nonce"); 192 t1NonceRegisteredLatch.countDown(); 193 194 // hold the submission until t2 is registering the nonce 195 t2BeforeNonceRegisteredLatch.await(); 196 Threads.sleep(1000); 197 198 if (submitProcedure) { 199 CountDownLatch latch = new CountDownLatch(1); 200 TestSingleStepProcedure proc = new TestSingleStepProcedure(); 201 procEnv.setWaitLatch(latch); 202 203 procExecutor.submitProcedure(proc, nonceKey); 204 Threads.sleep(100); 205 206 // complete the procedure 207 latch.countDown(); 208 } else { 209 procExecutor.unregisterNonceIfProcedureWasNotSubmitted(nonceKey); 210 } 211 } catch (Throwable e) { 212 t1Exception.set(e); 213 } finally { 214 t1NonceRegisteredLatch.countDown(); 215 t2BeforeNonceRegisteredLatch.countDown(); 216 } 217 } 218 }; 219 220 threads[1] = new Thread() { 221 @Override 222 public void run() { 223 try { 224 // wait until t1 has registered the nonce 225 t1NonceRegisteredLatch.await(); 226 227 // register the nonce 228 t2BeforeNonceRegisteredLatch.countDown(); 229 assertFalse(procExecutor.registerNonce(nonceKey) < 0, "unexpected non registered nonce"); 230 } catch (Throwable e) { 231 t2Exception.set(e); 232 } finally { 233 t1NonceRegisteredLatch.countDown(); 234 t2BeforeNonceRegisteredLatch.countDown(); 235 } 236 } 237 }; 238 239 for (int i = 0; i < threads.length; ++i) { 240 threads[i].start(); 241 } 242 243 for (int i = 0; i < threads.length; ++i) { 244 Threads.shutdown(threads[i]); 245 } 246 247 ProcedureTestingUtility.waitNoProcedureRunning(procExecutor); 248 assertNull(t1Exception.get()); 249 assertNull(t2Exception.get()); 250 } 251 252 public static class TestSingleStepProcedure extends SequentialProcedure<TestProcEnv> { 253 private int step = 0; 254 255 public TestSingleStepProcedure() { 256 } 257 258 @Override 259 protected Procedure[] execute(TestProcEnv env) throws InterruptedException { 260 step++; 261 env.waitOnLatch(); 262 LOG.debug("execute procedure {} step={}", this, step); 263 step++; 264 setResult(Bytes.toBytes(step)); 265 return null; 266 } 267 268 @Override 269 protected void rollback(TestProcEnv env) { 270 } 271 272 @Override 273 protected boolean abort(TestProcEnv env) { 274 return true; 275 } 276 } 277 278 private static class TestProcEnv { 279 private CountDownLatch latch = null; 280 281 /** 282 * set/unset a latch. every procedure execute() step will wait on the latch if any. 283 */ 284 public void setWaitLatch(CountDownLatch latch) { 285 this.latch = latch; 286 } 287 288 public void waitOnLatch() throws InterruptedException { 289 if (latch != null) { 290 latch.await(); 291 } 292 } 293 } 294}