001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertNull;
023import static org.junit.jupiter.api.Assertions.assertTrue;
024
025import java.io.IOException;
026import java.util.concurrent.CountDownLatch;
027import java.util.concurrent.atomic.AtomicReference;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
031import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
032import org.apache.hadoop.hbase.security.User;
033import org.apache.hadoop.hbase.testclassification.MasterTests;
034import org.apache.hadoop.hbase.testclassification.SmallTests;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.hbase.util.NonceKey;
037import org.apache.hadoop.hbase.util.Threads;
038import org.junit.jupiter.api.AfterEach;
039import org.junit.jupiter.api.BeforeEach;
040import org.junit.jupiter.api.Tag;
041import org.junit.jupiter.api.Test;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045@Tag(MasterTests.TAG)
046@Tag(SmallTests.TAG)
047public class TestProcedureNonce {
048
049  private static final Logger LOG = LoggerFactory.getLogger(TestProcedureNonce.class);
050
051  private static final int PROCEDURE_EXECUTOR_SLOTS = 2;
052
053  private static TestProcEnv procEnv;
054  private static ProcedureExecutor<TestProcEnv> procExecutor;
055  private static ProcedureStore procStore;
056
057  private HBaseCommonTestingUtil htu;
058  private FileSystem fs;
059  private Path logDir;
060
061  @BeforeEach
062  public void setUp() throws IOException {
063    htu = new HBaseCommonTestingUtil();
064    Path testDir = htu.getDataTestDir();
065    fs = testDir.getFileSystem(htu.getConfiguration());
066    assertTrue(testDir.depth() > 1);
067
068    logDir = new Path(testDir, "proc-logs");
069    procEnv = new TestProcEnv();
070    procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir);
071    procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore);
072    procExecutor.testing = new ProcedureExecutor.Testing();
073    procStore.start(PROCEDURE_EXECUTOR_SLOTS);
074    ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
075  }
076
077  @AfterEach
078  public void tearDown() throws IOException {
079    procExecutor.stop();
080    procStore.stop(false);
081    fs.delete(logDir, true);
082  }
083
084  @Test
085  public void testCompletedProcWithSameNonce() throws Exception {
086    final long nonceGroup = 123;
087    final long nonce = 2222;
088
089    // register the nonce
090    final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce);
091    assertFalse(procExecutor.registerNonce(nonceKey) >= 0);
092
093    // Submit a proc and wait for its completion
094    Procedure proc = new TestSingleStepProcedure();
095    long procId = procExecutor.submitProcedure(proc, nonceKey);
096    ProcedureTestingUtility.waitProcedure(procExecutor, procId);
097
098    // Restart
099    ProcedureTestingUtility.restart(procExecutor);
100    ProcedureTestingUtility.waitProcedure(procExecutor, procId);
101
102    // try to register a procedure with the same nonce
103    // we should get back the old procId
104    assertEquals(procId, procExecutor.registerNonce(nonceKey));
105
106    Procedure<?> result = procExecutor.getResult(procId);
107    ProcedureTestingUtility.assertProcNotFailed(result);
108  }
109
110  @Test
111  public void testRunningProcWithSameNonce() throws Exception {
112    final long nonceGroup = 456;
113    final long nonce = 33333;
114
115    // register the nonce
116    final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce);
117    assertFalse(procExecutor.registerNonce(nonceKey) >= 0);
118
119    // Submit a proc and use a latch to prevent the step execution until we submitted proc2
120    CountDownLatch latch = new CountDownLatch(1);
121    TestSingleStepProcedure proc = new TestSingleStepProcedure();
122    procEnv.setWaitLatch(latch);
123    long procId = procExecutor.submitProcedure(proc, nonceKey);
124    while (proc.step != 1) {
125      Threads.sleep(25);
126    }
127
128    // try to register a procedure with the same nonce
129    // we should get back the old procId
130    assertEquals(procId, procExecutor.registerNonce(nonceKey));
131
132    // complete the procedure
133    latch.countDown();
134
135    // Restart, the procedure is not completed yet
136    ProcedureTestingUtility.restart(procExecutor);
137    ProcedureTestingUtility.waitProcedure(procExecutor, procId);
138
139    // try to register a procedure with the same nonce
140    // we should get back the old procId
141    assertEquals(procId, procExecutor.registerNonce(nonceKey));
142
143    Procedure<?> result = procExecutor.getResult(procId);
144    ProcedureTestingUtility.assertProcNotFailed(result);
145  }
146
147  @Test
148  public void testSetFailureResultForNonce() throws IOException {
149    final long nonceGroup = 234;
150    final long nonce = 55555;
151
152    // check and register the request nonce
153    final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce);
154    assertFalse(procExecutor.registerNonce(nonceKey) >= 0);
155
156    procExecutor.setFailureResultForNonce(nonceKey, "testProc", User.getCurrent(),
157      new IOException("test failure"));
158
159    final long procId = procExecutor.registerNonce(nonceKey);
160    Procedure<?> result = procExecutor.getResult(procId);
161    ProcedureTestingUtility.assertProcFailed(result);
162  }
163
164  @Test
165  public void testConcurrentNonceRegistration() throws IOException {
166    testConcurrentNonceRegistration(true, 567, 44444);
167  }
168
169  @Test
170  public void testConcurrentNonceRegistrationWithRollback() throws IOException {
171    testConcurrentNonceRegistration(false, 890, 55555);
172  }
173
174  private void testConcurrentNonceRegistration(final boolean submitProcedure, final long nonceGroup,
175    final long nonce) throws IOException {
176    // register the nonce
177    final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce);
178
179    final AtomicReference<Throwable> t1Exception = new AtomicReference();
180    final AtomicReference<Throwable> t2Exception = new AtomicReference();
181
182    final CountDownLatch t1NonceRegisteredLatch = new CountDownLatch(1);
183    final CountDownLatch t2BeforeNonceRegisteredLatch = new CountDownLatch(1);
184    final Thread[] threads = new Thread[2];
185    threads[0] = new Thread() {
186      @Override
187      public void run() {
188        try {
189          // release the nonce and wake t2
190          assertFalse(procExecutor.registerNonce(nonceKey) >= 0,
191            "unexpected already registered nonce");
192          t1NonceRegisteredLatch.countDown();
193
194          // hold the submission until t2 is registering the nonce
195          t2BeforeNonceRegisteredLatch.await();
196          Threads.sleep(1000);
197
198          if (submitProcedure) {
199            CountDownLatch latch = new CountDownLatch(1);
200            TestSingleStepProcedure proc = new TestSingleStepProcedure();
201            procEnv.setWaitLatch(latch);
202
203            procExecutor.submitProcedure(proc, nonceKey);
204            Threads.sleep(100);
205
206            // complete the procedure
207            latch.countDown();
208          } else {
209            procExecutor.unregisterNonceIfProcedureWasNotSubmitted(nonceKey);
210          }
211        } catch (Throwable e) {
212          t1Exception.set(e);
213        } finally {
214          t1NonceRegisteredLatch.countDown();
215          t2BeforeNonceRegisteredLatch.countDown();
216        }
217      }
218    };
219
220    threads[1] = new Thread() {
221      @Override
222      public void run() {
223        try {
224          // wait until t1 has registered the nonce
225          t1NonceRegisteredLatch.await();
226
227          // register the nonce
228          t2BeforeNonceRegisteredLatch.countDown();
229          assertFalse(procExecutor.registerNonce(nonceKey) < 0, "unexpected non registered nonce");
230        } catch (Throwable e) {
231          t2Exception.set(e);
232        } finally {
233          t1NonceRegisteredLatch.countDown();
234          t2BeforeNonceRegisteredLatch.countDown();
235        }
236      }
237    };
238
239    for (int i = 0; i < threads.length; ++i) {
240      threads[i].start();
241    }
242
243    for (int i = 0; i < threads.length; ++i) {
244      Threads.shutdown(threads[i]);
245    }
246
247    ProcedureTestingUtility.waitNoProcedureRunning(procExecutor);
248    assertNull(t1Exception.get());
249    assertNull(t2Exception.get());
250  }
251
252  public static class TestSingleStepProcedure extends SequentialProcedure<TestProcEnv> {
253    private int step = 0;
254
255    public TestSingleStepProcedure() {
256    }
257
258    @Override
259    protected Procedure[] execute(TestProcEnv env) throws InterruptedException {
260      step++;
261      env.waitOnLatch();
262      LOG.debug("execute procedure {} step={}", this, step);
263      step++;
264      setResult(Bytes.toBytes(step));
265      return null;
266    }
267
268    @Override
269    protected void rollback(TestProcEnv env) {
270    }
271
272    @Override
273    protected boolean abort(TestProcEnv env) {
274      return true;
275    }
276  }
277
278  private static class TestProcEnv {
279    private CountDownLatch latch = null;
280
281    /**
282     * set/unset a latch. every procedure execute() step will wait on the latch if any.
283     */
284    public void setWaitLatch(CountDownLatch latch) {
285      this.latch = latch;
286    }
287
288    public void waitOnLatch() throws InterruptedException {
289      if (latch != null) {
290        latch.await();
291      }
292    }
293  }
294}