001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022import static org.mockito.Matchers.any; 023import static org.mockito.Matchers.anyListOf; 024import static org.mockito.Matchers.eq; 025import static org.mockito.Mockito.atMost; 026import static org.mockito.Mockito.never; 027import static org.mockito.Mockito.spy; 028import static org.mockito.Mockito.when; 029 030import java.io.IOException; 031import java.util.ArrayList; 032import java.util.Arrays; 033import java.util.List; 034import java.util.concurrent.CountDownLatch; 035import java.util.concurrent.ThreadPoolExecutor; 036import java.util.concurrent.atomic.AtomicInteger; 037import org.apache.hadoop.hbase.Abortable; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.HBaseTestingUtility; 040import org.apache.hadoop.hbase.errorhandling.ForeignException; 041import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 042import org.apache.hadoop.hbase.errorhandling.TimeoutException; 043import org.apache.hadoop.hbase.procedure.Subprocedure.SubprocedureImpl; 044import org.apache.hadoop.hbase.testclassification.MasterTests; 045import org.apache.hadoop.hbase.testclassification.MediumTests; 046import org.apache.hadoop.hbase.util.Pair; 047import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 048import org.junit.AfterClass; 049import org.junit.BeforeClass; 050import org.junit.ClassRule; 051import org.junit.Test; 052import org.junit.experimental.categories.Category; 053import org.mockito.Mockito; 054import org.mockito.internal.matchers.ArrayEquals; 055import org.mockito.invocation.InvocationOnMock; 056import org.mockito.stubbing.Answer; 057import org.mockito.verification.VerificationMode; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 062 063/** 064 * Cluster-wide testing of a distributed three-phase commit using a 'real' zookeeper cluster 065 */ 066@Category({MasterTests.class, MediumTests.class}) 067public class TestZKProcedure { 068 069 @ClassRule 070 public static final HBaseClassTestRule CLASS_RULE = 071 HBaseClassTestRule.forClass(TestZKProcedure.class); 072 073 private static final Logger LOG = LoggerFactory.getLogger(TestZKProcedure.class); 074 private static HBaseTestingUtility UTIL = new HBaseTestingUtility(); 075 private static final String COORDINATOR_NODE_NAME = "coordinator"; 076 private static final long KEEP_ALIVE = 100; // seconds 077 private static final int POOL_SIZE = 1; 078 private static final long TIMEOUT = 10000; // when debugging make this larger for debugging 079 private static final long WAKE_FREQUENCY = 500; 080 private static final String opName = "op"; 081 private static final byte[] data = new byte[] { 1, 2 }; // TODO what is this used for? 082 private static final VerificationMode once = Mockito.times(1); 083 084 @BeforeClass 085 public static void setupTest() throws Exception { 086 UTIL.startMiniZKCluster(); 087 } 088 089 @AfterClass 090 public static void cleanupTest() throws Exception { 091 UTIL.shutdownMiniZKCluster(); 092 } 093 094 private static ZKWatcher newZooKeeperWatcher() throws IOException { 095 return new ZKWatcher(UTIL.getConfiguration(), "testing utility", new Abortable() { 096 @Override 097 public void abort(String why, Throwable e) { 098 throw new RuntimeException( 099 "Unexpected abort in distributed three phase commit test:" + why, e); 100 } 101 102 @Override 103 public boolean isAborted() { 104 return false; 105 } 106 }); 107 } 108 109 @Test 110 public void testEmptyMemberSet() throws Exception { 111 runCommit(); 112 } 113 114 @Test 115 public void testSingleMember() throws Exception { 116 runCommit("one"); 117 } 118 119 @Test 120 public void testMultipleMembers() throws Exception { 121 runCommit("one", "two", "three", "four" ); 122 } 123 124 private void runCommit(String... members) throws Exception { 125 // make sure we just have an empty list 126 if (members == null) { 127 members = new String[0]; 128 } 129 List<String> expected = Arrays.asList(members); 130 131 // setup the constants 132 ZKWatcher coordZkw = newZooKeeperWatcher(); 133 String opDescription = "coordination test - " + members.length + " cohort members"; 134 135 // start running the controller 136 ZKProcedureCoordinator coordinatorComms = new ZKProcedureCoordinator( 137 coordZkw, opDescription, COORDINATOR_NODE_NAME); 138 ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, POOL_SIZE, KEEP_ALIVE); 139 ProcedureCoordinator coordinator = new ProcedureCoordinator(coordinatorComms, pool) { 140 @Override 141 public Procedure createProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs, 142 List<String> expectedMembers) { 143 return Mockito.spy(super.createProcedure(fed, procName, procArgs, expectedMembers)); 144 } 145 }; 146 147 // build and start members 148 // NOTE: There is a single subprocedure builder for all members here. 149 SubprocedureFactory subprocFactory = Mockito.mock(SubprocedureFactory.class); 150 List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> procMembers = new ArrayList<>(members.length); 151 // start each member 152 for (String member : members) { 153 ZKWatcher watcher = newZooKeeperWatcher(); 154 ZKProcedureMemberRpcs comms = new ZKProcedureMemberRpcs(watcher, opDescription); 155 ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(member, 1, KEEP_ALIVE); 156 ProcedureMember procMember = new ProcedureMember(comms, pool2, subprocFactory); 157 procMembers.add(new Pair<>(procMember, comms)); 158 comms.start(member, procMember); 159 } 160 161 // setup mock member subprocedures 162 final List<Subprocedure> subprocs = new ArrayList<>(); 163 for (int i = 0; i < procMembers.size(); i++) { 164 ForeignExceptionDispatcher cohortMonitor = new ForeignExceptionDispatcher(); 165 Subprocedure commit = Mockito 166 .spy(new SubprocedureImpl(procMembers.get(i).getFirst(), opName, cohortMonitor, 167 WAKE_FREQUENCY, TIMEOUT)); 168 subprocs.add(commit); 169 } 170 171 // link subprocedure to buildNewOperation invocation. 172 final AtomicInteger i = new AtomicInteger(0); // NOTE: would be racy if not an AtomicInteger 173 Mockito.when(subprocFactory.buildSubprocedure(Mockito.eq(opName), 174 (byte[]) Mockito.argThat(new ArrayEquals(data)))).thenAnswer( 175 new Answer<Subprocedure>() { 176 @Override 177 public Subprocedure answer(InvocationOnMock invocation) throws Throwable { 178 int index = i.getAndIncrement(); 179 LOG.debug("Task size:" + subprocs.size() + ", getting:" + index); 180 Subprocedure commit = subprocs.get(index); 181 return commit; 182 } 183 }); 184 185 // setup spying on the coordinator 186// Procedure proc = Mockito.spy(procBuilder.createProcedure(coordinator, opName, data, expected)); 187// Mockito.when(procBuilder.build(coordinator, opName, data, expected)).thenReturn(proc); 188 189 // start running the operation 190 Procedure task = coordinator.startProcedure(new ForeignExceptionDispatcher(), opName, data, expected); 191// assertEquals("Didn't mock coordinator task", proc, task); 192 193 // verify all things ran as expected 194// waitAndVerifyProc(proc, once, once, never(), once, false); 195 waitAndVerifyProc(task, once, once, never(), once, false); 196 verifyCohortSuccessful(expected, subprocFactory, subprocs, once, once, never(), once, false); 197 198 // close all the things 199 closeAll(coordinator, coordinatorComms, procMembers); 200 } 201 202 /** 203 * Test a distributed commit with multiple cohort members, where one of the cohort members has a 204 * timeout exception during the prepare stage. 205 */ 206 @Test 207 public void testMultiCohortWithMemberTimeoutDuringPrepare() throws Exception { 208 String opDescription = "error injection coordination"; 209 String[] cohortMembers = new String[] { "one", "two", "three" }; 210 List<String> expected = Lists.newArrayList(cohortMembers); 211 // error constants 212 final int memberErrorIndex = 2; 213 final CountDownLatch coordinatorReceivedErrorLatch = new CountDownLatch(1); 214 215 // start running the coordinator and its controller 216 ZKWatcher coordinatorWatcher = newZooKeeperWatcher(); 217 ZKProcedureCoordinator coordinatorController = new ZKProcedureCoordinator( 218 coordinatorWatcher, opDescription, COORDINATOR_NODE_NAME); 219 ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, POOL_SIZE, KEEP_ALIVE); 220 ProcedureCoordinator coordinator = spy(new ProcedureCoordinator(coordinatorController, pool)); 221 222 // start a member for each node 223 SubprocedureFactory subprocFactory = Mockito.mock(SubprocedureFactory.class); 224 List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> members = new ArrayList<>(expected.size()); 225 for (String member : expected) { 226 ZKWatcher watcher = newZooKeeperWatcher(); 227 ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(watcher, opDescription); 228 ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(member, 1, KEEP_ALIVE); 229 ProcedureMember mem = new ProcedureMember(controller, pool2, subprocFactory); 230 members.add(new Pair<>(mem, controller)); 231 controller.start(member, mem); 232 } 233 234 // setup mock subprocedures 235 final List<Subprocedure> cohortTasks = new ArrayList<>(); 236 final int[] elem = new int[1]; 237 for (int i = 0; i < members.size(); i++) { 238 ForeignExceptionDispatcher cohortMonitor = new ForeignExceptionDispatcher(); 239 final ProcedureMember comms = members.get(i).getFirst(); 240 Subprocedure commit = Mockito 241 .spy(new SubprocedureImpl(comms, opName, cohortMonitor, WAKE_FREQUENCY, TIMEOUT)); 242 // This nasty bit has one of the impls throw a TimeoutException 243 Mockito.doAnswer(new Answer<Void>() { 244 @Override 245 public Void answer(InvocationOnMock invocation) throws Throwable { 246 int index = elem[0]; 247 if (index == memberErrorIndex) { 248 LOG.debug("Sending error to coordinator"); 249 ForeignException remoteCause = new ForeignException("TIMER", 250 new TimeoutException("subprocTimeout" , 1, 2, 0)); 251 Subprocedure r = ((Subprocedure) invocation.getMock()); 252 LOG.error("Remote commit failure, not propagating error:" + remoteCause); 253 comms.receiveAbortProcedure(r.getName(), remoteCause); 254 assertTrue(r.isComplete()); 255 // don't complete the error phase until the coordinator has gotten the error 256 // notification (which ensures that we never progress past prepare) 257 try { 258 Procedure.waitForLatch(coordinatorReceivedErrorLatch, new ForeignExceptionDispatcher(), 259 WAKE_FREQUENCY, "coordinator received error"); 260 } catch (InterruptedException e) { 261 LOG.debug("Wait for latch interrupted, done:" + (coordinatorReceivedErrorLatch.getCount() == 0)); 262 // reset the interrupt status on the thread 263 Thread.currentThread().interrupt(); 264 } 265 } 266 elem[0] = ++index; 267 return null; 268 } 269 }).when(commit).acquireBarrier(); 270 cohortTasks.add(commit); 271 } 272 273 // pass out a task per member 274 final AtomicInteger taskIndex = new AtomicInteger(); 275 Mockito.when( 276 subprocFactory.buildSubprocedure(Mockito.eq(opName), 277 (byte[]) Mockito.argThat(new ArrayEquals(data)))).thenAnswer( 278 new Answer<Subprocedure>() { 279 @Override 280 public Subprocedure answer(InvocationOnMock invocation) throws Throwable { 281 int index = taskIndex.getAndIncrement(); 282 Subprocedure commit = cohortTasks.get(index); 283 return commit; 284 } 285 }); 286 287 // setup spying on the coordinator 288 ForeignExceptionDispatcher coordinatorTaskErrorMonitor = Mockito 289 .spy(new ForeignExceptionDispatcher()); 290 Procedure coordinatorTask = Mockito.spy(new Procedure(coordinator, 291 coordinatorTaskErrorMonitor, WAKE_FREQUENCY, TIMEOUT, 292 opName, data, expected)); 293 when(coordinator.createProcedure(any(), eq(opName), eq(data), anyListOf(String.class))) 294 .thenReturn(coordinatorTask); 295 // count down the error latch when we get the remote error 296 Mockito.doAnswer(new Answer<Void>() { 297 @Override 298 public Void answer(InvocationOnMock invocation) throws Throwable { 299 // pass on the error to the master 300 invocation.callRealMethod(); 301 // then count down the got error latch 302 coordinatorReceivedErrorLatch.countDown(); 303 return null; 304 } 305 }).when(coordinatorTask).receive(Mockito.any()); 306 307 // ---------------------------- 308 // start running the operation 309 // ---------------------------- 310 311 Procedure task = coordinator.startProcedure(coordinatorTaskErrorMonitor, opName, data, expected); 312 assertEquals("Didn't mock coordinator task", coordinatorTask, task); 313 314 // wait for the task to complete 315 try { 316 task.waitForCompleted(); 317 } catch (ForeignException fe) { 318 // this may get caught or may not 319 } 320 321 // ------------- 322 // verification 323 // ------------- 324 325 // always expect prepared, never committed, and possible to have cleanup and finish (racy since 326 // error case) 327 waitAndVerifyProc(coordinatorTask, once, never(), once, atMost(1), true); 328 verifyCohortSuccessful(expected, subprocFactory, cohortTasks, once, never(), once, 329 once, true); 330 331 // close all the open things 332 closeAll(coordinator, coordinatorController, members); 333 } 334 335 /** 336 * Wait for the coordinator task to complete, and verify all the mocks 337 * @param proc the {@link Procedure} to execute 338 * @param prepare the mock prepare 339 * @param commit the mock commit 340 * @param cleanup the mock cleanup 341 * @param finish the mock finish 342 * @param opHasError the operation error state 343 * @throws Exception on unexpected failure 344 */ 345 private void waitAndVerifyProc(Procedure proc, VerificationMode prepare, 346 VerificationMode commit, VerificationMode cleanup, VerificationMode finish, boolean opHasError) 347 throws Exception { 348 boolean caughtError = false; 349 try { 350 proc.waitForCompleted(); 351 } catch (ForeignException fe) { 352 caughtError = true; 353 } 354 // make sure that the task called all the expected phases 355 Mockito.verify(proc, prepare).sendGlobalBarrierStart(); 356 Mockito.verify(proc, commit).sendGlobalBarrierReached(); 357 Mockito.verify(proc, finish).sendGlobalBarrierComplete(); 358 assertEquals("Operation error state was unexpected", opHasError, proc.getErrorMonitor() 359 .hasException()); 360 assertEquals("Operation error state was unexpected", opHasError, caughtError); 361 362 } 363 364 /** 365 * Wait for the coordinator task to complete, and verify all the mocks 366 * @param op the {@link Subprocedure} to use 367 * @param prepare the mock prepare 368 * @param commit the mock commit 369 * @param cleanup the mock cleanup 370 * @param finish the mock finish 371 * @param opHasError the operation error state 372 * @throws Exception on unexpected failure 373 */ 374 private void waitAndVerifySubproc(Subprocedure op, VerificationMode prepare, 375 VerificationMode commit, VerificationMode cleanup, VerificationMode finish, boolean opHasError) 376 throws Exception { 377 boolean caughtError = false; 378 try { 379 op.waitForLocallyCompleted(); 380 } catch (ForeignException fe) { 381 caughtError = true; 382 } 383 // make sure that the task called all the expected phases 384 Mockito.verify(op, prepare).acquireBarrier(); 385 Mockito.verify(op, commit).insideBarrier(); 386 // We cannot guarantee that cleanup has run so we don't check it. 387 388 assertEquals("Operation error state was unexpected", opHasError, op.getErrorCheckable() 389 .hasException()); 390 assertEquals("Operation error state was unexpected", opHasError, caughtError); 391 392 } 393 394 private void verifyCohortSuccessful(List<String> cohortNames, 395 SubprocedureFactory subprocFactory, Iterable<Subprocedure> cohortTasks, 396 VerificationMode prepare, VerificationMode commit, VerificationMode cleanup, 397 VerificationMode finish, boolean opHasError) throws Exception { 398 399 // make sure we build the correct number of cohort members 400 Mockito.verify(subprocFactory, Mockito.times(cohortNames.size())).buildSubprocedure( 401 Mockito.eq(opName), (byte[]) Mockito.argThat(new ArrayEquals(data))); 402 // verify that we ran each of the operations cleanly 403 int j = 0; 404 for (Subprocedure op : cohortTasks) { 405 LOG.debug("Checking mock:" + (j++)); 406 waitAndVerifySubproc(op, prepare, commit, cleanup, finish, opHasError); 407 } 408 } 409 410 private void closeAll( 411 ProcedureCoordinator coordinator, 412 ZKProcedureCoordinator coordinatorController, 413 List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> cohort) 414 throws IOException { 415 // make sure we close all the resources 416 for (Pair<ProcedureMember, ZKProcedureMemberRpcs> member : cohort) { 417 member.getFirst().close(); 418 member.getSecond().close(); 419 } 420 coordinator.close(); 421 coordinatorController.close(); 422 } 423}