001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure; 019 020import static org.mockito.ArgumentMatchers.any; 021import static org.mockito.ArgumentMatchers.anyString; 022import static org.mockito.ArgumentMatchers.eq; 023import static org.mockito.Mockito.doAnswer; 024import static org.mockito.Mockito.doThrow; 025import static org.mockito.Mockito.inOrder; 026import static org.mockito.Mockito.mock; 027import static org.mockito.Mockito.never; 028import static org.mockito.Mockito.reset; 029import static org.mockito.Mockito.spy; 030import static org.mockito.Mockito.verifyNoInteractions; 031import static org.mockito.Mockito.when; 032 033import java.io.IOException; 034import java.util.concurrent.ThreadPoolExecutor; 035import org.apache.hadoop.hbase.errorhandling.ForeignException; 036import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 037import org.apache.hadoop.hbase.errorhandling.TimeoutException; 038import org.apache.hadoop.hbase.procedure.Subprocedure.SubprocedureImpl; 039import org.apache.hadoop.hbase.testclassification.MasterTests; 040import org.apache.hadoop.hbase.testclassification.SmallTests; 041import org.junit.jupiter.api.AfterEach; 042import org.junit.jupiter.api.Tag; 043import org.junit.jupiter.api.Test; 044import org.mockito.InOrder; 045import org.mockito.Mockito; 046import org.mockito.invocation.InvocationOnMock; 047import org.mockito.stubbing.Answer; 048 049import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 050 051/** 052 * Test the procedure member, and it's error handling mechanisms. 053 */ 054@Tag(MasterTests.TAG) 055@Tag(SmallTests.TAG) 056public class TestProcedureMember { 057 058 private static final long WAKE_FREQUENCY = 100; 059 private static final long TIMEOUT = 100000; 060 private static final long POOL_KEEP_ALIVE = 1; 061 062 private final String op = "some op"; 063 private final byte[] data = new byte[0]; 064 private final ForeignExceptionDispatcher mockListener = 065 Mockito.spy(new ForeignExceptionDispatcher()); 066 private final SubprocedureFactory mockBuilder = mock(SubprocedureFactory.class); 067 private final ProcedureMemberRpcs mockMemberComms = Mockito.mock(ProcedureMemberRpcs.class); 068 private ProcedureMember member; 069 private ForeignExceptionDispatcher dispatcher; 070 Subprocedure spySub; 071 072 /** 073 * Reset all the mock objects 074 */ 075 @AfterEach 076 public void resetTest() throws IOException { 077 reset(mockListener, mockBuilder, mockMemberComms); 078 Closeables.close(member, true); 079 } 080 081 /** 082 * Build a member using the class level mocks 083 * @return member to use for tests 084 */ 085 private ProcedureMember buildCohortMember() { 086 String name = "node"; 087 ThreadPoolExecutor pool = ProcedureMember.defaultPool(name, 1, POOL_KEEP_ALIVE); 088 return new ProcedureMember(mockMemberComms, pool, mockBuilder); 089 } 090 091 /** 092 * Setup a procedure member that returns the spied-upon {@link Subprocedure}. 093 */ 094 private void buildCohortMemberPair() throws IOException { 095 dispatcher = new ForeignExceptionDispatcher(); 096 String name = "node"; 097 ThreadPoolExecutor pool = ProcedureMember.defaultPool(name, 1, POOL_KEEP_ALIVE); 098 member = new ProcedureMember(mockMemberComms, pool, mockBuilder); 099 when(mockMemberComms.getMemberName()).thenReturn("membername"); // needed for generating 100 // exception 101 Subprocedure subproc = new EmptySubprocedure(member, dispatcher); 102 spySub = spy(subproc); 103 when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spySub); 104 addCommitAnswer(); 105 } 106 107 /** 108 * Add a 'in barrier phase' response to the mock controller when it gets a acquired notification 109 */ 110 private void addCommitAnswer() throws IOException { 111 doAnswer(new Answer<Void>() { 112 @Override 113 public Void answer(InvocationOnMock invocation) throws Throwable { 114 member.receivedReachedGlobalBarrier(op); 115 return null; 116 } 117 }).when(mockMemberComms).sendMemberAcquired(any()); 118 } 119 120 /** 121 * Test the normal sub procedure execution case. 122 */ 123 @Test 124 public void testSimpleRun() throws Exception { 125 member = buildCohortMember(); 126 EmptySubprocedure subproc = new EmptySubprocedure(member, mockListener); 127 EmptySubprocedure spy = spy(subproc); 128 when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spy); 129 130 // when we get a prepare, then start the commit phase 131 addCommitAnswer(); 132 133 // run the operation 134 // build a new operation 135 Subprocedure subproc1 = member.createSubprocedure(op, data); 136 member.submitSubprocedure(subproc1); 137 // and wait for it to finish 138 subproc.waitForLocallyCompleted(); 139 140 // make sure everything ran in order 141 InOrder order = inOrder(mockMemberComms, spy); 142 order.verify(spy).acquireBarrier(); 143 order.verify(mockMemberComms).sendMemberAcquired(eq(spy)); 144 order.verify(spy).insideBarrier(); 145 order.verify(mockMemberComms).sendMemberCompleted(eq(spy), eq(data)); 146 order.verify(mockMemberComms, never()).sendMemberAborted(eq(spy), any()); 147 } 148 149 /** 150 * Make sure we call cleanup etc, when we have an exception during 151 * {@link Subprocedure#acquireBarrier()}. 152 */ 153 @Test 154 public void testMemberPrepareException() throws Exception { 155 buildCohortMemberPair(); 156 157 // mock an exception on Subprocedure's prepare 158 doAnswer(new Answer<Void>() { 159 @Override 160 public Void answer(InvocationOnMock invocation) throws Throwable { 161 throw new IOException("Forced IOException in member acquireBarrier"); 162 } 163 }).when(spySub).acquireBarrier(); 164 165 // run the operation 166 // build a new operation 167 Subprocedure subproc = member.createSubprocedure(op, data); 168 member.submitSubprocedure(subproc); 169 // if the operation doesn't die properly, then this will timeout 170 member.closeAndWait(TIMEOUT); 171 172 // make sure everything ran in order 173 InOrder order = inOrder(mockMemberComms, spySub); 174 order.verify(spySub).acquireBarrier(); 175 // Later phases not run 176 order.verify(mockMemberComms, never()).sendMemberAcquired(eq(spySub)); 177 order.verify(spySub, never()).insideBarrier(); 178 order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub), eq(data)); 179 // error recovery path exercised 180 order.verify(spySub).cancel(anyString(), any()); 181 order.verify(spySub).cleanup(any()); 182 } 183 184 /** 185 * Make sure we call cleanup etc, when we have an exception during prepare. 186 */ 187 @Test 188 public void testSendMemberAcquiredCommsFailure() throws Exception { 189 buildCohortMemberPair(); 190 191 // mock an exception on Subprocedure's prepare 192 doAnswer(new Answer<Void>() { 193 @Override 194 public Void answer(InvocationOnMock invocation) throws Throwable { 195 throw new IOException("Forced IOException in member prepare"); 196 } 197 }).when(mockMemberComms).sendMemberAcquired(any()); 198 199 // run the operation 200 // build a new operation 201 Subprocedure subproc = member.createSubprocedure(op, data); 202 member.submitSubprocedure(subproc); 203 // if the operation doesn't die properly, then this will timeout 204 member.closeAndWait(TIMEOUT); 205 206 // make sure everything ran in order 207 InOrder order = inOrder(mockMemberComms, spySub); 208 order.verify(spySub).acquireBarrier(); 209 order.verify(mockMemberComms).sendMemberAcquired(eq(spySub)); 210 211 // Later phases not run 212 order.verify(spySub, never()).insideBarrier(); 213 order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub), eq(data)); 214 // error recovery path exercised 215 order.verify(spySub).cancel(anyString(), any()); 216 order.verify(spySub).cleanup(any()); 217 } 218 219 /** 220 * Fail correctly if coordinator aborts the procedure. The subprocedure will not interrupt a 221 * running {@link Subprocedure#acquireBarrier()} -- prepare needs to finish first, and the the 222 * abort is checked. Thus, the {@link Subprocedure#acquireBarrier()} should succeed but later get 223 * rolled back via {@link Subprocedure#cleanup}. 224 */ 225 @Test 226 public void testCoordinatorAbort() throws Exception { 227 buildCohortMemberPair(); 228 229 // mock that another node timed out or failed to prepare 230 final TimeoutException oate = new TimeoutException("bogus timeout", 1, 2, 0); 231 doAnswer(new Answer<Void>() { 232 @Override 233 public Void answer(InvocationOnMock invocation) throws Throwable { 234 // inject a remote error (this would have come from an external thread) 235 spySub.cancel("bogus message", oate); 236 // sleep the wake frequency since that is what we promised 237 Thread.sleep(WAKE_FREQUENCY); 238 return null; 239 } 240 }).when(spySub).waitForReachedGlobalBarrier(); 241 242 // run the operation 243 // build a new operation 244 Subprocedure subproc = member.createSubprocedure(op, data); 245 member.submitSubprocedure(subproc); 246 // if the operation doesn't die properly, then this will timeout 247 member.closeAndWait(TIMEOUT); 248 249 // make sure everything ran in order 250 InOrder order = inOrder(mockMemberComms, spySub); 251 order.verify(spySub).acquireBarrier(); 252 order.verify(mockMemberComms).sendMemberAcquired(eq(spySub)); 253 // Later phases not run 254 order.verify(spySub, never()).insideBarrier(); 255 order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub), eq(data)); 256 // error recovery path exercised 257 order.verify(spySub).cancel(anyString(), any()); 258 order.verify(spySub).cleanup(any()); 259 } 260 261 /** 262 * Handle failures if a member's commit phase fails. 263 * <p/> 264 * NOTE: This is the core difference that makes this different from traditional 2PC. In true 2PC 265 * the transaction is committed just before the coordinator sends commit messages to the member. 266 * Members are then responsible for reading its TX log. This implementation actually rolls back, 267 * and thus breaks the normal TX guarantees. 268 */ 269 @Test 270 public void testMemberCommitException() throws Exception { 271 buildCohortMemberPair(); 272 273 // mock an exception on Subprocedure's prepare 274 doAnswer(new Answer<Void>() { 275 @Override 276 public Void answer(InvocationOnMock invocation) throws Throwable { 277 throw new IOException("Forced IOException in member prepare"); 278 } 279 }).when(spySub).insideBarrier(); 280 281 // run the operation 282 // build a new operation 283 Subprocedure subproc = member.createSubprocedure(op, data); 284 member.submitSubprocedure(subproc); 285 // if the operation doesn't die properly, then this will timeout 286 member.closeAndWait(TIMEOUT); 287 288 // make sure everything ran in order 289 InOrder order = inOrder(mockMemberComms, spySub); 290 order.verify(spySub).acquireBarrier(); 291 order.verify(mockMemberComms).sendMemberAcquired(eq(spySub)); 292 order.verify(spySub).insideBarrier(); 293 294 // Later phases not run 295 order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub), eq(data)); 296 // error recovery path exercised 297 order.verify(spySub).cancel(anyString(), any()); 298 order.verify(spySub).cleanup(any()); 299 } 300 301 /** 302 * Handle Failures if a member's commit phase succeeds but notification to coordinator fails 303 * <p/> 304 * NOTE: This is the core difference that makes this different from traditional 2PC. In true 2PC 305 * the transaction is committed just before the coordinator sends commit messages to the member. 306 * Members are then responsible for reading its TX log. This implementation actually rolls back, 307 * and thus breaks the normal TX guarantees. 308 */ 309 @Test 310 public void testMemberCommitCommsFailure() throws Exception { 311 buildCohortMemberPair(); 312 final TimeoutException oate = new TimeoutException("bogus timeout", 1, 2, 0); 313 doAnswer(new Answer<Void>() { 314 @Override 315 public Void answer(InvocationOnMock invocation) throws Throwable { 316 // inject a remote error (this would have come from an external thread) 317 spySub.cancel("commit comms fail", oate); 318 // sleep the wake frequency since that is what we promised 319 Thread.sleep(WAKE_FREQUENCY); 320 return null; 321 } 322 }).when(mockMemberComms).sendMemberCompleted(any(), eq(data)); 323 324 // run the operation 325 // build a new operation 326 Subprocedure subproc = member.createSubprocedure(op, data); 327 member.submitSubprocedure(subproc); 328 // if the operation doesn't die properly, then this will timeout 329 member.closeAndWait(TIMEOUT); 330 331 // make sure everything ran in order 332 InOrder order = inOrder(mockMemberComms, spySub); 333 order.verify(spySub).acquireBarrier(); 334 order.verify(mockMemberComms).sendMemberAcquired(eq(spySub)); 335 order.verify(spySub).insideBarrier(); 336 order.verify(mockMemberComms).sendMemberCompleted(eq(spySub), eq(data)); 337 // error recovery path exercised 338 order.verify(spySub).cancel(anyString(), any()); 339 order.verify(spySub).cleanup(any()); 340 } 341 342 /** 343 * Fail correctly on getting an external error while waiting for the prepared latch 344 * @throws Exception on failure 345 */ 346 @Test 347 public void testPropagateConnectionErrorBackToManager() throws Exception { 348 // setup the operation 349 member = buildCohortMember(); 350 ProcedureMember memberSpy = spy(member); 351 352 // setup the commit and the spy 353 final ForeignExceptionDispatcher dispatcher = new ForeignExceptionDispatcher(); 354 ForeignExceptionDispatcher dispSpy = spy(dispatcher); 355 Subprocedure commit = new EmptySubprocedure(member, dispatcher); 356 Subprocedure spy = spy(commit); 357 when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spy); 358 359 // fail during the prepare phase 360 doThrow(new ForeignException("SRC", "prepare exception")).when(spy).acquireBarrier(); 361 // and throw a connection error when we try to tell the controller about it 362 doThrow(new IOException("Controller is down!")).when(mockMemberComms).sendMemberAborted(eq(spy), 363 any()); 364 365 // run the operation 366 // build a new operation 367 Subprocedure subproc = memberSpy.createSubprocedure(op, data); 368 memberSpy.submitSubprocedure(subproc); 369 // if the operation doesn't die properly, then this will timeout 370 memberSpy.closeAndWait(TIMEOUT); 371 372 // make sure everything ran in order 373 InOrder order = inOrder(mockMemberComms, spy, dispSpy); 374 // make sure we acquire. 375 order.verify(spy).acquireBarrier(); 376 order.verify(mockMemberComms, never()).sendMemberAcquired(spy); 377 378 // TODO Need to do another refactor to get this to propagate to the coordinator. 379 // make sure we pass a remote exception back the controller 380 // order.verify(mockMemberComms).sendMemberAborted(eq(spy), 381 // any()); 382 // order.verify(dispSpy).receiveError(anyString(), 383 // any(), any()); 384 } 385 386 /** 387 * Test that the cohort member correctly doesn't attempt to start a task when the builder cannot 388 * correctly build a new task for the requested operation 389 * @throws Exception on failure 390 */ 391 @Test 392 public void testNoTaskToBeRunFromRequest() throws Exception { 393 ThreadPoolExecutor pool = mock(ThreadPoolExecutor.class); 394 when(mockBuilder.buildSubprocedure(op, data)).thenReturn(null).thenThrow( 395 new IllegalStateException("Wrong state!"), 396 new IllegalArgumentException("can't understand the args")); 397 member = new ProcedureMember(mockMemberComms, pool, mockBuilder); 398 // builder returns null 399 // build a new operation 400 Subprocedure subproc = member.createSubprocedure(op, data); 401 member.submitSubprocedure(subproc); 402 // throws an illegal state exception 403 try { 404 // build a new operation 405 Subprocedure subproc2 = member.createSubprocedure(op, data); 406 member.submitSubprocedure(subproc2); 407 } catch (IllegalStateException ise) { 408 } 409 // throws an illegal argument exception 410 try { 411 // build a new operation 412 Subprocedure subproc3 = member.createSubprocedure(op, data); 413 member.submitSubprocedure(subproc3); 414 } catch (IllegalArgumentException iae) { 415 } 416 417 // no request should reach the pool 418 verifyNoInteractions(pool); 419 // get two abort requests 420 // TODO Need to do another refactor to get this to propagate to the coordinator. 421 // verify(mockMemberComms, times(2)).sendMemberAborted(any(), any()); 422 } 423 424 /** 425 * Helper {@link Procedure} who's phase for each step is just empty 426 */ 427 public class EmptySubprocedure extends SubprocedureImpl { 428 public EmptySubprocedure(ProcedureMember member, ForeignExceptionDispatcher dispatcher) { 429 super(member, op, dispatcher, 430 // TODO 1000000 is an arbitrary number that I picked. 431 WAKE_FREQUENCY, TIMEOUT); 432 } 433 } 434}