001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HConstants.NO_NONCE;
021import static org.junit.jupiter.api.Assertions.assertEquals;
022import static org.junit.jupiter.api.Assertions.assertFalse;
023import static org.junit.jupiter.api.Assertions.assertTrue;
024import static org.junit.jupiter.api.Assertions.fail;
025
026import java.util.concurrent.CountDownLatch;
027import java.util.concurrent.atomic.AtomicInteger;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HBaseConfiguration;
030import org.apache.hadoop.hbase.ScheduledChore;
031import org.apache.hadoop.hbase.Stoppable;
032import org.apache.hadoop.hbase.testclassification.RegionServerTests;
033import org.apache.hadoop.hbase.testclassification.SmallTests;
034import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
035import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
036import org.apache.hadoop.hbase.util.Threads;
037import org.junit.jupiter.api.Tag;
038import org.junit.jupiter.api.Test;
039import org.mockito.Mockito;
040import org.mockito.invocation.InvocationOnMock;
041import org.mockito.stubbing.Answer;
042
043@Tag(RegionServerTests.TAG)
044@Tag(SmallTests.TAG)
045public class TestServerNonceManager {
046
047  @Test
048  public void testMvcc() throws Exception {
049    ServerNonceManager nm = createManager();
050    final long group = 100;
051    final long nonce = 1;
052    final long initMvcc = 999;
053    assertTrue(nm.startOperation(group, nonce, createStoppable()));
054    nm.addMvccToOperationContext(group, nonce, initMvcc);
055    nm.endOperation(group, nonce, true);
056    assertEquals(initMvcc, nm.getMvccFromOperationContext(group, nonce));
057    long newMvcc = initMvcc + 1;
058    for (long newNonce = nonce + 1; newNonce != (nonce + 5); ++newNonce) {
059      assertTrue(nm.startOperation(group, newNonce, createStoppable()));
060      nm.addMvccToOperationContext(group, newNonce, newMvcc);
061      nm.endOperation(group, newNonce, true);
062      assertEquals(newMvcc, nm.getMvccFromOperationContext(group, newNonce));
063      ++newMvcc;
064    }
065    assertEquals(initMvcc, nm.getMvccFromOperationContext(group, nonce));
066  }
067
068  @Test
069  public void testNormalStartEnd() throws Exception {
070    final long[] numbers = new long[] { NO_NONCE, 1, 2, Long.MAX_VALUE, Long.MIN_VALUE };
071    ServerNonceManager nm = createManager();
072    for (int i = 0; i < numbers.length; ++i) {
073      for (int j = 0; j < numbers.length; ++j) {
074        assertTrue(nm.startOperation(numbers[i], numbers[j], createStoppable()));
075      }
076    }
077    // Should be able to start operation the second time w/o nonces.
078    for (int i = 0; i < numbers.length; ++i) {
079      assertTrue(nm.startOperation(numbers[i], NO_NONCE, createStoppable()));
080    }
081    // Fail all operations - should be able to restart.
082    for (int i = 0; i < numbers.length; ++i) {
083      for (int j = 0; j < numbers.length; ++j) {
084        nm.endOperation(numbers[i], numbers[j], false);
085        assertTrue(nm.startOperation(numbers[i], numbers[j], createStoppable()));
086      }
087    }
088    // Succeed all operations - should not be able to restart, except for NO_NONCE.
089    for (int i = 0; i < numbers.length; ++i) {
090      for (int j = 0; j < numbers.length; ++j) {
091        nm.endOperation(numbers[i], numbers[j], true);
092        assertEquals(numbers[j] == NO_NONCE,
093          nm.startOperation(numbers[i], numbers[j], createStoppable()));
094      }
095    }
096  }
097
098  @Test
099  public void testNoEndWithoutStart() {
100    ServerNonceManager nm = createManager();
101    try {
102      nm.endOperation(NO_NONCE, 1, true);
103      throw new Error("Should have thrown");
104    } catch (AssertionError err) {
105    }
106  }
107
108  @Test
109  public void testCleanup() throws Exception {
110    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
111    EnvironmentEdgeManager.injectEdge(edge);
112    try {
113      ServerNonceManager nm = createManager(6);
114      ScheduledChore cleanup = nm.createCleanupScheduledChore(Mockito.mock(Stoppable.class));
115      edge.setValue(1);
116      assertTrue(nm.startOperation(NO_NONCE, 1, createStoppable()));
117      assertTrue(nm.startOperation(NO_NONCE, 2, createStoppable()));
118      assertTrue(nm.startOperation(NO_NONCE, 3, createStoppable()));
119      edge.setValue(2);
120      nm.endOperation(NO_NONCE, 1, true);
121      edge.setValue(4);
122      nm.endOperation(NO_NONCE, 2, true);
123      edge.setValue(9);
124      cleanup.choreForTesting();
125      // Nonce 1 has been cleaned up.
126      assertTrue(nm.startOperation(NO_NONCE, 1, createStoppable()));
127      // Nonce 2 has not been cleaned up.
128      assertFalse(nm.startOperation(NO_NONCE, 2, createStoppable()));
129      // Nonce 3 was active and active ops should never be cleaned up; try to end and start.
130      nm.endOperation(NO_NONCE, 3, false);
131      assertTrue(nm.startOperation(NO_NONCE, 3, createStoppable()));
132      edge.setValue(11);
133      cleanup.choreForTesting();
134      // Now, nonce 2 has been cleaned up.
135      assertTrue(nm.startOperation(NO_NONCE, 2, createStoppable()));
136    } finally {
137      EnvironmentEdgeManager.reset();
138    }
139  }
140
141  @Test
142  public void testWalNonces() throws Exception {
143    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
144    EnvironmentEdgeManager.injectEdge(edge);
145    try {
146      ServerNonceManager nm = createManager(6);
147      ScheduledChore cleanup = nm.createCleanupScheduledChore(Mockito.mock(Stoppable.class));
148      // Add nonces from WAL, including dups.
149      edge.setValue(12);
150      nm.reportOperationFromWal(NO_NONCE, 1, 8);
151      nm.reportOperationFromWal(NO_NONCE, 2, 2);
152      nm.reportOperationFromWal(NO_NONCE, 3, 5);
153      nm.reportOperationFromWal(NO_NONCE, 3, 6);
154      // WAL nonces should prevent cross-server conflicts.
155      assertFalse(nm.startOperation(NO_NONCE, 1, createStoppable()));
156      // Make sure we ignore very old nonces, but not borderline old nonces.
157      assertTrue(nm.startOperation(NO_NONCE, 2, createStoppable()));
158      assertFalse(nm.startOperation(NO_NONCE, 3, createStoppable()));
159      // Make sure grace period is counted from recovery time.
160      edge.setValue(17);
161      cleanup.choreForTesting();
162      assertFalse(nm.startOperation(NO_NONCE, 1, createStoppable()));
163      assertFalse(nm.startOperation(NO_NONCE, 3, createStoppable()));
164      edge.setValue(19);
165      cleanup.choreForTesting();
166      assertTrue(nm.startOperation(NO_NONCE, 1, createStoppable()));
167      assertTrue(nm.startOperation(NO_NONCE, 3, createStoppable()));
168    } finally {
169      EnvironmentEdgeManager.reset();
170    }
171  }
172
173  @Test
174  public void testConcurrentAttempts() throws Exception {
175    final ServerNonceManager nm = createManager();
176
177    nm.startOperation(NO_NONCE, 1, createStoppable());
178    TestRunnable tr = new TestRunnable(nm, 1, false, createStoppable());
179    Thread t = tr.start();
180    waitForThreadToBlockOrExit(t);
181    nm.endOperation(NO_NONCE, 1, true); // operation succeeded
182    t.join(); // thread must now unblock and not proceed (result checked inside).
183    tr.propagateError();
184
185    nm.startOperation(NO_NONCE, 2, createStoppable());
186    tr = new TestRunnable(nm, 2, true, createStoppable());
187    t = tr.start();
188    waitForThreadToBlockOrExit(t);
189    nm.endOperation(NO_NONCE, 2, false);
190    t.join(); // thread must now unblock and allow us to proceed (result checked inside).
191    tr.propagateError();
192    nm.endOperation(NO_NONCE, 2, true); // that is to say we should be able to end operation
193
194    nm.startOperation(NO_NONCE, 3, createStoppable());
195    tr = new TestRunnable(nm, 4, true, createStoppable());
196    tr.start().join(); // nonce 3 must have no bearing on nonce 4
197    tr.propagateError();
198  }
199
200  @Test
201  public void testStopWaiting() throws Exception {
202    final ServerNonceManager nm = createManager();
203    nm.setConflictWaitIterationMs(1);
204    Stoppable stoppingStoppable = createStoppable();
205    Mockito.when(stoppingStoppable.isStopped()).thenAnswer(new Answer<Boolean>() {
206      AtomicInteger answer = new AtomicInteger(3);
207
208      @Override
209      public Boolean answer(InvocationOnMock invocation) throws Throwable {
210        return 0 < answer.decrementAndGet();
211      }
212    });
213
214    nm.startOperation(NO_NONCE, 1, createStoppable());
215    TestRunnable tr = new TestRunnable(nm, 1, null, stoppingStoppable);
216    Thread t = tr.start();
217    waitForThreadToBlockOrExit(t);
218    // thread must eventually throw
219    t.join();
220    tr.propagateError();
221  }
222
223  private void waitForThreadToBlockOrExit(Thread t) throws InterruptedException {
224    for (int i = 9; i >= 0; --i) {
225      if (
226        t.getState() == Thread.State.TIMED_WAITING || t.getState() == Thread.State.WAITING
227          || t.getState() == Thread.State.BLOCKED || t.getState() == Thread.State.TERMINATED
228      ) {
229        return;
230      }
231      if (i > 0) Thread.sleep(300);
232    }
233    // Thread didn't block in 3 seconds. What is it doing? Continue the test, we'd rather
234    // have a very strange false positive then false negative due to timing.
235  }
236
237  private static class TestRunnable implements Runnable {
238    public final CountDownLatch startedLatch = new CountDownLatch(1); // It's the final countdown!
239
240    private final ServerNonceManager nm;
241    private final long nonce;
242    private final Boolean expected;
243    private final Stoppable stoppable;
244
245    private Throwable throwable = null;
246
247    public TestRunnable(ServerNonceManager nm, long nonce, Boolean expected, Stoppable stoppable) {
248      this.nm = nm;
249      this.nonce = nonce;
250      this.expected = expected;
251      this.stoppable = stoppable;
252    }
253
254    public void propagateError() throws Exception {
255      if (throwable == null) return;
256      throw new Exception(throwable);
257    }
258
259    public Thread start() {
260      Thread t = new Thread(this);
261      t = Threads.setDaemonThreadRunning(t);
262      try {
263        startedLatch.await();
264      } catch (InterruptedException e) {
265        fail("Unexpected");
266      }
267      return t;
268    }
269
270    @Override
271    public void run() {
272      startedLatch.countDown();
273      boolean shouldThrow = expected == null;
274      boolean hasThrown = true;
275      try {
276        boolean result = nm.startOperation(NO_NONCE, nonce, stoppable);
277        hasThrown = false;
278        if (!shouldThrow) {
279          assertEquals(expected.booleanValue(), result);
280        }
281      } catch (Throwable t) {
282        if (!shouldThrow) {
283          throwable = t;
284        }
285      }
286      if (shouldThrow && !hasThrown) {
287        throwable = new AssertionError("Should have thrown");
288      }
289    }
290  }
291
292  private Stoppable createStoppable() {
293    Stoppable s = Mockito.mock(Stoppable.class);
294    Mockito.when(s.isStopped()).thenReturn(false);
295    return s;
296  }
297
298  private ServerNonceManager createManager() {
299    return createManager(null);
300  }
301
302  private ServerNonceManager createManager(Integer gracePeriod) {
303    Configuration conf = HBaseConfiguration.create();
304    if (gracePeriod != null) {
305      conf.setInt(ServerNonceManager.HASH_NONCE_GRACE_PERIOD_KEY, gracePeriod.intValue());
306    }
307    return new ServerNonceManager(conf);
308  }
309}