001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.mockito.Mockito.never; 023import static org.mockito.Mockito.spy; 024import static org.mockito.Mockito.times; 025import static org.mockito.Mockito.verify; 026 027import java.util.ArrayList; 028import java.util.List; 029import java.util.concurrent.CountDownLatch; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseTestingUtility; 032import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 033import org.apache.hadoop.hbase.testclassification.MasterTests; 034import org.apache.hadoop.hbase.testclassification.MediumTests; 035import org.apache.hadoop.hbase.util.Pair; 036import org.apache.hadoop.hbase.zookeeper.ZKUtil; 037import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 038import org.junit.AfterClass; 039import org.junit.BeforeClass; 040import org.junit.ClassRule; 041import org.junit.Test; 042import org.junit.experimental.categories.Category; 043import org.mockito.Mockito; 044import org.mockito.invocation.InvocationOnMock; 045import org.mockito.stubbing.Answer; 046import org.mockito.verification.VerificationMode; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 051 052import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 053 054/** 055 * Test zookeeper-based, procedure controllers 056 */ 057@Category({MasterTests.class, MediumTests.class}) 058public class TestZKProcedureControllers { 059 060 @ClassRule 061 public static final HBaseClassTestRule CLASS_RULE = 062 HBaseClassTestRule.forClass(TestZKProcedureControllers.class); 063 064 private static final Logger LOG = LoggerFactory.getLogger(TestZKProcedureControllers.class); 065 private final static HBaseTestingUtility UTIL = new HBaseTestingUtility(); 066 private static final String COHORT_NODE_NAME = "expected"; 067 private static final String CONTROLLER_NODE_NAME = "controller"; 068 private static final VerificationMode once = Mockito.times(1); 069 070 private final byte[] memberData = new String("data from member").getBytes(); 071 072 @BeforeClass 073 public static void setupTest() throws Exception { 074 UTIL.startMiniZKCluster(); 075 } 076 077 @AfterClass 078 public static void cleanupTest() throws Exception { 079 UTIL.shutdownMiniZKCluster(); 080 } 081 082 /** 083 * Smaller test to just test the actuation on the cohort member 084 * @throws Exception on failure 085 */ 086 @Test 087 public void testSimpleZKCohortMemberController() throws Exception { 088 ZKWatcher watcher = UTIL.getZooKeeperWatcher(); 089 final String operationName = "instanceTest"; 090 091 final Subprocedure sub = Mockito.mock(Subprocedure.class); 092 Mockito.when(sub.getName()).thenReturn(operationName); 093 094 final byte[] data = new byte[] { 1, 2, 3 }; 095 final CountDownLatch prepared = new CountDownLatch(1); 096 final CountDownLatch committed = new CountDownLatch(1); 097 098 final ForeignExceptionDispatcher monitor = spy(new ForeignExceptionDispatcher()); 099 final ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs( 100 watcher, "testSimple"); 101 102 // mock out cohort member callbacks 103 final ProcedureMember member = Mockito 104 .mock(ProcedureMember.class); 105 Mockito.doReturn(sub).when(member).createSubprocedure(operationName, data); 106 Mockito.doAnswer(new Answer<Void>() { 107 @Override 108 public Void answer(InvocationOnMock invocation) throws Throwable { 109 controller.sendMemberAcquired(sub); 110 prepared.countDown(); 111 return null; 112 } 113 }).when(member).submitSubprocedure(sub); 114 Mockito.doAnswer(new Answer<Void>() { 115 @Override 116 public Void answer(InvocationOnMock invocation) throws Throwable { 117 controller.sendMemberCompleted(sub, memberData); 118 committed.countDown(); 119 return null; 120 } 121 }).when(member).receivedReachedGlobalBarrier(operationName); 122 123 // start running the listener 124 controller.start(COHORT_NODE_NAME, member); 125 126 // set a prepare node from a 'coordinator' 127 String prepare = ZKProcedureUtil.getAcquireBarrierNode(controller.getZkController(), operationName); 128 ZKUtil.createSetData(watcher, prepare, ProtobufUtil.prependPBMagic(data)); 129 // wait for the operation to be prepared 130 prepared.await(); 131 132 // create the commit node so we update the operation to enter the commit phase 133 String commit = ZKProcedureUtil.getReachedBarrierNode(controller.getZkController(), operationName); 134 LOG.debug("Found prepared, posting commit node:" + commit); 135 ZKUtil.createAndFailSilent(watcher, commit); 136 LOG.debug("Commit node:" + commit + ", exists:" + ZKUtil.checkExists(watcher, commit)); 137 committed.await(); 138 139 verify(monitor, never()).receive(Mockito.any()); 140 // XXX: broken due to composition. 141// verify(member, never()).getManager().controllerConnectionFailure(Mockito.anyString(), 142// Mockito.any()); 143 // cleanup after the test 144 ZKUtil.deleteNodeRecursively(watcher, controller.getZkController().getBaseZnode()); 145 assertEquals("Didn't delete prepare node", -1, ZKUtil.checkExists(watcher, prepare)); 146 assertEquals("Didn't delete commit node", -1, ZKUtil.checkExists(watcher, commit)); 147 } 148 149 @Test 150 public void testZKCoordinatorControllerWithNoCohort() throws Exception { 151 final String operationName = "no cohort controller test"; 152 final byte[] data = new byte[] { 1, 2, 3 }; 153 154 runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data); 155 runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data); 156 } 157 158 @Test 159 public void testZKCoordinatorControllerWithSingleMemberCohort() throws Exception { 160 final String operationName = "single member controller test"; 161 final byte[] data = new byte[] { 1, 2, 3 }; 162 163 runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data, "cohort"); 164 runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data, "cohort"); 165 } 166 167 @Test 168 public void testZKCoordinatorControllerMultipleCohort() throws Exception { 169 final String operationName = "multi member controller test"; 170 final byte[] data = new byte[] { 1, 2, 3 }; 171 172 runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data, "cohort", 173 "cohort2", "cohort3"); 174 runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data, "cohort", 175 "cohort2", "cohort3"); 176 } 177 178 private void runMockCommitWithOrchestratedControllers(StartControllers controllers, 179 String operationName, byte[] data, String... cohort) throws Exception { 180 ZKWatcher watcher = UTIL.getZooKeeperWatcher(); 181 List<String> expected = Lists.newArrayList(cohort); 182 183 final Subprocedure sub = Mockito.mock(Subprocedure.class); 184 Mockito.when(sub.getName()).thenReturn(operationName); 185 186 CountDownLatch prepared = new CountDownLatch(expected.size()); 187 CountDownLatch committed = new CountDownLatch(expected.size()); 188 ArrayList<byte[]> dataFromMembers = new ArrayList<>(); 189 190 // mock out coordinator so we can keep track of zk progress 191 ProcedureCoordinator coordinator = setupMockCoordinator(operationName, 192 prepared, committed, dataFromMembers); 193 194 ProcedureMember member = Mockito.mock(ProcedureMember.class); 195 196 Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> pair = controllers 197 .start(watcher, operationName, coordinator, CONTROLLER_NODE_NAME, member, expected); 198 ZKProcedureCoordinator controller = pair.getFirst(); 199 List<ZKProcedureMemberRpcs> cohortControllers = pair.getSecond(); 200 // start the operation 201 Procedure p = Mockito.mock(Procedure.class); 202 Mockito.when(p.getName()).thenReturn(operationName); 203 204 controller.sendGlobalBarrierAcquire(p, data, expected); 205 206 // post the prepare node for each expected node 207 for (ZKProcedureMemberRpcs cc : cohortControllers) { 208 cc.sendMemberAcquired(sub); 209 } 210 211 // wait for all the notifications to reach the coordinator 212 prepared.await(); 213 // make sure we got the all the nodes and no more 214 Mockito.verify(coordinator, times(expected.size())).memberAcquiredBarrier(Mockito.eq(operationName), 215 Mockito.anyString()); 216 217 // kick off the commit phase 218 controller.sendGlobalBarrierReached(p, expected); 219 220 // post the committed node for each expected node 221 for (ZKProcedureMemberRpcs cc : cohortControllers) { 222 cc.sendMemberCompleted(sub, memberData); 223 } 224 225 // wait for all commit notifications to reach the coordinator 226 committed.await(); 227 // make sure we got the all the nodes and no more 228 Mockito.verify(coordinator, times(expected.size())).memberFinishedBarrier(Mockito.eq(operationName), 229 Mockito.anyString(), Mockito.eq(memberData)); 230 231 assertEquals("Incorrect number of members returnd data", expected.size(), 232 dataFromMembers.size()); 233 for (byte[] result : dataFromMembers) { 234 assertArrayEquals("Incorrect data from member", memberData, result); 235 } 236 237 controller.resetMembers(p); 238 239 // verify all behavior 240 verifyZooKeeperClean(operationName, watcher, controller.getZkProcedureUtil()); 241 verifyCohort(member, cohortControllers.size(), operationName, data); 242 verifyCoordinator(operationName, coordinator, expected); 243 } 244 245 // TODO Broken by composition. 246// @Test 247// public void testCoordinatorControllerHandlesEarlyPrepareNodes() throws Exception { 248// runEarlyPrepareNodes(startCoordinatorFirst, "testEarlyPreparenodes", new byte[] { 1, 2, 3 }, 249// "cohort1", "cohort2"); 250// runEarlyPrepareNodes(startCohortFirst, "testEarlyPreparenodes", new byte[] { 1, 2, 3 }, 251// "cohort1", "cohort2"); 252// } 253 254 public void runEarlyPrepareNodes(StartControllers controllers, String operationName, byte[] data, 255 String... cohort) throws Exception { 256 ZKWatcher watcher = UTIL.getZooKeeperWatcher(); 257 List<String> expected = Lists.newArrayList(cohort); 258 259 final Subprocedure sub = Mockito.mock(Subprocedure.class); 260 Mockito.when(sub.getName()).thenReturn(operationName); 261 262 final CountDownLatch prepared = new CountDownLatch(expected.size()); 263 final CountDownLatch committed = new CountDownLatch(expected.size()); 264 ArrayList<byte[]> dataFromMembers = new ArrayList<>(); 265 266 // mock out coordinator so we can keep track of zk progress 267 ProcedureCoordinator coordinator = setupMockCoordinator(operationName, 268 prepared, committed, dataFromMembers); 269 270 ProcedureMember member = Mockito.mock(ProcedureMember.class); 271 Procedure p = Mockito.mock(Procedure.class); 272 Mockito.when(p.getName()).thenReturn(operationName); 273 274 Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> pair = controllers 275 .start(watcher, operationName, coordinator, CONTROLLER_NODE_NAME, member, expected); 276 ZKProcedureCoordinator controller = pair.getFirst(); 277 List<ZKProcedureMemberRpcs> cohortControllers = pair.getSecond(); 278 279 // post 1/2 the prepare nodes early 280 for (int i = 0; i < cohortControllers.size() / 2; i++) { 281 cohortControllers.get(i).sendMemberAcquired(sub); 282 } 283 284 // start the operation 285 controller.sendGlobalBarrierAcquire(p, data, expected); 286 287 // post the prepare node for each expected node 288 for (ZKProcedureMemberRpcs cc : cohortControllers) { 289 cc.sendMemberAcquired(sub); 290 } 291 292 // wait for all the notifications to reach the coordinator 293 prepared.await(); 294 // make sure we got the all the nodes and no more 295 Mockito.verify(coordinator, times(expected.size())).memberAcquiredBarrier(Mockito.eq(operationName), 296 Mockito.anyString()); 297 298 // kick off the commit phase 299 controller.sendGlobalBarrierReached(p, expected); 300 301 // post the committed node for each expected node 302 for (ZKProcedureMemberRpcs cc : cohortControllers) { 303 cc.sendMemberCompleted(sub, memberData); 304 } 305 306 // wait for all commit notifications to reach the coordiantor 307 committed.await(); 308 // make sure we got the all the nodes and no more 309 Mockito.verify(coordinator, times(expected.size())).memberFinishedBarrier(Mockito.eq(operationName), 310 Mockito.anyString(), Mockito.eq(memberData)); 311 312 controller.resetMembers(p); 313 314 // verify all behavior 315 verifyZooKeeperClean(operationName, watcher, controller.getZkProcedureUtil()); 316 verifyCohort(member, cohortControllers.size(), operationName, data); 317 verifyCoordinator(operationName, coordinator, expected); 318 } 319 320 /** 321 * @param dataFromMembers 322 * @return a mock {@link ProcedureCoordinator} that just counts down the 323 * prepared and committed latch for called to the respective method 324 */ 325 private ProcedureCoordinator setupMockCoordinator(String operationName, 326 final CountDownLatch prepared, final CountDownLatch committed, 327 final ArrayList<byte[]> dataFromMembers) { 328 ProcedureCoordinator coordinator = Mockito 329 .mock(ProcedureCoordinator.class); 330 Mockito.mock(ProcedureCoordinator.class); 331 Mockito.doAnswer(new Answer<Void>() { 332 @Override 333 public Void answer(InvocationOnMock invocation) throws Throwable { 334 prepared.countDown(); 335 return null; 336 } 337 }).when(coordinator).memberAcquiredBarrier(Mockito.eq(operationName), Mockito.anyString()); 338 Mockito.doAnswer(new Answer<Void>() { 339 @Override 340 public Void answer(InvocationOnMock invocation) throws Throwable { 341 dataFromMembers.add(memberData); 342 committed.countDown(); 343 return null; 344 } 345 }).when(coordinator).memberFinishedBarrier(Mockito.eq(operationName), Mockito.anyString(), 346 Mockito.eq(memberData)); 347 return coordinator; 348 } 349 350 /** 351 * Verify that the prepare, commit and abort nodes for the operation are removed from zookeeper 352 */ 353 private void verifyZooKeeperClean(String operationName, ZKWatcher watcher, 354 ZKProcedureUtil controller) throws Exception { 355 String prepare = ZKProcedureUtil.getAcquireBarrierNode(controller, operationName); 356 String commit = ZKProcedureUtil.getReachedBarrierNode(controller, operationName); 357 String abort = ZKProcedureUtil.getAbortNode(controller, operationName); 358 assertEquals("Didn't delete prepare node", -1, ZKUtil.checkExists(watcher, prepare)); 359 assertEquals("Didn't delete commit node", -1, ZKUtil.checkExists(watcher, commit)); 360 assertEquals("Didn't delete abort node", -1, ZKUtil.checkExists(watcher, abort)); 361 } 362 363 /** 364 * Verify the cohort controller got called once per expected node to start the operation 365 */ 366 private void verifyCohort(ProcedureMember member, int cohortSize, 367 String operationName, byte[] data) { 368// verify(member, Mockito.times(cohortSize)).submitSubprocedure(Mockito.eq(operationName), 369// (byte[]) Mockito.argThat(new ArrayEquals(data))); 370 Mockito.verify(member, 371 Mockito.atLeast(cohortSize)).submitSubprocedure(Mockito.any()); 372 373 } 374 375 /** 376 * Verify that the coordinator only got called once for each expected node 377 */ 378 private void verifyCoordinator(String operationName, 379 ProcedureCoordinator coordinator, List<String> expected) { 380 // verify that we got all the expected nodes 381 for (String node : expected) { 382 verify(coordinator, once).memberAcquiredBarrier(operationName, node); 383 verify(coordinator, once).memberFinishedBarrier(operationName, node, memberData); 384 } 385 } 386 387 /** 388 * Specify how the controllers that should be started (not spy/mockable) for the test. 389 */ 390 private abstract class StartControllers { 391 public abstract Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> start( 392 ZKWatcher watcher, String operationName, 393 ProcedureCoordinator coordinator, String controllerName, 394 ProcedureMember member, List<String> cohortNames) throws Exception; 395 } 396 397 private final StartControllers startCoordinatorFirst = new StartControllers() { 398 399 @Override 400 public Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> start( 401 ZKWatcher watcher, String operationName, 402 ProcedureCoordinator coordinator, String controllerName, 403 ProcedureMember member, List<String> expected) throws Exception { 404 // start the controller 405 ZKProcedureCoordinator controller = new ZKProcedureCoordinator( 406 watcher, operationName, CONTROLLER_NODE_NAME); 407 controller.start(coordinator); 408 409 // make a cohort controller for each expected node 410 411 List<ZKProcedureMemberRpcs> cohortControllers = new ArrayList<>(); 412 for (String nodeName : expected) { 413 ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs(watcher, operationName); 414 cc.start(nodeName, member); 415 cohortControllers.add(cc); 416 } 417 return new Pair<>(controller, cohortControllers); 418 } 419 }; 420 421 /** 422 * Check for the possible race condition where a cohort member starts after the controller and 423 * therefore could miss a new operation 424 */ 425 private final StartControllers startCohortFirst = new StartControllers() { 426 427 @Override 428 public Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> start( 429 ZKWatcher watcher, String operationName, 430 ProcedureCoordinator coordinator, String controllerName, 431 ProcedureMember member, List<String> expected) throws Exception { 432 433 // make a cohort controller for each expected node 434 List<ZKProcedureMemberRpcs> cohortControllers = new ArrayList<>(); 435 for (String nodeName : expected) { 436 ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs(watcher, operationName); 437 cc.start(nodeName, member); 438 cohortControllers.add(cc); 439 } 440 441 // start the controller 442 ZKProcedureCoordinator controller = new ZKProcedureCoordinator( 443 watcher, operationName, CONTROLLER_NODE_NAME); 444 controller.start(coordinator); 445 446 return new Pair<>(controller, cohortControllers); 447 } 448 }; 449}