001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure; 019 020import static org.junit.jupiter.api.Assertions.assertArrayEquals; 021import static org.junit.jupiter.api.Assertions.assertEquals; 022import static org.mockito.Mockito.never; 023import static org.mockito.Mockito.spy; 024import static org.mockito.Mockito.times; 025import static org.mockito.Mockito.verify; 026 027import java.util.ArrayList; 028import java.util.List; 029import java.util.concurrent.CountDownLatch; 030import org.apache.hadoop.hbase.HBaseTestingUtil; 031import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 032import org.apache.hadoop.hbase.testclassification.MasterTests; 033import org.apache.hadoop.hbase.testclassification.MediumTests; 034import org.apache.hadoop.hbase.util.Bytes; 035import org.apache.hadoop.hbase.util.Pair; 036import org.apache.hadoop.hbase.zookeeper.ZKUtil; 037import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 038import org.junit.jupiter.api.AfterAll; 039import org.junit.jupiter.api.BeforeAll; 040import org.junit.jupiter.api.Tag; 041import org.junit.jupiter.api.Test; 042import org.mockito.Mockito; 043import org.mockito.invocation.InvocationOnMock; 044import org.mockito.stubbing.Answer; 045import org.mockito.verification.VerificationMode; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 050 051import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 052 053/** 054 * Test zookeeper-based, procedure controllers 055 */ 056@Tag(MasterTests.TAG) 057@Tag(MediumTests.TAG) 058public class TestZKProcedureControllers { 059 060 private static final Logger LOG = LoggerFactory.getLogger(TestZKProcedureControllers.class); 061 private final static HBaseTestingUtil UTIL = new HBaseTestingUtil(); 062 private static final String COHORT_NODE_NAME = "expected"; 063 private static final String CONTROLLER_NODE_NAME = "controller"; 064 private static final VerificationMode once = Mockito.times(1); 065 066 private final byte[] memberData = Bytes.toBytes("data from member"); 067 068 @BeforeAll 069 public static void setupTest() throws Exception { 070 UTIL.startMiniZKCluster(); 071 } 072 073 @AfterAll 074 public static void cleanupTest() throws Exception { 075 UTIL.shutdownMiniZKCluster(); 076 } 077 078 /** 079 * Smaller test to just test the actuation on the cohort member 080 * @throws Exception on failure 081 */ 082 @Test 083 public void testSimpleZKCohortMemberController() throws Exception { 084 ZKWatcher watcher = UTIL.getZooKeeperWatcher(); 085 final String operationName = "instanceTest"; 086 087 final Subprocedure sub = Mockito.mock(Subprocedure.class); 088 Mockito.when(sub.getName()).thenReturn(operationName); 089 090 final byte[] data = new byte[] { 1, 2, 3 }; 091 final CountDownLatch prepared = new CountDownLatch(1); 092 final CountDownLatch committed = new CountDownLatch(1); 093 094 final ForeignExceptionDispatcher monitor = spy(new ForeignExceptionDispatcher()); 095 final ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(watcher, "testSimple"); 096 097 // mock out cohort member callbacks 098 final ProcedureMember member = Mockito.mock(ProcedureMember.class); 099 Mockito.doReturn(sub).when(member).createSubprocedure(operationName, data); 100 Mockito.doAnswer(new Answer<Void>() { 101 @Override 102 public Void answer(InvocationOnMock invocation) throws Throwable { 103 controller.sendMemberAcquired(sub); 104 prepared.countDown(); 105 return null; 106 } 107 }).when(member).submitSubprocedure(sub); 108 Mockito.doAnswer(new Answer<Void>() { 109 @Override 110 public Void answer(InvocationOnMock invocation) throws Throwable { 111 controller.sendMemberCompleted(sub, memberData); 112 committed.countDown(); 113 return null; 114 } 115 }).when(member).receivedReachedGlobalBarrier(operationName); 116 117 // start running the listener 118 controller.start(COHORT_NODE_NAME, member); 119 120 // set a prepare node from a 'coordinator' 121 String prepare = 122 ZKProcedureUtil.getAcquireBarrierNode(controller.getZkController(), operationName); 123 ZKUtil.createSetData(watcher, prepare, ProtobufUtil.prependPBMagic(data)); 124 // wait for the operation to be prepared 125 prepared.await(); 126 127 // create the commit node so we update the operation to enter the commit phase 128 String commit = 129 ZKProcedureUtil.getReachedBarrierNode(controller.getZkController(), operationName); 130 LOG.debug("Found prepared, posting commit node:" + commit); 131 ZKUtil.createAndFailSilent(watcher, commit); 132 LOG.debug("Commit node:" + commit + ", exists:" + ZKUtil.checkExists(watcher, commit)); 133 committed.await(); 134 135 verify(monitor, never()).receive(Mockito.any()); 136 // XXX: broken due to composition. 137 // verify(member, never()).getManager().controllerConnectionFailure(Mockito.anyString(), 138 // Mockito.any()); 139 // cleanup after the test 140 ZKUtil.deleteNodeRecursively(watcher, controller.getZkController().getBaseZnode()); 141 assertEquals(-1, ZKUtil.checkExists(watcher, prepare), "Didn't delete prepare node"); 142 assertEquals(-1, ZKUtil.checkExists(watcher, commit), "Didn't delete commit node"); 143 } 144 145 @Test 146 public void testZKCoordinatorControllerWithNoCohort() throws Exception { 147 final String operationName = "no cohort controller test"; 148 final byte[] data = new byte[] { 1, 2, 3 }; 149 150 runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data); 151 runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data); 152 } 153 154 @Test 155 public void testZKCoordinatorControllerWithSingleMemberCohort() throws Exception { 156 final String operationName = "single member controller test"; 157 final byte[] data = new byte[] { 1, 2, 3 }; 158 159 runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data, "cohort"); 160 runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data, "cohort"); 161 } 162 163 @Test 164 public void testZKCoordinatorControllerMultipleCohort() throws Exception { 165 final String operationName = "multi member controller test"; 166 final byte[] data = new byte[] { 1, 2, 3 }; 167 168 runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data, "cohort", 169 "cohort2", "cohort3"); 170 runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data, "cohort", 171 "cohort2", "cohort3"); 172 } 173 174 private void runMockCommitWithOrchestratedControllers(StartControllers controllers, 175 String operationName, byte[] data, String... cohort) throws Exception { 176 ZKWatcher watcher = UTIL.getZooKeeperWatcher(); 177 List<String> expected = Lists.newArrayList(cohort); 178 179 final Subprocedure sub = Mockito.mock(Subprocedure.class); 180 Mockito.when(sub.getName()).thenReturn(operationName); 181 182 CountDownLatch prepared = new CountDownLatch(expected.size()); 183 CountDownLatch committed = new CountDownLatch(expected.size()); 184 ArrayList<byte[]> dataFromMembers = new ArrayList<>(); 185 186 // mock out coordinator so we can keep track of zk progress 187 ProcedureCoordinator coordinator = 188 setupMockCoordinator(operationName, prepared, committed, dataFromMembers); 189 190 ProcedureMember member = Mockito.mock(ProcedureMember.class); 191 192 Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> pair = controllers.start(watcher, 193 operationName, coordinator, CONTROLLER_NODE_NAME, member, expected); 194 ZKProcedureCoordinator controller = pair.getFirst(); 195 List<ZKProcedureMemberRpcs> cohortControllers = pair.getSecond(); 196 // start the operation 197 Procedure p = Mockito.mock(Procedure.class); 198 Mockito.when(p.getName()).thenReturn(operationName); 199 200 controller.sendGlobalBarrierAcquire(p, data, expected); 201 202 // post the prepare node for each expected node 203 for (ZKProcedureMemberRpcs cc : cohortControllers) { 204 cc.sendMemberAcquired(sub); 205 } 206 207 // wait for all the notifications to reach the coordinator 208 prepared.await(); 209 // make sure we got the all the nodes and no more 210 Mockito.verify(coordinator, times(expected.size())) 211 .memberAcquiredBarrier(Mockito.eq(operationName), Mockito.anyString()); 212 213 // kick off the commit phase 214 controller.sendGlobalBarrierReached(p, expected); 215 216 // post the committed node for each expected node 217 for (ZKProcedureMemberRpcs cc : cohortControllers) { 218 cc.sendMemberCompleted(sub, memberData); 219 } 220 221 // wait for all commit notifications to reach the coordinator 222 committed.await(); 223 // make sure we got the all the nodes and no more 224 Mockito.verify(coordinator, times(expected.size())).memberFinishedBarrier( 225 Mockito.eq(operationName), Mockito.anyString(), Mockito.eq(memberData)); 226 227 assertEquals(expected.size(), dataFromMembers.size(), 228 "Incorrect number of members returnd data"); 229 for (byte[] result : dataFromMembers) { 230 assertArrayEquals(memberData, result, "Incorrect data from member"); 231 } 232 233 controller.resetMembers(p); 234 235 // verify all behavior 236 verifyZooKeeperClean(operationName, watcher, controller.getZkProcedureUtil()); 237 verifyCohort(member, cohortControllers.size(), operationName, data); 238 verifyCoordinator(operationName, coordinator, expected); 239 } 240 241 // TODO Broken by composition. 242 // @Test 243 // public void testCoordinatorControllerHandlesEarlyPrepareNodes() throws Exception { 244 // runEarlyPrepareNodes(startCoordinatorFirst, "testEarlyPreparenodes", new byte[] { 1, 2, 3 }, 245 // "cohort1", "cohort2"); 246 // runEarlyPrepareNodes(startCohortFirst, "testEarlyPreparenodes", new byte[] { 1, 2, 3 }, 247 // "cohort1", "cohort2"); 248 // } 249 250 public void runEarlyPrepareNodes(StartControllers controllers, String operationName, byte[] data, 251 String... cohort) throws Exception { 252 ZKWatcher watcher = UTIL.getZooKeeperWatcher(); 253 List<String> expected = Lists.newArrayList(cohort); 254 255 final Subprocedure sub = Mockito.mock(Subprocedure.class); 256 Mockito.when(sub.getName()).thenReturn(operationName); 257 258 final CountDownLatch prepared = new CountDownLatch(expected.size()); 259 final CountDownLatch committed = new CountDownLatch(expected.size()); 260 ArrayList<byte[]> dataFromMembers = new ArrayList<>(); 261 262 // mock out coordinator so we can keep track of zk progress 263 ProcedureCoordinator coordinator = 264 setupMockCoordinator(operationName, prepared, committed, dataFromMembers); 265 266 ProcedureMember member = Mockito.mock(ProcedureMember.class); 267 Procedure p = Mockito.mock(Procedure.class); 268 Mockito.when(p.getName()).thenReturn(operationName); 269 270 Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> pair = controllers.start(watcher, 271 operationName, coordinator, CONTROLLER_NODE_NAME, member, expected); 272 ZKProcedureCoordinator controller = pair.getFirst(); 273 List<ZKProcedureMemberRpcs> cohortControllers = pair.getSecond(); 274 275 // post 1/2 the prepare nodes early 276 for (int i = 0; i < cohortControllers.size() / 2; i++) { 277 cohortControllers.get(i).sendMemberAcquired(sub); 278 } 279 280 // start the operation 281 controller.sendGlobalBarrierAcquire(p, data, expected); 282 283 // post the prepare node for each expected node 284 for (ZKProcedureMemberRpcs cc : cohortControllers) { 285 cc.sendMemberAcquired(sub); 286 } 287 288 // wait for all the notifications to reach the coordinator 289 prepared.await(); 290 // make sure we got the all the nodes and no more 291 Mockito.verify(coordinator, times(expected.size())) 292 .memberAcquiredBarrier(Mockito.eq(operationName), Mockito.anyString()); 293 294 // kick off the commit phase 295 controller.sendGlobalBarrierReached(p, expected); 296 297 // post the committed node for each expected node 298 for (ZKProcedureMemberRpcs cc : cohortControllers) { 299 cc.sendMemberCompleted(sub, memberData); 300 } 301 302 // wait for all commit notifications to reach the coordiantor 303 committed.await(); 304 // make sure we got the all the nodes and no more 305 Mockito.verify(coordinator, times(expected.size())).memberFinishedBarrier( 306 Mockito.eq(operationName), Mockito.anyString(), Mockito.eq(memberData)); 307 308 controller.resetMembers(p); 309 310 // verify all behavior 311 verifyZooKeeperClean(operationName, watcher, controller.getZkProcedureUtil()); 312 verifyCohort(member, cohortControllers.size(), operationName, data); 313 verifyCoordinator(operationName, coordinator, expected); 314 } 315 316 /** 317 * @return a mock {@link ProcedureCoordinator} that just counts down the prepared and committed 318 * latch for called to the respective method 319 */ 320 private ProcedureCoordinator setupMockCoordinator(String operationName, 321 final CountDownLatch prepared, final CountDownLatch committed, 322 final ArrayList<byte[]> dataFromMembers) { 323 ProcedureCoordinator coordinator = Mockito.mock(ProcedureCoordinator.class); 324 Mockito.doAnswer(new Answer<Void>() { 325 @Override 326 public Void answer(InvocationOnMock invocation) throws Throwable { 327 prepared.countDown(); 328 return null; 329 } 330 }).when(coordinator).memberAcquiredBarrier(Mockito.eq(operationName), Mockito.anyString()); 331 Mockito.doAnswer(new Answer<Void>() { 332 @Override 333 public Void answer(InvocationOnMock invocation) throws Throwable { 334 dataFromMembers.add(memberData); 335 committed.countDown(); 336 return null; 337 } 338 }).when(coordinator).memberFinishedBarrier(Mockito.eq(operationName), Mockito.anyString(), 339 Mockito.eq(memberData)); 340 return coordinator; 341 } 342 343 /** 344 * Verify that the prepare, commit and abort nodes for the operation are removed from zookeeper 345 */ 346 private void verifyZooKeeperClean(String operationName, ZKWatcher watcher, 347 ZKProcedureUtil controller) throws Exception { 348 String prepare = ZKProcedureUtil.getAcquireBarrierNode(controller, operationName); 349 String commit = ZKProcedureUtil.getReachedBarrierNode(controller, operationName); 350 String abort = ZKProcedureUtil.getAbortNode(controller, operationName); 351 assertEquals(-1, ZKUtil.checkExists(watcher, prepare), "Didn't delete prepare node"); 352 assertEquals(-1, ZKUtil.checkExists(watcher, commit), "Didn't delete commit node"); 353 assertEquals(-1, ZKUtil.checkExists(watcher, abort), "Didn't delete abort node"); 354 } 355 356 /** 357 * Verify the cohort controller got called once per expected node to start the operation 358 */ 359 private void verifyCohort(ProcedureMember member, int cohortSize, String operationName, 360 byte[] data) { 361 // verify(member, Mockito.times(cohortSize)).submitSubprocedure(Mockito.eq(operationName), 362 // (byte[]) Mockito.argThat(new ArrayEquals(data))); 363 Mockito.verify(member, Mockito.atLeast(cohortSize)).submitSubprocedure(Mockito.any()); 364 365 } 366 367 /** 368 * Verify that the coordinator only got called once for each expected node 369 */ 370 private void verifyCoordinator(String operationName, ProcedureCoordinator coordinator, 371 List<String> expected) { 372 // verify that we got all the expected nodes 373 for (String node : expected) { 374 verify(coordinator, once).memberAcquiredBarrier(operationName, node); 375 verify(coordinator, once).memberFinishedBarrier(operationName, node, memberData); 376 } 377 } 378 379 /** 380 * Specify how the controllers that should be started (not spy/mockable) for the test. 381 */ 382 private abstract class StartControllers { 383 public abstract Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> start( 384 ZKWatcher watcher, String operationName, ProcedureCoordinator coordinator, 385 String controllerName, ProcedureMember member, List<String> cohortNames) throws Exception; 386 } 387 388 private final StartControllers startCoordinatorFirst = new StartControllers() { 389 390 @Override 391 public Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> start(ZKWatcher watcher, 392 String operationName, ProcedureCoordinator coordinator, String controllerName, 393 ProcedureMember member, List<String> expected) throws Exception { 394 // start the controller 395 ZKProcedureCoordinator controller = 396 new ZKProcedureCoordinator(watcher, operationName, CONTROLLER_NODE_NAME); 397 controller.start(coordinator); 398 399 // make a cohort controller for each expected node 400 401 List<ZKProcedureMemberRpcs> cohortControllers = new ArrayList<>(); 402 for (String nodeName : expected) { 403 ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs(watcher, operationName); 404 cc.start(nodeName, member); 405 cohortControllers.add(cc); 406 } 407 return new Pair<>(controller, cohortControllers); 408 } 409 }; 410 411 /** 412 * Check for the possible race condition where a cohort member starts after the controller and 413 * therefore could miss a new operation 414 */ 415 private final StartControllers startCohortFirst = new StartControllers() { 416 417 @Override 418 public Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> start(ZKWatcher watcher, 419 String operationName, ProcedureCoordinator coordinator, String controllerName, 420 ProcedureMember member, List<String> expected) throws Exception { 421 422 // make a cohort controller for each expected node 423 List<ZKProcedureMemberRpcs> cohortControllers = new ArrayList<>(); 424 for (String nodeName : expected) { 425 ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs(watcher, operationName); 426 cc.start(nodeName, member); 427 cohortControllers.add(cc); 428 } 429 430 // start the controller 431 ZKProcedureCoordinator controller = 432 new ZKProcedureCoordinator(watcher, operationName, CONTROLLER_NODE_NAME); 433 controller.start(coordinator); 434 435 return new Pair<>(controller, cohortControllers); 436 } 437 }; 438}