001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HConstants.NO_NONCE;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.util.concurrent.CountDownLatch;
027import java.util.concurrent.atomic.AtomicInteger;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseConfiguration;
031import org.apache.hadoop.hbase.ScheduledChore;
032import org.apache.hadoop.hbase.Stoppable;
033import org.apache.hadoop.hbase.testclassification.RegionServerTests;
034import org.apache.hadoop.hbase.testclassification.SmallTests;
035import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
036import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
037import org.apache.hadoop.hbase.util.Threads;
038import org.junit.ClassRule;
039import org.junit.Test;
040import org.junit.experimental.categories.Category;
041import org.mockito.Mockito;
042import org.mockito.invocation.InvocationOnMock;
043import org.mockito.stubbing.Answer;
044
045@Category({ RegionServerTests.class, SmallTests.class })
046public class TestServerNonceManager {
047
048  @ClassRule
049  public static final HBaseClassTestRule CLASS_RULE =
050    HBaseClassTestRule.forClass(TestServerNonceManager.class);
051
052  @Test
053  public void testMvcc() throws Exception {
054    ServerNonceManager nm = createManager();
055    final long group = 100;
056    final long nonce = 1;
057    final long initMvcc = 999;
058    assertTrue(nm.startOperation(group, nonce, createStoppable()));
059    nm.addMvccToOperationContext(group, nonce, initMvcc);
060    nm.endOperation(group, nonce, true);
061    assertEquals(initMvcc, nm.getMvccFromOperationContext(group, nonce));
062    long newMvcc = initMvcc + 1;
063    for (long newNonce = nonce + 1; newNonce != (nonce + 5); ++newNonce) {
064      assertTrue(nm.startOperation(group, newNonce, createStoppable()));
065      nm.addMvccToOperationContext(group, newNonce, newMvcc);
066      nm.endOperation(group, newNonce, true);
067      assertEquals(newMvcc, nm.getMvccFromOperationContext(group, newNonce));
068      ++newMvcc;
069    }
070    assertEquals(initMvcc, nm.getMvccFromOperationContext(group, nonce));
071  }
072
073  @Test
074  public void testNormalStartEnd() throws Exception {
075    final long[] numbers = new long[] { NO_NONCE, 1, 2, Long.MAX_VALUE, Long.MIN_VALUE };
076    ServerNonceManager nm = createManager();
077    for (int i = 0; i < numbers.length; ++i) {
078      for (int j = 0; j < numbers.length; ++j) {
079        assertTrue(nm.startOperation(numbers[i], numbers[j], createStoppable()));
080      }
081    }
082    // Should be able to start operation the second time w/o nonces.
083    for (int i = 0; i < numbers.length; ++i) {
084      assertTrue(nm.startOperation(numbers[i], NO_NONCE, createStoppable()));
085    }
086    // Fail all operations - should be able to restart.
087    for (int i = 0; i < numbers.length; ++i) {
088      for (int j = 0; j < numbers.length; ++j) {
089        nm.endOperation(numbers[i], numbers[j], false);
090        assertTrue(nm.startOperation(numbers[i], numbers[j], createStoppable()));
091      }
092    }
093    // Succeed all operations - should not be able to restart, except for NO_NONCE.
094    for (int i = 0; i < numbers.length; ++i) {
095      for (int j = 0; j < numbers.length; ++j) {
096        nm.endOperation(numbers[i], numbers[j], true);
097        assertEquals(numbers[j] == NO_NONCE,
098          nm.startOperation(numbers[i], numbers[j], createStoppable()));
099      }
100    }
101  }
102
103  @Test
104  public void testNoEndWithoutStart() {
105    ServerNonceManager nm = createManager();
106    try {
107      nm.endOperation(NO_NONCE, 1, true);
108      throw new Error("Should have thrown");
109    } catch (AssertionError err) {
110    }
111  }
112
113  @Test
114  public void testCleanup() throws Exception {
115    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
116    EnvironmentEdgeManager.injectEdge(edge);
117    try {
118      ServerNonceManager nm = createManager(6);
119      ScheduledChore cleanup = nm.createCleanupScheduledChore(Mockito.mock(Stoppable.class));
120      edge.setValue(1);
121      assertTrue(nm.startOperation(NO_NONCE, 1, createStoppable()));
122      assertTrue(nm.startOperation(NO_NONCE, 2, createStoppable()));
123      assertTrue(nm.startOperation(NO_NONCE, 3, createStoppable()));
124      edge.setValue(2);
125      nm.endOperation(NO_NONCE, 1, true);
126      edge.setValue(4);
127      nm.endOperation(NO_NONCE, 2, true);
128      edge.setValue(9);
129      cleanup.choreForTesting();
130      // Nonce 1 has been cleaned up.
131      assertTrue(nm.startOperation(NO_NONCE, 1, createStoppable()));
132      // Nonce 2 has not been cleaned up.
133      assertFalse(nm.startOperation(NO_NONCE, 2, createStoppable()));
134      // Nonce 3 was active and active ops should never be cleaned up; try to end and start.
135      nm.endOperation(NO_NONCE, 3, false);
136      assertTrue(nm.startOperation(NO_NONCE, 3, createStoppable()));
137      edge.setValue(11);
138      cleanup.choreForTesting();
139      // Now, nonce 2 has been cleaned up.
140      assertTrue(nm.startOperation(NO_NONCE, 2, createStoppable()));
141    } finally {
142      EnvironmentEdgeManager.reset();
143    }
144  }
145
146  @Test
147  public void testWalNonces() throws Exception {
148    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
149    EnvironmentEdgeManager.injectEdge(edge);
150    try {
151      ServerNonceManager nm = createManager(6);
152      ScheduledChore cleanup = nm.createCleanupScheduledChore(Mockito.mock(Stoppable.class));
153      // Add nonces from WAL, including dups.
154      edge.setValue(12);
155      nm.reportOperationFromWal(NO_NONCE, 1, 8);
156      nm.reportOperationFromWal(NO_NONCE, 2, 2);
157      nm.reportOperationFromWal(NO_NONCE, 3, 5);
158      nm.reportOperationFromWal(NO_NONCE, 3, 6);
159      // WAL nonces should prevent cross-server conflicts.
160      assertFalse(nm.startOperation(NO_NONCE, 1, createStoppable()));
161      // Make sure we ignore very old nonces, but not borderline old nonces.
162      assertTrue(nm.startOperation(NO_NONCE, 2, createStoppable()));
163      assertFalse(nm.startOperation(NO_NONCE, 3, createStoppable()));
164      // Make sure grace period is counted from recovery time.
165      edge.setValue(17);
166      cleanup.choreForTesting();
167      assertFalse(nm.startOperation(NO_NONCE, 1, createStoppable()));
168      assertFalse(nm.startOperation(NO_NONCE, 3, createStoppable()));
169      edge.setValue(19);
170      cleanup.choreForTesting();
171      assertTrue(nm.startOperation(NO_NONCE, 1, createStoppable()));
172      assertTrue(nm.startOperation(NO_NONCE, 3, createStoppable()));
173    } finally {
174      EnvironmentEdgeManager.reset();
175    }
176  }
177
178  @Test
179  public void testConcurrentAttempts() throws Exception {
180    final ServerNonceManager nm = createManager();
181
182    nm.startOperation(NO_NONCE, 1, createStoppable());
183    TestRunnable tr = new TestRunnable(nm, 1, false, createStoppable());
184    Thread t = tr.start();
185    waitForThreadToBlockOrExit(t);
186    nm.endOperation(NO_NONCE, 1, true); // operation succeeded
187    t.join(); // thread must now unblock and not proceed (result checked inside).
188    tr.propagateError();
189
190    nm.startOperation(NO_NONCE, 2, createStoppable());
191    tr = new TestRunnable(nm, 2, true, createStoppable());
192    t = tr.start();
193    waitForThreadToBlockOrExit(t);
194    nm.endOperation(NO_NONCE, 2, false);
195    t.join(); // thread must now unblock and allow us to proceed (result checked inside).
196    tr.propagateError();
197    nm.endOperation(NO_NONCE, 2, true); // that is to say we should be able to end operation
198
199    nm.startOperation(NO_NONCE, 3, createStoppable());
200    tr = new TestRunnable(nm, 4, true, createStoppable());
201    tr.start().join(); // nonce 3 must have no bearing on nonce 4
202    tr.propagateError();
203  }
204
205  @Test
206  public void testStopWaiting() throws Exception {
207    final ServerNonceManager nm = createManager();
208    nm.setConflictWaitIterationMs(1);
209    Stoppable stoppingStoppable = createStoppable();
210    Mockito.when(stoppingStoppable.isStopped()).thenAnswer(new Answer<Boolean>() {
211      AtomicInteger answer = new AtomicInteger(3);
212
213      @Override
214      public Boolean answer(InvocationOnMock invocation) throws Throwable {
215        return 0 < answer.decrementAndGet();
216      }
217    });
218
219    nm.startOperation(NO_NONCE, 1, createStoppable());
220    TestRunnable tr = new TestRunnable(nm, 1, null, stoppingStoppable);
221    Thread t = tr.start();
222    waitForThreadToBlockOrExit(t);
223    // thread must eventually throw
224    t.join();
225    tr.propagateError();
226  }
227
228  private void waitForThreadToBlockOrExit(Thread t) throws InterruptedException {
229    for (int i = 9; i >= 0; --i) {
230      if (
231        t.getState() == Thread.State.TIMED_WAITING || t.getState() == Thread.State.WAITING
232          || t.getState() == Thread.State.BLOCKED || t.getState() == Thread.State.TERMINATED
233      ) {
234        return;
235      }
236      if (i > 0) Thread.sleep(300);
237    }
238    // Thread didn't block in 3 seconds. What is it doing? Continue the test, we'd rather
239    // have a very strange false positive then false negative due to timing.
240  }
241
242  private static class TestRunnable implements Runnable {
243    public final CountDownLatch startedLatch = new CountDownLatch(1); // It's the final countdown!
244
245    private final ServerNonceManager nm;
246    private final long nonce;
247    private final Boolean expected;
248    private final Stoppable stoppable;
249
250    private Throwable throwable = null;
251
252    public TestRunnable(ServerNonceManager nm, long nonce, Boolean expected, Stoppable stoppable) {
253      this.nm = nm;
254      this.nonce = nonce;
255      this.expected = expected;
256      this.stoppable = stoppable;
257    }
258
259    public void propagateError() throws Exception {
260      if (throwable == null) return;
261      throw new Exception(throwable);
262    }
263
264    public Thread start() {
265      Thread t = new Thread(this);
266      t = Threads.setDaemonThreadRunning(t);
267      try {
268        startedLatch.await();
269      } catch (InterruptedException e) {
270        fail("Unexpected");
271      }
272      return t;
273    }
274
275    @Override
276    public void run() {
277      startedLatch.countDown();
278      boolean shouldThrow = expected == null;
279      boolean hasThrown = true;
280      try {
281        boolean result = nm.startOperation(NO_NONCE, nonce, stoppable);
282        hasThrown = false;
283        if (!shouldThrow) {
284          assertEquals(expected.booleanValue(), result);
285        }
286      } catch (Throwable t) {
287        if (!shouldThrow) {
288          throwable = t;
289        }
290      }
291      if (shouldThrow && !hasThrown) {
292        throwable = new AssertionError("Should have thrown");
293      }
294    }
295  }
296
297  private Stoppable createStoppable() {
298    Stoppable s = Mockito.mock(Stoppable.class);
299    Mockito.when(s.isStopped()).thenReturn(false);
300    return s;
301  }
302
303  private ServerNonceManager createManager() {
304    return createManager(null);
305  }
306
307  private ServerNonceManager createManager(Integer gracePeriod) {
308    Configuration conf = HBaseConfiguration.create();
309    if (gracePeriod != null) {
310      conf.setInt(ServerNonceManager.HASH_NONCE_GRACE_PERIOD_KEY, gracePeriod.intValue());
311    }
312    return new ServerNonceManager(conf);
313  }
314}