001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.mockito.Mockito.never;
023import static org.mockito.Mockito.spy;
024import static org.mockito.Mockito.times;
025import static org.mockito.Mockito.verify;
026
027import java.util.ArrayList;
028import java.util.List;
029import java.util.concurrent.CountDownLatch;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseTestingUtility;
032import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
033import org.apache.hadoop.hbase.testclassification.MasterTests;
034import org.apache.hadoop.hbase.testclassification.MediumTests;
035import org.apache.hadoop.hbase.util.Pair;
036import org.apache.hadoop.hbase.zookeeper.ZKUtil;
037import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
038import org.junit.AfterClass;
039import org.junit.BeforeClass;
040import org.junit.ClassRule;
041import org.junit.Test;
042import org.junit.experimental.categories.Category;
043import org.mockito.Mockito;
044import org.mockito.invocation.InvocationOnMock;
045import org.mockito.stubbing.Answer;
046import org.mockito.verification.VerificationMode;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
051
052import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
053
054/**
055 * Test zookeeper-based, procedure controllers
056 */
057@Category({MasterTests.class, MediumTests.class})
058public class TestZKProcedureControllers {
059
060  @ClassRule
061  public static final HBaseClassTestRule CLASS_RULE =
062      HBaseClassTestRule.forClass(TestZKProcedureControllers.class);
063
064  private static final Logger LOG = LoggerFactory.getLogger(TestZKProcedureControllers.class);
065  private final static HBaseTestingUtility UTIL = new HBaseTestingUtility();
066  private static final String COHORT_NODE_NAME = "expected";
067  private static final String CONTROLLER_NODE_NAME = "controller";
068  private static final VerificationMode once = Mockito.times(1);
069
070  private final byte[] memberData = new String("data from member").getBytes();
071
072  @BeforeClass
073  public static void setupTest() throws Exception {
074    UTIL.startMiniZKCluster();
075  }
076
077  @AfterClass
078  public static void cleanupTest() throws Exception {
079    UTIL.shutdownMiniZKCluster();
080  }
081
082  /**
083   * Smaller test to just test the actuation on the cohort member
084   * @throws Exception on failure
085   */
086  @Test
087  public void testSimpleZKCohortMemberController() throws Exception {
088    ZKWatcher watcher = UTIL.getZooKeeperWatcher();
089    final String operationName = "instanceTest";
090
091    final Subprocedure sub = Mockito.mock(Subprocedure.class);
092    Mockito.when(sub.getName()).thenReturn(operationName);
093
094    final byte[] data = new byte[] { 1, 2, 3 };
095    final CountDownLatch prepared = new CountDownLatch(1);
096    final CountDownLatch committed = new CountDownLatch(1);
097
098    final ForeignExceptionDispatcher monitor = spy(new ForeignExceptionDispatcher());
099    final ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(
100        watcher, "testSimple");
101
102    // mock out cohort member callbacks
103    final ProcedureMember member = Mockito
104        .mock(ProcedureMember.class);
105    Mockito.doReturn(sub).when(member).createSubprocedure(operationName, data);
106    Mockito.doAnswer(new Answer<Void>() {
107      @Override
108      public Void answer(InvocationOnMock invocation) throws Throwable {
109        controller.sendMemberAcquired(sub);
110        prepared.countDown();
111        return null;
112      }
113    }).when(member).submitSubprocedure(sub);
114    Mockito.doAnswer(new Answer<Void>() {
115      @Override
116      public Void answer(InvocationOnMock invocation) throws Throwable {
117        controller.sendMemberCompleted(sub, memberData);
118        committed.countDown();
119        return null;
120      }
121    }).when(member).receivedReachedGlobalBarrier(operationName);
122
123    // start running the listener
124    controller.start(COHORT_NODE_NAME, member);
125
126    // set a prepare node from a 'coordinator'
127    String prepare = ZKProcedureUtil.getAcquireBarrierNode(controller.getZkController(), operationName);
128    ZKUtil.createSetData(watcher, prepare, ProtobufUtil.prependPBMagic(data));
129    // wait for the operation to be prepared
130    prepared.await();
131
132    // create the commit node so we update the operation to enter the commit phase
133    String commit = ZKProcedureUtil.getReachedBarrierNode(controller.getZkController(), operationName);
134    LOG.debug("Found prepared, posting commit node:" + commit);
135    ZKUtil.createAndFailSilent(watcher, commit);
136    LOG.debug("Commit node:" + commit + ", exists:" + ZKUtil.checkExists(watcher, commit));
137    committed.await();
138
139    verify(monitor, never()).receive(Mockito.any());
140    // XXX: broken due to composition.
141//    verify(member, never()).getManager().controllerConnectionFailure(Mockito.anyString(),
142//      Mockito.any());
143    // cleanup after the test
144    ZKUtil.deleteNodeRecursively(watcher, controller.getZkController().getBaseZnode());
145    assertEquals("Didn't delete prepare node", -1, ZKUtil.checkExists(watcher, prepare));
146    assertEquals("Didn't delete commit node", -1, ZKUtil.checkExists(watcher, commit));
147  }
148
149  @Test
150  public void testZKCoordinatorControllerWithNoCohort() throws Exception {
151    final String operationName = "no cohort controller test";
152    final byte[] data = new byte[] { 1, 2, 3 };
153
154    runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data);
155    runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data);
156  }
157
158  @Test
159  public void testZKCoordinatorControllerWithSingleMemberCohort() throws Exception {
160    final String operationName = "single member controller test";
161    final byte[] data = new byte[] { 1, 2, 3 };
162
163    runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data, "cohort");
164    runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data, "cohort");
165  }
166
167  @Test
168  public void testZKCoordinatorControllerMultipleCohort() throws Exception {
169    final String operationName = "multi member controller test";
170    final byte[] data = new byte[] { 1, 2, 3 };
171
172    runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data, "cohort",
173      "cohort2", "cohort3");
174    runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data, "cohort",
175      "cohort2", "cohort3");
176  }
177
178  private void runMockCommitWithOrchestratedControllers(StartControllers controllers,
179      String operationName, byte[] data, String... cohort) throws Exception {
180    ZKWatcher watcher = UTIL.getZooKeeperWatcher();
181    List<String> expected = Lists.newArrayList(cohort);
182
183    final Subprocedure sub = Mockito.mock(Subprocedure.class);
184    Mockito.when(sub.getName()).thenReturn(operationName);
185
186    CountDownLatch prepared = new CountDownLatch(expected.size());
187    CountDownLatch committed = new CountDownLatch(expected.size());
188    ArrayList<byte[]> dataFromMembers = new ArrayList<>();
189
190    // mock out coordinator so we can keep track of zk progress
191    ProcedureCoordinator coordinator = setupMockCoordinator(operationName,
192      prepared, committed, dataFromMembers);
193
194    ProcedureMember member = Mockito.mock(ProcedureMember.class);
195
196    Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> pair = controllers
197        .start(watcher, operationName, coordinator, CONTROLLER_NODE_NAME, member, expected);
198    ZKProcedureCoordinator controller = pair.getFirst();
199    List<ZKProcedureMemberRpcs> cohortControllers = pair.getSecond();
200    // start the operation
201    Procedure p = Mockito.mock(Procedure.class);
202    Mockito.when(p.getName()).thenReturn(operationName);
203
204    controller.sendGlobalBarrierAcquire(p, data, expected);
205
206    // post the prepare node for each expected node
207    for (ZKProcedureMemberRpcs cc : cohortControllers) {
208      cc.sendMemberAcquired(sub);
209    }
210
211    // wait for all the notifications to reach the coordinator
212    prepared.await();
213    // make sure we got the all the nodes and no more
214    Mockito.verify(coordinator, times(expected.size())).memberAcquiredBarrier(Mockito.eq(operationName),
215      Mockito.anyString());
216
217    // kick off the commit phase
218    controller.sendGlobalBarrierReached(p, expected);
219
220    // post the committed node for each expected node
221    for (ZKProcedureMemberRpcs cc : cohortControllers) {
222      cc.sendMemberCompleted(sub, memberData);
223    }
224
225    // wait for all commit notifications to reach the coordinator
226    committed.await();
227    // make sure we got the all the nodes and no more
228    Mockito.verify(coordinator, times(expected.size())).memberFinishedBarrier(Mockito.eq(operationName),
229      Mockito.anyString(), Mockito.eq(memberData));
230
231    assertEquals("Incorrect number of members returnd data", expected.size(),
232      dataFromMembers.size());
233    for (byte[] result : dataFromMembers) {
234      assertArrayEquals("Incorrect data from member", memberData, result);
235    }
236
237    controller.resetMembers(p);
238
239    // verify all behavior
240    verifyZooKeeperClean(operationName, watcher, controller.getZkProcedureUtil());
241    verifyCohort(member, cohortControllers.size(), operationName, data);
242    verifyCoordinator(operationName, coordinator, expected);
243  }
244
245  // TODO Broken by composition.
246//  @Test
247//  public void testCoordinatorControllerHandlesEarlyPrepareNodes() throws Exception {
248//    runEarlyPrepareNodes(startCoordinatorFirst, "testEarlyPreparenodes", new byte[] { 1, 2, 3 },
249//      "cohort1", "cohort2");
250//    runEarlyPrepareNodes(startCohortFirst, "testEarlyPreparenodes", new byte[] { 1, 2, 3 },
251//      "cohort1", "cohort2");
252//  }
253
254  public void runEarlyPrepareNodes(StartControllers controllers, String operationName, byte[] data,
255      String... cohort) throws Exception {
256    ZKWatcher watcher = UTIL.getZooKeeperWatcher();
257    List<String> expected = Lists.newArrayList(cohort);
258
259    final Subprocedure sub = Mockito.mock(Subprocedure.class);
260    Mockito.when(sub.getName()).thenReturn(operationName);
261
262    final CountDownLatch prepared = new CountDownLatch(expected.size());
263    final CountDownLatch committed = new CountDownLatch(expected.size());
264    ArrayList<byte[]> dataFromMembers = new ArrayList<>();
265
266    // mock out coordinator so we can keep track of zk progress
267    ProcedureCoordinator coordinator = setupMockCoordinator(operationName,
268      prepared, committed, dataFromMembers);
269
270    ProcedureMember member = Mockito.mock(ProcedureMember.class);
271    Procedure p = Mockito.mock(Procedure.class);
272    Mockito.when(p.getName()).thenReturn(operationName);
273
274    Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> pair = controllers
275        .start(watcher, operationName, coordinator, CONTROLLER_NODE_NAME, member, expected);
276    ZKProcedureCoordinator controller = pair.getFirst();
277    List<ZKProcedureMemberRpcs> cohortControllers = pair.getSecond();
278
279    // post 1/2 the prepare nodes early
280    for (int i = 0; i < cohortControllers.size() / 2; i++) {
281      cohortControllers.get(i).sendMemberAcquired(sub);
282    }
283
284    // start the operation
285    controller.sendGlobalBarrierAcquire(p, data, expected);
286
287    // post the prepare node for each expected node
288    for (ZKProcedureMemberRpcs cc : cohortControllers) {
289      cc.sendMemberAcquired(sub);
290    }
291
292    // wait for all the notifications to reach the coordinator
293    prepared.await();
294    // make sure we got the all the nodes and no more
295    Mockito.verify(coordinator, times(expected.size())).memberAcquiredBarrier(Mockito.eq(operationName),
296      Mockito.anyString());
297
298    // kick off the commit phase
299    controller.sendGlobalBarrierReached(p, expected);
300
301    // post the committed node for each expected node
302    for (ZKProcedureMemberRpcs cc : cohortControllers) {
303      cc.sendMemberCompleted(sub, memberData);
304    }
305
306    // wait for all commit notifications to reach the coordiantor
307    committed.await();
308    // make sure we got the all the nodes and no more
309    Mockito.verify(coordinator, times(expected.size())).memberFinishedBarrier(Mockito.eq(operationName),
310      Mockito.anyString(), Mockito.eq(memberData));
311
312    controller.resetMembers(p);
313
314    // verify all behavior
315    verifyZooKeeperClean(operationName, watcher, controller.getZkProcedureUtil());
316    verifyCohort(member, cohortControllers.size(), operationName, data);
317    verifyCoordinator(operationName, coordinator, expected);
318  }
319
320  /**
321   * @param dataFromMembers
322   * @return a mock {@link ProcedureCoordinator} that just counts down the
323   *         prepared and committed latch for called to the respective method
324   */
325  private ProcedureCoordinator setupMockCoordinator(String operationName,
326      final CountDownLatch prepared, final CountDownLatch committed,
327      final ArrayList<byte[]> dataFromMembers) {
328    ProcedureCoordinator coordinator = Mockito
329        .mock(ProcedureCoordinator.class);
330    Mockito.doAnswer(new Answer<Void>() {
331      @Override
332      public Void answer(InvocationOnMock invocation) throws Throwable {
333        prepared.countDown();
334        return null;
335      }
336    }).when(coordinator).memberAcquiredBarrier(Mockito.eq(operationName), Mockito.anyString());
337    Mockito.doAnswer(new Answer<Void>() {
338      @Override
339      public Void answer(InvocationOnMock invocation) throws Throwable {
340        dataFromMembers.add(memberData);
341        committed.countDown();
342        return null;
343      }
344    }).when(coordinator).memberFinishedBarrier(Mockito.eq(operationName), Mockito.anyString(),
345      Mockito.eq(memberData));
346    return coordinator;
347  }
348
349  /**
350   * Verify that the prepare, commit and abort nodes for the operation are removed from zookeeper
351   */
352  private void verifyZooKeeperClean(String operationName, ZKWatcher watcher,
353      ZKProcedureUtil controller) throws Exception {
354    String prepare = ZKProcedureUtil.getAcquireBarrierNode(controller, operationName);
355    String commit = ZKProcedureUtil.getReachedBarrierNode(controller, operationName);
356    String abort = ZKProcedureUtil.getAbortNode(controller, operationName);
357    assertEquals("Didn't delete prepare node", -1, ZKUtil.checkExists(watcher, prepare));
358    assertEquals("Didn't delete commit node", -1, ZKUtil.checkExists(watcher, commit));
359    assertEquals("Didn't delete abort node", -1, ZKUtil.checkExists(watcher, abort));
360  }
361
362  /**
363   * Verify the cohort controller got called once per expected node to start the operation
364   */
365  private void verifyCohort(ProcedureMember member, int cohortSize,
366      String operationName, byte[] data) {
367//    verify(member, Mockito.times(cohortSize)).submitSubprocedure(Mockito.eq(operationName),
368//      (byte[]) Mockito.argThat(new ArrayEquals(data)));
369    Mockito.verify(member,
370      Mockito.atLeast(cohortSize)).submitSubprocedure(Mockito.any());
371
372  }
373
374  /**
375   * Verify that the coordinator only got called once for each expected node
376   */
377  private void verifyCoordinator(String operationName,
378      ProcedureCoordinator coordinator, List<String> expected) {
379    // verify that we got all the expected nodes
380    for (String node : expected) {
381      verify(coordinator, once).memberAcquiredBarrier(operationName, node);
382      verify(coordinator, once).memberFinishedBarrier(operationName, node, memberData);
383    }
384  }
385
386  /**
387   * Specify how the controllers that should be started (not spy/mockable) for the test.
388   */
389  private abstract class StartControllers {
390    public abstract Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> start(
391            ZKWatcher watcher, String operationName,
392            ProcedureCoordinator coordinator, String controllerName,
393            ProcedureMember member, List<String> cohortNames) throws Exception;
394  }
395
396  private final StartControllers startCoordinatorFirst = new StartControllers() {
397
398    @Override
399    public Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> start(
400            ZKWatcher watcher, String operationName,
401            ProcedureCoordinator coordinator, String controllerName,
402            ProcedureMember member, List<String> expected) throws Exception {
403      // start the controller
404      ZKProcedureCoordinator controller = new ZKProcedureCoordinator(
405          watcher, operationName, CONTROLLER_NODE_NAME);
406      controller.start(coordinator);
407
408      // make a cohort controller for each expected node
409
410      List<ZKProcedureMemberRpcs> cohortControllers = new ArrayList<>();
411      for (String nodeName : expected) {
412        ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs(watcher, operationName);
413        cc.start(nodeName, member);
414        cohortControllers.add(cc);
415      }
416      return new Pair<>(controller, cohortControllers);
417    }
418  };
419
420  /**
421   * Check for the possible race condition where a cohort member starts after the controller and
422   * therefore could miss a new operation
423   */
424  private final StartControllers startCohortFirst = new StartControllers() {
425
426    @Override
427    public Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> start(
428            ZKWatcher watcher, String operationName,
429            ProcedureCoordinator coordinator, String controllerName,
430            ProcedureMember member, List<String> expected) throws Exception {
431
432      // make a cohort controller for each expected node
433      List<ZKProcedureMemberRpcs> cohortControllers = new ArrayList<>();
434      for (String nodeName : expected) {
435        ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs(watcher, operationName);
436        cc.start(nodeName, member);
437        cohortControllers.add(cc);
438      }
439
440      // start the controller
441      ZKProcedureCoordinator controller = new ZKProcedureCoordinator(
442          watcher, operationName, CONTROLLER_NODE_NAME);
443      controller.start(coordinator);
444
445      return new Pair<>(controller, cohortControllers);
446    }
447  };
448}