001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HConstants.NO_NONCE; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.util.concurrent.CountDownLatch; 027import java.util.concurrent.atomic.AtomicInteger; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.HBaseConfiguration; 031import org.apache.hadoop.hbase.ScheduledChore; 032import org.apache.hadoop.hbase.Stoppable; 033import org.apache.hadoop.hbase.testclassification.RegionServerTests; 034import org.apache.hadoop.hbase.testclassification.SmallTests; 035import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 036import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 037import org.apache.hadoop.hbase.util.Threads; 038import org.junit.ClassRule; 039import org.junit.Test; 040import org.junit.experimental.categories.Category; 041import org.mockito.Mockito; 042import org.mockito.invocation.InvocationOnMock; 043import org.mockito.stubbing.Answer; 044 045@Category({ RegionServerTests.class, SmallTests.class }) 046public class TestServerNonceManager { 047 048 @ClassRule 049 public static final HBaseClassTestRule CLASS_RULE = 050 HBaseClassTestRule.forClass(TestServerNonceManager.class); 051 052 @Test 053 public void testMvcc() throws Exception { 054 ServerNonceManager nm = createManager(); 055 final long group = 100; 056 final long nonce = 1; 057 final long initMvcc = 999; 058 assertTrue(nm.startOperation(group, nonce, createStoppable())); 059 nm.addMvccToOperationContext(group, nonce, initMvcc); 060 nm.endOperation(group, nonce, true); 061 assertEquals(initMvcc, nm.getMvccFromOperationContext(group, nonce)); 062 long newMvcc = initMvcc + 1; 063 for (long newNonce = nonce + 1; newNonce != (nonce + 5); ++newNonce) { 064 assertTrue(nm.startOperation(group, newNonce, createStoppable())); 065 nm.addMvccToOperationContext(group, newNonce, newMvcc); 066 nm.endOperation(group, newNonce, true); 067 assertEquals(newMvcc, nm.getMvccFromOperationContext(group, newNonce)); 068 ++newMvcc; 069 } 070 assertEquals(initMvcc, nm.getMvccFromOperationContext(group, nonce)); 071 } 072 073 @Test 074 public void testNormalStartEnd() throws Exception { 075 final long[] numbers = new long[] { NO_NONCE, 1, 2, Long.MAX_VALUE, Long.MIN_VALUE }; 076 ServerNonceManager nm = createManager(); 077 for (int i = 0; i < numbers.length; ++i) { 078 for (int j = 0; j < numbers.length; ++j) { 079 assertTrue(nm.startOperation(numbers[i], numbers[j], createStoppable())); 080 } 081 } 082 // Should be able to start operation the second time w/o nonces. 083 for (int i = 0; i < numbers.length; ++i) { 084 assertTrue(nm.startOperation(numbers[i], NO_NONCE, createStoppable())); 085 } 086 // Fail all operations - should be able to restart. 087 for (int i = 0; i < numbers.length; ++i) { 088 for (int j = 0; j < numbers.length; ++j) { 089 nm.endOperation(numbers[i], numbers[j], false); 090 assertTrue(nm.startOperation(numbers[i], numbers[j], createStoppable())); 091 } 092 } 093 // Succeed all operations - should not be able to restart, except for NO_NONCE. 094 for (int i = 0; i < numbers.length; ++i) { 095 for (int j = 0; j < numbers.length; ++j) { 096 nm.endOperation(numbers[i], numbers[j], true); 097 assertEquals(numbers[j] == NO_NONCE, 098 nm.startOperation(numbers[i], numbers[j], createStoppable())); 099 } 100 } 101 } 102 103 @Test 104 public void testNoEndWithoutStart() { 105 ServerNonceManager nm = createManager(); 106 try { 107 nm.endOperation(NO_NONCE, 1, true); 108 throw new Error("Should have thrown"); 109 } catch (AssertionError err) { 110 } 111 } 112 113 @Test 114 public void testCleanup() throws Exception { 115 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 116 EnvironmentEdgeManager.injectEdge(edge); 117 try { 118 ServerNonceManager nm = createManager(6); 119 ScheduledChore cleanup = nm.createCleanupScheduledChore(Mockito.mock(Stoppable.class)); 120 edge.setValue(1); 121 assertTrue(nm.startOperation(NO_NONCE, 1, createStoppable())); 122 assertTrue(nm.startOperation(NO_NONCE, 2, createStoppable())); 123 assertTrue(nm.startOperation(NO_NONCE, 3, createStoppable())); 124 edge.setValue(2); 125 nm.endOperation(NO_NONCE, 1, true); 126 edge.setValue(4); 127 nm.endOperation(NO_NONCE, 2, true); 128 edge.setValue(9); 129 cleanup.choreForTesting(); 130 // Nonce 1 has been cleaned up. 131 assertTrue(nm.startOperation(NO_NONCE, 1, createStoppable())); 132 // Nonce 2 has not been cleaned up. 133 assertFalse(nm.startOperation(NO_NONCE, 2, createStoppable())); 134 // Nonce 3 was active and active ops should never be cleaned up; try to end and start. 135 nm.endOperation(NO_NONCE, 3, false); 136 assertTrue(nm.startOperation(NO_NONCE, 3, createStoppable())); 137 edge.setValue(11); 138 cleanup.choreForTesting(); 139 // Now, nonce 2 has been cleaned up. 140 assertTrue(nm.startOperation(NO_NONCE, 2, createStoppable())); 141 } finally { 142 EnvironmentEdgeManager.reset(); 143 } 144 } 145 146 @Test 147 public void testWalNonces() throws Exception { 148 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 149 EnvironmentEdgeManager.injectEdge(edge); 150 try { 151 ServerNonceManager nm = createManager(6); 152 ScheduledChore cleanup = nm.createCleanupScheduledChore(Mockito.mock(Stoppable.class)); 153 // Add nonces from WAL, including dups. 154 edge.setValue(12); 155 nm.reportOperationFromWal(NO_NONCE, 1, 8); 156 nm.reportOperationFromWal(NO_NONCE, 2, 2); 157 nm.reportOperationFromWal(NO_NONCE, 3, 5); 158 nm.reportOperationFromWal(NO_NONCE, 3, 6); 159 // WAL nonces should prevent cross-server conflicts. 160 assertFalse(nm.startOperation(NO_NONCE, 1, createStoppable())); 161 // Make sure we ignore very old nonces, but not borderline old nonces. 162 assertTrue(nm.startOperation(NO_NONCE, 2, createStoppable())); 163 assertFalse(nm.startOperation(NO_NONCE, 3, createStoppable())); 164 // Make sure grace period is counted from recovery time. 165 edge.setValue(17); 166 cleanup.choreForTesting(); 167 assertFalse(nm.startOperation(NO_NONCE, 1, createStoppable())); 168 assertFalse(nm.startOperation(NO_NONCE, 3, createStoppable())); 169 edge.setValue(19); 170 cleanup.choreForTesting(); 171 assertTrue(nm.startOperation(NO_NONCE, 1, createStoppable())); 172 assertTrue(nm.startOperation(NO_NONCE, 3, createStoppable())); 173 } finally { 174 EnvironmentEdgeManager.reset(); 175 } 176 } 177 178 @Test 179 public void testConcurrentAttempts() throws Exception { 180 final ServerNonceManager nm = createManager(); 181 182 nm.startOperation(NO_NONCE, 1, createStoppable()); 183 TestRunnable tr = new TestRunnable(nm, 1, false, createStoppable()); 184 Thread t = tr.start(); 185 waitForThreadToBlockOrExit(t); 186 nm.endOperation(NO_NONCE, 1, true); // operation succeeded 187 t.join(); // thread must now unblock and not proceed (result checked inside). 188 tr.propagateError(); 189 190 nm.startOperation(NO_NONCE, 2, createStoppable()); 191 tr = new TestRunnable(nm, 2, true, createStoppable()); 192 t = tr.start(); 193 waitForThreadToBlockOrExit(t); 194 nm.endOperation(NO_NONCE, 2, false); 195 t.join(); // thread must now unblock and allow us to proceed (result checked inside). 196 tr.propagateError(); 197 nm.endOperation(NO_NONCE, 2, true); // that is to say we should be able to end operation 198 199 nm.startOperation(NO_NONCE, 3, createStoppable()); 200 tr = new TestRunnable(nm, 4, true, createStoppable()); 201 tr.start().join(); // nonce 3 must have no bearing on nonce 4 202 tr.propagateError(); 203 } 204 205 @Test 206 public void testStopWaiting() throws Exception { 207 final ServerNonceManager nm = createManager(); 208 nm.setConflictWaitIterationMs(1); 209 Stoppable stoppingStoppable = createStoppable(); 210 Mockito.when(stoppingStoppable.isStopped()).thenAnswer(new Answer<Boolean>() { 211 AtomicInteger answer = new AtomicInteger(3); 212 213 @Override 214 public Boolean answer(InvocationOnMock invocation) throws Throwable { 215 return 0 < answer.decrementAndGet(); 216 } 217 }); 218 219 nm.startOperation(NO_NONCE, 1, createStoppable()); 220 TestRunnable tr = new TestRunnable(nm, 1, null, stoppingStoppable); 221 Thread t = tr.start(); 222 waitForThreadToBlockOrExit(t); 223 // thread must eventually throw 224 t.join(); 225 tr.propagateError(); 226 } 227 228 private void waitForThreadToBlockOrExit(Thread t) throws InterruptedException { 229 for (int i = 9; i >= 0; --i) { 230 if ( 231 t.getState() == Thread.State.TIMED_WAITING || t.getState() == Thread.State.WAITING 232 || t.getState() == Thread.State.BLOCKED || t.getState() == Thread.State.TERMINATED 233 ) { 234 return; 235 } 236 if (i > 0) Thread.sleep(300); 237 } 238 // Thread didn't block in 3 seconds. What is it doing? Continue the test, we'd rather 239 // have a very strange false positive then false negative due to timing. 240 } 241 242 private static class TestRunnable implements Runnable { 243 public final CountDownLatch startedLatch = new CountDownLatch(1); // It's the final countdown! 244 245 private final ServerNonceManager nm; 246 private final long nonce; 247 private final Boolean expected; 248 private final Stoppable stoppable; 249 250 private Throwable throwable = null; 251 252 public TestRunnable(ServerNonceManager nm, long nonce, Boolean expected, Stoppable stoppable) { 253 this.nm = nm; 254 this.nonce = nonce; 255 this.expected = expected; 256 this.stoppable = stoppable; 257 } 258 259 public void propagateError() throws Exception { 260 if (throwable == null) return; 261 throw new Exception(throwable); 262 } 263 264 public Thread start() { 265 Thread t = new Thread(this); 266 t = Threads.setDaemonThreadRunning(t); 267 try { 268 startedLatch.await(); 269 } catch (InterruptedException e) { 270 fail("Unexpected"); 271 } 272 return t; 273 } 274 275 @Override 276 public void run() { 277 startedLatch.countDown(); 278 boolean shouldThrow = expected == null; 279 boolean hasThrown = true; 280 try { 281 boolean result = nm.startOperation(NO_NONCE, nonce, stoppable); 282 hasThrown = false; 283 if (!shouldThrow) { 284 assertEquals(expected.booleanValue(), result); 285 } 286 } catch (Throwable t) { 287 if (!shouldThrow) { 288 throwable = t; 289 } 290 } 291 if (shouldThrow && !hasThrown) { 292 throwable = new AssertionError("Should have thrown"); 293 } 294 } 295 } 296 297 private Stoppable createStoppable() { 298 Stoppable s = Mockito.mock(Stoppable.class); 299 Mockito.when(s.isStopped()).thenReturn(false); 300 return s; 301 } 302 303 private ServerNonceManager createManager() { 304 return createManager(null); 305 } 306 307 private ServerNonceManager createManager(Integer gracePeriod) { 308 Configuration conf = HBaseConfiguration.create(); 309 if (gracePeriod != null) { 310 conf.setInt(ServerNonceManager.HASH_NONCE_GRACE_PERIOD_KEY, gracePeriod.intValue()); 311 } 312 return new ServerNonceManager(conf); 313 } 314}