001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HConstants.NO_NONCE; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.util.concurrent.CountDownLatch; 027import java.util.concurrent.atomic.AtomicInteger; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.HBaseConfiguration; 031import org.apache.hadoop.hbase.ScheduledChore; 032import org.apache.hadoop.hbase.Stoppable; 033import org.apache.hadoop.hbase.testclassification.RegionServerTests; 034import org.apache.hadoop.hbase.testclassification.SmallTests; 035import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 036import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 037import org.apache.hadoop.hbase.util.Threads; 038import org.junit.ClassRule; 039import org.junit.Test; 040import org.junit.experimental.categories.Category; 041import org.mockito.Mockito; 042import org.mockito.invocation.InvocationOnMock; 043import org.mockito.stubbing.Answer; 044 045@Category({RegionServerTests.class, SmallTests.class}) 046public class TestServerNonceManager { 047 048 @ClassRule 049 public static final HBaseClassTestRule CLASS_RULE = 050 HBaseClassTestRule.forClass(TestServerNonceManager.class); 051 052 @Test 053 public void testMvcc() throws Exception { 054 ServerNonceManager nm = createManager(); 055 final long group = 100; 056 final long nonce = 1; 057 final long initMvcc = 999; 058 assertTrue(nm.startOperation(group, nonce, createStoppable())); 059 nm.addMvccToOperationContext(group, nonce, initMvcc); 060 nm.endOperation(group, nonce, true); 061 assertEquals(initMvcc, nm.getMvccFromOperationContext(group, nonce)); 062 long newMvcc = initMvcc + 1; 063 for (long newNonce = nonce + 1; newNonce != (nonce + 5); ++newNonce) { 064 assertTrue(nm.startOperation(group, newNonce, createStoppable())); 065 nm.addMvccToOperationContext(group, newNonce, newMvcc); 066 nm.endOperation(group, newNonce, true); 067 assertEquals(newMvcc, nm.getMvccFromOperationContext(group, newNonce)); 068 ++newMvcc; 069 } 070 assertEquals(initMvcc, nm.getMvccFromOperationContext(group, nonce)); 071 } 072 073 @Test 074 public void testNormalStartEnd() throws Exception { 075 final long[] numbers = new long[] { NO_NONCE, 1, 2, Long.MAX_VALUE, Long.MIN_VALUE }; 076 ServerNonceManager nm = createManager(); 077 for (int i = 0; i < numbers.length; ++i) { 078 for (int j = 0; j < numbers.length; ++j) { 079 assertTrue(nm.startOperation(numbers[i], numbers[j], createStoppable())); 080 } 081 } 082 // Should be able to start operation the second time w/o nonces. 083 for (int i = 0; i < numbers.length; ++i) { 084 assertTrue(nm.startOperation(numbers[i], NO_NONCE, createStoppable())); 085 } 086 // Fail all operations - should be able to restart. 087 for (int i = 0; i < numbers.length; ++i) { 088 for (int j = 0; j < numbers.length; ++j) { 089 nm.endOperation(numbers[i], numbers[j], false); 090 assertTrue(nm.startOperation(numbers[i], numbers[j], createStoppable())); 091 } 092 } 093 // Succeed all operations - should not be able to restart, except for NO_NONCE. 094 for (int i = 0; i < numbers.length; ++i) { 095 for (int j = 0; j < numbers.length; ++j) { 096 nm.endOperation(numbers[i], numbers[j], true); 097 assertEquals(numbers[j] == NO_NONCE, 098 nm.startOperation(numbers[i], numbers[j], createStoppable())); 099 } 100 } 101 } 102 103 @Test 104 public void testNoEndWithoutStart() { 105 ServerNonceManager nm = createManager(); 106 try { 107 nm.endOperation(NO_NONCE, 1, true); 108 throw new Error("Should have thrown"); 109 } catch (AssertionError err) {} 110 } 111 112 @Test 113 public void testCleanup() throws Exception { 114 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 115 EnvironmentEdgeManager.injectEdge(edge); 116 try { 117 ServerNonceManager nm = createManager(6); 118 ScheduledChore cleanup = nm.createCleanupScheduledChore(Mockito.mock(Stoppable.class)); 119 edge.setValue(1); 120 assertTrue(nm.startOperation(NO_NONCE, 1, createStoppable())); 121 assertTrue(nm.startOperation(NO_NONCE, 2, createStoppable())); 122 assertTrue(nm.startOperation(NO_NONCE, 3, createStoppable())); 123 edge.setValue(2); 124 nm.endOperation(NO_NONCE, 1, true); 125 edge.setValue(4); 126 nm.endOperation(NO_NONCE, 2, true); 127 edge.setValue(9); 128 cleanup.choreForTesting(); 129 // Nonce 1 has been cleaned up. 130 assertTrue(nm.startOperation(NO_NONCE, 1, createStoppable())); 131 // Nonce 2 has not been cleaned up. 132 assertFalse(nm.startOperation(NO_NONCE, 2, createStoppable())); 133 // Nonce 3 was active and active ops should never be cleaned up; try to end and start. 134 nm.endOperation(NO_NONCE, 3, false); 135 assertTrue(nm.startOperation(NO_NONCE, 3, createStoppable())); 136 edge.setValue(11); 137 cleanup.choreForTesting(); 138 // Now, nonce 2 has been cleaned up. 139 assertTrue(nm.startOperation(NO_NONCE, 2, createStoppable())); 140 } finally { 141 EnvironmentEdgeManager.reset(); 142 } 143 } 144 145 @Test 146 public void testWalNonces() throws Exception { 147 ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); 148 EnvironmentEdgeManager.injectEdge(edge); 149 try { 150 ServerNonceManager nm = createManager(6); 151 ScheduledChore cleanup = nm.createCleanupScheduledChore(Mockito.mock(Stoppable.class)); 152 // Add nonces from WAL, including dups. 153 edge.setValue(12); 154 nm.reportOperationFromWal(NO_NONCE, 1, 8); 155 nm.reportOperationFromWal(NO_NONCE, 2, 2); 156 nm.reportOperationFromWal(NO_NONCE, 3, 5); 157 nm.reportOperationFromWal(NO_NONCE, 3, 6); 158 // WAL nonces should prevent cross-server conflicts. 159 assertFalse(nm.startOperation(NO_NONCE, 1, createStoppable())); 160 // Make sure we ignore very old nonces, but not borderline old nonces. 161 assertTrue(nm.startOperation(NO_NONCE, 2, createStoppable())); 162 assertFalse(nm.startOperation(NO_NONCE, 3, createStoppable())); 163 // Make sure grace period is counted from recovery time. 164 edge.setValue(17); 165 cleanup.choreForTesting(); 166 assertFalse(nm.startOperation(NO_NONCE, 1, createStoppable())); 167 assertFalse(nm.startOperation(NO_NONCE, 3, createStoppable())); 168 edge.setValue(19); 169 cleanup.choreForTesting(); 170 assertTrue(nm.startOperation(NO_NONCE, 1, createStoppable())); 171 assertTrue(nm.startOperation(NO_NONCE, 3, createStoppable())); 172 } finally { 173 EnvironmentEdgeManager.reset(); 174 } 175 } 176 177 @Test 178 public void testConcurrentAttempts() throws Exception { 179 final ServerNonceManager nm = createManager(); 180 181 nm.startOperation(NO_NONCE, 1, createStoppable()); 182 TestRunnable tr = new TestRunnable(nm, 1, false, createStoppable()); 183 Thread t = tr.start(); 184 waitForThreadToBlockOrExit(t); 185 nm.endOperation(NO_NONCE, 1, true); // operation succeeded 186 t.join(); // thread must now unblock and not proceed (result checked inside). 187 tr.propagateError(); 188 189 nm.startOperation(NO_NONCE, 2, createStoppable()); 190 tr = new TestRunnable(nm, 2, true, createStoppable()); 191 t = tr.start(); 192 waitForThreadToBlockOrExit(t); 193 nm.endOperation(NO_NONCE, 2, false); 194 t.join(); // thread must now unblock and allow us to proceed (result checked inside). 195 tr.propagateError(); 196 nm.endOperation(NO_NONCE, 2, true); // that is to say we should be able to end operation 197 198 nm.startOperation(NO_NONCE, 3, createStoppable()); 199 tr = new TestRunnable(nm, 4, true, createStoppable()); 200 tr.start().join(); // nonce 3 must have no bearing on nonce 4 201 tr.propagateError(); 202 } 203 204 @Test 205 public void testStopWaiting() throws Exception { 206 final ServerNonceManager nm = createManager(); 207 nm.setConflictWaitIterationMs(1); 208 Stoppable stoppingStoppable = createStoppable(); 209 Mockito.when(stoppingStoppable.isStopped()).thenAnswer(new Answer<Boolean>() { 210 AtomicInteger answer = new AtomicInteger(3); 211 @Override 212 public Boolean answer(InvocationOnMock invocation) throws Throwable { 213 return 0 < answer.decrementAndGet(); 214 } 215 }); 216 217 nm.startOperation(NO_NONCE, 1, createStoppable()); 218 TestRunnable tr = new TestRunnable(nm, 1, null, stoppingStoppable); 219 Thread t = tr.start(); 220 waitForThreadToBlockOrExit(t); 221 // thread must eventually throw 222 t.join(); 223 tr.propagateError(); 224 } 225 226 private void waitForThreadToBlockOrExit(Thread t) throws InterruptedException { 227 for (int i = 9; i >= 0; --i) { 228 if (t.getState() == Thread.State.TIMED_WAITING || t.getState() == Thread.State.WAITING 229 || t.getState() == Thread.State.BLOCKED || t.getState() == Thread.State.TERMINATED) { 230 return; 231 } 232 if (i > 0) Thread.sleep(300); 233 } 234 // Thread didn't block in 3 seconds. What is it doing? Continue the test, we'd rather 235 // have a very strange false positive then false negative due to timing. 236 } 237 238 private static class TestRunnable implements Runnable { 239 public final CountDownLatch startedLatch = new CountDownLatch(1); // It's the final countdown! 240 241 private final ServerNonceManager nm; 242 private final long nonce; 243 private final Boolean expected; 244 private final Stoppable stoppable; 245 246 private Throwable throwable = null; 247 248 public TestRunnable(ServerNonceManager nm, long nonce, Boolean expected, Stoppable stoppable) { 249 this.nm = nm; 250 this.nonce = nonce; 251 this.expected = expected; 252 this.stoppable = stoppable; 253 } 254 255 public void propagateError() throws Exception { 256 if (throwable == null) return; 257 throw new Exception(throwable); 258 } 259 260 public Thread start() { 261 Thread t = new Thread(this); 262 t = Threads.setDaemonThreadRunning(t); 263 try { 264 startedLatch.await(); 265 } catch (InterruptedException e) { 266 fail("Unexpected"); 267 } 268 return t; 269 } 270 271 @Override 272 public void run() { 273 startedLatch.countDown(); 274 boolean shouldThrow = expected == null; 275 boolean hasThrown = true; 276 try { 277 boolean result = nm.startOperation(NO_NONCE, nonce, stoppable); 278 hasThrown = false; 279 if (!shouldThrow) { 280 assertEquals(expected.booleanValue(), result); 281 } 282 } catch (Throwable t) { 283 if (!shouldThrow) { 284 throwable = t; 285 } 286 } 287 if (shouldThrow && !hasThrown) { 288 throwable = new AssertionError("Should have thrown"); 289 } 290 } 291 } 292 293 private Stoppable createStoppable() { 294 Stoppable s = Mockito.mock(Stoppable.class); 295 Mockito.when(s.isStopped()).thenReturn(false); 296 return s; 297 } 298 299 private ServerNonceManager createManager() { 300 return createManager(null); 301 } 302 303 private ServerNonceManager createManager(Integer gracePeriod) { 304 Configuration conf = HBaseConfiguration.create(); 305 if (gracePeriod != null) { 306 conf.setInt(ServerNonceManager.HASH_NONCE_GRACE_PERIOD_KEY, gracePeriod.intValue()); 307 } 308 return new ServerNonceManager(conf); 309 } 310}