001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HConstants.NO_NONCE; 021import static org.junit.jupiter.api.Assertions.assertEquals; 022import static org.junit.jupiter.api.Assertions.assertFalse; 023import static org.junit.jupiter.api.Assertions.assertTrue; 024import static org.junit.jupiter.api.Assertions.fail; 025 026import java.util.concurrent.CountDownLatch; 027import java.util.concurrent.atomic.AtomicInteger; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.HBaseConfiguration; 030import org.apache.hadoop.hbase.ScheduledChore; 031import org.apache.hadoop.hbase.Stoppable; 032import org.apache.hadoop.hbase.testclassification.RegionServerTests; 033import org.apache.hadoop.hbase.testclassification.SmallTests; 034import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 035import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 036import org.apache.hadoop.hbase.util.Threads; 037import org.junit.jupiter.api.Tag; 038import org.junit.jupiter.api.Test; 039import org.mockito.Mockito; 040import org.mockito.invocation.InvocationOnMock; 041import org.mockito.stubbing.Answer; 042 043@Tag(RegionServerTests.TAG) 044@Tag(SmallTests.TAG) 045public class TestServerNonceManager { 046 047 @Test 048 public void testMvcc() throws Exception { 049 ServerNonceManager nm = createManager(); 050 final long group = 100; 051 final long nonce = 1; 052 final long initMvcc = 999; 053 assertTrue(nm.startOperation(group, nonce, createStoppable())); 054 nm.addMvccToOperationContext(group, nonce, initMvcc); 055 nm.endOperation(group, nonce, true); 056 assertEquals(initMvcc, nm.getMvccFromOperationContext(group, nonce)); 057 long newMvcc = initMvcc + 1; 058 for (long newNonce = nonce + 1; newNonce != (nonce + 5); ++newNonce) { 059 assertTrue(nm.startOperation(group, newNonce, createStoppable())); 060 nm.addMvccToOperationContext(group, newNonce, newMvcc); 061 nm.endOperation(group, newNonce, true); 062 assertEquals(newMvcc, nm.getMvccFromOperationContext(group, newNonce)); 063 ++newMvcc; 064 } 065 assertEquals(initMvcc, nm.getMvccFromOperationContext(group, nonce)); 066 } 067 068 @Test 069 public void testNormalStartEnd() throws Exception { 070 final long[] numbers = new long[] { NO_NONCE, 1, 2, Long.MAX_VALUE, Long.MIN_VALUE }; 071 ServerNonceManager nm = createManager(); 072 for (int i = 0; i < numbers.length; ++i) { 073 for (int j = 0; j < numbers.length; ++j) { 074 assertTrue(nm.startOperation(numbers[i], numbers[j], createStoppable())); 075 } 076 } 077 // Should be able to start operation the second time w/o nonces. 078 for (int i = 0; i < numbers.length; ++i) { 079 assertTrue(nm.startOperation(numbers[i], NO_NONCE, createStoppable())); 080 } 081 // Fail all operations - should be able to restart. 082 for (int i = 0; i < numbers.length; ++i) { 083 for (int j = 0; j < numbers.length; ++j) { 084 nm.endOperation(numbers[i], numbers[j], false); 085 assertTrue(nm.startOperation(numbers[i], numbers[j], createStoppable())); 086 } 087 } 088 // Succeed all operations - should not be able to restart, except for NO_NONCE. 089 for (int i = 0; i < numbers.length; ++i) { 090 for (int j = 0; j < numbers.length; ++j) { 091 nm.endOperation(numbers[i], numbers[j], true); 092 assertEquals(numbers[j] == NO_NONCE, 093 nm.startOperation(numbers[i], numbers[j], createStoppable())); 094 } 095 } 096 } 097 098 @Test 099 public void testNoEndWithoutStart() { 100 ServerNonceManager nm = createManager(); 101 try { 102 nm.endOperation(NO_NONCE, 1, true); 103 throw new Error("Should have thrown"); 104 } catch (AssertionError err) { 105 } 106 } 107 108 @Test 109 public void testCleanup() throws Exception { 110 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 111 EnvironmentEdgeManager.injectEdge(edge); 112 try { 113 ServerNonceManager nm = createManager(6); 114 ScheduledChore cleanup = nm.createCleanupScheduledChore(Mockito.mock(Stoppable.class)); 115 edge.setValue(1); 116 assertTrue(nm.startOperation(NO_NONCE, 1, createStoppable())); 117 assertTrue(nm.startOperation(NO_NONCE, 2, createStoppable())); 118 assertTrue(nm.startOperation(NO_NONCE, 3, createStoppable())); 119 edge.setValue(2); 120 nm.endOperation(NO_NONCE, 1, true); 121 edge.setValue(4); 122 nm.endOperation(NO_NONCE, 2, true); 123 edge.setValue(9); 124 cleanup.choreForTesting(); 125 // Nonce 1 has been cleaned up. 126 assertTrue(nm.startOperation(NO_NONCE, 1, createStoppable())); 127 // Nonce 2 has not been cleaned up. 128 assertFalse(nm.startOperation(NO_NONCE, 2, createStoppable())); 129 // Nonce 3 was active and active ops should never be cleaned up; try to end and start. 130 nm.endOperation(NO_NONCE, 3, false); 131 assertTrue(nm.startOperation(NO_NONCE, 3, createStoppable())); 132 edge.setValue(11); 133 cleanup.choreForTesting(); 134 // Now, nonce 2 has been cleaned up. 135 assertTrue(nm.startOperation(NO_NONCE, 2, createStoppable())); 136 } finally { 137 EnvironmentEdgeManager.reset(); 138 } 139 } 140 141 @Test 142 public void testWalNonces() throws Exception { 143 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 144 EnvironmentEdgeManager.injectEdge(edge); 145 try { 146 ServerNonceManager nm = createManager(6); 147 ScheduledChore cleanup = nm.createCleanupScheduledChore(Mockito.mock(Stoppable.class)); 148 // Add nonces from WAL, including dups. 149 edge.setValue(12); 150 nm.reportOperationFromWal(NO_NONCE, 1, 8); 151 nm.reportOperationFromWal(NO_NONCE, 2, 2); 152 nm.reportOperationFromWal(NO_NONCE, 3, 5); 153 nm.reportOperationFromWal(NO_NONCE, 3, 6); 154 // WAL nonces should prevent cross-server conflicts. 155 assertFalse(nm.startOperation(NO_NONCE, 1, createStoppable())); 156 // Make sure we ignore very old nonces, but not borderline old nonces. 157 assertTrue(nm.startOperation(NO_NONCE, 2, createStoppable())); 158 assertFalse(nm.startOperation(NO_NONCE, 3, createStoppable())); 159 // Make sure grace period is counted from recovery time. 160 edge.setValue(17); 161 cleanup.choreForTesting(); 162 assertFalse(nm.startOperation(NO_NONCE, 1, createStoppable())); 163 assertFalse(nm.startOperation(NO_NONCE, 3, createStoppable())); 164 edge.setValue(19); 165 cleanup.choreForTesting(); 166 assertTrue(nm.startOperation(NO_NONCE, 1, createStoppable())); 167 assertTrue(nm.startOperation(NO_NONCE, 3, createStoppable())); 168 } finally { 169 EnvironmentEdgeManager.reset(); 170 } 171 } 172 173 @Test 174 public void testConcurrentAttempts() throws Exception { 175 final ServerNonceManager nm = createManager(); 176 177 nm.startOperation(NO_NONCE, 1, createStoppable()); 178 TestRunnable tr = new TestRunnable(nm, 1, false, createStoppable()); 179 Thread t = tr.start(); 180 waitForThreadToBlockOrExit(t); 181 nm.endOperation(NO_NONCE, 1, true); // operation succeeded 182 t.join(); // thread must now unblock and not proceed (result checked inside). 183 tr.propagateError(); 184 185 nm.startOperation(NO_NONCE, 2, createStoppable()); 186 tr = new TestRunnable(nm, 2, true, createStoppable()); 187 t = tr.start(); 188 waitForThreadToBlockOrExit(t); 189 nm.endOperation(NO_NONCE, 2, false); 190 t.join(); // thread must now unblock and allow us to proceed (result checked inside). 191 tr.propagateError(); 192 nm.endOperation(NO_NONCE, 2, true); // that is to say we should be able to end operation 193 194 nm.startOperation(NO_NONCE, 3, createStoppable()); 195 tr = new TestRunnable(nm, 4, true, createStoppable()); 196 tr.start().join(); // nonce 3 must have no bearing on nonce 4 197 tr.propagateError(); 198 } 199 200 @Test 201 public void testStopWaiting() throws Exception { 202 final ServerNonceManager nm = createManager(); 203 nm.setConflictWaitIterationMs(1); 204 Stoppable stoppingStoppable = createStoppable(); 205 Mockito.when(stoppingStoppable.isStopped()).thenAnswer(new Answer<Boolean>() { 206 AtomicInteger answer = new AtomicInteger(3); 207 208 @Override 209 public Boolean answer(InvocationOnMock invocation) throws Throwable { 210 return 0 < answer.decrementAndGet(); 211 } 212 }); 213 214 nm.startOperation(NO_NONCE, 1, createStoppable()); 215 TestRunnable tr = new TestRunnable(nm, 1, null, stoppingStoppable); 216 Thread t = tr.start(); 217 waitForThreadToBlockOrExit(t); 218 // thread must eventually throw 219 t.join(); 220 tr.propagateError(); 221 } 222 223 private void waitForThreadToBlockOrExit(Thread t) throws InterruptedException { 224 for (int i = 9; i >= 0; --i) { 225 if ( 226 t.getState() == Thread.State.TIMED_WAITING || t.getState() == Thread.State.WAITING 227 || t.getState() == Thread.State.BLOCKED || t.getState() == Thread.State.TERMINATED 228 ) { 229 return; 230 } 231 if (i > 0) Thread.sleep(300); 232 } 233 // Thread didn't block in 3 seconds. What is it doing? Continue the test, we'd rather 234 // have a very strange false positive then false negative due to timing. 235 } 236 237 private static class TestRunnable implements Runnable { 238 public final CountDownLatch startedLatch = new CountDownLatch(1); // It's the final countdown! 239 240 private final ServerNonceManager nm; 241 private final long nonce; 242 private final Boolean expected; 243 private final Stoppable stoppable; 244 245 private Throwable throwable = null; 246 247 public TestRunnable(ServerNonceManager nm, long nonce, Boolean expected, Stoppable stoppable) { 248 this.nm = nm; 249 this.nonce = nonce; 250 this.expected = expected; 251 this.stoppable = stoppable; 252 } 253 254 public void propagateError() throws Exception { 255 if (throwable == null) return; 256 throw new Exception(throwable); 257 } 258 259 public Thread start() { 260 Thread t = new Thread(this); 261 t = Threads.setDaemonThreadRunning(t); 262 try { 263 startedLatch.await(); 264 } catch (InterruptedException e) { 265 fail("Unexpected"); 266 } 267 return t; 268 } 269 270 @Override 271 public void run() { 272 startedLatch.countDown(); 273 boolean shouldThrow = expected == null; 274 boolean hasThrown = true; 275 try { 276 boolean result = nm.startOperation(NO_NONCE, nonce, stoppable); 277 hasThrown = false; 278 if (!shouldThrow) { 279 assertEquals(expected.booleanValue(), result); 280 } 281 } catch (Throwable t) { 282 if (!shouldThrow) { 283 throwable = t; 284 } 285 } 286 if (shouldThrow && !hasThrown) { 287 throwable = new AssertionError("Should have thrown"); 288 } 289 } 290 } 291 292 private Stoppable createStoppable() { 293 Stoppable s = Mockito.mock(Stoppable.class); 294 Mockito.when(s.isStopped()).thenReturn(false); 295 return s; 296 } 297 298 private ServerNonceManager createManager() { 299 return createManager(null); 300 } 301 302 private ServerNonceManager createManager(Integer gracePeriod) { 303 Configuration conf = HBaseConfiguration.create(); 304 if (gracePeriod != null) { 305 conf.setInt(ServerNonceManager.HASH_NONCE_GRACE_PERIOD_KEY, gracePeriod.intValue()); 306 } 307 return new ServerNonceManager(conf); 308 } 309}