001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure; 019 020import static org.junit.Assert.assertNull; 021import static org.junit.Assert.assertTrue; 022import static org.mockito.Matchers.any; 023import static org.mockito.Matchers.anyListOf; 024import static org.mockito.Matchers.anyString; 025import static org.mockito.Matchers.eq; 026import static org.mockito.Mockito.atLeastOnce; 027import static org.mockito.Mockito.doAnswer; 028import static org.mockito.Mockito.doThrow; 029import static org.mockito.Mockito.inOrder; 030import static org.mockito.Mockito.mock; 031import static org.mockito.Mockito.never; 032import static org.mockito.Mockito.reset; 033import static org.mockito.Mockito.spy; 034import static org.mockito.Mockito.times; 035import static org.mockito.Mockito.verify; 036import static org.mockito.Mockito.when; 037 038import java.io.IOException; 039import java.util.Arrays; 040import java.util.List; 041import java.util.concurrent.ThreadPoolExecutor; 042import java.util.concurrent.TimeUnit; 043import org.apache.hadoop.hbase.HBaseClassTestRule; 044import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 045import org.apache.hadoop.hbase.testclassification.MasterTests; 046import org.apache.hadoop.hbase.testclassification.SmallTests; 047import org.junit.After; 048import org.junit.ClassRule; 049import org.junit.Test; 050import org.junit.experimental.categories.Category; 051import org.mockito.InOrder; 052import org.mockito.invocation.InvocationOnMock; 053import org.mockito.stubbing.Answer; 054 055import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 056 057/** 058 * Test Procedure coordinator operation. 059 * <p> 060 * This only works correctly when we do <i>class level parallelization</i> of tests. If we do method 061 * level serialization this class will likely throw all kinds of errors. 062 */ 063@Category({MasterTests.class, SmallTests.class}) 064public class TestProcedureCoordinator { 065 066 @ClassRule 067 public static final HBaseClassTestRule CLASS_RULE = 068 HBaseClassTestRule.forClass(TestProcedureCoordinator.class); 069 070 // general test constants 071 private static final long WAKE_FREQUENCY = 1000; 072 private static final long TIMEOUT = 100000; 073 private static final long POOL_KEEP_ALIVE = 1; 074 private static final String nodeName = "node"; 075 private static final String procName = "some op"; 076 private static final byte[] procData = new byte[0]; 077 private static final List<String> expected = Lists.newArrayList("remote1", "remote2"); 078 079 // setup the mocks 080 private final ProcedureCoordinatorRpcs controller = mock(ProcedureCoordinatorRpcs.class); 081 private final Procedure task = mock(Procedure.class); 082 private final ForeignExceptionDispatcher monitor = mock(ForeignExceptionDispatcher.class); 083 084 // handle to the coordinator for each test 085 private ProcedureCoordinator coordinator; 086 087 @After 088 public void resetTest() throws IOException { 089 // reset all the mocks used for the tests 090 reset(controller, task, monitor); 091 // close the open coordinator, if it was used 092 if (coordinator != null) coordinator.close(); 093 } 094 095 private ProcedureCoordinator buildNewCoordinator() { 096 ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(nodeName, 1, POOL_KEEP_ALIVE); 097 return spy(new ProcedureCoordinator(controller, pool)); 098 } 099 100 /** 101 * Currently we can only handle one procedure at a time. This makes sure we handle that and 102 * reject submitting more. 103 */ 104 @Test 105 public void testThreadPoolSize() throws Exception { 106 ProcedureCoordinator coordinator = buildNewCoordinator(); 107 Procedure proc = new Procedure(coordinator, monitor, 108 WAKE_FREQUENCY, TIMEOUT, procName, procData, expected); 109 Procedure procSpy = spy(proc); 110 111 Procedure proc2 = new Procedure(coordinator, monitor, 112 WAKE_FREQUENCY, TIMEOUT, procName +"2", procData, expected); 113 Procedure procSpy2 = spy(proc2); 114 when(coordinator.createProcedure(any(), eq(procName), eq(procData), anyListOf(String.class))) 115 .thenReturn(procSpy, procSpy2); 116 117 coordinator.startProcedure(procSpy.getErrorMonitor(), procName, procData, expected); 118 // null here means second procedure failed to start. 119 assertNull("Coordinator successfully ran two tasks at once with a single thread pool.", 120 coordinator.startProcedure(proc2.getErrorMonitor(), "another op", procData, expected)); 121 } 122 123 /** 124 * Check handling a connection failure correctly if we get it during the acquiring phase 125 */ 126 @Test 127 public void testUnreachableControllerDuringPrepare() throws Exception { 128 coordinator = buildNewCoordinator(); 129 // setup the proc 130 List<String> expected = Arrays.asList("cohort"); 131 Procedure proc = new Procedure(coordinator, WAKE_FREQUENCY, 132 TIMEOUT, procName, procData, expected); 133 final Procedure procSpy = spy(proc); 134 135 when(coordinator.createProcedure(any(), eq(procName), eq(procData), anyListOf(String.class))) 136 .thenReturn(procSpy); 137 138 // use the passed controller responses 139 IOException cause = new IOException("Failed to reach comms during acquire"); 140 doThrow(cause).when(controller) 141 .sendGlobalBarrierAcquire(eq(procSpy), eq(procData), anyListOf(String.class)); 142 143 // run the operation 144 proc = coordinator.startProcedure(proc.getErrorMonitor(), procName, procData, expected); 145 // and wait for it to finish 146 while(!proc.completedLatch.await(WAKE_FREQUENCY, TimeUnit.MILLISECONDS)); 147 verify(procSpy, atLeastOnce()).receive(any()); 148 verify(coordinator, times(1)).rpcConnectionFailure(anyString(), eq(cause)); 149 verify(controller, times(1)).sendGlobalBarrierAcquire(procSpy, procData, expected); 150 verify(controller, never()).sendGlobalBarrierReached(any(), 151 anyListOf(String.class)); 152 } 153 154 /** 155 * Check handling a connection failure correctly if we get it during the barrier phase 156 */ 157 @Test 158 public void testUnreachableControllerDuringCommit() throws Exception { 159 coordinator = buildNewCoordinator(); 160 161 // setup the task and spy on it 162 List<String> expected = Arrays.asList("cohort"); 163 final Procedure spy = spy(new Procedure(coordinator, 164 WAKE_FREQUENCY, TIMEOUT, procName, procData, expected)); 165 166 when(coordinator.createProcedure(any(), eq(procName), eq(procData), anyListOf(String.class))) 167 .thenReturn(spy); 168 169 // use the passed controller responses 170 IOException cause = new IOException("Failed to reach controller during prepare"); 171 doAnswer(new AcquireBarrierAnswer(procName, new String[] { "cohort" })) 172 .when(controller).sendGlobalBarrierAcquire(eq(spy), eq(procData), anyListOf(String.class)); 173 doThrow(cause).when(controller).sendGlobalBarrierReached(eq(spy), anyListOf(String.class)); 174 175 // run the operation 176 Procedure task = coordinator.startProcedure(spy.getErrorMonitor(), procName, procData, expected); 177 // and wait for it to finish 178 while(!task.completedLatch.await(WAKE_FREQUENCY, TimeUnit.MILLISECONDS)); 179 verify(spy, atLeastOnce()).receive(any()); 180 verify(coordinator, times(1)).rpcConnectionFailure(anyString(), eq(cause)); 181 verify(controller, times(1)).sendGlobalBarrierAcquire(eq(spy), 182 eq(procData), anyListOf(String.class)); 183 verify(controller, times(1)).sendGlobalBarrierReached(any(), 184 anyListOf(String.class)); 185 } 186 187 @Test 188 public void testNoCohort() throws Exception { 189 runSimpleProcedure(); 190 } 191 192 @Test 193 public void testSingleCohortOrchestration() throws Exception { 194 runSimpleProcedure("one"); 195 } 196 197 @Test 198 public void testMultipleCohortOrchestration() throws Exception { 199 runSimpleProcedure("one", "two", "three", "four"); 200 } 201 202 public void runSimpleProcedure(String... members) throws Exception { 203 coordinator = buildNewCoordinator(); 204 Procedure task = new Procedure(coordinator, monitor, WAKE_FREQUENCY, 205 TIMEOUT, procName, procData, Arrays.asList(members)); 206 final Procedure spy = spy(task); 207 runCoordinatedProcedure(spy, members); 208 } 209 210 /** 211 * Test that if nodes join the barrier early we still correctly handle the progress 212 */ 213 @Test 214 public void testEarlyJoiningBarrier() throws Exception { 215 final String[] cohort = new String[] { "one", "two", "three", "four" }; 216 coordinator = buildNewCoordinator(); 217 final ProcedureCoordinator ref = coordinator; 218 Procedure task = new Procedure(coordinator, monitor, WAKE_FREQUENCY, 219 TIMEOUT, procName, procData, Arrays.asList(cohort)); 220 final Procedure spy = spy(task); 221 222 AcquireBarrierAnswer prepare = new AcquireBarrierAnswer(procName, cohort) { 223 @Override 224 public void doWork() { 225 // then do some fun where we commit before all nodes have prepared 226 // "one" commits before anyone else is done 227 ref.memberAcquiredBarrier(this.opName, this.cohort[0]); 228 ref.memberFinishedBarrier(this.opName, this.cohort[0], new byte[0]); 229 // but "two" takes a while 230 ref.memberAcquiredBarrier(this.opName, this.cohort[1]); 231 // "three"jumps ahead 232 ref.memberAcquiredBarrier(this.opName, this.cohort[2]); 233 ref.memberFinishedBarrier(this.opName, this.cohort[2], new byte[0]); 234 // and "four" takes a while 235 ref.memberAcquiredBarrier(this.opName, this.cohort[3]); 236 } 237 }; 238 239 BarrierAnswer commit = new BarrierAnswer(procName, cohort) { 240 @Override 241 public void doWork() { 242 ref.memberFinishedBarrier(opName, this.cohort[1], new byte[0]); 243 ref.memberFinishedBarrier(opName, this.cohort[3], new byte[0]); 244 } 245 }; 246 runCoordinatedOperation(spy, prepare, commit, cohort); 247 } 248 249 /** 250 * Just run a procedure with the standard name and data, with not special task for the mock 251 * coordinator (it works just like a regular coordinator). For custom behavior see 252 * {@link #runCoordinatedOperation(Procedure, AcquireBarrierAnswer, BarrierAnswer, String[])} 253 * . 254 * @param spy Spy on a real {@link Procedure} 255 * @param cohort expected cohort members 256 * @throws Exception on failure 257 */ 258 public void runCoordinatedProcedure(Procedure spy, String... cohort) throws Exception { 259 runCoordinatedOperation(spy, new AcquireBarrierAnswer(procName, cohort), 260 new BarrierAnswer(procName, cohort), cohort); 261 } 262 263 public void runCoordinatedOperation(Procedure spy, AcquireBarrierAnswer prepare, 264 String... cohort) throws Exception { 265 runCoordinatedOperation(spy, prepare, new BarrierAnswer(procName, cohort), cohort); 266 } 267 268 public void runCoordinatedOperation(Procedure spy, BarrierAnswer commit, 269 String... cohort) throws Exception { 270 runCoordinatedOperation(spy, new AcquireBarrierAnswer(procName, cohort), commit, cohort); 271 } 272 273 public void runCoordinatedOperation(Procedure spy, AcquireBarrierAnswer prepareOperation, 274 BarrierAnswer commitOperation, String... cohort) throws Exception { 275 List<String> expected = Arrays.asList(cohort); 276 when(coordinator.createProcedure(any(), eq(procName), eq(procData), anyListOf(String.class))) 277 .thenReturn(spy); 278 279 // use the passed controller responses 280 doAnswer(prepareOperation).when(controller).sendGlobalBarrierAcquire(spy, procData, expected); 281 doAnswer(commitOperation).when(controller) 282 .sendGlobalBarrierReached(eq(spy), anyListOf(String.class)); 283 284 // run the operation 285 Procedure task = coordinator.startProcedure(spy.getErrorMonitor(), procName, procData, expected); 286 // and wait for it to finish 287 task.waitForCompleted(); 288 289 // make sure we mocked correctly 290 prepareOperation.ensureRan(); 291 // we never got an exception 292 InOrder inorder = inOrder(spy, controller); 293 inorder.verify(spy).sendGlobalBarrierStart(); 294 inorder.verify(controller).sendGlobalBarrierAcquire(task, procData, expected); 295 inorder.verify(spy).sendGlobalBarrierReached(); 296 inorder.verify(controller).sendGlobalBarrierReached(eq(task), anyListOf(String.class)); 297 } 298 299 private static abstract class OperationAnswer implements Answer<Void> { 300 private boolean ran = false; 301 302 public void ensureRan() { 303 assertTrue("Prepare mocking didn't actually run!", ran); 304 } 305 306 @Override 307 public final Void answer(InvocationOnMock invocation) throws Throwable { 308 this.ran = true; 309 doWork(); 310 return null; 311 } 312 313 protected abstract void doWork() throws Throwable; 314 } 315 316 /** 317 * Just tell the current coordinator that each of the nodes has prepared 318 */ 319 private class AcquireBarrierAnswer extends OperationAnswer { 320 protected final String[] cohort; 321 protected final String opName; 322 323 public AcquireBarrierAnswer(String opName, String... cohort) { 324 this.cohort = cohort; 325 this.opName = opName; 326 } 327 328 @Override 329 public void doWork() { 330 if (cohort == null) return; 331 for (String member : cohort) { 332 TestProcedureCoordinator.this.coordinator.memberAcquiredBarrier(opName, member); 333 } 334 } 335 } 336 337 /** 338 * Just tell the current coordinator that each of the nodes has committed 339 */ 340 private class BarrierAnswer extends OperationAnswer { 341 protected final String[] cohort; 342 protected final String opName; 343 344 public BarrierAnswer(String opName, String... cohort) { 345 this.cohort = cohort; 346 this.opName = opName; 347 } 348 349 @Override 350 public void doWork() { 351 if (cohort == null) return; 352 for (String member : cohort) { 353 TestProcedureCoordinator.this.coordinator.memberFinishedBarrier(opName, member, 354 new byte[0]); 355 } 356 } 357 } 358}