001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.mockito.Mockito.never;
023import static org.mockito.Mockito.spy;
024import static org.mockito.Mockito.times;
025import static org.mockito.Mockito.verify;
026
027import java.util.ArrayList;
028import java.util.List;
029import java.util.concurrent.CountDownLatch;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseTestingUtility;
032import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
033import org.apache.hadoop.hbase.testclassification.MasterTests;
034import org.apache.hadoop.hbase.testclassification.MediumTests;
035import org.apache.hadoop.hbase.util.Pair;
036import org.apache.hadoop.hbase.zookeeper.ZKUtil;
037import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
038import org.junit.AfterClass;
039import org.junit.BeforeClass;
040import org.junit.ClassRule;
041import org.junit.Test;
042import org.junit.experimental.categories.Category;
043import org.mockito.Mockito;
044import org.mockito.invocation.InvocationOnMock;
045import org.mockito.stubbing.Answer;
046import org.mockito.verification.VerificationMode;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
051
052import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
053
054/**
055 * Test zookeeper-based, procedure controllers
056 */
057@Category({MasterTests.class, MediumTests.class})
058public class TestZKProcedureControllers {
059
060  @ClassRule
061  public static final HBaseClassTestRule CLASS_RULE =
062      HBaseClassTestRule.forClass(TestZKProcedureControllers.class);
063
064  private static final Logger LOG = LoggerFactory.getLogger(TestZKProcedureControllers.class);
065  private final static HBaseTestingUtility UTIL = new HBaseTestingUtility();
066  private static final String COHORT_NODE_NAME = "expected";
067  private static final String CONTROLLER_NODE_NAME = "controller";
068  private static final VerificationMode once = Mockito.times(1);
069
070  private final byte[] memberData = new String("data from member").getBytes();
071
072  @BeforeClass
073  public static void setupTest() throws Exception {
074    UTIL.startMiniZKCluster();
075  }
076
077  @AfterClass
078  public static void cleanupTest() throws Exception {
079    UTIL.shutdownMiniZKCluster();
080  }
081
082  /**
083   * Smaller test to just test the actuation on the cohort member
084   * @throws Exception on failure
085   */
086  @Test
087  public void testSimpleZKCohortMemberController() throws Exception {
088    ZKWatcher watcher = UTIL.getZooKeeperWatcher();
089    final String operationName = "instanceTest";
090
091    final Subprocedure sub = Mockito.mock(Subprocedure.class);
092    Mockito.when(sub.getName()).thenReturn(operationName);
093
094    final byte[] data = new byte[] { 1, 2, 3 };
095    final CountDownLatch prepared = new CountDownLatch(1);
096    final CountDownLatch committed = new CountDownLatch(1);
097
098    final ForeignExceptionDispatcher monitor = spy(new ForeignExceptionDispatcher());
099    final ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(
100        watcher, "testSimple");
101
102    // mock out cohort member callbacks
103    final ProcedureMember member = Mockito
104        .mock(ProcedureMember.class);
105    Mockito.doReturn(sub).when(member).createSubprocedure(operationName, data);
106    Mockito.doAnswer(new Answer<Void>() {
107      @Override
108      public Void answer(InvocationOnMock invocation) throws Throwable {
109        controller.sendMemberAcquired(sub);
110        prepared.countDown();
111        return null;
112      }
113    }).when(member).submitSubprocedure(sub);
114    Mockito.doAnswer(new Answer<Void>() {
115      @Override
116      public Void answer(InvocationOnMock invocation) throws Throwable {
117        controller.sendMemberCompleted(sub, memberData);
118        committed.countDown();
119        return null;
120      }
121    }).when(member).receivedReachedGlobalBarrier(operationName);
122
123    // start running the listener
124    controller.start(COHORT_NODE_NAME, member);
125
126    // set a prepare node from a 'coordinator'
127    String prepare = ZKProcedureUtil.getAcquireBarrierNode(controller.getZkController(), operationName);
128    ZKUtil.createSetData(watcher, prepare, ProtobufUtil.prependPBMagic(data));
129    // wait for the operation to be prepared
130    prepared.await();
131
132    // create the commit node so we update the operation to enter the commit phase
133    String commit = ZKProcedureUtil.getReachedBarrierNode(controller.getZkController(), operationName);
134    LOG.debug("Found prepared, posting commit node:" + commit);
135    ZKUtil.createAndFailSilent(watcher, commit);
136    LOG.debug("Commit node:" + commit + ", exists:" + ZKUtil.checkExists(watcher, commit));
137    committed.await();
138
139    verify(monitor, never()).receive(Mockito.any());
140    // XXX: broken due to composition.
141//    verify(member, never()).getManager().controllerConnectionFailure(Mockito.anyString(),
142//      Mockito.any());
143    // cleanup after the test
144    ZKUtil.deleteNodeRecursively(watcher, controller.getZkController().getBaseZnode());
145    assertEquals("Didn't delete prepare node", -1, ZKUtil.checkExists(watcher, prepare));
146    assertEquals("Didn't delete commit node", -1, ZKUtil.checkExists(watcher, commit));
147  }
148
149  @Test
150  public void testZKCoordinatorControllerWithNoCohort() throws Exception {
151    final String operationName = "no cohort controller test";
152    final byte[] data = new byte[] { 1, 2, 3 };
153
154    runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data);
155    runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data);
156  }
157
158  @Test
159  public void testZKCoordinatorControllerWithSingleMemberCohort() throws Exception {
160    final String operationName = "single member controller test";
161    final byte[] data = new byte[] { 1, 2, 3 };
162
163    runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data, "cohort");
164    runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data, "cohort");
165  }
166
167  @Test
168  public void testZKCoordinatorControllerMultipleCohort() throws Exception {
169    final String operationName = "multi member controller test";
170    final byte[] data = new byte[] { 1, 2, 3 };
171
172    runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data, "cohort",
173      "cohort2", "cohort3");
174    runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data, "cohort",
175      "cohort2", "cohort3");
176  }
177
178  private void runMockCommitWithOrchestratedControllers(StartControllers controllers,
179      String operationName, byte[] data, String... cohort) throws Exception {
180    ZKWatcher watcher = UTIL.getZooKeeperWatcher();
181    List<String> expected = Lists.newArrayList(cohort);
182
183    final Subprocedure sub = Mockito.mock(Subprocedure.class);
184    Mockito.when(sub.getName()).thenReturn(operationName);
185
186    CountDownLatch prepared = new CountDownLatch(expected.size());
187    CountDownLatch committed = new CountDownLatch(expected.size());
188    ArrayList<byte[]> dataFromMembers = new ArrayList<>();
189
190    // mock out coordinator so we can keep track of zk progress
191    ProcedureCoordinator coordinator = setupMockCoordinator(operationName,
192      prepared, committed, dataFromMembers);
193
194    ProcedureMember member = Mockito.mock(ProcedureMember.class);
195
196    Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> pair = controllers
197        .start(watcher, operationName, coordinator, CONTROLLER_NODE_NAME, member, expected);
198    ZKProcedureCoordinator controller = pair.getFirst();
199    List<ZKProcedureMemberRpcs> cohortControllers = pair.getSecond();
200    // start the operation
201    Procedure p = Mockito.mock(Procedure.class);
202    Mockito.when(p.getName()).thenReturn(operationName);
203
204    controller.sendGlobalBarrierAcquire(p, data, expected);
205
206    // post the prepare node for each expected node
207    for (ZKProcedureMemberRpcs cc : cohortControllers) {
208      cc.sendMemberAcquired(sub);
209    }
210
211    // wait for all the notifications to reach the coordinator
212    prepared.await();
213    // make sure we got the all the nodes and no more
214    Mockito.verify(coordinator, times(expected.size())).memberAcquiredBarrier(Mockito.eq(operationName),
215      Mockito.anyString());
216
217    // kick off the commit phase
218    controller.sendGlobalBarrierReached(p, expected);
219
220    // post the committed node for each expected node
221    for (ZKProcedureMemberRpcs cc : cohortControllers) {
222      cc.sendMemberCompleted(sub, memberData);
223    }
224
225    // wait for all commit notifications to reach the coordinator
226    committed.await();
227    // make sure we got the all the nodes and no more
228    Mockito.verify(coordinator, times(expected.size())).memberFinishedBarrier(Mockito.eq(operationName),
229      Mockito.anyString(), Mockito.eq(memberData));
230
231    assertEquals("Incorrect number of members returnd data", expected.size(),
232      dataFromMembers.size());
233    for (byte[] result : dataFromMembers) {
234      assertArrayEquals("Incorrect data from member", memberData, result);
235    }
236
237    controller.resetMembers(p);
238
239    // verify all behavior
240    verifyZooKeeperClean(operationName, watcher, controller.getZkProcedureUtil());
241    verifyCohort(member, cohortControllers.size(), operationName, data);
242    verifyCoordinator(operationName, coordinator, expected);
243  }
244
245  // TODO Broken by composition.
246//  @Test
247//  public void testCoordinatorControllerHandlesEarlyPrepareNodes() throws Exception {
248//    runEarlyPrepareNodes(startCoordinatorFirst, "testEarlyPreparenodes", new byte[] { 1, 2, 3 },
249//      "cohort1", "cohort2");
250//    runEarlyPrepareNodes(startCohortFirst, "testEarlyPreparenodes", new byte[] { 1, 2, 3 },
251//      "cohort1", "cohort2");
252//  }
253
254  public void runEarlyPrepareNodes(StartControllers controllers, String operationName, byte[] data,
255      String... cohort) throws Exception {
256    ZKWatcher watcher = UTIL.getZooKeeperWatcher();
257    List<String> expected = Lists.newArrayList(cohort);
258
259    final Subprocedure sub = Mockito.mock(Subprocedure.class);
260    Mockito.when(sub.getName()).thenReturn(operationName);
261
262    final CountDownLatch prepared = new CountDownLatch(expected.size());
263    final CountDownLatch committed = new CountDownLatch(expected.size());
264    ArrayList<byte[]> dataFromMembers = new ArrayList<>();
265
266    // mock out coordinator so we can keep track of zk progress
267    ProcedureCoordinator coordinator = setupMockCoordinator(operationName,
268      prepared, committed, dataFromMembers);
269
270    ProcedureMember member = Mockito.mock(ProcedureMember.class);
271    Procedure p = Mockito.mock(Procedure.class);
272    Mockito.when(p.getName()).thenReturn(operationName);
273
274    Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> pair = controllers
275        .start(watcher, operationName, coordinator, CONTROLLER_NODE_NAME, member, expected);
276    ZKProcedureCoordinator controller = pair.getFirst();
277    List<ZKProcedureMemberRpcs> cohortControllers = pair.getSecond();
278
279    // post 1/2 the prepare nodes early
280    for (int i = 0; i < cohortControllers.size() / 2; i++) {
281      cohortControllers.get(i).sendMemberAcquired(sub);
282    }
283
284    // start the operation
285    controller.sendGlobalBarrierAcquire(p, data, expected);
286
287    // post the prepare node for each expected node
288    for (ZKProcedureMemberRpcs cc : cohortControllers) {
289      cc.sendMemberAcquired(sub);
290    }
291
292    // wait for all the notifications to reach the coordinator
293    prepared.await();
294    // make sure we got the all the nodes and no more
295    Mockito.verify(coordinator, times(expected.size())).memberAcquiredBarrier(Mockito.eq(operationName),
296      Mockito.anyString());
297
298    // kick off the commit phase
299    controller.sendGlobalBarrierReached(p, expected);
300
301    // post the committed node for each expected node
302    for (ZKProcedureMemberRpcs cc : cohortControllers) {
303      cc.sendMemberCompleted(sub, memberData);
304    }
305
306    // wait for all commit notifications to reach the coordiantor
307    committed.await();
308    // make sure we got the all the nodes and no more
309    Mockito.verify(coordinator, times(expected.size())).memberFinishedBarrier(Mockito.eq(operationName),
310      Mockito.anyString(), Mockito.eq(memberData));
311
312    controller.resetMembers(p);
313
314    // verify all behavior
315    verifyZooKeeperClean(operationName, watcher, controller.getZkProcedureUtil());
316    verifyCohort(member, cohortControllers.size(), operationName, data);
317    verifyCoordinator(operationName, coordinator, expected);
318  }
319
320  /**
321   * @param dataFromMembers
322   * @return a mock {@link ProcedureCoordinator} that just counts down the
323   *         prepared and committed latch for called to the respective method
324   */
325  private ProcedureCoordinator setupMockCoordinator(String operationName,
326      final CountDownLatch prepared, final CountDownLatch committed,
327      final ArrayList<byte[]> dataFromMembers) {
328    ProcedureCoordinator coordinator = Mockito
329        .mock(ProcedureCoordinator.class);
330    Mockito.mock(ProcedureCoordinator.class);
331    Mockito.doAnswer(new Answer<Void>() {
332      @Override
333      public Void answer(InvocationOnMock invocation) throws Throwable {
334        prepared.countDown();
335        return null;
336      }
337    }).when(coordinator).memberAcquiredBarrier(Mockito.eq(operationName), Mockito.anyString());
338    Mockito.doAnswer(new Answer<Void>() {
339      @Override
340      public Void answer(InvocationOnMock invocation) throws Throwable {
341        dataFromMembers.add(memberData);
342        committed.countDown();
343        return null;
344      }
345    }).when(coordinator).memberFinishedBarrier(Mockito.eq(operationName), Mockito.anyString(),
346      Mockito.eq(memberData));
347    return coordinator;
348  }
349
350  /**
351   * Verify that the prepare, commit and abort nodes for the operation are removed from zookeeper
352   */
353  private void verifyZooKeeperClean(String operationName, ZKWatcher watcher,
354      ZKProcedureUtil controller) throws Exception {
355    String prepare = ZKProcedureUtil.getAcquireBarrierNode(controller, operationName);
356    String commit = ZKProcedureUtil.getReachedBarrierNode(controller, operationName);
357    String abort = ZKProcedureUtil.getAbortNode(controller, operationName);
358    assertEquals("Didn't delete prepare node", -1, ZKUtil.checkExists(watcher, prepare));
359    assertEquals("Didn't delete commit node", -1, ZKUtil.checkExists(watcher, commit));
360    assertEquals("Didn't delete abort node", -1, ZKUtil.checkExists(watcher, abort));
361  }
362
363  /**
364   * Verify the cohort controller got called once per expected node to start the operation
365   */
366  private void verifyCohort(ProcedureMember member, int cohortSize,
367      String operationName, byte[] data) {
368//    verify(member, Mockito.times(cohortSize)).submitSubprocedure(Mockito.eq(operationName),
369//      (byte[]) Mockito.argThat(new ArrayEquals(data)));
370    Mockito.verify(member,
371      Mockito.atLeast(cohortSize)).submitSubprocedure(Mockito.any());
372
373  }
374
375  /**
376   * Verify that the coordinator only got called once for each expected node
377   */
378  private void verifyCoordinator(String operationName,
379      ProcedureCoordinator coordinator, List<String> expected) {
380    // verify that we got all the expected nodes
381    for (String node : expected) {
382      verify(coordinator, once).memberAcquiredBarrier(operationName, node);
383      verify(coordinator, once).memberFinishedBarrier(operationName, node, memberData);
384    }
385  }
386
387  /**
388   * Specify how the controllers that should be started (not spy/mockable) for the test.
389   */
390  private abstract class StartControllers {
391    public abstract Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> start(
392            ZKWatcher watcher, String operationName,
393            ProcedureCoordinator coordinator, String controllerName,
394            ProcedureMember member, List<String> cohortNames) throws Exception;
395  }
396
397  private final StartControllers startCoordinatorFirst = new StartControllers() {
398
399    @Override
400    public Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> start(
401            ZKWatcher watcher, String operationName,
402            ProcedureCoordinator coordinator, String controllerName,
403            ProcedureMember member, List<String> expected) throws Exception {
404      // start the controller
405      ZKProcedureCoordinator controller = new ZKProcedureCoordinator(
406          watcher, operationName, CONTROLLER_NODE_NAME);
407      controller.start(coordinator);
408
409      // make a cohort controller for each expected node
410
411      List<ZKProcedureMemberRpcs> cohortControllers = new ArrayList<>();
412      for (String nodeName : expected) {
413        ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs(watcher, operationName);
414        cc.start(nodeName, member);
415        cohortControllers.add(cc);
416      }
417      return new Pair<>(controller, cohortControllers);
418    }
419  };
420
421  /**
422   * Check for the possible race condition where a cohort member starts after the controller and
423   * therefore could miss a new operation
424   */
425  private final StartControllers startCohortFirst = new StartControllers() {
426
427    @Override
428    public Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> start(
429            ZKWatcher watcher, String operationName,
430            ProcedureCoordinator coordinator, String controllerName,
431            ProcedureMember member, List<String> expected) throws Exception {
432
433      // make a cohort controller for each expected node
434      List<ZKProcedureMemberRpcs> cohortControllers = new ArrayList<>();
435      for (String nodeName : expected) {
436        ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs(watcher, operationName);
437        cc.start(nodeName, member);
438        cohortControllers.add(cc);
439      }
440
441      // start the controller
442      ZKProcedureCoordinator controller = new ZKProcedureCoordinator(
443          watcher, operationName, CONTROLLER_NODE_NAME);
444      controller.start(coordinator);
445
446      return new Pair<>(controller, cohortControllers);
447    }
448  };
449}