001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure; 019 020import static org.junit.Assert.assertNull; 021import static org.junit.Assert.assertTrue; 022import static org.mockito.Matchers.any; 023import static org.mockito.Matchers.anyListOf; 024import static org.mockito.Matchers.anyString; 025import static org.mockito.Matchers.eq; 026import static org.mockito.Mockito.atLeastOnce; 027import static org.mockito.Mockito.doAnswer; 028import static org.mockito.Mockito.doThrow; 029import static org.mockito.Mockito.inOrder; 030import static org.mockito.Mockito.mock; 031import static org.mockito.Mockito.never; 032import static org.mockito.Mockito.reset; 033import static org.mockito.Mockito.spy; 034import static org.mockito.Mockito.times; 035import static org.mockito.Mockito.verify; 036import static org.mockito.Mockito.when; 037 038import java.io.IOException; 039import java.util.Arrays; 040import java.util.List; 041import java.util.concurrent.ThreadPoolExecutor; 042import java.util.concurrent.TimeUnit; 043import org.apache.hadoop.hbase.HBaseClassTestRule; 044import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 045import org.apache.hadoop.hbase.testclassification.MasterTests; 046import org.apache.hadoop.hbase.testclassification.SmallTests; 047import org.junit.After; 048import org.junit.ClassRule; 049import org.junit.Test; 050import org.junit.experimental.categories.Category; 051import org.mockito.InOrder; 052import org.mockito.invocation.InvocationOnMock; 053import org.mockito.stubbing.Answer; 054 055import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 056 057/** 058 * Test Procedure coordinator operation. 059 * <p> 060 * This only works correctly when we do <i>class level parallelization</i> of tests. If we do method 061 * level serialization this class will likely throw all kinds of errors. 062 */ 063@Category({ MasterTests.class, SmallTests.class }) 064public class TestProcedureCoordinator { 065 066 @ClassRule 067 public static final HBaseClassTestRule CLASS_RULE = 068 HBaseClassTestRule.forClass(TestProcedureCoordinator.class); 069 070 // general test constants 071 private static final long WAKE_FREQUENCY = 1000; 072 private static final long TIMEOUT = 100000; 073 private static final long POOL_KEEP_ALIVE = 1; 074 private static final String nodeName = "node"; 075 private static final String procName = "some op"; 076 private static final byte[] procData = new byte[0]; 077 private static final List<String> expected = Lists.newArrayList("remote1", "remote2"); 078 079 // setup the mocks 080 private final ProcedureCoordinatorRpcs controller = mock(ProcedureCoordinatorRpcs.class); 081 private final Procedure task = mock(Procedure.class); 082 private final ForeignExceptionDispatcher monitor = mock(ForeignExceptionDispatcher.class); 083 084 // handle to the coordinator for each test 085 private ProcedureCoordinator coordinator; 086 087 @After 088 public void resetTest() throws IOException { 089 // reset all the mocks used for the tests 090 reset(controller, task, monitor); 091 // close the open coordinator, if it was used 092 if (coordinator != null) coordinator.close(); 093 } 094 095 private ProcedureCoordinator buildNewCoordinator() { 096 ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(nodeName, 1, POOL_KEEP_ALIVE); 097 return spy(new ProcedureCoordinator(controller, pool)); 098 } 099 100 /** 101 * Currently we can only handle one procedure at a time. This makes sure we handle that and reject 102 * submitting more. 103 */ 104 @Test 105 public void testThreadPoolSize() throws Exception { 106 ProcedureCoordinator coordinator = buildNewCoordinator(); 107 Procedure proc = 108 new Procedure(coordinator, monitor, WAKE_FREQUENCY, TIMEOUT, procName, procData, expected); 109 Procedure procSpy = spy(proc); 110 111 Procedure proc2 = new Procedure(coordinator, monitor, WAKE_FREQUENCY, TIMEOUT, procName + "2", 112 procData, expected); 113 Procedure procSpy2 = spy(proc2); 114 when(coordinator.createProcedure(any(), eq(procName), eq(procData), anyListOf(String.class))) 115 .thenReturn(procSpy, procSpy2); 116 117 coordinator.startProcedure(procSpy.getErrorMonitor(), procName, procData, expected); 118 // null here means second procedure failed to start. 119 assertNull("Coordinator successfully ran two tasks at once with a single thread pool.", 120 coordinator.startProcedure(proc2.getErrorMonitor(), "another op", procData, expected)); 121 } 122 123 /** 124 * Check handling a connection failure correctly if we get it during the acquiring phase 125 */ 126 @Test 127 public void testUnreachableControllerDuringPrepare() throws Exception { 128 coordinator = buildNewCoordinator(); 129 // setup the proc 130 List<String> expected = Arrays.asList("cohort"); 131 Procedure proc = 132 new Procedure(coordinator, WAKE_FREQUENCY, TIMEOUT, procName, procData, expected); 133 final Procedure procSpy = spy(proc); 134 135 when(coordinator.createProcedure(any(), eq(procName), eq(procData), anyListOf(String.class))) 136 .thenReturn(procSpy); 137 138 // use the passed controller responses 139 IOException cause = new IOException("Failed to reach comms during acquire"); 140 doThrow(cause).when(controller).sendGlobalBarrierAcquire(eq(procSpy), eq(procData), 141 anyListOf(String.class)); 142 143 // run the operation 144 proc = coordinator.startProcedure(proc.getErrorMonitor(), procName, procData, expected); 145 // and wait for it to finish 146 while (!proc.completedLatch.await(WAKE_FREQUENCY, TimeUnit.MILLISECONDS)) 147 ; 148 verify(procSpy, atLeastOnce()).receive(any()); 149 verify(coordinator, times(1)).rpcConnectionFailure(anyString(), eq(cause)); 150 verify(controller, times(1)).sendGlobalBarrierAcquire(procSpy, procData, expected); 151 verify(controller, never()).sendGlobalBarrierReached(any(), anyListOf(String.class)); 152 } 153 154 /** 155 * Check handling a connection failure correctly if we get it during the barrier phase 156 */ 157 @Test 158 public void testUnreachableControllerDuringCommit() throws Exception { 159 coordinator = buildNewCoordinator(); 160 161 // setup the task and spy on it 162 List<String> expected = Arrays.asList("cohort"); 163 final Procedure spy = 164 spy(new Procedure(coordinator, WAKE_FREQUENCY, TIMEOUT, procName, procData, expected)); 165 166 when(coordinator.createProcedure(any(), eq(procName), eq(procData), anyListOf(String.class))) 167 .thenReturn(spy); 168 169 // use the passed controller responses 170 IOException cause = new IOException("Failed to reach controller during prepare"); 171 doAnswer(new AcquireBarrierAnswer(procName, new String[] { "cohort" })).when(controller) 172 .sendGlobalBarrierAcquire(eq(spy), eq(procData), anyListOf(String.class)); 173 doThrow(cause).when(controller).sendGlobalBarrierReached(eq(spy), anyListOf(String.class)); 174 175 // run the operation 176 Procedure task = 177 coordinator.startProcedure(spy.getErrorMonitor(), procName, procData, expected); 178 // and wait for it to finish 179 while (!task.completedLatch.await(WAKE_FREQUENCY, TimeUnit.MILLISECONDS)) 180 ; 181 verify(spy, atLeastOnce()).receive(any()); 182 verify(coordinator, times(1)).rpcConnectionFailure(anyString(), eq(cause)); 183 verify(controller, times(1)).sendGlobalBarrierAcquire(eq(spy), eq(procData), 184 anyListOf(String.class)); 185 verify(controller, times(1)).sendGlobalBarrierReached(any(), anyListOf(String.class)); 186 } 187 188 @Test 189 public void testNoCohort() throws Exception { 190 runSimpleProcedure(); 191 } 192 193 @Test 194 public void testSingleCohortOrchestration() throws Exception { 195 runSimpleProcedure("one"); 196 } 197 198 @Test 199 public void testMultipleCohortOrchestration() throws Exception { 200 runSimpleProcedure("one", "two", "three", "four"); 201 } 202 203 public void runSimpleProcedure(String... members) throws Exception { 204 coordinator = buildNewCoordinator(); 205 Procedure task = new Procedure(coordinator, monitor, WAKE_FREQUENCY, TIMEOUT, procName, 206 procData, Arrays.asList(members)); 207 final Procedure spy = spy(task); 208 runCoordinatedProcedure(spy, members); 209 } 210 211 /** 212 * Test that if nodes join the barrier early we still correctly handle the progress 213 */ 214 @Test 215 public void testEarlyJoiningBarrier() throws Exception { 216 final String[] cohort = new String[] { "one", "two", "three", "four" }; 217 coordinator = buildNewCoordinator(); 218 final ProcedureCoordinator ref = coordinator; 219 Procedure task = new Procedure(coordinator, monitor, WAKE_FREQUENCY, TIMEOUT, procName, 220 procData, Arrays.asList(cohort)); 221 final Procedure spy = spy(task); 222 223 AcquireBarrierAnswer prepare = new AcquireBarrierAnswer(procName, cohort) { 224 @Override 225 public void doWork() { 226 // then do some fun where we commit before all nodes have prepared 227 // "one" commits before anyone else is done 228 ref.memberAcquiredBarrier(this.opName, this.cohort[0]); 229 ref.memberFinishedBarrier(this.opName, this.cohort[0], new byte[0]); 230 // but "two" takes a while 231 ref.memberAcquiredBarrier(this.opName, this.cohort[1]); 232 // "three"jumps ahead 233 ref.memberAcquiredBarrier(this.opName, this.cohort[2]); 234 ref.memberFinishedBarrier(this.opName, this.cohort[2], new byte[0]); 235 // and "four" takes a while 236 ref.memberAcquiredBarrier(this.opName, this.cohort[3]); 237 } 238 }; 239 240 BarrierAnswer commit = new BarrierAnswer(procName, cohort) { 241 @Override 242 public void doWork() { 243 ref.memberFinishedBarrier(opName, this.cohort[1], new byte[0]); 244 ref.memberFinishedBarrier(opName, this.cohort[3], new byte[0]); 245 } 246 }; 247 runCoordinatedOperation(spy, prepare, commit, cohort); 248 } 249 250 /** 251 * Just run a procedure with the standard name and data, with not special task for the mock 252 * coordinator (it works just like a regular coordinator). For custom behavior see 253 * {@link #runCoordinatedOperation(Procedure, AcquireBarrierAnswer, BarrierAnswer, String[])} . 254 * @param spy Spy on a real {@link Procedure} 255 * @param cohort expected cohort members 256 * @throws Exception on failure 257 */ 258 public void runCoordinatedProcedure(Procedure spy, String... cohort) throws Exception { 259 runCoordinatedOperation(spy, new AcquireBarrierAnswer(procName, cohort), 260 new BarrierAnswer(procName, cohort), cohort); 261 } 262 263 public void runCoordinatedOperation(Procedure spy, AcquireBarrierAnswer prepare, String... cohort) 264 throws Exception { 265 runCoordinatedOperation(spy, prepare, new BarrierAnswer(procName, cohort), cohort); 266 } 267 268 public void runCoordinatedOperation(Procedure spy, BarrierAnswer commit, String... cohort) 269 throws Exception { 270 runCoordinatedOperation(spy, new AcquireBarrierAnswer(procName, cohort), commit, cohort); 271 } 272 273 public void runCoordinatedOperation(Procedure spy, AcquireBarrierAnswer prepareOperation, 274 BarrierAnswer commitOperation, String... cohort) throws Exception { 275 List<String> expected = Arrays.asList(cohort); 276 when(coordinator.createProcedure(any(), eq(procName), eq(procData), anyListOf(String.class))) 277 .thenReturn(spy); 278 279 // use the passed controller responses 280 doAnswer(prepareOperation).when(controller).sendGlobalBarrierAcquire(spy, procData, expected); 281 doAnswer(commitOperation).when(controller).sendGlobalBarrierReached(eq(spy), 282 anyListOf(String.class)); 283 284 // run the operation 285 Procedure task = 286 coordinator.startProcedure(spy.getErrorMonitor(), procName, procData, expected); 287 // and wait for it to finish 288 task.waitForCompleted(); 289 290 // make sure we mocked correctly 291 prepareOperation.ensureRan(); 292 // we never got an exception 293 InOrder inorder = inOrder(spy, controller); 294 inorder.verify(spy).sendGlobalBarrierStart(); 295 inorder.verify(controller).sendGlobalBarrierAcquire(task, procData, expected); 296 inorder.verify(spy).sendGlobalBarrierReached(); 297 inorder.verify(controller).sendGlobalBarrierReached(eq(task), anyListOf(String.class)); 298 } 299 300 private static abstract class OperationAnswer implements Answer<Void> { 301 private boolean ran = false; 302 303 public void ensureRan() { 304 assertTrue("Prepare mocking didn't actually run!", ran); 305 } 306 307 @Override 308 public final Void answer(InvocationOnMock invocation) throws Throwable { 309 this.ran = true; 310 doWork(); 311 return null; 312 } 313 314 protected abstract void doWork() throws Throwable; 315 } 316 317 /** 318 * Just tell the current coordinator that each of the nodes has prepared 319 */ 320 private class AcquireBarrierAnswer extends OperationAnswer { 321 protected final String[] cohort; 322 protected final String opName; 323 324 public AcquireBarrierAnswer(String opName, String... cohort) { 325 this.cohort = cohort; 326 this.opName = opName; 327 } 328 329 @Override 330 public void doWork() { 331 if (cohort == null) return; 332 for (String member : cohort) { 333 TestProcedureCoordinator.this.coordinator.memberAcquiredBarrier(opName, member); 334 } 335 } 336 } 337 338 /** 339 * Just tell the current coordinator that each of the nodes has committed 340 */ 341 private class BarrierAnswer extends OperationAnswer { 342 protected final String[] cohort; 343 protected final String opName; 344 345 public BarrierAnswer(String opName, String... cohort) { 346 this.cohort = cohort; 347 this.opName = opName; 348 } 349 350 @Override 351 public void doWork() { 352 if (cohort == null) return; 353 for (String member : cohort) { 354 TestProcedureCoordinator.this.coordinator.memberFinishedBarrier(opName, member, 355 new byte[0]); 356 } 357 } 358 } 359}