001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure; 019 020import static org.mockito.Matchers.any; 021import static org.mockito.Matchers.anyString; 022import static org.mockito.Matchers.eq; 023import static org.mockito.Mockito.doAnswer; 024import static org.mockito.Mockito.doThrow; 025import static org.mockito.Mockito.inOrder; 026import static org.mockito.Mockito.mock; 027import static org.mockito.Mockito.never; 028import static org.mockito.Mockito.reset; 029import static org.mockito.Mockito.spy; 030import static org.mockito.Mockito.verifyZeroInteractions; 031import static org.mockito.Mockito.when; 032 033import java.io.IOException; 034import java.util.concurrent.ThreadPoolExecutor; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.errorhandling.ForeignException; 037import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 038import org.apache.hadoop.hbase.errorhandling.TimeoutException; 039import org.apache.hadoop.hbase.procedure.Subprocedure.SubprocedureImpl; 040import org.apache.hadoop.hbase.testclassification.MasterTests; 041import org.apache.hadoop.hbase.testclassification.SmallTests; 042import org.junit.After; 043import org.junit.ClassRule; 044import org.junit.Test; 045import org.junit.experimental.categories.Category; 046import org.mockito.InOrder; 047import org.mockito.Mockito; 048import org.mockito.invocation.InvocationOnMock; 049import org.mockito.stubbing.Answer; 050 051/** 052 * Test the procedure member, and it's error handling mechanisms. 053 */ 054@Category({MasterTests.class, SmallTests.class}) 055public class TestProcedureMember { 056 057 @ClassRule 058 public static final HBaseClassTestRule CLASS_RULE = 059 HBaseClassTestRule.forClass(TestProcedureMember.class); 060 061 private static final long WAKE_FREQUENCY = 100; 062 private static final long TIMEOUT = 100000; 063 private static final long POOL_KEEP_ALIVE = 1; 064 065 private final String op = "some op"; 066 private final byte[] data = new byte[0]; 067 private final ForeignExceptionDispatcher mockListener = Mockito 068 .spy(new ForeignExceptionDispatcher()); 069 private final SubprocedureFactory mockBuilder = mock(SubprocedureFactory.class); 070 private final ProcedureMemberRpcs mockMemberComms = Mockito 071 .mock(ProcedureMemberRpcs.class); 072 private ProcedureMember member; 073 private ForeignExceptionDispatcher dispatcher; 074 Subprocedure spySub; 075 076 /** 077 * Reset all the mock objects 078 */ 079 @After 080 public void resetTest() { 081 reset(mockListener, mockBuilder, mockMemberComms); 082 if (member != null) 083 try { 084 member.close(); 085 } catch (IOException e) { 086 e.printStackTrace(); 087 } 088 } 089 090 /** 091 * Build a member using the class level mocks 092 * @return member to use for tests 093 */ 094 private ProcedureMember buildCohortMember() { 095 String name = "node"; 096 ThreadPoolExecutor pool = ProcedureMember.defaultPool(name, 1, POOL_KEEP_ALIVE); 097 return new ProcedureMember(mockMemberComms, pool, mockBuilder); 098 } 099 100 /** 101 * Setup a procedure member that returns the spied-upon {@link Subprocedure}. 102 */ 103 private void buildCohortMemberPair() throws IOException { 104 dispatcher = new ForeignExceptionDispatcher(); 105 String name = "node"; 106 ThreadPoolExecutor pool = ProcedureMember.defaultPool(name, 1, POOL_KEEP_ALIVE); 107 member = new ProcedureMember(mockMemberComms, pool, mockBuilder); 108 when(mockMemberComms.getMemberName()).thenReturn("membername"); // needed for generating exception 109 Subprocedure subproc = new EmptySubprocedure(member, dispatcher); 110 spySub = spy(subproc); 111 when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spySub); 112 addCommitAnswer(); 113 } 114 115 116 /** 117 * Add a 'in barrier phase' response to the mock controller when it gets a acquired notification 118 */ 119 private void addCommitAnswer() throws IOException { 120 doAnswer(new Answer<Void>() { 121 @Override 122 public Void answer(InvocationOnMock invocation) throws Throwable { 123 member.receivedReachedGlobalBarrier(op); 124 return null; 125 } 126 }).when(mockMemberComms).sendMemberAcquired(any()); 127 } 128 129 /** 130 * Test the normal sub procedure execution case. 131 */ 132 @Test 133 public void testSimpleRun() throws Exception { 134 member = buildCohortMember(); 135 EmptySubprocedure subproc = new EmptySubprocedure(member, mockListener); 136 EmptySubprocedure spy = spy(subproc); 137 when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spy); 138 139 // when we get a prepare, then start the commit phase 140 addCommitAnswer(); 141 142 // run the operation 143 // build a new operation 144 Subprocedure subproc1 = member.createSubprocedure(op, data); 145 member.submitSubprocedure(subproc1); 146 // and wait for it to finish 147 subproc.waitForLocallyCompleted(); 148 149 // make sure everything ran in order 150 InOrder order = inOrder(mockMemberComms, spy); 151 order.verify(spy).acquireBarrier(); 152 order.verify(mockMemberComms).sendMemberAcquired(eq(spy)); 153 order.verify(spy).insideBarrier(); 154 order.verify(mockMemberComms).sendMemberCompleted(eq(spy), eq(data)); 155 order.verify(mockMemberComms, never()).sendMemberAborted(eq(spy), 156 any()); 157 } 158 159 /** 160 * Make sure we call cleanup etc, when we have an exception during 161 * {@link Subprocedure#acquireBarrier()}. 162 */ 163 @Test 164 public void testMemberPrepareException() throws Exception { 165 buildCohortMemberPair(); 166 167 // mock an exception on Subprocedure's prepare 168 doAnswer( 169 new Answer<Void>() { 170 @Override 171 public Void answer(InvocationOnMock invocation) throws Throwable { 172 throw new IOException("Forced IOException in member acquireBarrier"); 173 } 174 }).when(spySub).acquireBarrier(); 175 176 // run the operation 177 // build a new operation 178 Subprocedure subproc = member.createSubprocedure(op, data); 179 member.submitSubprocedure(subproc); 180 // if the operation doesn't die properly, then this will timeout 181 member.closeAndWait(TIMEOUT); 182 183 // make sure everything ran in order 184 InOrder order = inOrder(mockMemberComms, spySub); 185 order.verify(spySub).acquireBarrier(); 186 // Later phases not run 187 order.verify(mockMemberComms, never()).sendMemberAcquired(eq(spySub)); 188 order.verify(spySub, never()).insideBarrier(); 189 order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub), eq(data)); 190 // error recovery path exercised 191 order.verify(spySub).cancel(anyString(), any()); 192 order.verify(spySub).cleanup(any()); 193 } 194 195 /** 196 * Make sure we call cleanup etc, when we have an exception during prepare. 197 */ 198 @Test 199 public void testSendMemberAcquiredCommsFailure() throws Exception { 200 buildCohortMemberPair(); 201 202 // mock an exception on Subprocedure's prepare 203 doAnswer( 204 new Answer<Void>() { 205 @Override 206 public Void answer(InvocationOnMock invocation) throws Throwable { 207 throw new IOException("Forced IOException in member prepare"); 208 } 209 }).when(mockMemberComms).sendMemberAcquired(any()); 210 211 // run the operation 212 // build a new operation 213 Subprocedure subproc = member.createSubprocedure(op, data); 214 member.submitSubprocedure(subproc); 215 // if the operation doesn't die properly, then this will timeout 216 member.closeAndWait(TIMEOUT); 217 218 // make sure everything ran in order 219 InOrder order = inOrder(mockMemberComms, spySub); 220 order.verify(spySub).acquireBarrier(); 221 order.verify(mockMemberComms).sendMemberAcquired(eq(spySub)); 222 223 // Later phases not run 224 order.verify(spySub, never()).insideBarrier(); 225 order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub), eq(data)); 226 // error recovery path exercised 227 order.verify(spySub).cancel(anyString(), any()); 228 order.verify(spySub).cleanup(any()); 229 } 230 231 /** 232 * Fail correctly if coordinator aborts the procedure. The subprocedure will not interrupt a 233 * running {@link Subprocedure#acquireBarrier()} -- prepare needs to finish first, and the the abort 234 * is checked. Thus, the {@link Subprocedure#acquireBarrier()} should succeed but later get rolled back 235 * via {@link Subprocedure#cleanup}. 236 */ 237 @Test 238 public void testCoordinatorAbort() throws Exception { 239 buildCohortMemberPair(); 240 241 // mock that another node timed out or failed to prepare 242 final TimeoutException oate = new TimeoutException("bogus timeout", 1,2,0); 243 doAnswer( 244 new Answer<Void>() { 245 @Override 246 public Void answer(InvocationOnMock invocation) throws Throwable { 247 // inject a remote error (this would have come from an external thread) 248 spySub.cancel("bogus message", oate); 249 // sleep the wake frequency since that is what we promised 250 Thread.sleep(WAKE_FREQUENCY); 251 return null; 252 } 253 }).when(spySub).waitForReachedGlobalBarrier(); 254 255 // run the operation 256 // build a new operation 257 Subprocedure subproc = member.createSubprocedure(op, data); 258 member.submitSubprocedure(subproc); 259 // if the operation doesn't die properly, then this will timeout 260 member.closeAndWait(TIMEOUT); 261 262 // make sure everything ran in order 263 InOrder order = inOrder(mockMemberComms, spySub); 264 order.verify(spySub).acquireBarrier(); 265 order.verify(mockMemberComms).sendMemberAcquired(eq(spySub)); 266 // Later phases not run 267 order.verify(spySub, never()).insideBarrier(); 268 order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub), eq(data)); 269 // error recovery path exercised 270 order.verify(spySub).cancel(anyString(), any()); 271 order.verify(spySub).cleanup(any()); 272 } 273 274 /** 275 * Handle failures if a member's commit phase fails. 276 * 277 * NOTE: This is the core difference that makes this different from traditional 2PC. In true 278 * 2PC the transaction is committed just before the coordinator sends commit messages to the 279 * member. Members are then responsible for reading its TX log. This implementation actually 280 * rolls back, and thus breaks the normal TX guarantees. 281 */ 282 @Test 283 public void testMemberCommitException() throws Exception { 284 buildCohortMemberPair(); 285 286 // mock an exception on Subprocedure's prepare 287 doAnswer( 288 new Answer<Void>() { 289 @Override 290 public Void answer(InvocationOnMock invocation) throws Throwable { 291 throw new IOException("Forced IOException in member prepare"); 292 } 293 }).when(spySub).insideBarrier(); 294 295 // run the operation 296 // build a new operation 297 Subprocedure subproc = member.createSubprocedure(op, data); 298 member.submitSubprocedure(subproc); 299 // if the operation doesn't die properly, then this will timeout 300 member.closeAndWait(TIMEOUT); 301 302 // make sure everything ran in order 303 InOrder order = inOrder(mockMemberComms, spySub); 304 order.verify(spySub).acquireBarrier(); 305 order.verify(mockMemberComms).sendMemberAcquired(eq(spySub)); 306 order.verify(spySub).insideBarrier(); 307 308 // Later phases not run 309 order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub), eq(data)); 310 // error recovery path exercised 311 order.verify(spySub).cancel(anyString(), any()); 312 order.verify(spySub).cleanup(any()); 313 } 314 315 /** 316 * Handle Failures if a member's commit phase succeeds but notification to coordinator fails 317 * 318 * NOTE: This is the core difference that makes this different from traditional 2PC. In true 319 * 2PC the transaction is committed just before the coordinator sends commit messages to the 320 * member. Members are then responsible for reading its TX log. This implementation actually 321 * rolls back, and thus breaks the normal TX guarantees. 322 */ 323 @Test 324 public void testMemberCommitCommsFailure() throws Exception { 325 buildCohortMemberPair(); 326 final TimeoutException oate = new TimeoutException("bogus timeout",1,2,0); 327 doAnswer( 328 new Answer<Void>() { 329 @Override 330 public Void answer(InvocationOnMock invocation) throws Throwable { 331 // inject a remote error (this would have come from an external thread) 332 spySub.cancel("commit comms fail", oate); 333 // sleep the wake frequency since that is what we promised 334 Thread.sleep(WAKE_FREQUENCY); 335 return null; 336 } 337 }).when(mockMemberComms).sendMemberCompleted(any(), eq(data)); 338 339 // run the operation 340 // build a new operation 341 Subprocedure subproc = member.createSubprocedure(op, data); 342 member.submitSubprocedure(subproc); 343 // if the operation doesn't die properly, then this will timeout 344 member.closeAndWait(TIMEOUT); 345 346 // make sure everything ran in order 347 InOrder order = inOrder(mockMemberComms, spySub); 348 order.verify(spySub).acquireBarrier(); 349 order.verify(mockMemberComms).sendMemberAcquired(eq(spySub)); 350 order.verify(spySub).insideBarrier(); 351 order.verify(mockMemberComms).sendMemberCompleted(eq(spySub), eq(data)); 352 // error recovery path exercised 353 order.verify(spySub).cancel(anyString(), any()); 354 order.verify(spySub).cleanup(any()); 355 } 356 357 /** 358 * Fail correctly on getting an external error while waiting for the prepared latch 359 * @throws Exception on failure 360 */ 361 @Test 362 public void testPropagateConnectionErrorBackToManager() throws Exception { 363 // setup the operation 364 member = buildCohortMember(); 365 ProcedureMember memberSpy = spy(member); 366 367 // setup the commit and the spy 368 final ForeignExceptionDispatcher dispatcher = new ForeignExceptionDispatcher(); 369 ForeignExceptionDispatcher dispSpy = spy(dispatcher); 370 Subprocedure commit = new EmptySubprocedure(member, dispatcher); 371 Subprocedure spy = spy(commit); 372 when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spy); 373 374 // fail during the prepare phase 375 doThrow(new ForeignException("SRC", "prepare exception")).when(spy).acquireBarrier(); 376 // and throw a connection error when we try to tell the controller about it 377 doThrow(new IOException("Controller is down!")).when(mockMemberComms) 378 .sendMemberAborted(eq(spy), any()); 379 380 381 // run the operation 382 // build a new operation 383 Subprocedure subproc = memberSpy.createSubprocedure(op, data); 384 memberSpy.submitSubprocedure(subproc); 385 // if the operation doesn't die properly, then this will timeout 386 memberSpy.closeAndWait(TIMEOUT); 387 388 // make sure everything ran in order 389 InOrder order = inOrder(mockMemberComms, spy, dispSpy); 390 // make sure we acquire. 391 order.verify(spy).acquireBarrier(); 392 order.verify(mockMemberComms, never()).sendMemberAcquired(spy); 393 394 // TODO Need to do another refactor to get this to propagate to the coordinator. 395 // make sure we pass a remote exception back the controller 396// order.verify(mockMemberComms).sendMemberAborted(eq(spy), 397// any()); 398// order.verify(dispSpy).receiveError(anyString(), 399// any(), any()); 400 } 401 402 /** 403 * Test that the cohort member correctly doesn't attempt to start a task when the builder cannot 404 * correctly build a new task for the requested operation 405 * @throws Exception on failure 406 */ 407 @Test 408 public void testNoTaskToBeRunFromRequest() throws Exception { 409 ThreadPoolExecutor pool = mock(ThreadPoolExecutor.class); 410 when(mockBuilder.buildSubprocedure(op, data)).thenReturn(null) 411 .thenThrow(new IllegalStateException("Wrong state!"), new IllegalArgumentException("can't understand the args")); 412 member = new ProcedureMember(mockMemberComms, pool, mockBuilder); 413 // builder returns null 414 // build a new operation 415 Subprocedure subproc = member.createSubprocedure(op, data); 416 member.submitSubprocedure(subproc); 417 // throws an illegal state exception 418 try { 419 // build a new operation 420 Subprocedure subproc2 = member.createSubprocedure(op, data); 421 member.submitSubprocedure(subproc2); 422 } catch (IllegalStateException ise) { 423 } 424 // throws an illegal argument exception 425 try { 426 // build a new operation 427 Subprocedure subproc3 = member.createSubprocedure(op, data); 428 member.submitSubprocedure(subproc3); 429 } catch (IllegalArgumentException iae) { 430 } 431 432 // no request should reach the pool 433 verifyZeroInteractions(pool); 434 // get two abort requests 435 // TODO Need to do another refactor to get this to propagate to the coordinator. 436 // verify(mockMemberComms, times(2)).sendMemberAborted(any(), any()); 437 } 438 439 /** 440 * Helper {@link Procedure} who's phase for each step is just empty 441 */ 442 public class EmptySubprocedure extends SubprocedureImpl { 443 public EmptySubprocedure(ProcedureMember member, ForeignExceptionDispatcher dispatcher) { 444 super( member, op, dispatcher, 445 // TODO 1000000 is an arbitrary number that I picked. 446 WAKE_FREQUENCY, TIMEOUT); 447 } 448 } 449}