001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.concurrent.CountDownLatch;
026import java.util.concurrent.atomic.AtomicReference;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
031import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
032import org.apache.hadoop.hbase.security.User;
033import org.apache.hadoop.hbase.testclassification.MasterTests;
034import org.apache.hadoop.hbase.testclassification.SmallTests;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.hbase.util.NonceKey;
037import org.apache.hadoop.hbase.util.Threads;
038import org.junit.After;
039import org.junit.Before;
040import org.junit.ClassRule;
041import org.junit.Test;
042import org.junit.experimental.categories.Category;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046@Category({MasterTests.class, SmallTests.class})
047public class TestProcedureNonce {
048
049  @ClassRule
050  public static final HBaseClassTestRule CLASS_RULE =
051      HBaseClassTestRule.forClass(TestProcedureNonce.class);
052
053  private static final Logger LOG = LoggerFactory.getLogger(TestProcedureNonce.class);
054
055  private static final int PROCEDURE_EXECUTOR_SLOTS = 2;
056
057  private static TestProcEnv procEnv;
058  private static ProcedureExecutor<TestProcEnv> procExecutor;
059  private static ProcedureStore procStore;
060
061  private HBaseCommonTestingUtility htu;
062  private FileSystem fs;
063  private Path logDir;
064
065  @Before
066  public void setUp() throws IOException {
067    htu = new HBaseCommonTestingUtility();
068    Path testDir = htu.getDataTestDir();
069    fs = testDir.getFileSystem(htu.getConfiguration());
070    assertTrue(testDir.depth() > 1);
071
072    logDir = new Path(testDir, "proc-logs");
073    procEnv = new TestProcEnv();
074    procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir);
075    procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore);
076    procExecutor.testing = new ProcedureExecutor.Testing();
077    procStore.start(PROCEDURE_EXECUTOR_SLOTS);
078    ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
079  }
080
081  @After
082  public void tearDown() throws IOException {
083    procExecutor.stop();
084    procStore.stop(false);
085    fs.delete(logDir, true);
086  }
087
088  @Test
089  public void testCompletedProcWithSameNonce() throws Exception {
090    final long nonceGroup = 123;
091    final long nonce = 2222;
092
093    // register the nonce
094    final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce);
095    assertFalse(procExecutor.registerNonce(nonceKey) >= 0);
096
097    // Submit a proc and wait for its completion
098    Procedure proc = new TestSingleStepProcedure();
099    long procId = procExecutor.submitProcedure(proc, nonceKey);
100    ProcedureTestingUtility.waitProcedure(procExecutor, procId);
101
102    // Restart
103    ProcedureTestingUtility.restart(procExecutor);
104    ProcedureTestingUtility.waitProcedure(procExecutor, procId);
105
106    // try to register a procedure with the same nonce
107    // we should get back the old procId
108    assertEquals(procId, procExecutor.registerNonce(nonceKey));
109
110    Procedure<?> result = procExecutor.getResult(procId);
111    ProcedureTestingUtility.assertProcNotFailed(result);
112  }
113
114  @Test
115  public void testRunningProcWithSameNonce() throws Exception {
116    final long nonceGroup = 456;
117    final long nonce = 33333;
118
119    // register the nonce
120    final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce);
121    assertFalse(procExecutor.registerNonce(nonceKey) >= 0);
122
123    // Submit a proc and use a latch to prevent the step execution until we submitted proc2
124    CountDownLatch latch = new CountDownLatch(1);
125    TestSingleStepProcedure proc = new TestSingleStepProcedure();
126    procEnv.setWaitLatch(latch);
127    long procId = procExecutor.submitProcedure(proc, nonceKey);
128    while (proc.step != 1) Threads.sleep(25);
129
130    // try to register a procedure with the same nonce
131    // we should get back the old procId
132    assertEquals(procId, procExecutor.registerNonce(nonceKey));
133
134    // complete the procedure
135    latch.countDown();
136
137    // Restart, the procedure is not completed yet
138    ProcedureTestingUtility.restart(procExecutor);
139    ProcedureTestingUtility.waitProcedure(procExecutor, procId);
140
141    // try to register a procedure with the same nonce
142    // we should get back the old procId
143    assertEquals(procId, procExecutor.registerNonce(nonceKey));
144
145    Procedure<?> result = procExecutor.getResult(procId);
146    ProcedureTestingUtility.assertProcNotFailed(result);
147  }
148
149  @Test
150  public void testSetFailureResultForNonce() throws IOException {
151    final long nonceGroup = 234;
152    final long nonce = 55555;
153
154    // check and register the request nonce
155    final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce);
156    assertFalse(procExecutor.registerNonce(nonceKey) >= 0);
157
158    procExecutor.setFailureResultForNonce(nonceKey, "testProc", User.getCurrent(),
159      new IOException("test failure"));
160
161    final long procId = procExecutor.registerNonce(nonceKey);
162    Procedure<?> result = procExecutor.getResult(procId);
163    ProcedureTestingUtility.assertProcFailed(result);
164  }
165
166  @Test
167  public void testConcurrentNonceRegistration() throws IOException {
168    testConcurrentNonceRegistration(true, 567, 44444);
169  }
170
171  @Test
172  public void testConcurrentNonceRegistrationWithRollback() throws IOException {
173    testConcurrentNonceRegistration(false, 890, 55555);
174  }
175
176  private void testConcurrentNonceRegistration(final boolean submitProcedure,
177      final long nonceGroup, final long nonce) throws IOException {
178    // register the nonce
179    final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce);
180
181    final AtomicReference<Throwable> t1Exception = new AtomicReference();
182    final AtomicReference<Throwable> t2Exception = new AtomicReference();
183
184    final CountDownLatch t1NonceRegisteredLatch = new CountDownLatch(1);
185    final CountDownLatch t2BeforeNonceRegisteredLatch = new CountDownLatch(1);
186    final Thread[] threads = new Thread[2];
187    threads[0] = new Thread() {
188      @Override
189      public void run() {
190        try {
191          // release the nonce and wake t2
192          assertFalse("unexpected already registered nonce",
193            procExecutor.registerNonce(nonceKey) >= 0);
194          t1NonceRegisteredLatch.countDown();
195
196          // hold the submission until t2 is registering the nonce
197          t2BeforeNonceRegisteredLatch.await();
198          Threads.sleep(1000);
199
200          if (submitProcedure) {
201            CountDownLatch latch = new CountDownLatch(1);
202            TestSingleStepProcedure proc = new TestSingleStepProcedure();
203            procEnv.setWaitLatch(latch);
204
205            procExecutor.submitProcedure(proc, nonceKey);
206            Threads.sleep(100);
207
208            // complete the procedure
209            latch.countDown();
210          } else {
211            procExecutor.unregisterNonceIfProcedureWasNotSubmitted(nonceKey);
212          }
213        } catch (Throwable e) {
214          t1Exception.set(e);
215        } finally {
216          t1NonceRegisteredLatch.countDown();
217          t2BeforeNonceRegisteredLatch.countDown();
218        }
219      }
220    };
221
222    threads[1] = new Thread() {
223      @Override
224      public void run() {
225        try {
226          // wait until t1 has registered the nonce
227          t1NonceRegisteredLatch.await();
228
229          // register the nonce
230          t2BeforeNonceRegisteredLatch.countDown();
231          assertFalse("unexpected non registered nonce",
232            procExecutor.registerNonce(nonceKey) < 0);
233        } catch (Throwable e) {
234          t2Exception.set(e);
235        } finally {
236          t1NonceRegisteredLatch.countDown();
237          t2BeforeNonceRegisteredLatch.countDown();
238        }
239      }
240    };
241
242    for (int i = 0; i < threads.length; ++i) threads[i].start();
243    for (int i = 0; i < threads.length; ++i) Threads.shutdown(threads[i]);
244    ProcedureTestingUtility.waitNoProcedureRunning(procExecutor);
245    assertEquals(null, t1Exception.get());
246    assertEquals(null, t2Exception.get());
247  }
248
249  public static class TestSingleStepProcedure extends SequentialProcedure<TestProcEnv> {
250    private int step = 0;
251
252    public TestSingleStepProcedure() { }
253
254    @Override
255    protected Procedure[] execute(TestProcEnv env) throws InterruptedException {
256      step++;
257      env.waitOnLatch();
258      LOG.debug("execute procedure " + this + " step=" + step);
259      step++;
260      setResult(Bytes.toBytes(step));
261      return null;
262    }
263
264    @Override
265    protected void rollback(TestProcEnv env) { }
266
267    @Override
268    protected boolean abort(TestProcEnv env) { return true; }
269  }
270
271  private static class TestProcEnv {
272    private CountDownLatch latch = null;
273
274    /**
275     * set/unset a latch. every procedure execute() step will wait on the latch if any.
276     */
277    public void setWaitLatch(CountDownLatch latch) {
278      this.latch = latch;
279    }
280
281    public void waitOnLatch() throws InterruptedException {
282      if (latch != null) {
283        latch.await();
284      }
285    }
286  }
287}