001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HConstants.NO_NONCE;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.util.concurrent.CountDownLatch;
027import java.util.concurrent.atomic.AtomicInteger;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseConfiguration;
031import org.apache.hadoop.hbase.ScheduledChore;
032import org.apache.hadoop.hbase.Stoppable;
033import org.apache.hadoop.hbase.testclassification.RegionServerTests;
034import org.apache.hadoop.hbase.testclassification.SmallTests;
035import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
036import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
037import org.apache.hadoop.hbase.util.Threads;
038import org.junit.ClassRule;
039import org.junit.Test;
040import org.junit.experimental.categories.Category;
041import org.mockito.Mockito;
042import org.mockito.invocation.InvocationOnMock;
043import org.mockito.stubbing.Answer;
044
045@Category({RegionServerTests.class, SmallTests.class})
046public class TestServerNonceManager {
047
048  @ClassRule
049  public static final HBaseClassTestRule CLASS_RULE =
050      HBaseClassTestRule.forClass(TestServerNonceManager.class);
051
052  @Test
053  public void testMvcc() throws Exception {
054    ServerNonceManager nm = createManager();
055    final long group = 100;
056    final long nonce = 1;
057    final long initMvcc = 999;
058    assertTrue(nm.startOperation(group, nonce, createStoppable()));
059    nm.addMvccToOperationContext(group, nonce, initMvcc);
060    nm.endOperation(group, nonce, true);
061    assertEquals(initMvcc, nm.getMvccFromOperationContext(group, nonce));
062    long newMvcc = initMvcc + 1;
063    for (long newNonce = nonce + 1; newNonce != (nonce + 5); ++newNonce) {
064      assertTrue(nm.startOperation(group, newNonce, createStoppable()));
065      nm.addMvccToOperationContext(group, newNonce, newMvcc);
066      nm.endOperation(group, newNonce, true);
067      assertEquals(newMvcc, nm.getMvccFromOperationContext(group, newNonce));
068      ++newMvcc;
069    }
070    assertEquals(initMvcc, nm.getMvccFromOperationContext(group, nonce));
071  }
072
073  @Test
074  public void testNormalStartEnd() throws Exception {
075    final long[] numbers = new long[] { NO_NONCE, 1, 2, Long.MAX_VALUE, Long.MIN_VALUE };
076    ServerNonceManager nm = createManager();
077    for (int i = 0; i < numbers.length; ++i) {
078      for (int j = 0; j < numbers.length; ++j) {
079        assertTrue(nm.startOperation(numbers[i], numbers[j], createStoppable()));
080      }
081    }
082    // Should be able to start operation the second time w/o nonces.
083    for (int i = 0; i < numbers.length; ++i) {
084      assertTrue(nm.startOperation(numbers[i], NO_NONCE, createStoppable()));
085    }
086    // Fail all operations - should be able to restart.
087    for (int i = 0; i < numbers.length; ++i) {
088      for (int j = 0; j < numbers.length; ++j) {
089        nm.endOperation(numbers[i], numbers[j], false);
090        assertTrue(nm.startOperation(numbers[i], numbers[j], createStoppable()));
091      }
092    }
093    // Succeed all operations - should not be able to restart, except for NO_NONCE.
094    for (int i = 0; i < numbers.length; ++i) {
095      for (int j = 0; j < numbers.length; ++j) {
096        nm.endOperation(numbers[i], numbers[j], true);
097        assertEquals(numbers[j] == NO_NONCE,
098            nm.startOperation(numbers[i], numbers[j], createStoppable()));
099      }
100    }
101  }
102
103  @Test
104  public void testNoEndWithoutStart() {
105    ServerNonceManager nm = createManager();
106    try {
107      nm.endOperation(NO_NONCE, 1, true);
108      throw new Error("Should have thrown");
109    } catch (AssertionError err) {}
110  }
111
112  @Test
113  public void testCleanup() throws Exception {
114    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
115    EnvironmentEdgeManager.injectEdge(edge);
116    try {
117      ServerNonceManager nm = createManager(6);
118      ScheduledChore cleanup = nm.createCleanupScheduledChore(Mockito.mock(Stoppable.class));
119      edge.setValue(1);
120      assertTrue(nm.startOperation(NO_NONCE, 1, createStoppable()));
121      assertTrue(nm.startOperation(NO_NONCE, 2, createStoppable()));
122      assertTrue(nm.startOperation(NO_NONCE, 3, createStoppable()));
123      edge.setValue(2);
124      nm.endOperation(NO_NONCE, 1, true);
125      edge.setValue(4);
126      nm.endOperation(NO_NONCE, 2, true);
127      edge.setValue(9);
128      cleanup.choreForTesting();
129      // Nonce 1 has been cleaned up.
130      assertTrue(nm.startOperation(NO_NONCE, 1, createStoppable()));
131      // Nonce 2 has not been cleaned up.
132      assertFalse(nm.startOperation(NO_NONCE, 2, createStoppable()));
133      // Nonce 3 was active and active ops should never be cleaned up; try to end and start.
134      nm.endOperation(NO_NONCE, 3, false);
135      assertTrue(nm.startOperation(NO_NONCE, 3, createStoppable()));
136      edge.setValue(11);
137      cleanup.choreForTesting();
138      // Now, nonce 2 has been cleaned up.
139      assertTrue(nm.startOperation(NO_NONCE, 2, createStoppable()));
140    } finally {
141      EnvironmentEdgeManager.reset();
142    }
143  }
144
145  @Test
146  public void testWalNonces() throws Exception {
147    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
148    EnvironmentEdgeManager.injectEdge(edge);
149    try {
150      ServerNonceManager nm = createManager(6);
151      ScheduledChore cleanup = nm.createCleanupScheduledChore(Mockito.mock(Stoppable.class));
152      // Add nonces from WAL, including dups.
153      edge.setValue(12);
154      nm.reportOperationFromWal(NO_NONCE, 1, 8);
155      nm.reportOperationFromWal(NO_NONCE, 2, 2);
156      nm.reportOperationFromWal(NO_NONCE, 3, 5);
157      nm.reportOperationFromWal(NO_NONCE, 3, 6);
158      // WAL nonces should prevent cross-server conflicts.
159      assertFalse(nm.startOperation(NO_NONCE, 1, createStoppable()));
160      // Make sure we ignore very old nonces, but not borderline old nonces.
161      assertTrue(nm.startOperation(NO_NONCE, 2, createStoppable()));
162      assertFalse(nm.startOperation(NO_NONCE, 3, createStoppable()));
163      // Make sure grace period is counted from recovery time.
164      edge.setValue(17);
165      cleanup.choreForTesting();
166      assertFalse(nm.startOperation(NO_NONCE, 1, createStoppable()));
167      assertFalse(nm.startOperation(NO_NONCE, 3, createStoppable()));
168      edge.setValue(19);
169      cleanup.choreForTesting();
170      assertTrue(nm.startOperation(NO_NONCE, 1, createStoppable()));
171      assertTrue(nm.startOperation(NO_NONCE, 3, createStoppable()));
172    } finally {
173      EnvironmentEdgeManager.reset();
174    }
175  }
176
177  @Test
178  public void testConcurrentAttempts() throws Exception {
179    final ServerNonceManager nm = createManager();
180
181    nm.startOperation(NO_NONCE, 1, createStoppable());
182    TestRunnable tr = new TestRunnable(nm, 1, false, createStoppable());
183    Thread t = tr.start();
184    waitForThreadToBlockOrExit(t);
185    nm.endOperation(NO_NONCE, 1, true); // operation succeeded
186    t.join(); // thread must now unblock and not proceed (result checked inside).
187    tr.propagateError();
188
189    nm.startOperation(NO_NONCE, 2, createStoppable());
190    tr = new TestRunnable(nm, 2, true, createStoppable());
191    t = tr.start();
192    waitForThreadToBlockOrExit(t);
193    nm.endOperation(NO_NONCE, 2, false);
194    t.join(); // thread must now unblock and allow us to proceed (result checked inside).
195    tr.propagateError();
196    nm.endOperation(NO_NONCE, 2, true); // that is to say we should be able to end operation
197
198    nm.startOperation(NO_NONCE, 3, createStoppable());
199    tr = new TestRunnable(nm, 4, true, createStoppable());
200    tr.start().join();  // nonce 3 must have no bearing on nonce 4
201    tr.propagateError();
202  }
203
204  @Test
205  public void testStopWaiting() throws Exception {
206    final ServerNonceManager nm = createManager();
207    nm.setConflictWaitIterationMs(1);
208    Stoppable stoppingStoppable = createStoppable();
209    Mockito.when(stoppingStoppable.isStopped()).thenAnswer(new Answer<Boolean>() {
210      AtomicInteger answer = new AtomicInteger(3);
211      @Override
212      public Boolean answer(InvocationOnMock invocation) throws Throwable {
213        return 0 < answer.decrementAndGet();
214      }
215    });
216
217    nm.startOperation(NO_NONCE, 1, createStoppable());
218    TestRunnable tr = new TestRunnable(nm, 1, null, stoppingStoppable);
219    Thread t = tr.start();
220    waitForThreadToBlockOrExit(t);
221    // thread must eventually throw
222    t.join();
223    tr.propagateError();
224  }
225
226  private void waitForThreadToBlockOrExit(Thread t) throws InterruptedException {
227    for (int i = 9; i >= 0; --i) {
228      if (t.getState() == Thread.State.TIMED_WAITING || t.getState() == Thread.State.WAITING
229          || t.getState() == Thread.State.BLOCKED || t.getState() == Thread.State.TERMINATED) {
230        return;
231      }
232      if (i > 0) Thread.sleep(300);
233    }
234    // Thread didn't block in 3 seconds. What is it doing? Continue the test, we'd rather
235    // have a very strange false positive then false negative due to timing.
236  }
237
238  private static class TestRunnable implements Runnable {
239    public final CountDownLatch startedLatch = new CountDownLatch(1); // It's the final countdown!
240
241    private final ServerNonceManager nm;
242    private final long nonce;
243    private final Boolean expected;
244    private final Stoppable stoppable;
245
246    private Throwable throwable = null;
247
248    public TestRunnable(ServerNonceManager nm, long nonce, Boolean expected, Stoppable stoppable) {
249      this.nm = nm;
250      this.nonce = nonce;
251      this.expected = expected;
252      this.stoppable = stoppable;
253    }
254
255    public void propagateError() throws Exception {
256      if (throwable == null) return;
257      throw new Exception(throwable);
258    }
259
260    public Thread start() {
261      Thread t = new Thread(this);
262      t = Threads.setDaemonThreadRunning(t);
263      try {
264        startedLatch.await();
265      } catch (InterruptedException e) {
266        fail("Unexpected");
267      }
268      return t;
269    }
270
271    @Override
272    public void run() {
273      startedLatch.countDown();
274      boolean shouldThrow = expected == null;
275      boolean hasThrown = true;
276      try {
277        boolean result = nm.startOperation(NO_NONCE, nonce, stoppable);
278        hasThrown = false;
279        if (!shouldThrow) {
280          assertEquals(expected.booleanValue(), result);
281        }
282      } catch (Throwable t) {
283        if (!shouldThrow) {
284          throwable = t;
285        }
286      }
287      if (shouldThrow && !hasThrown) {
288        throwable = new AssertionError("Should have thrown");
289      }
290    }
291  }
292
293  private Stoppable createStoppable() {
294    Stoppable s = Mockito.mock(Stoppable.class);
295    Mockito.when(s.isStopped()).thenReturn(false);
296    return s;
297  }
298
299  private ServerNonceManager createManager() {
300    return createManager(null);
301  }
302
303  private ServerNonceManager createManager(Integer gracePeriod) {
304    Configuration conf = HBaseConfiguration.create();
305    if (gracePeriod != null) {
306      conf.setInt(ServerNonceManager.HASH_NONCE_GRACE_PERIOD_KEY, gracePeriod.intValue());
307    }
308    return new ServerNonceManager(conf);
309  }
310}