001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.mockito.Mockito.never; 023import static org.mockito.Mockito.spy; 024import static org.mockito.Mockito.times; 025import static org.mockito.Mockito.verify; 026 027import java.util.ArrayList; 028import java.util.List; 029import java.util.concurrent.CountDownLatch; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseTestingUtility; 032import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 033import org.apache.hadoop.hbase.testclassification.MasterTests; 034import org.apache.hadoop.hbase.testclassification.MediumTests; 035import org.apache.hadoop.hbase.util.Pair; 036import org.apache.hadoop.hbase.zookeeper.ZKUtil; 037import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 038import org.junit.AfterClass; 039import org.junit.BeforeClass; 040import org.junit.ClassRule; 041import org.junit.Test; 042import org.junit.experimental.categories.Category; 043import org.mockito.Mockito; 044import org.mockito.invocation.InvocationOnMock; 045import org.mockito.stubbing.Answer; 046import org.mockito.verification.VerificationMode; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 051 052import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 053 054/** 055 * Test zookeeper-based, procedure controllers 056 */ 057@Category({MasterTests.class, MediumTests.class}) 058public class TestZKProcedureControllers { 059 060 @ClassRule 061 public static final HBaseClassTestRule CLASS_RULE = 062 HBaseClassTestRule.forClass(TestZKProcedureControllers.class); 063 064 private static final Logger LOG = LoggerFactory.getLogger(TestZKProcedureControllers.class); 065 private final static HBaseTestingUtility UTIL = new HBaseTestingUtility(); 066 private static final String COHORT_NODE_NAME = "expected"; 067 private static final String CONTROLLER_NODE_NAME = "controller"; 068 private static final VerificationMode once = Mockito.times(1); 069 070 private final byte[] memberData = new String("data from member").getBytes(); 071 072 @BeforeClass 073 public static void setupTest() throws Exception { 074 UTIL.startMiniZKCluster(); 075 } 076 077 @AfterClass 078 public static void cleanupTest() throws Exception { 079 UTIL.shutdownMiniZKCluster(); 080 } 081 082 /** 083 * Smaller test to just test the actuation on the cohort member 084 * @throws Exception on failure 085 */ 086 @Test 087 public void testSimpleZKCohortMemberController() throws Exception { 088 ZKWatcher watcher = UTIL.getZooKeeperWatcher(); 089 final String operationName = "instanceTest"; 090 091 final Subprocedure sub = Mockito.mock(Subprocedure.class); 092 Mockito.when(sub.getName()).thenReturn(operationName); 093 094 final byte[] data = new byte[] { 1, 2, 3 }; 095 final CountDownLatch prepared = new CountDownLatch(1); 096 final CountDownLatch committed = new CountDownLatch(1); 097 098 final ForeignExceptionDispatcher monitor = spy(new ForeignExceptionDispatcher()); 099 final ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs( 100 watcher, "testSimple"); 101 102 // mock out cohort member callbacks 103 final ProcedureMember member = Mockito 104 .mock(ProcedureMember.class); 105 Mockito.doReturn(sub).when(member).createSubprocedure(operationName, data); 106 Mockito.doAnswer(new Answer<Void>() { 107 @Override 108 public Void answer(InvocationOnMock invocation) throws Throwable { 109 controller.sendMemberAcquired(sub); 110 prepared.countDown(); 111 return null; 112 } 113 }).when(member).submitSubprocedure(sub); 114 Mockito.doAnswer(new Answer<Void>() { 115 @Override 116 public Void answer(InvocationOnMock invocation) throws Throwable { 117 controller.sendMemberCompleted(sub, memberData); 118 committed.countDown(); 119 return null; 120 } 121 }).when(member).receivedReachedGlobalBarrier(operationName); 122 123 // start running the listener 124 controller.start(COHORT_NODE_NAME, member); 125 126 // set a prepare node from a 'coordinator' 127 String prepare = ZKProcedureUtil.getAcquireBarrierNode(controller.getZkController(), operationName); 128 ZKUtil.createSetData(watcher, prepare, ProtobufUtil.prependPBMagic(data)); 129 // wait for the operation to be prepared 130 prepared.await(); 131 132 // create the commit node so we update the operation to enter the commit phase 133 String commit = ZKProcedureUtil.getReachedBarrierNode(controller.getZkController(), operationName); 134 LOG.debug("Found prepared, posting commit node:" + commit); 135 ZKUtil.createAndFailSilent(watcher, commit); 136 LOG.debug("Commit node:" + commit + ", exists:" + ZKUtil.checkExists(watcher, commit)); 137 committed.await(); 138 139 verify(monitor, never()).receive(Mockito.any()); 140 // XXX: broken due to composition. 141// verify(member, never()).getManager().controllerConnectionFailure(Mockito.anyString(), 142// Mockito.any()); 143 // cleanup after the test 144 ZKUtil.deleteNodeRecursively(watcher, controller.getZkController().getBaseZnode()); 145 assertEquals("Didn't delete prepare node", -1, ZKUtil.checkExists(watcher, prepare)); 146 assertEquals("Didn't delete commit node", -1, ZKUtil.checkExists(watcher, commit)); 147 } 148 149 @Test 150 public void testZKCoordinatorControllerWithNoCohort() throws Exception { 151 final String operationName = "no cohort controller test"; 152 final byte[] data = new byte[] { 1, 2, 3 }; 153 154 runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data); 155 runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data); 156 } 157 158 @Test 159 public void testZKCoordinatorControllerWithSingleMemberCohort() throws Exception { 160 final String operationName = "single member controller test"; 161 final byte[] data = new byte[] { 1, 2, 3 }; 162 163 runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data, "cohort"); 164 runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data, "cohort"); 165 } 166 167 @Test 168 public void testZKCoordinatorControllerMultipleCohort() throws Exception { 169 final String operationName = "multi member controller test"; 170 final byte[] data = new byte[] { 1, 2, 3 }; 171 172 runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data, "cohort", 173 "cohort2", "cohort3"); 174 runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data, "cohort", 175 "cohort2", "cohort3"); 176 } 177 178 private void runMockCommitWithOrchestratedControllers(StartControllers controllers, 179 String operationName, byte[] data, String... cohort) throws Exception { 180 ZKWatcher watcher = UTIL.getZooKeeperWatcher(); 181 List<String> expected = Lists.newArrayList(cohort); 182 183 final Subprocedure sub = Mockito.mock(Subprocedure.class); 184 Mockito.when(sub.getName()).thenReturn(operationName); 185 186 CountDownLatch prepared = new CountDownLatch(expected.size()); 187 CountDownLatch committed = new CountDownLatch(expected.size()); 188 ArrayList<byte[]> dataFromMembers = new ArrayList<>(); 189 190 // mock out coordinator so we can keep track of zk progress 191 ProcedureCoordinator coordinator = setupMockCoordinator(operationName, 192 prepared, committed, dataFromMembers); 193 194 ProcedureMember member = Mockito.mock(ProcedureMember.class); 195 196 Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> pair = controllers 197 .start(watcher, operationName, coordinator, CONTROLLER_NODE_NAME, member, expected); 198 ZKProcedureCoordinator controller = pair.getFirst(); 199 List<ZKProcedureMemberRpcs> cohortControllers = pair.getSecond(); 200 // start the operation 201 Procedure p = Mockito.mock(Procedure.class); 202 Mockito.when(p.getName()).thenReturn(operationName); 203 204 controller.sendGlobalBarrierAcquire(p, data, expected); 205 206 // post the prepare node for each expected node 207 for (ZKProcedureMemberRpcs cc : cohortControllers) { 208 cc.sendMemberAcquired(sub); 209 } 210 211 // wait for all the notifications to reach the coordinator 212 prepared.await(); 213 // make sure we got the all the nodes and no more 214 Mockito.verify(coordinator, times(expected.size())).memberAcquiredBarrier(Mockito.eq(operationName), 215 Mockito.anyString()); 216 217 // kick off the commit phase 218 controller.sendGlobalBarrierReached(p, expected); 219 220 // post the committed node for each expected node 221 for (ZKProcedureMemberRpcs cc : cohortControllers) { 222 cc.sendMemberCompleted(sub, memberData); 223 } 224 225 // wait for all commit notifications to reach the coordinator 226 committed.await(); 227 // make sure we got the all the nodes and no more 228 Mockito.verify(coordinator, times(expected.size())).memberFinishedBarrier(Mockito.eq(operationName), 229 Mockito.anyString(), Mockito.eq(memberData)); 230 231 assertEquals("Incorrect number of members returnd data", expected.size(), 232 dataFromMembers.size()); 233 for (byte[] result : dataFromMembers) { 234 assertArrayEquals("Incorrect data from member", memberData, result); 235 } 236 237 controller.resetMembers(p); 238 239 // verify all behavior 240 verifyZooKeeperClean(operationName, watcher, controller.getZkProcedureUtil()); 241 verifyCohort(member, cohortControllers.size(), operationName, data); 242 verifyCoordinator(operationName, coordinator, expected); 243 } 244 245 // TODO Broken by composition. 246// @Test 247// public void testCoordinatorControllerHandlesEarlyPrepareNodes() throws Exception { 248// runEarlyPrepareNodes(startCoordinatorFirst, "testEarlyPreparenodes", new byte[] { 1, 2, 3 }, 249// "cohort1", "cohort2"); 250// runEarlyPrepareNodes(startCohortFirst, "testEarlyPreparenodes", new byte[] { 1, 2, 3 }, 251// "cohort1", "cohort2"); 252// } 253 254 public void runEarlyPrepareNodes(StartControllers controllers, String operationName, byte[] data, 255 String... cohort) throws Exception { 256 ZKWatcher watcher = UTIL.getZooKeeperWatcher(); 257 List<String> expected = Lists.newArrayList(cohort); 258 259 final Subprocedure sub = Mockito.mock(Subprocedure.class); 260 Mockito.when(sub.getName()).thenReturn(operationName); 261 262 final CountDownLatch prepared = new CountDownLatch(expected.size()); 263 final CountDownLatch committed = new CountDownLatch(expected.size()); 264 ArrayList<byte[]> dataFromMembers = new ArrayList<>(); 265 266 // mock out coordinator so we can keep track of zk progress 267 ProcedureCoordinator coordinator = setupMockCoordinator(operationName, 268 prepared, committed, dataFromMembers); 269 270 ProcedureMember member = Mockito.mock(ProcedureMember.class); 271 Procedure p = Mockito.mock(Procedure.class); 272 Mockito.when(p.getName()).thenReturn(operationName); 273 274 Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> pair = controllers 275 .start(watcher, operationName, coordinator, CONTROLLER_NODE_NAME, member, expected); 276 ZKProcedureCoordinator controller = pair.getFirst(); 277 List<ZKProcedureMemberRpcs> cohortControllers = pair.getSecond(); 278 279 // post 1/2 the prepare nodes early 280 for (int i = 0; i < cohortControllers.size() / 2; i++) { 281 cohortControllers.get(i).sendMemberAcquired(sub); 282 } 283 284 // start the operation 285 controller.sendGlobalBarrierAcquire(p, data, expected); 286 287 // post the prepare node for each expected node 288 for (ZKProcedureMemberRpcs cc : cohortControllers) { 289 cc.sendMemberAcquired(sub); 290 } 291 292 // wait for all the notifications to reach the coordinator 293 prepared.await(); 294 // make sure we got the all the nodes and no more 295 Mockito.verify(coordinator, times(expected.size())).memberAcquiredBarrier(Mockito.eq(operationName), 296 Mockito.anyString()); 297 298 // kick off the commit phase 299 controller.sendGlobalBarrierReached(p, expected); 300 301 // post the committed node for each expected node 302 for (ZKProcedureMemberRpcs cc : cohortControllers) { 303 cc.sendMemberCompleted(sub, memberData); 304 } 305 306 // wait for all commit notifications to reach the coordiantor 307 committed.await(); 308 // make sure we got the all the nodes and no more 309 Mockito.verify(coordinator, times(expected.size())).memberFinishedBarrier(Mockito.eq(operationName), 310 Mockito.anyString(), Mockito.eq(memberData)); 311 312 controller.resetMembers(p); 313 314 // verify all behavior 315 verifyZooKeeperClean(operationName, watcher, controller.getZkProcedureUtil()); 316 verifyCohort(member, cohortControllers.size(), operationName, data); 317 verifyCoordinator(operationName, coordinator, expected); 318 } 319 320 /** 321 * @param dataFromMembers 322 * @return a mock {@link ProcedureCoordinator} that just counts down the 323 * prepared and committed latch for called to the respective method 324 */ 325 private ProcedureCoordinator setupMockCoordinator(String operationName, 326 final CountDownLatch prepared, final CountDownLatch committed, 327 final ArrayList<byte[]> dataFromMembers) { 328 ProcedureCoordinator coordinator = Mockito 329 .mock(ProcedureCoordinator.class); 330 Mockito.doAnswer(new Answer<Void>() { 331 @Override 332 public Void answer(InvocationOnMock invocation) throws Throwable { 333 prepared.countDown(); 334 return null; 335 } 336 }).when(coordinator).memberAcquiredBarrier(Mockito.eq(operationName), Mockito.anyString()); 337 Mockito.doAnswer(new Answer<Void>() { 338 @Override 339 public Void answer(InvocationOnMock invocation) throws Throwable { 340 dataFromMembers.add(memberData); 341 committed.countDown(); 342 return null; 343 } 344 }).when(coordinator).memberFinishedBarrier(Mockito.eq(operationName), Mockito.anyString(), 345 Mockito.eq(memberData)); 346 return coordinator; 347 } 348 349 /** 350 * Verify that the prepare, commit and abort nodes for the operation are removed from zookeeper 351 */ 352 private void verifyZooKeeperClean(String operationName, ZKWatcher watcher, 353 ZKProcedureUtil controller) throws Exception { 354 String prepare = ZKProcedureUtil.getAcquireBarrierNode(controller, operationName); 355 String commit = ZKProcedureUtil.getReachedBarrierNode(controller, operationName); 356 String abort = ZKProcedureUtil.getAbortNode(controller, operationName); 357 assertEquals("Didn't delete prepare node", -1, ZKUtil.checkExists(watcher, prepare)); 358 assertEquals("Didn't delete commit node", -1, ZKUtil.checkExists(watcher, commit)); 359 assertEquals("Didn't delete abort node", -1, ZKUtil.checkExists(watcher, abort)); 360 } 361 362 /** 363 * Verify the cohort controller got called once per expected node to start the operation 364 */ 365 private void verifyCohort(ProcedureMember member, int cohortSize, 366 String operationName, byte[] data) { 367// verify(member, Mockito.times(cohortSize)).submitSubprocedure(Mockito.eq(operationName), 368// (byte[]) Mockito.argThat(new ArrayEquals(data))); 369 Mockito.verify(member, 370 Mockito.atLeast(cohortSize)).submitSubprocedure(Mockito.any()); 371 372 } 373 374 /** 375 * Verify that the coordinator only got called once for each expected node 376 */ 377 private void verifyCoordinator(String operationName, 378 ProcedureCoordinator coordinator, List<String> expected) { 379 // verify that we got all the expected nodes 380 for (String node : expected) { 381 verify(coordinator, once).memberAcquiredBarrier(operationName, node); 382 verify(coordinator, once).memberFinishedBarrier(operationName, node, memberData); 383 } 384 } 385 386 /** 387 * Specify how the controllers that should be started (not spy/mockable) for the test. 388 */ 389 private abstract class StartControllers { 390 public abstract Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> start( 391 ZKWatcher watcher, String operationName, 392 ProcedureCoordinator coordinator, String controllerName, 393 ProcedureMember member, List<String> cohortNames) throws Exception; 394 } 395 396 private final StartControllers startCoordinatorFirst = new StartControllers() { 397 398 @Override 399 public Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> start( 400 ZKWatcher watcher, String operationName, 401 ProcedureCoordinator coordinator, String controllerName, 402 ProcedureMember member, List<String> expected) throws Exception { 403 // start the controller 404 ZKProcedureCoordinator controller = new ZKProcedureCoordinator( 405 watcher, operationName, CONTROLLER_NODE_NAME); 406 controller.start(coordinator); 407 408 // make a cohort controller for each expected node 409 410 List<ZKProcedureMemberRpcs> cohortControllers = new ArrayList<>(); 411 for (String nodeName : expected) { 412 ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs(watcher, operationName); 413 cc.start(nodeName, member); 414 cohortControllers.add(cc); 415 } 416 return new Pair<>(controller, cohortControllers); 417 } 418 }; 419 420 /** 421 * Check for the possible race condition where a cohort member starts after the controller and 422 * therefore could miss a new operation 423 */ 424 private final StartControllers startCohortFirst = new StartControllers() { 425 426 @Override 427 public Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> start( 428 ZKWatcher watcher, String operationName, 429 ProcedureCoordinator coordinator, String controllerName, 430 ProcedureMember member, List<String> expected) throws Exception { 431 432 // make a cohort controller for each expected node 433 List<ZKProcedureMemberRpcs> cohortControllers = new ArrayList<>(); 434 for (String nodeName : expected) { 435 ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs(watcher, operationName); 436 cc.start(nodeName, member); 437 cohortControllers.add(cc); 438 } 439 440 // start the controller 441 ZKProcedureCoordinator controller = new ZKProcedureCoordinator( 442 watcher, operationName, CONTROLLER_NODE_NAME); 443 controller.start(coordinator); 444 445 return new Pair<>(controller, cohortControllers); 446 } 447 }; 448}