001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.mockito.Mockito.never;
023import static org.mockito.Mockito.spy;
024import static org.mockito.Mockito.times;
025import static org.mockito.Mockito.verify;
026
027import java.util.ArrayList;
028import java.util.List;
029import java.util.concurrent.CountDownLatch;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseTestingUtility;
032import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
033import org.apache.hadoop.hbase.testclassification.MasterTests;
034import org.apache.hadoop.hbase.testclassification.MediumTests;
035import org.apache.hadoop.hbase.util.Pair;
036import org.apache.hadoop.hbase.zookeeper.ZKUtil;
037import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
038import org.junit.AfterClass;
039import org.junit.BeforeClass;
040import org.junit.ClassRule;
041import org.junit.Test;
042import org.junit.experimental.categories.Category;
043import org.mockito.Mockito;
044import org.mockito.invocation.InvocationOnMock;
045import org.mockito.stubbing.Answer;
046import org.mockito.verification.VerificationMode;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
051
052import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
053
054/**
055 * Test zookeeper-based, procedure controllers
056 */
057@Category({ MasterTests.class, MediumTests.class })
058public class TestZKProcedureControllers {
059
060  @ClassRule
061  public static final HBaseClassTestRule CLASS_RULE =
062    HBaseClassTestRule.forClass(TestZKProcedureControllers.class);
063
064  private static final Logger LOG = LoggerFactory.getLogger(TestZKProcedureControllers.class);
065  private final static HBaseTestingUtility UTIL = new HBaseTestingUtility();
066  private static final String COHORT_NODE_NAME = "expected";
067  private static final String CONTROLLER_NODE_NAME = "controller";
068  private static final VerificationMode once = Mockito.times(1);
069
070  private final byte[] memberData = new String("data from member").getBytes();
071
072  @BeforeClass
073  public static void setupTest() throws Exception {
074    UTIL.startMiniZKCluster();
075  }
076
077  @AfterClass
078  public static void cleanupTest() throws Exception {
079    UTIL.shutdownMiniZKCluster();
080  }
081
082  /**
083   * Smaller test to just test the actuation on the cohort member
084   * @throws Exception on failure
085   */
086  @Test
087  public void testSimpleZKCohortMemberController() throws Exception {
088    ZKWatcher watcher = UTIL.getZooKeeperWatcher();
089    final String operationName = "instanceTest";
090
091    final Subprocedure sub = Mockito.mock(Subprocedure.class);
092    Mockito.when(sub.getName()).thenReturn(operationName);
093
094    final byte[] data = new byte[] { 1, 2, 3 };
095    final CountDownLatch prepared = new CountDownLatch(1);
096    final CountDownLatch committed = new CountDownLatch(1);
097
098    final ForeignExceptionDispatcher monitor = spy(new ForeignExceptionDispatcher());
099    final ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(watcher, "testSimple");
100
101    // mock out cohort member callbacks
102    final ProcedureMember member = Mockito.mock(ProcedureMember.class);
103    Mockito.doReturn(sub).when(member).createSubprocedure(operationName, data);
104    Mockito.doAnswer(new Answer<Void>() {
105      @Override
106      public Void answer(InvocationOnMock invocation) throws Throwable {
107        controller.sendMemberAcquired(sub);
108        prepared.countDown();
109        return null;
110      }
111    }).when(member).submitSubprocedure(sub);
112    Mockito.doAnswer(new Answer<Void>() {
113      @Override
114      public Void answer(InvocationOnMock invocation) throws Throwable {
115        controller.sendMemberCompleted(sub, memberData);
116        committed.countDown();
117        return null;
118      }
119    }).when(member).receivedReachedGlobalBarrier(operationName);
120
121    // start running the listener
122    controller.start(COHORT_NODE_NAME, member);
123
124    // set a prepare node from a 'coordinator'
125    String prepare =
126      ZKProcedureUtil.getAcquireBarrierNode(controller.getZkController(), operationName);
127    ZKUtil.createSetData(watcher, prepare, ProtobufUtil.prependPBMagic(data));
128    // wait for the operation to be prepared
129    prepared.await();
130
131    // create the commit node so we update the operation to enter the commit phase
132    String commit =
133      ZKProcedureUtil.getReachedBarrierNode(controller.getZkController(), operationName);
134    LOG.debug("Found prepared, posting commit node:" + commit);
135    ZKUtil.createAndFailSilent(watcher, commit);
136    LOG.debug("Commit node:" + commit + ", exists:" + ZKUtil.checkExists(watcher, commit));
137    committed.await();
138
139    verify(monitor, never()).receive(Mockito.any());
140    // XXX: broken due to composition.
141    // verify(member, never()).getManager().controllerConnectionFailure(Mockito.anyString(),
142    // Mockito.any());
143    // cleanup after the test
144    ZKUtil.deleteNodeRecursively(watcher, controller.getZkController().getBaseZnode());
145    assertEquals("Didn't delete prepare node", -1, ZKUtil.checkExists(watcher, prepare));
146    assertEquals("Didn't delete commit node", -1, ZKUtil.checkExists(watcher, commit));
147  }
148
149  @Test
150  public void testZKCoordinatorControllerWithNoCohort() throws Exception {
151    final String operationName = "no cohort controller test";
152    final byte[] data = new byte[] { 1, 2, 3 };
153
154    runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data);
155    runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data);
156  }
157
158  @Test
159  public void testZKCoordinatorControllerWithSingleMemberCohort() throws Exception {
160    final String operationName = "single member controller test";
161    final byte[] data = new byte[] { 1, 2, 3 };
162
163    runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data, "cohort");
164    runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data, "cohort");
165  }
166
167  @Test
168  public void testZKCoordinatorControllerMultipleCohort() throws Exception {
169    final String operationName = "multi member controller test";
170    final byte[] data = new byte[] { 1, 2, 3 };
171
172    runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data, "cohort",
173      "cohort2", "cohort3");
174    runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data, "cohort",
175      "cohort2", "cohort3");
176  }
177
178  private void runMockCommitWithOrchestratedControllers(StartControllers controllers,
179    String operationName, byte[] data, String... cohort) throws Exception {
180    ZKWatcher watcher = UTIL.getZooKeeperWatcher();
181    List<String> expected = Lists.newArrayList(cohort);
182
183    final Subprocedure sub = Mockito.mock(Subprocedure.class);
184    Mockito.when(sub.getName()).thenReturn(operationName);
185
186    CountDownLatch prepared = new CountDownLatch(expected.size());
187    CountDownLatch committed = new CountDownLatch(expected.size());
188    ArrayList<byte[]> dataFromMembers = new ArrayList<>();
189
190    // mock out coordinator so we can keep track of zk progress
191    ProcedureCoordinator coordinator =
192      setupMockCoordinator(operationName, prepared, committed, dataFromMembers);
193
194    ProcedureMember member = Mockito.mock(ProcedureMember.class);
195
196    Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> pair = controllers.start(watcher,
197      operationName, coordinator, CONTROLLER_NODE_NAME, member, expected);
198    ZKProcedureCoordinator controller = pair.getFirst();
199    List<ZKProcedureMemberRpcs> cohortControllers = pair.getSecond();
200    // start the operation
201    Procedure p = Mockito.mock(Procedure.class);
202    Mockito.when(p.getName()).thenReturn(operationName);
203
204    controller.sendGlobalBarrierAcquire(p, data, expected);
205
206    // post the prepare node for each expected node
207    for (ZKProcedureMemberRpcs cc : cohortControllers) {
208      cc.sendMemberAcquired(sub);
209    }
210
211    // wait for all the notifications to reach the coordinator
212    prepared.await();
213    // make sure we got the all the nodes and no more
214    Mockito.verify(coordinator, times(expected.size()))
215      .memberAcquiredBarrier(Mockito.eq(operationName), Mockito.anyString());
216
217    // kick off the commit phase
218    controller.sendGlobalBarrierReached(p, expected);
219
220    // post the committed node for each expected node
221    for (ZKProcedureMemberRpcs cc : cohortControllers) {
222      cc.sendMemberCompleted(sub, memberData);
223    }
224
225    // wait for all commit notifications to reach the coordinator
226    committed.await();
227    // make sure we got the all the nodes and no more
228    Mockito.verify(coordinator, times(expected.size())).memberFinishedBarrier(
229      Mockito.eq(operationName), Mockito.anyString(), Mockito.eq(memberData));
230
231    assertEquals("Incorrect number of members returnd data", expected.size(),
232      dataFromMembers.size());
233    for (byte[] result : dataFromMembers) {
234      assertArrayEquals("Incorrect data from member", memberData, result);
235    }
236
237    controller.resetMembers(p);
238
239    // verify all behavior
240    verifyZooKeeperClean(operationName, watcher, controller.getZkProcedureUtil());
241    verifyCohort(member, cohortControllers.size(), operationName, data);
242    verifyCoordinator(operationName, coordinator, expected);
243  }
244
245  // TODO Broken by composition.
246  // @Test
247  // public void testCoordinatorControllerHandlesEarlyPrepareNodes() throws Exception {
248  // runEarlyPrepareNodes(startCoordinatorFirst, "testEarlyPreparenodes", new byte[] { 1, 2, 3 },
249  // "cohort1", "cohort2");
250  // runEarlyPrepareNodes(startCohortFirst, "testEarlyPreparenodes", new byte[] { 1, 2, 3 },
251  // "cohort1", "cohort2");
252  // }
253
254  public void runEarlyPrepareNodes(StartControllers controllers, String operationName, byte[] data,
255    String... cohort) throws Exception {
256    ZKWatcher watcher = UTIL.getZooKeeperWatcher();
257    List<String> expected = Lists.newArrayList(cohort);
258
259    final Subprocedure sub = Mockito.mock(Subprocedure.class);
260    Mockito.when(sub.getName()).thenReturn(operationName);
261
262    final CountDownLatch prepared = new CountDownLatch(expected.size());
263    final CountDownLatch committed = new CountDownLatch(expected.size());
264    ArrayList<byte[]> dataFromMembers = new ArrayList<>();
265
266    // mock out coordinator so we can keep track of zk progress
267    ProcedureCoordinator coordinator =
268      setupMockCoordinator(operationName, prepared, committed, dataFromMembers);
269
270    ProcedureMember member = Mockito.mock(ProcedureMember.class);
271    Procedure p = Mockito.mock(Procedure.class);
272    Mockito.when(p.getName()).thenReturn(operationName);
273
274    Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> pair = controllers.start(watcher,
275      operationName, coordinator, CONTROLLER_NODE_NAME, member, expected);
276    ZKProcedureCoordinator controller = pair.getFirst();
277    List<ZKProcedureMemberRpcs> cohortControllers = pair.getSecond();
278
279    // post 1/2 the prepare nodes early
280    for (int i = 0; i < cohortControllers.size() / 2; i++) {
281      cohortControllers.get(i).sendMemberAcquired(sub);
282    }
283
284    // start the operation
285    controller.sendGlobalBarrierAcquire(p, data, expected);
286
287    // post the prepare node for each expected node
288    for (ZKProcedureMemberRpcs cc : cohortControllers) {
289      cc.sendMemberAcquired(sub);
290    }
291
292    // wait for all the notifications to reach the coordinator
293    prepared.await();
294    // make sure we got the all the nodes and no more
295    Mockito.verify(coordinator, times(expected.size()))
296      .memberAcquiredBarrier(Mockito.eq(operationName), Mockito.anyString());
297
298    // kick off the commit phase
299    controller.sendGlobalBarrierReached(p, expected);
300
301    // post the committed node for each expected node
302    for (ZKProcedureMemberRpcs cc : cohortControllers) {
303      cc.sendMemberCompleted(sub, memberData);
304    }
305
306    // wait for all commit notifications to reach the coordiantor
307    committed.await();
308    // make sure we got the all the nodes and no more
309    Mockito.verify(coordinator, times(expected.size())).memberFinishedBarrier(
310      Mockito.eq(operationName), Mockito.anyString(), Mockito.eq(memberData));
311
312    controller.resetMembers(p);
313
314    // verify all behavior
315    verifyZooKeeperClean(operationName, watcher, controller.getZkProcedureUtil());
316    verifyCohort(member, cohortControllers.size(), operationName, data);
317    verifyCoordinator(operationName, coordinator, expected);
318  }
319
320  /**
321   * n * @return a mock {@link ProcedureCoordinator} that just counts down the prepared and
322   * committed latch for called to the respective method
323   */
324  private ProcedureCoordinator setupMockCoordinator(String operationName,
325    final CountDownLatch prepared, final CountDownLatch committed,
326    final ArrayList<byte[]> dataFromMembers) {
327    ProcedureCoordinator coordinator = Mockito.mock(ProcedureCoordinator.class);
328    Mockito.doAnswer(new Answer<Void>() {
329      @Override
330      public Void answer(InvocationOnMock invocation) throws Throwable {
331        prepared.countDown();
332        return null;
333      }
334    }).when(coordinator).memberAcquiredBarrier(Mockito.eq(operationName), Mockito.anyString());
335    Mockito.doAnswer(new Answer<Void>() {
336      @Override
337      public Void answer(InvocationOnMock invocation) throws Throwable {
338        dataFromMembers.add(memberData);
339        committed.countDown();
340        return null;
341      }
342    }).when(coordinator).memberFinishedBarrier(Mockito.eq(operationName), Mockito.anyString(),
343      Mockito.eq(memberData));
344    return coordinator;
345  }
346
347  /**
348   * Verify that the prepare, commit and abort nodes for the operation are removed from zookeeper
349   */
350  private void verifyZooKeeperClean(String operationName, ZKWatcher watcher,
351    ZKProcedureUtil controller) throws Exception {
352    String prepare = ZKProcedureUtil.getAcquireBarrierNode(controller, operationName);
353    String commit = ZKProcedureUtil.getReachedBarrierNode(controller, operationName);
354    String abort = ZKProcedureUtil.getAbortNode(controller, operationName);
355    assertEquals("Didn't delete prepare node", -1, ZKUtil.checkExists(watcher, prepare));
356    assertEquals("Didn't delete commit node", -1, ZKUtil.checkExists(watcher, commit));
357    assertEquals("Didn't delete abort node", -1, ZKUtil.checkExists(watcher, abort));
358  }
359
360  /**
361   * Verify the cohort controller got called once per expected node to start the operation
362   */
363  private void verifyCohort(ProcedureMember member, int cohortSize, String operationName,
364    byte[] data) {
365    // verify(member, Mockito.times(cohortSize)).submitSubprocedure(Mockito.eq(operationName),
366    // (byte[]) Mockito.argThat(new ArrayEquals(data)));
367    Mockito.verify(member, Mockito.atLeast(cohortSize)).submitSubprocedure(Mockito.any());
368
369  }
370
371  /**
372   * Verify that the coordinator only got called once for each expected node
373   */
374  private void verifyCoordinator(String operationName, ProcedureCoordinator coordinator,
375    List<String> expected) {
376    // verify that we got all the expected nodes
377    for (String node : expected) {
378      verify(coordinator, once).memberAcquiredBarrier(operationName, node);
379      verify(coordinator, once).memberFinishedBarrier(operationName, node, memberData);
380    }
381  }
382
383  /**
384   * Specify how the controllers that should be started (not spy/mockable) for the test.
385   */
386  private abstract class StartControllers {
387    public abstract Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> start(
388      ZKWatcher watcher, String operationName, ProcedureCoordinator coordinator,
389      String controllerName, ProcedureMember member, List<String> cohortNames) throws Exception;
390  }
391
392  private final StartControllers startCoordinatorFirst = new StartControllers() {
393
394    @Override
395    public Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> start(ZKWatcher watcher,
396      String operationName, ProcedureCoordinator coordinator, String controllerName,
397      ProcedureMember member, List<String> expected) throws Exception {
398      // start the controller
399      ZKProcedureCoordinator controller =
400        new ZKProcedureCoordinator(watcher, operationName, CONTROLLER_NODE_NAME);
401      controller.start(coordinator);
402
403      // make a cohort controller for each expected node
404
405      List<ZKProcedureMemberRpcs> cohortControllers = new ArrayList<>();
406      for (String nodeName : expected) {
407        ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs(watcher, operationName);
408        cc.start(nodeName, member);
409        cohortControllers.add(cc);
410      }
411      return new Pair<>(controller, cohortControllers);
412    }
413  };
414
415  /**
416   * Check for the possible race condition where a cohort member starts after the controller and
417   * therefore could miss a new operation
418   */
419  private final StartControllers startCohortFirst = new StartControllers() {
420
421    @Override
422    public Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> start(ZKWatcher watcher,
423      String operationName, ProcedureCoordinator coordinator, String controllerName,
424      ProcedureMember member, List<String> expected) throws Exception {
425
426      // make a cohort controller for each expected node
427      List<ZKProcedureMemberRpcs> cohortControllers = new ArrayList<>();
428      for (String nodeName : expected) {
429        ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs(watcher, operationName);
430        cc.start(nodeName, member);
431        cohortControllers.add(cc);
432      }
433
434      // start the controller
435      ZKProcedureCoordinator controller =
436        new ZKProcedureCoordinator(watcher, operationName, CONTROLLER_NODE_NAME);
437      controller.start(coordinator);
438
439      return new Pair<>(controller, cohortControllers);
440    }
441  };
442}