001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.concurrent.CountDownLatch;
026import java.util.concurrent.atomic.AtomicReference;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
031import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
032import org.apache.hadoop.hbase.security.User;
033import org.apache.hadoop.hbase.testclassification.MasterTests;
034import org.apache.hadoop.hbase.testclassification.SmallTests;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.hbase.util.NonceKey;
037import org.apache.hadoop.hbase.util.Threads;
038import org.junit.After;
039import org.junit.Before;
040import org.junit.ClassRule;
041import org.junit.Test;
042import org.junit.experimental.categories.Category;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046@Category({MasterTests.class, SmallTests.class})
047public class TestProcedureNonce {
048  @ClassRule
049  public static final HBaseClassTestRule CLASS_RULE =
050      HBaseClassTestRule.forClass(TestProcedureNonce.class);
051
052  private static final Logger LOG = LoggerFactory.getLogger(TestProcedureNonce.class);
053
054  private static final int PROCEDURE_EXECUTOR_SLOTS = 2;
055
056  private static TestProcEnv procEnv;
057  private static ProcedureExecutor<TestProcEnv> procExecutor;
058  private static ProcedureStore procStore;
059
060  private HBaseCommonTestingUtility htu;
061  private FileSystem fs;
062  private Path logDir;
063
064  @Before
065  public void setUp() throws IOException {
066    htu = new HBaseCommonTestingUtility();
067    Path testDir = htu.getDataTestDir();
068    fs = testDir.getFileSystem(htu.getConfiguration());
069    assertTrue(testDir.depth() > 1);
070
071    logDir = new Path(testDir, "proc-logs");
072    procEnv = new TestProcEnv();
073    procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir);
074    procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore);
075    procExecutor.testing = new ProcedureExecutor.Testing();
076    procStore.start(PROCEDURE_EXECUTOR_SLOTS);
077    ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
078  }
079
080  @After
081  public void tearDown() throws IOException {
082    procExecutor.stop();
083    procStore.stop(false);
084    fs.delete(logDir, true);
085  }
086
087  @Test
088  public void testCompletedProcWithSameNonce() throws Exception {
089    final long nonceGroup = 123;
090    final long nonce = 2222;
091
092    // register the nonce
093    final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce);
094    assertFalse(procExecutor.registerNonce(nonceKey) >= 0);
095
096    // Submit a proc and wait for its completion
097    Procedure proc = new TestSingleStepProcedure();
098    long procId = procExecutor.submitProcedure(proc, nonceKey);
099    ProcedureTestingUtility.waitProcedure(procExecutor, procId);
100
101    // Restart
102    ProcedureTestingUtility.restart(procExecutor);
103    ProcedureTestingUtility.waitProcedure(procExecutor, procId);
104
105    // try to register a procedure with the same nonce
106    // we should get back the old procId
107    assertEquals(procId, procExecutor.registerNonce(nonceKey));
108
109    Procedure<?> result = procExecutor.getResult(procId);
110    ProcedureTestingUtility.assertProcNotFailed(result);
111  }
112
113  @Test
114  public void testRunningProcWithSameNonce() throws Exception {
115    final long nonceGroup = 456;
116    final long nonce = 33333;
117
118    // register the nonce
119    final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce);
120    assertFalse(procExecutor.registerNonce(nonceKey) >= 0);
121
122    // Submit a proc and use a latch to prevent the step execution until we submitted proc2
123    CountDownLatch latch = new CountDownLatch(1);
124    TestSingleStepProcedure proc = new TestSingleStepProcedure();
125    procEnv.setWaitLatch(latch);
126    long procId = procExecutor.submitProcedure(proc, nonceKey);
127    while (proc.step != 1) {
128      Threads.sleep(25);
129    }
130
131    // try to register a procedure with the same nonce
132    // we should get back the old procId
133    assertEquals(procId, procExecutor.registerNonce(nonceKey));
134
135    // complete the procedure
136    latch.countDown();
137
138    // Restart, the procedure is not completed yet
139    ProcedureTestingUtility.restart(procExecutor);
140    ProcedureTestingUtility.waitProcedure(procExecutor, procId);
141
142    // try to register a procedure with the same nonce
143    // we should get back the old procId
144    assertEquals(procId, procExecutor.registerNonce(nonceKey));
145
146    Procedure<?> result = procExecutor.getResult(procId);
147    ProcedureTestingUtility.assertProcNotFailed(result);
148  }
149
150  @Test
151  public void testSetFailureResultForNonce() throws IOException {
152    final long nonceGroup = 234;
153    final long nonce = 55555;
154
155    // check and register the request nonce
156    final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce);
157    assertFalse(procExecutor.registerNonce(nonceKey) >= 0);
158
159    procExecutor.setFailureResultForNonce(nonceKey, "testProc", User.getCurrent(),
160      new IOException("test failure"));
161
162    final long procId = procExecutor.registerNonce(nonceKey);
163    Procedure<?> result = procExecutor.getResult(procId);
164    ProcedureTestingUtility.assertProcFailed(result);
165  }
166
167  @Test
168  public void testConcurrentNonceRegistration() throws IOException {
169    testConcurrentNonceRegistration(true, 567, 44444);
170  }
171
172  @Test
173  public void testConcurrentNonceRegistrationWithRollback() throws IOException {
174    testConcurrentNonceRegistration(false, 890, 55555);
175  }
176
177  private void testConcurrentNonceRegistration(final boolean submitProcedure,
178      final long nonceGroup, final long nonce) throws IOException {
179    // register the nonce
180    final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce);
181
182    final AtomicReference<Throwable> t1Exception = new AtomicReference();
183    final AtomicReference<Throwable> t2Exception = new AtomicReference();
184
185    final CountDownLatch t1NonceRegisteredLatch = new CountDownLatch(1);
186    final CountDownLatch t2BeforeNonceRegisteredLatch = new CountDownLatch(1);
187    final Thread[] threads = new Thread[2];
188    threads[0] = new Thread() {
189      @Override
190      public void run() {
191        try {
192          // release the nonce and wake t2
193          assertFalse("unexpected already registered nonce",
194            procExecutor.registerNonce(nonceKey) >= 0);
195          t1NonceRegisteredLatch.countDown();
196
197          // hold the submission until t2 is registering the nonce
198          t2BeforeNonceRegisteredLatch.await();
199          Threads.sleep(1000);
200
201          if (submitProcedure) {
202            CountDownLatch latch = new CountDownLatch(1);
203            TestSingleStepProcedure proc = new TestSingleStepProcedure();
204            procEnv.setWaitLatch(latch);
205
206            procExecutor.submitProcedure(proc, nonceKey);
207            Threads.sleep(100);
208
209            // complete the procedure
210            latch.countDown();
211          } else {
212            procExecutor.unregisterNonceIfProcedureWasNotSubmitted(nonceKey);
213          }
214        } catch (Throwable e) {
215          t1Exception.set(e);
216        } finally {
217          t1NonceRegisteredLatch.countDown();
218          t2BeforeNonceRegisteredLatch.countDown();
219        }
220      }
221    };
222
223    threads[1] = new Thread() {
224      @Override
225      public void run() {
226        try {
227          // wait until t1 has registered the nonce
228          t1NonceRegisteredLatch.await();
229
230          // register the nonce
231          t2BeforeNonceRegisteredLatch.countDown();
232          assertFalse("unexpected non registered nonce",
233            procExecutor.registerNonce(nonceKey) < 0);
234        } catch (Throwable e) {
235          t2Exception.set(e);
236        } finally {
237          t1NonceRegisteredLatch.countDown();
238          t2BeforeNonceRegisteredLatch.countDown();
239        }
240      }
241    };
242
243    for (int i = 0; i < threads.length; ++i) {
244      threads[i].start();
245    }
246
247    for (int i = 0; i < threads.length; ++i) {
248      Threads.shutdown(threads[i]);
249    }
250
251    ProcedureTestingUtility.waitNoProcedureRunning(procExecutor);
252    assertEquals(null, t1Exception.get());
253    assertEquals(null, t2Exception.get());
254  }
255
256  public static class TestSingleStepProcedure extends SequentialProcedure<TestProcEnv> {
257    private int step = 0;
258
259    public TestSingleStepProcedure() { }
260
261    @Override
262    protected Procedure[] execute(TestProcEnv env) throws InterruptedException {
263      step++;
264      env.waitOnLatch();
265      LOG.debug("execute procedure " + this + " step=" + step);
266      step++;
267      setResult(Bytes.toBytes(step));
268      return null;
269    }
270
271    @Override
272    protected void rollback(TestProcEnv env) { }
273
274    @Override
275    protected boolean abort(TestProcEnv env) {
276      return true;
277    }
278  }
279
280  private static class TestProcEnv {
281    private CountDownLatch latch = null;
282
283    /**
284     * set/unset a latch. every procedure execute() step will wait on the latch if any.
285     */
286    public void setWaitLatch(CountDownLatch latch) {
287      this.latch = latch;
288    }
289
290    public void waitOnLatch() throws InterruptedException {
291      if (latch != null) {
292        latch.await();
293      }
294    }
295  }
296}