001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure; 019 020import static org.junit.jupiter.api.Assertions.assertNull; 021import static org.junit.jupiter.api.Assertions.assertTrue; 022import static org.mockito.ArgumentMatchers.any; 023import static org.mockito.ArgumentMatchers.anyList; 024import static org.mockito.ArgumentMatchers.anyString; 025import static org.mockito.ArgumentMatchers.eq; 026import static org.mockito.Mockito.atLeastOnce; 027import static org.mockito.Mockito.doAnswer; 028import static org.mockito.Mockito.doThrow; 029import static org.mockito.Mockito.inOrder; 030import static org.mockito.Mockito.mock; 031import static org.mockito.Mockito.never; 032import static org.mockito.Mockito.reset; 033import static org.mockito.Mockito.spy; 034import static org.mockito.Mockito.times; 035import static org.mockito.Mockito.verify; 036import static org.mockito.Mockito.when; 037 038import java.io.IOException; 039import java.util.Arrays; 040import java.util.List; 041import java.util.concurrent.ThreadPoolExecutor; 042import java.util.concurrent.TimeUnit; 043import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 044import org.apache.hadoop.hbase.testclassification.MasterTests; 045import org.apache.hadoop.hbase.testclassification.SmallTests; 046import org.junit.jupiter.api.AfterEach; 047import org.junit.jupiter.api.Tag; 048import org.junit.jupiter.api.Test; 049import org.mockito.InOrder; 050import org.mockito.invocation.InvocationOnMock; 051import org.mockito.stubbing.Answer; 052 053import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 054 055/** 056 * Test Procedure coordinator operation. 057 * <p> 058 * This only works correctly when we do <i>class level parallelization</i> of tests. If we do method 059 * level serialization this class will likely throw all kinds of errors. 060 */ 061@Tag(MasterTests.TAG) 062@Tag(SmallTests.TAG) 063public class TestProcedureCoordinator { 064 065 // general test constants 066 private static final long WAKE_FREQUENCY = 1000; 067 private static final long TIMEOUT = 100000; 068 private static final long POOL_KEEP_ALIVE = 1; 069 private static final String nodeName = "node"; 070 private static final String procName = "some op"; 071 private static final byte[] procData = new byte[0]; 072 private static final List<String> expected = Lists.newArrayList("remote1", "remote2"); 073 074 // setup the mocks 075 private final ProcedureCoordinatorRpcs controller = mock(ProcedureCoordinatorRpcs.class); 076 private final Procedure task = mock(Procedure.class); 077 private final ForeignExceptionDispatcher monitor = mock(ForeignExceptionDispatcher.class); 078 079 // handle to the coordinator for each test 080 private ProcedureCoordinator coordinator; 081 082 @AfterEach 083 public void resetTest() throws IOException { 084 // reset all the mocks used for the tests 085 reset(controller, task, monitor); 086 // close the open coordinator, if it was used 087 if (coordinator != null) coordinator.close(); 088 } 089 090 private ProcedureCoordinator buildNewCoordinator() { 091 ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(nodeName, 1, POOL_KEEP_ALIVE); 092 return spy(new ProcedureCoordinator(controller, pool)); 093 } 094 095 /** 096 * Currently we can only handle one procedure at a time. This makes sure we handle that and reject 097 * submitting more. 098 */ 099 @Test 100 public void testThreadPoolSize() throws Exception { 101 ProcedureCoordinator coordinator = buildNewCoordinator(); 102 Procedure proc = 103 new Procedure(coordinator, monitor, WAKE_FREQUENCY, TIMEOUT, procName, procData, expected); 104 Procedure procSpy = spy(proc); 105 106 Procedure proc2 = new Procedure(coordinator, monitor, WAKE_FREQUENCY, TIMEOUT, procName + "2", 107 procData, expected); 108 Procedure procSpy2 = spy(proc2); 109 when(coordinator.createProcedure(any(), eq(procName), eq(procData), anyList())) 110 .thenReturn(procSpy, procSpy2); 111 112 coordinator.startProcedure(procSpy.getErrorMonitor(), procName, procData, expected); 113 // null here means second procedure failed to start. 114 assertNull( 115 coordinator.startProcedure(proc2.getErrorMonitor(), "another op", procData, expected), 116 "Coordinator successfully ran two tasks at once with a single thread pool."); 117 } 118 119 /** 120 * Check handling a connection failure correctly if we get it during the acquiring phase 121 */ 122 @Test 123 public void testUnreachableControllerDuringPrepare() throws Exception { 124 coordinator = buildNewCoordinator(); 125 // setup the proc 126 List<String> expected = Arrays.asList("cohort"); 127 Procedure proc = 128 new Procedure(coordinator, WAKE_FREQUENCY, TIMEOUT, procName, procData, expected); 129 final Procedure procSpy = spy(proc); 130 131 when(coordinator.createProcedure(any(), eq(procName), eq(procData), anyList())) 132 .thenReturn(procSpy); 133 134 // use the passed controller responses 135 IOException cause = new IOException("Failed to reach comms during acquire"); 136 doThrow(cause).when(controller).sendGlobalBarrierAcquire(eq(procSpy), eq(procData), anyList()); 137 138 // run the operation 139 proc = coordinator.startProcedure(proc.getErrorMonitor(), procName, procData, expected); 140 // and wait for it to finish 141 while (!proc.completedLatch.await(WAKE_FREQUENCY, TimeUnit.MILLISECONDS)) 142 ; 143 verify(procSpy, atLeastOnce()).receive(any()); 144 verify(coordinator, times(1)).rpcConnectionFailure(anyString(), eq(cause)); 145 verify(controller, times(1)).sendGlobalBarrierAcquire(procSpy, procData, expected); 146 verify(controller, never()).sendGlobalBarrierReached(any(), anyList()); 147 } 148 149 /** 150 * Check handling a connection failure correctly if we get it during the barrier phase 151 */ 152 @Test 153 public void testUnreachableControllerDuringCommit() throws Exception { 154 coordinator = buildNewCoordinator(); 155 156 // setup the task and spy on it 157 List<String> expected = Arrays.asList("cohort"); 158 final Procedure spy = 159 spy(new Procedure(coordinator, WAKE_FREQUENCY, TIMEOUT, procName, procData, expected)); 160 161 when(coordinator.createProcedure(any(), eq(procName), eq(procData), anyList())).thenReturn(spy); 162 163 // use the passed controller responses 164 IOException cause = new IOException("Failed to reach controller during prepare"); 165 doAnswer(new AcquireBarrierAnswer(procName, new String[] { "cohort" })).when(controller) 166 .sendGlobalBarrierAcquire(eq(spy), eq(procData), anyList()); 167 doThrow(cause).when(controller).sendGlobalBarrierReached(eq(spy), anyList()); 168 169 // run the operation 170 Procedure task = 171 coordinator.startProcedure(spy.getErrorMonitor(), procName, procData, expected); 172 // and wait for it to finish 173 while (!task.completedLatch.await(WAKE_FREQUENCY, TimeUnit.MILLISECONDS)) 174 ; 175 verify(spy, atLeastOnce()).receive(any()); 176 verify(coordinator, times(1)).rpcConnectionFailure(anyString(), eq(cause)); 177 verify(controller, times(1)).sendGlobalBarrierAcquire(eq(spy), eq(procData), anyList()); 178 verify(controller, times(1)).sendGlobalBarrierReached(any(), anyList()); 179 } 180 181 @Test 182 public void testNoCohort() throws Exception { 183 runSimpleProcedure(); 184 } 185 186 @Test 187 public void testSingleCohortOrchestration() throws Exception { 188 runSimpleProcedure("one"); 189 } 190 191 @Test 192 public void testMultipleCohortOrchestration() throws Exception { 193 runSimpleProcedure("one", "two", "three", "four"); 194 } 195 196 public void runSimpleProcedure(String... members) throws Exception { 197 coordinator = buildNewCoordinator(); 198 Procedure task = new Procedure(coordinator, monitor, WAKE_FREQUENCY, TIMEOUT, procName, 199 procData, Arrays.asList(members)); 200 final Procedure spy = spy(task); 201 runCoordinatedProcedure(spy, members); 202 } 203 204 /** 205 * Test that if nodes join the barrier early we still correctly handle the progress 206 */ 207 @Test 208 public void testEarlyJoiningBarrier() throws Exception { 209 final String[] cohort = new String[] { "one", "two", "three", "four" }; 210 coordinator = buildNewCoordinator(); 211 final ProcedureCoordinator ref = coordinator; 212 Procedure task = new Procedure(coordinator, monitor, WAKE_FREQUENCY, TIMEOUT, procName, 213 procData, Arrays.asList(cohort)); 214 final Procedure spy = spy(task); 215 216 AcquireBarrierAnswer prepare = new AcquireBarrierAnswer(procName, cohort) { 217 @Override 218 public void doWork() { 219 // then do some fun where we commit before all nodes have prepared 220 // "one" commits before anyone else is done 221 ref.memberAcquiredBarrier(this.opName, this.cohort[0]); 222 ref.memberFinishedBarrier(this.opName, this.cohort[0], new byte[0]); 223 // but "two" takes a while 224 ref.memberAcquiredBarrier(this.opName, this.cohort[1]); 225 // "three"jumps ahead 226 ref.memberAcquiredBarrier(this.opName, this.cohort[2]); 227 ref.memberFinishedBarrier(this.opName, this.cohort[2], new byte[0]); 228 // and "four" takes a while 229 ref.memberAcquiredBarrier(this.opName, this.cohort[3]); 230 } 231 }; 232 233 BarrierAnswer commit = new BarrierAnswer(procName, cohort) { 234 @Override 235 public void doWork() { 236 ref.memberFinishedBarrier(opName, this.cohort[1], new byte[0]); 237 ref.memberFinishedBarrier(opName, this.cohort[3], new byte[0]); 238 } 239 }; 240 runCoordinatedOperation(spy, prepare, commit, cohort); 241 } 242 243 /** 244 * Just run a procedure with the standard name and data, with not special task for the mock 245 * coordinator (it works just like a regular coordinator). For custom behavior see 246 * {@link #runCoordinatedOperation(Procedure, AcquireBarrierAnswer, BarrierAnswer, String[])} . 247 * @param spy Spy on a real {@link Procedure} 248 * @param cohort expected cohort members 249 * @throws Exception on failure 250 */ 251 public void runCoordinatedProcedure(Procedure spy, String... cohort) throws Exception { 252 runCoordinatedOperation(spy, new AcquireBarrierAnswer(procName, cohort), 253 new BarrierAnswer(procName, cohort), cohort); 254 } 255 256 public void runCoordinatedOperation(Procedure spy, AcquireBarrierAnswer prepare, String... cohort) 257 throws Exception { 258 runCoordinatedOperation(spy, prepare, new BarrierAnswer(procName, cohort), cohort); 259 } 260 261 public void runCoordinatedOperation(Procedure spy, BarrierAnswer commit, String... cohort) 262 throws Exception { 263 runCoordinatedOperation(spy, new AcquireBarrierAnswer(procName, cohort), commit, cohort); 264 } 265 266 public void runCoordinatedOperation(Procedure spy, AcquireBarrierAnswer prepareOperation, 267 BarrierAnswer commitOperation, String... cohort) throws Exception { 268 List<String> expected = Arrays.asList(cohort); 269 when(coordinator.createProcedure(any(), eq(procName), eq(procData), anyList())).thenReturn(spy); 270 271 // use the passed controller responses 272 doAnswer(prepareOperation).when(controller).sendGlobalBarrierAcquire(spy, procData, expected); 273 doAnswer(commitOperation).when(controller).sendGlobalBarrierReached(eq(spy), anyList()); 274 275 // run the operation 276 Procedure task = 277 coordinator.startProcedure(spy.getErrorMonitor(), procName, procData, expected); 278 // and wait for it to finish 279 task.waitForCompleted(); 280 281 // make sure we mocked correctly 282 prepareOperation.ensureRan(); 283 // we never got an exception 284 InOrder inorder = inOrder(spy, controller); 285 inorder.verify(spy).sendGlobalBarrierStart(); 286 inorder.verify(controller).sendGlobalBarrierAcquire(task, procData, expected); 287 inorder.verify(spy).sendGlobalBarrierReached(); 288 inorder.verify(controller).sendGlobalBarrierReached(eq(task), anyList()); 289 } 290 291 private static abstract class OperationAnswer implements Answer<Void> { 292 private boolean ran = false; 293 294 public void ensureRan() { 295 assertTrue(ran, "Prepare mocking didn't actually run!"); 296 } 297 298 @Override 299 public final Void answer(InvocationOnMock invocation) throws Throwable { 300 this.ran = true; 301 doWork(); 302 return null; 303 } 304 305 protected abstract void doWork() throws Throwable; 306 } 307 308 /** 309 * Just tell the current coordinator that each of the nodes has prepared 310 */ 311 private class AcquireBarrierAnswer extends OperationAnswer { 312 protected final String[] cohort; 313 protected final String opName; 314 315 public AcquireBarrierAnswer(String opName, String... cohort) { 316 this.cohort = cohort; 317 this.opName = opName; 318 } 319 320 @Override 321 public void doWork() { 322 if (cohort == null) return; 323 for (String member : cohort) { 324 TestProcedureCoordinator.this.coordinator.memberAcquiredBarrier(opName, member); 325 } 326 } 327 } 328 329 /** 330 * Just tell the current coordinator that each of the nodes has committed 331 */ 332 private class BarrierAnswer extends OperationAnswer { 333 protected final String[] cohort; 334 protected final String opName; 335 336 public BarrierAnswer(String opName, String... cohort) { 337 this.cohort = cohort; 338 this.opName = opName; 339 } 340 341 @Override 342 public void doWork() { 343 if (cohort == null) return; 344 for (String member : cohort) { 345 TestProcedureCoordinator.this.coordinator.memberFinishedBarrier(opName, member, 346 new byte[0]); 347 } 348 } 349 } 350}