001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.mockito.Mockito.never;
023import static org.mockito.Mockito.spy;
024import static org.mockito.Mockito.times;
025import static org.mockito.Mockito.verify;
026
027import java.util.ArrayList;
028import java.util.List;
029import java.util.concurrent.CountDownLatch;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseTestingUtil;
032import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
033import org.apache.hadoop.hbase.testclassification.MasterTests;
034import org.apache.hadoop.hbase.testclassification.MediumTests;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.hbase.util.Pair;
037import org.apache.hadoop.hbase.zookeeper.ZKUtil;
038import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
039import org.junit.AfterClass;
040import org.junit.BeforeClass;
041import org.junit.ClassRule;
042import org.junit.Test;
043import org.junit.experimental.categories.Category;
044import org.mockito.Mockito;
045import org.mockito.invocation.InvocationOnMock;
046import org.mockito.stubbing.Answer;
047import org.mockito.verification.VerificationMode;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
052
053import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
054
055/**
056 * Test zookeeper-based, procedure controllers
057 */
058@Category({ MasterTests.class, MediumTests.class })
059public class TestZKProcedureControllers {
060
061  @ClassRule
062  public static final HBaseClassTestRule CLASS_RULE =
063    HBaseClassTestRule.forClass(TestZKProcedureControllers.class);
064
065  private static final Logger LOG = LoggerFactory.getLogger(TestZKProcedureControllers.class);
066  private final static HBaseTestingUtil UTIL = new HBaseTestingUtil();
067  private static final String COHORT_NODE_NAME = "expected";
068  private static final String CONTROLLER_NODE_NAME = "controller";
069  private static final VerificationMode once = Mockito.times(1);
070
071  private final byte[] memberData = Bytes.toBytes("data from member");
072
073  @BeforeClass
074  public static void setupTest() throws Exception {
075    UTIL.startMiniZKCluster();
076  }
077
078  @AfterClass
079  public static void cleanupTest() throws Exception {
080    UTIL.shutdownMiniZKCluster();
081  }
082
083  /**
084   * Smaller test to just test the actuation on the cohort member
085   * @throws Exception on failure
086   */
087  @Test
088  public void testSimpleZKCohortMemberController() throws Exception {
089    ZKWatcher watcher = UTIL.getZooKeeperWatcher();
090    final String operationName = "instanceTest";
091
092    final Subprocedure sub = Mockito.mock(Subprocedure.class);
093    Mockito.when(sub.getName()).thenReturn(operationName);
094
095    final byte[] data = new byte[] { 1, 2, 3 };
096    final CountDownLatch prepared = new CountDownLatch(1);
097    final CountDownLatch committed = new CountDownLatch(1);
098
099    final ForeignExceptionDispatcher monitor = spy(new ForeignExceptionDispatcher());
100    final ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(watcher, "testSimple");
101
102    // mock out cohort member callbacks
103    final ProcedureMember member = Mockito.mock(ProcedureMember.class);
104    Mockito.doReturn(sub).when(member).createSubprocedure(operationName, data);
105    Mockito.doAnswer(new Answer<Void>() {
106      @Override
107      public Void answer(InvocationOnMock invocation) throws Throwable {
108        controller.sendMemberAcquired(sub);
109        prepared.countDown();
110        return null;
111      }
112    }).when(member).submitSubprocedure(sub);
113    Mockito.doAnswer(new Answer<Void>() {
114      @Override
115      public Void answer(InvocationOnMock invocation) throws Throwable {
116        controller.sendMemberCompleted(sub, memberData);
117        committed.countDown();
118        return null;
119      }
120    }).when(member).receivedReachedGlobalBarrier(operationName);
121
122    // start running the listener
123    controller.start(COHORT_NODE_NAME, member);
124
125    // set a prepare node from a 'coordinator'
126    String prepare =
127      ZKProcedureUtil.getAcquireBarrierNode(controller.getZkController(), operationName);
128    ZKUtil.createSetData(watcher, prepare, ProtobufUtil.prependPBMagic(data));
129    // wait for the operation to be prepared
130    prepared.await();
131
132    // create the commit node so we update the operation to enter the commit phase
133    String commit =
134      ZKProcedureUtil.getReachedBarrierNode(controller.getZkController(), operationName);
135    LOG.debug("Found prepared, posting commit node:" + commit);
136    ZKUtil.createAndFailSilent(watcher, commit);
137    LOG.debug("Commit node:" + commit + ", exists:" + ZKUtil.checkExists(watcher, commit));
138    committed.await();
139
140    verify(monitor, never()).receive(Mockito.any());
141    // XXX: broken due to composition.
142    // verify(member, never()).getManager().controllerConnectionFailure(Mockito.anyString(),
143    // Mockito.any());
144    // cleanup after the test
145    ZKUtil.deleteNodeRecursively(watcher, controller.getZkController().getBaseZnode());
146    assertEquals("Didn't delete prepare node", -1, ZKUtil.checkExists(watcher, prepare));
147    assertEquals("Didn't delete commit node", -1, ZKUtil.checkExists(watcher, commit));
148  }
149
150  @Test
151  public void testZKCoordinatorControllerWithNoCohort() throws Exception {
152    final String operationName = "no cohort controller test";
153    final byte[] data = new byte[] { 1, 2, 3 };
154
155    runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data);
156    runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data);
157  }
158
159  @Test
160  public void testZKCoordinatorControllerWithSingleMemberCohort() throws Exception {
161    final String operationName = "single member controller test";
162    final byte[] data = new byte[] { 1, 2, 3 };
163
164    runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data, "cohort");
165    runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data, "cohort");
166  }
167
168  @Test
169  public void testZKCoordinatorControllerMultipleCohort() throws Exception {
170    final String operationName = "multi member controller test";
171    final byte[] data = new byte[] { 1, 2, 3 };
172
173    runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data, "cohort",
174      "cohort2", "cohort3");
175    runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data, "cohort",
176      "cohort2", "cohort3");
177  }
178
179  private void runMockCommitWithOrchestratedControllers(StartControllers controllers,
180    String operationName, byte[] data, String... cohort) throws Exception {
181    ZKWatcher watcher = UTIL.getZooKeeperWatcher();
182    List<String> expected = Lists.newArrayList(cohort);
183
184    final Subprocedure sub = Mockito.mock(Subprocedure.class);
185    Mockito.when(sub.getName()).thenReturn(operationName);
186
187    CountDownLatch prepared = new CountDownLatch(expected.size());
188    CountDownLatch committed = new CountDownLatch(expected.size());
189    ArrayList<byte[]> dataFromMembers = new ArrayList<>();
190
191    // mock out coordinator so we can keep track of zk progress
192    ProcedureCoordinator coordinator =
193      setupMockCoordinator(operationName, prepared, committed, dataFromMembers);
194
195    ProcedureMember member = Mockito.mock(ProcedureMember.class);
196
197    Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> pair = controllers.start(watcher,
198      operationName, coordinator, CONTROLLER_NODE_NAME, member, expected);
199    ZKProcedureCoordinator controller = pair.getFirst();
200    List<ZKProcedureMemberRpcs> cohortControllers = pair.getSecond();
201    // start the operation
202    Procedure p = Mockito.mock(Procedure.class);
203    Mockito.when(p.getName()).thenReturn(operationName);
204
205    controller.sendGlobalBarrierAcquire(p, data, expected);
206
207    // post the prepare node for each expected node
208    for (ZKProcedureMemberRpcs cc : cohortControllers) {
209      cc.sendMemberAcquired(sub);
210    }
211
212    // wait for all the notifications to reach the coordinator
213    prepared.await();
214    // make sure we got the all the nodes and no more
215    Mockito.verify(coordinator, times(expected.size()))
216      .memberAcquiredBarrier(Mockito.eq(operationName), Mockito.anyString());
217
218    // kick off the commit phase
219    controller.sendGlobalBarrierReached(p, expected);
220
221    // post the committed node for each expected node
222    for (ZKProcedureMemberRpcs cc : cohortControllers) {
223      cc.sendMemberCompleted(sub, memberData);
224    }
225
226    // wait for all commit notifications to reach the coordinator
227    committed.await();
228    // make sure we got the all the nodes and no more
229    Mockito.verify(coordinator, times(expected.size())).memberFinishedBarrier(
230      Mockito.eq(operationName), Mockito.anyString(), Mockito.eq(memberData));
231
232    assertEquals("Incorrect number of members returnd data", expected.size(),
233      dataFromMembers.size());
234    for (byte[] result : dataFromMembers) {
235      assertArrayEquals("Incorrect data from member", memberData, result);
236    }
237
238    controller.resetMembers(p);
239
240    // verify all behavior
241    verifyZooKeeperClean(operationName, watcher, controller.getZkProcedureUtil());
242    verifyCohort(member, cohortControllers.size(), operationName, data);
243    verifyCoordinator(operationName, coordinator, expected);
244  }
245
246  // TODO Broken by composition.
247  // @Test
248  // public void testCoordinatorControllerHandlesEarlyPrepareNodes() throws Exception {
249  // runEarlyPrepareNodes(startCoordinatorFirst, "testEarlyPreparenodes", new byte[] { 1, 2, 3 },
250  // "cohort1", "cohort2");
251  // runEarlyPrepareNodes(startCohortFirst, "testEarlyPreparenodes", new byte[] { 1, 2, 3 },
252  // "cohort1", "cohort2");
253  // }
254
255  public void runEarlyPrepareNodes(StartControllers controllers, String operationName, byte[] data,
256    String... cohort) throws Exception {
257    ZKWatcher watcher = UTIL.getZooKeeperWatcher();
258    List<String> expected = Lists.newArrayList(cohort);
259
260    final Subprocedure sub = Mockito.mock(Subprocedure.class);
261    Mockito.when(sub.getName()).thenReturn(operationName);
262
263    final CountDownLatch prepared = new CountDownLatch(expected.size());
264    final CountDownLatch committed = new CountDownLatch(expected.size());
265    ArrayList<byte[]> dataFromMembers = new ArrayList<>();
266
267    // mock out coordinator so we can keep track of zk progress
268    ProcedureCoordinator coordinator =
269      setupMockCoordinator(operationName, prepared, committed, dataFromMembers);
270
271    ProcedureMember member = Mockito.mock(ProcedureMember.class);
272    Procedure p = Mockito.mock(Procedure.class);
273    Mockito.when(p.getName()).thenReturn(operationName);
274
275    Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> pair = controllers.start(watcher,
276      operationName, coordinator, CONTROLLER_NODE_NAME, member, expected);
277    ZKProcedureCoordinator controller = pair.getFirst();
278    List<ZKProcedureMemberRpcs> cohortControllers = pair.getSecond();
279
280    // post 1/2 the prepare nodes early
281    for (int i = 0; i < cohortControllers.size() / 2; i++) {
282      cohortControllers.get(i).sendMemberAcquired(sub);
283    }
284
285    // start the operation
286    controller.sendGlobalBarrierAcquire(p, data, expected);
287
288    // post the prepare node for each expected node
289    for (ZKProcedureMemberRpcs cc : cohortControllers) {
290      cc.sendMemberAcquired(sub);
291    }
292
293    // wait for all the notifications to reach the coordinator
294    prepared.await();
295    // make sure we got the all the nodes and no more
296    Mockito.verify(coordinator, times(expected.size()))
297      .memberAcquiredBarrier(Mockito.eq(operationName), Mockito.anyString());
298
299    // kick off the commit phase
300    controller.sendGlobalBarrierReached(p, expected);
301
302    // post the committed node for each expected node
303    for (ZKProcedureMemberRpcs cc : cohortControllers) {
304      cc.sendMemberCompleted(sub, memberData);
305    }
306
307    // wait for all commit notifications to reach the coordiantor
308    committed.await();
309    // make sure we got the all the nodes and no more
310    Mockito.verify(coordinator, times(expected.size())).memberFinishedBarrier(
311      Mockito.eq(operationName), Mockito.anyString(), Mockito.eq(memberData));
312
313    controller.resetMembers(p);
314
315    // verify all behavior
316    verifyZooKeeperClean(operationName, watcher, controller.getZkProcedureUtil());
317    verifyCohort(member, cohortControllers.size(), operationName, data);
318    verifyCoordinator(operationName, coordinator, expected);
319  }
320
321  /**
322   * n * @return a mock {@link ProcedureCoordinator} that just counts down the prepared and
323   * committed latch for called to the respective method
324   */
325  private ProcedureCoordinator setupMockCoordinator(String operationName,
326    final CountDownLatch prepared, final CountDownLatch committed,
327    final ArrayList<byte[]> dataFromMembers) {
328    ProcedureCoordinator coordinator = Mockito.mock(ProcedureCoordinator.class);
329    Mockito.doAnswer(new Answer<Void>() {
330      @Override
331      public Void answer(InvocationOnMock invocation) throws Throwable {
332        prepared.countDown();
333        return null;
334      }
335    }).when(coordinator).memberAcquiredBarrier(Mockito.eq(operationName), Mockito.anyString());
336    Mockito.doAnswer(new Answer<Void>() {
337      @Override
338      public Void answer(InvocationOnMock invocation) throws Throwable {
339        dataFromMembers.add(memberData);
340        committed.countDown();
341        return null;
342      }
343    }).when(coordinator).memberFinishedBarrier(Mockito.eq(operationName), Mockito.anyString(),
344      Mockito.eq(memberData));
345    return coordinator;
346  }
347
348  /**
349   * Verify that the prepare, commit and abort nodes for the operation are removed from zookeeper
350   */
351  private void verifyZooKeeperClean(String operationName, ZKWatcher watcher,
352    ZKProcedureUtil controller) throws Exception {
353    String prepare = ZKProcedureUtil.getAcquireBarrierNode(controller, operationName);
354    String commit = ZKProcedureUtil.getReachedBarrierNode(controller, operationName);
355    String abort = ZKProcedureUtil.getAbortNode(controller, operationName);
356    assertEquals("Didn't delete prepare node", -1, ZKUtil.checkExists(watcher, prepare));
357    assertEquals("Didn't delete commit node", -1, ZKUtil.checkExists(watcher, commit));
358    assertEquals("Didn't delete abort node", -1, ZKUtil.checkExists(watcher, abort));
359  }
360
361  /**
362   * Verify the cohort controller got called once per expected node to start the operation
363   */
364  private void verifyCohort(ProcedureMember member, int cohortSize, String operationName,
365    byte[] data) {
366    // verify(member, Mockito.times(cohortSize)).submitSubprocedure(Mockito.eq(operationName),
367    // (byte[]) Mockito.argThat(new ArrayEquals(data)));
368    Mockito.verify(member, Mockito.atLeast(cohortSize)).submitSubprocedure(Mockito.any());
369
370  }
371
372  /**
373   * Verify that the coordinator only got called once for each expected node
374   */
375  private void verifyCoordinator(String operationName, ProcedureCoordinator coordinator,
376    List<String> expected) {
377    // verify that we got all the expected nodes
378    for (String node : expected) {
379      verify(coordinator, once).memberAcquiredBarrier(operationName, node);
380      verify(coordinator, once).memberFinishedBarrier(operationName, node, memberData);
381    }
382  }
383
384  /**
385   * Specify how the controllers that should be started (not spy/mockable) for the test.
386   */
387  private abstract class StartControllers {
388    public abstract Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> start(
389      ZKWatcher watcher, String operationName, ProcedureCoordinator coordinator,
390      String controllerName, ProcedureMember member, List<String> cohortNames) throws Exception;
391  }
392
393  private final StartControllers startCoordinatorFirst = new StartControllers() {
394
395    @Override
396    public Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> start(ZKWatcher watcher,
397      String operationName, ProcedureCoordinator coordinator, String controllerName,
398      ProcedureMember member, List<String> expected) throws Exception {
399      // start the controller
400      ZKProcedureCoordinator controller =
401        new ZKProcedureCoordinator(watcher, operationName, CONTROLLER_NODE_NAME);
402      controller.start(coordinator);
403
404      // make a cohort controller for each expected node
405
406      List<ZKProcedureMemberRpcs> cohortControllers = new ArrayList<>();
407      for (String nodeName : expected) {
408        ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs(watcher, operationName);
409        cc.start(nodeName, member);
410        cohortControllers.add(cc);
411      }
412      return new Pair<>(controller, cohortControllers);
413    }
414  };
415
416  /**
417   * Check for the possible race condition where a cohort member starts after the controller and
418   * therefore could miss a new operation
419   */
420  private final StartControllers startCohortFirst = new StartControllers() {
421
422    @Override
423    public Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> start(ZKWatcher watcher,
424      String operationName, ProcedureCoordinator coordinator, String controllerName,
425      ProcedureMember member, List<String> expected) throws Exception {
426
427      // make a cohort controller for each expected node
428      List<ZKProcedureMemberRpcs> cohortControllers = new ArrayList<>();
429      for (String nodeName : expected) {
430        ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs(watcher, operationName);
431        cc.start(nodeName, member);
432        cohortControllers.add(cc);
433      }
434
435      // start the controller
436      ZKProcedureCoordinator controller =
437        new ZKProcedureCoordinator(watcher, operationName, CONTROLLER_NODE_NAME);
438      controller.start(coordinator);
439
440      return new Pair<>(controller, cohortControllers);
441    }
442  };
443}