001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertTrue; 022import static org.mockito.ArgumentMatchers.any; 023import static org.mockito.ArgumentMatchers.anyList; 024import static org.mockito.ArgumentMatchers.eq; 025import static org.mockito.Mockito.atMost; 026import static org.mockito.Mockito.never; 027import static org.mockito.Mockito.spy; 028import static org.mockito.Mockito.when; 029 030import java.io.IOException; 031import java.util.ArrayList; 032import java.util.Arrays; 033import java.util.List; 034import java.util.concurrent.CountDownLatch; 035import java.util.concurrent.ThreadPoolExecutor; 036import java.util.concurrent.atomic.AtomicInteger; 037import org.apache.hadoop.hbase.Abortable; 038import org.apache.hadoop.hbase.HBaseTestingUtil; 039import org.apache.hadoop.hbase.errorhandling.ForeignException; 040import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 041import org.apache.hadoop.hbase.errorhandling.TimeoutException; 042import org.apache.hadoop.hbase.procedure.Subprocedure.SubprocedureImpl; 043import org.apache.hadoop.hbase.testclassification.MasterTests; 044import org.apache.hadoop.hbase.testclassification.MediumTests; 045import org.apache.hadoop.hbase.util.Pair; 046import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 047import org.junit.jupiter.api.AfterAll; 048import org.junit.jupiter.api.BeforeAll; 049import org.junit.jupiter.api.Tag; 050import org.junit.jupiter.api.Test; 051import org.mockito.Mockito; 052import org.mockito.internal.matchers.ArrayEquals; 053import org.mockito.invocation.InvocationOnMock; 054import org.mockito.stubbing.Answer; 055import org.mockito.verification.VerificationMode; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 060 061/** 062 * Cluster-wide testing of a distributed three-phase commit using a 'real' zookeeper cluster 063 */ 064@Tag(MasterTests.TAG) 065@Tag(MediumTests.TAG) 066public class TestZKProcedure { 067 068 private static final Logger LOG = LoggerFactory.getLogger(TestZKProcedure.class); 069 private static HBaseTestingUtil UTIL = new HBaseTestingUtil(); 070 private static final String COORDINATOR_NODE_NAME = "coordinator"; 071 private static final long KEEP_ALIVE = 100; // seconds 072 private static final int POOL_SIZE = 1; 073 private static final long TIMEOUT = 10000; // when debugging make this larger for debugging 074 private static final long WAKE_FREQUENCY = 500; 075 private static final String opName = "op"; 076 private static final byte[] data = new byte[] { 1, 2 }; // TODO what is this used for? 077 private static final VerificationMode once = Mockito.times(1); 078 079 @BeforeAll 080 public static void setupTest() throws Exception { 081 UTIL.startMiniZKCluster(); 082 } 083 084 @AfterAll 085 public static void cleanupTest() throws Exception { 086 UTIL.shutdownMiniZKCluster(); 087 } 088 089 private static ZKWatcher newZooKeeperWatcher() throws IOException { 090 return new ZKWatcher(UTIL.getConfiguration(), "testing utility", new Abortable() { 091 @Override 092 public void abort(String why, Throwable e) { 093 throw new RuntimeException("Unexpected abort in distributed three phase commit test:" + why, 094 e); 095 } 096 097 @Override 098 public boolean isAborted() { 099 return false; 100 } 101 }); 102 } 103 104 @Test 105 public void testEmptyMemberSet() throws Exception { 106 runCommit(); 107 } 108 109 @Test 110 public void testSingleMember() throws Exception { 111 runCommit("one"); 112 } 113 114 @Test 115 public void testMultipleMembers() throws Exception { 116 runCommit("one", "two", "three", "four"); 117 } 118 119 private void runCommit(String... members) throws Exception { 120 // make sure we just have an empty list 121 if (members == null) { 122 members = new String[0]; 123 } 124 List<String> expected = Arrays.asList(members); 125 126 // setup the constants 127 ZKWatcher coordZkw = newZooKeeperWatcher(); 128 String opDescription = "coordination test - " + members.length + " cohort members"; 129 130 // start running the controller 131 ZKProcedureCoordinator coordinatorComms = 132 new ZKProcedureCoordinator(coordZkw, opDescription, COORDINATOR_NODE_NAME); 133 ThreadPoolExecutor pool = 134 ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, POOL_SIZE, KEEP_ALIVE); 135 ProcedureCoordinator coordinator = new ProcedureCoordinator(coordinatorComms, pool) { 136 @Override 137 public Procedure createProcedure(ForeignExceptionDispatcher fed, String procName, 138 byte[] procArgs, List<String> expectedMembers) { 139 return Mockito.spy(super.createProcedure(fed, procName, procArgs, expectedMembers)); 140 } 141 }; 142 143 // build and start members 144 // NOTE: There is a single subprocedure builder for all members here. 145 SubprocedureFactory subprocFactory = Mockito.mock(SubprocedureFactory.class); 146 List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> procMembers = 147 new ArrayList<>(members.length); 148 // start each member 149 for (String member : members) { 150 ZKWatcher watcher = newZooKeeperWatcher(); 151 ZKProcedureMemberRpcs comms = new ZKProcedureMemberRpcs(watcher, opDescription); 152 ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(member, 1, KEEP_ALIVE); 153 ProcedureMember procMember = new ProcedureMember(comms, pool2, subprocFactory); 154 procMembers.add(new Pair<>(procMember, comms)); 155 comms.start(member, procMember); 156 } 157 158 // setup mock member subprocedures 159 final List<Subprocedure> subprocs = new ArrayList<>(); 160 for (int i = 0; i < procMembers.size(); i++) { 161 ForeignExceptionDispatcher cohortMonitor = new ForeignExceptionDispatcher(); 162 Subprocedure commit = Mockito.spy(new SubprocedureImpl(procMembers.get(i).getFirst(), opName, 163 cohortMonitor, WAKE_FREQUENCY, TIMEOUT)); 164 subprocs.add(commit); 165 } 166 167 // link subprocedure to buildNewOperation invocation. 168 final AtomicInteger i = new AtomicInteger(0); // NOTE: would be racy if not an AtomicInteger 169 Mockito.when(subprocFactory.buildSubprocedure(Mockito.eq(opName), 170 (byte[]) Mockito.argThat(new ArrayEquals(data)))).thenAnswer(new Answer<Subprocedure>() { 171 @Override 172 public Subprocedure answer(InvocationOnMock invocation) throws Throwable { 173 int index = i.getAndIncrement(); 174 LOG.debug("Task size:" + subprocs.size() + ", getting:" + index); 175 Subprocedure commit = subprocs.get(index); 176 return commit; 177 } 178 }); 179 180 // setup spying on the coordinator 181 // Procedure proc = Mockito.spy(procBuilder.createProcedure(coordinator, opName, data, 182 // expected)); 183 // Mockito.when(procBuilder.build(coordinator, opName, data, expected)).thenReturn(proc); 184 185 // start running the operation 186 Procedure task = 187 coordinator.startProcedure(new ForeignExceptionDispatcher(), opName, data, expected); 188 // assertEquals("Didn't mock coordinator task", proc, task); 189 190 // verify all things ran as expected 191 // waitAndVerifyProc(proc, once, once, never(), once, false); 192 waitAndVerifyProc(task, once, once, never(), once, false); 193 verifyCohortSuccessful(expected, subprocFactory, subprocs, once, once, never(), once, false); 194 195 // close all the things 196 closeAll(coordinator, coordinatorComms, procMembers); 197 } 198 199 /** 200 * Test a distributed commit with multiple cohort members, where one of the cohort members has a 201 * timeout exception during the prepare stage. 202 */ 203 @Test 204 public void testMultiCohortWithMemberTimeoutDuringPrepare() throws Exception { 205 String opDescription = "error injection coordination"; 206 String[] cohortMembers = new String[] { "one", "two", "three" }; 207 List<String> expected = Lists.newArrayList(cohortMembers); 208 // error constants 209 final int memberErrorIndex = 2; 210 final CountDownLatch coordinatorReceivedErrorLatch = new CountDownLatch(1); 211 212 // start running the coordinator and its controller 213 ZKWatcher coordinatorWatcher = newZooKeeperWatcher(); 214 ZKProcedureCoordinator coordinatorController = 215 new ZKProcedureCoordinator(coordinatorWatcher, opDescription, COORDINATOR_NODE_NAME); 216 ThreadPoolExecutor pool = 217 ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, POOL_SIZE, KEEP_ALIVE); 218 ProcedureCoordinator coordinator = spy(new ProcedureCoordinator(coordinatorController, pool)); 219 220 // start a member for each node 221 SubprocedureFactory subprocFactory = Mockito.mock(SubprocedureFactory.class); 222 List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> members = new ArrayList<>(expected.size()); 223 for (String member : expected) { 224 ZKWatcher watcher = newZooKeeperWatcher(); 225 ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(watcher, opDescription); 226 ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(member, 1, KEEP_ALIVE); 227 ProcedureMember mem = new ProcedureMember(controller, pool2, subprocFactory); 228 members.add(new Pair<>(mem, controller)); 229 controller.start(member, mem); 230 } 231 232 // setup mock subprocedures 233 final List<Subprocedure> cohortTasks = new ArrayList<>(); 234 final int[] elem = new int[1]; 235 for (int i = 0; i < members.size(); i++) { 236 ForeignExceptionDispatcher cohortMonitor = new ForeignExceptionDispatcher(); 237 final ProcedureMember comms = members.get(i).getFirst(); 238 Subprocedure commit = 239 Mockito.spy(new SubprocedureImpl(comms, opName, cohortMonitor, WAKE_FREQUENCY, TIMEOUT)); 240 // This nasty bit has one of the impls throw a TimeoutException 241 Mockito.doAnswer(new Answer<Void>() { 242 @Override 243 public Void answer(InvocationOnMock invocation) throws Throwable { 244 int index = elem[0]; 245 if (index == memberErrorIndex) { 246 LOG.debug("Sending error to coordinator"); 247 ForeignException remoteCause = 248 new ForeignException("TIMER", new TimeoutException("subprocTimeout", 1, 2, 0)); 249 Subprocedure r = ((Subprocedure) invocation.getMock()); 250 LOG.error("Remote commit failure, not propagating error:" + remoteCause); 251 comms.receiveAbortProcedure(r.getName(), remoteCause); 252 assertTrue(r.isComplete()); 253 // don't complete the error phase until the coordinator has gotten the error 254 // notification (which ensures that we never progress past prepare) 255 try { 256 Procedure.waitForLatch(coordinatorReceivedErrorLatch, 257 new ForeignExceptionDispatcher(), WAKE_FREQUENCY, "coordinator received error"); 258 } catch (InterruptedException e) { 259 LOG.debug("Wait for latch interrupted, done:" 260 + (coordinatorReceivedErrorLatch.getCount() == 0)); 261 // reset the interrupt status on the thread 262 Thread.currentThread().interrupt(); 263 } 264 } 265 elem[0] = ++index; 266 return null; 267 } 268 }).when(commit).acquireBarrier(); 269 cohortTasks.add(commit); 270 } 271 272 // pass out a task per member 273 final AtomicInteger taskIndex = new AtomicInteger(); 274 Mockito.when(subprocFactory.buildSubprocedure(Mockito.eq(opName), 275 (byte[]) Mockito.argThat(new ArrayEquals(data)))).thenAnswer(new Answer<Subprocedure>() { 276 @Override 277 public Subprocedure answer(InvocationOnMock invocation) throws Throwable { 278 int index = taskIndex.getAndIncrement(); 279 Subprocedure commit = cohortTasks.get(index); 280 return commit; 281 } 282 }); 283 284 // setup spying on the coordinator 285 ForeignExceptionDispatcher coordinatorTaskErrorMonitor = 286 Mockito.spy(new ForeignExceptionDispatcher()); 287 Procedure coordinatorTask = Mockito.spy(new Procedure(coordinator, coordinatorTaskErrorMonitor, 288 WAKE_FREQUENCY, TIMEOUT, opName, data, expected)); 289 when(coordinator.createProcedure(any(), eq(opName), eq(data), anyList())) 290 .thenReturn(coordinatorTask); 291 // count down the error latch when we get the remote error 292 Mockito.doAnswer(new Answer<Void>() { 293 @Override 294 public Void answer(InvocationOnMock invocation) throws Throwable { 295 // pass on the error to the master 296 invocation.callRealMethod(); 297 // then count down the got error latch 298 coordinatorReceivedErrorLatch.countDown(); 299 return null; 300 } 301 }).when(coordinatorTask).receive(Mockito.any()); 302 303 // ---------------------------- 304 // start running the operation 305 // ---------------------------- 306 307 Procedure task = 308 coordinator.startProcedure(coordinatorTaskErrorMonitor, opName, data, expected); 309 assertEquals(coordinatorTask, task, "Didn't mock coordinator task"); 310 311 // wait for the task to complete 312 try { 313 task.waitForCompleted(); 314 } catch (ForeignException fe) { 315 // this may get caught or may not 316 } 317 318 // ------------- 319 // verification 320 // ------------- 321 322 // always expect prepared, never committed, and possible to have cleanup and finish (racy since 323 // error case) 324 waitAndVerifyProc(coordinatorTask, once, never(), once, atMost(1), true); 325 verifyCohortSuccessful(expected, subprocFactory, cohortTasks, once, never(), once, once, true); 326 327 // close all the open things 328 closeAll(coordinator, coordinatorController, members); 329 } 330 331 /** 332 * Wait for the coordinator task to complete, and verify all the mocks 333 * @param proc the {@link Procedure} to execute 334 * @param prepare the mock prepare 335 * @param commit the mock commit 336 * @param cleanup the mock cleanup 337 * @param finish the mock finish 338 * @param opHasError the operation error state 339 * @throws Exception on unexpected failure 340 */ 341 private void waitAndVerifyProc(Procedure proc, VerificationMode prepare, VerificationMode commit, 342 VerificationMode cleanup, VerificationMode finish, boolean opHasError) throws Exception { 343 boolean caughtError = false; 344 try { 345 proc.waitForCompleted(); 346 } catch (ForeignException fe) { 347 caughtError = true; 348 } 349 // make sure that the task called all the expected phases 350 Mockito.verify(proc, prepare).sendGlobalBarrierStart(); 351 Mockito.verify(proc, commit).sendGlobalBarrierReached(); 352 Mockito.verify(proc, finish).sendGlobalBarrierComplete(); 353 assertEquals(opHasError, proc.getErrorMonitor().hasException(), 354 "Operation error state was unexpected"); 355 assertEquals(opHasError, caughtError, "Operation error state was unexpected"); 356 357 } 358 359 /** 360 * Wait for the coordinator task to complete, and verify all the mocks 361 * @param op the {@link Subprocedure} to use 362 * @param prepare the mock prepare 363 * @param commit the mock commit 364 * @param cleanup the mock cleanup 365 * @param finish the mock finish 366 * @param opHasError the operation error state 367 * @throws Exception on unexpected failure 368 */ 369 private void waitAndVerifySubproc(Subprocedure op, VerificationMode prepare, 370 VerificationMode commit, VerificationMode cleanup, VerificationMode finish, boolean opHasError) 371 throws Exception { 372 boolean caughtError = false; 373 try { 374 op.waitForLocallyCompleted(); 375 } catch (ForeignException fe) { 376 caughtError = true; 377 } 378 // make sure that the task called all the expected phases 379 Mockito.verify(op, prepare).acquireBarrier(); 380 Mockito.verify(op, commit).insideBarrier(); 381 // We cannot guarantee that cleanup has run so we don't check it. 382 383 assertEquals(opHasError, op.getErrorCheckable().hasException(), 384 "Operation error state was unexpected"); 385 assertEquals(opHasError, caughtError, "Operation error state was unexpected"); 386 387 } 388 389 private void verifyCohortSuccessful(List<String> cohortNames, SubprocedureFactory subprocFactory, 390 Iterable<Subprocedure> cohortTasks, VerificationMode prepare, VerificationMode commit, 391 VerificationMode cleanup, VerificationMode finish, boolean opHasError) throws Exception { 392 393 // make sure we build the correct number of cohort members 394 Mockito.verify(subprocFactory, Mockito.times(cohortNames.size())) 395 .buildSubprocedure(Mockito.eq(opName), (byte[]) Mockito.argThat(new ArrayEquals(data))); 396 // verify that we ran each of the operations cleanly 397 int j = 0; 398 for (Subprocedure op : cohortTasks) { 399 LOG.debug("Checking mock:" + (j++)); 400 waitAndVerifySubproc(op, prepare, commit, cleanup, finish, opHasError); 401 } 402 } 403 404 private void closeAll(ProcedureCoordinator coordinator, 405 ZKProcedureCoordinator coordinatorController, 406 List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> cohort) throws IOException { 407 // make sure we close all the resources 408 for (Pair<ProcedureMember, ZKProcedureMemberRpcs> member : cohort) { 409 member.getFirst().close(); 410 member.getSecond().close(); 411 } 412 coordinator.close(); 413 coordinatorController.close(); 414 } 415}