001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure;
019
020import static org.junit.jupiter.api.Assertions.assertArrayEquals;
021import static org.junit.jupiter.api.Assertions.assertEquals;
022import static org.mockito.Mockito.never;
023import static org.mockito.Mockito.spy;
024import static org.mockito.Mockito.times;
025import static org.mockito.Mockito.verify;
026
027import java.util.ArrayList;
028import java.util.List;
029import java.util.concurrent.CountDownLatch;
030import org.apache.hadoop.hbase.HBaseTestingUtil;
031import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
032import org.apache.hadoop.hbase.testclassification.MasterTests;
033import org.apache.hadoop.hbase.testclassification.MediumTests;
034import org.apache.hadoop.hbase.util.Bytes;
035import org.apache.hadoop.hbase.util.Pair;
036import org.apache.hadoop.hbase.zookeeper.ZKUtil;
037import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
038import org.junit.jupiter.api.AfterAll;
039import org.junit.jupiter.api.BeforeAll;
040import org.junit.jupiter.api.Tag;
041import org.junit.jupiter.api.Test;
042import org.mockito.Mockito;
043import org.mockito.invocation.InvocationOnMock;
044import org.mockito.stubbing.Answer;
045import org.mockito.verification.VerificationMode;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
050
051import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
052
053/**
054 * Test zookeeper-based, procedure controllers
055 */
056@Tag(MasterTests.TAG)
057@Tag(MediumTests.TAG)
058public class TestZKProcedureControllers {
059
060  private static final Logger LOG = LoggerFactory.getLogger(TestZKProcedureControllers.class);
061  private final static HBaseTestingUtil UTIL = new HBaseTestingUtil();
062  private static final String COHORT_NODE_NAME = "expected";
063  private static final String CONTROLLER_NODE_NAME = "controller";
064  private static final VerificationMode once = Mockito.times(1);
065
066  private final byte[] memberData = Bytes.toBytes("data from member");
067
068  @BeforeAll
069  public static void setupTest() throws Exception {
070    UTIL.startMiniZKCluster();
071  }
072
073  @AfterAll
074  public static void cleanupTest() throws Exception {
075    UTIL.shutdownMiniZKCluster();
076  }
077
078  /**
079   * Smaller test to just test the actuation on the cohort member
080   * @throws Exception on failure
081   */
082  @Test
083  public void testSimpleZKCohortMemberController() throws Exception {
084    ZKWatcher watcher = UTIL.getZooKeeperWatcher();
085    final String operationName = "instanceTest";
086
087    final Subprocedure sub = Mockito.mock(Subprocedure.class);
088    Mockito.when(sub.getName()).thenReturn(operationName);
089
090    final byte[] data = new byte[] { 1, 2, 3 };
091    final CountDownLatch prepared = new CountDownLatch(1);
092    final CountDownLatch committed = new CountDownLatch(1);
093
094    final ForeignExceptionDispatcher monitor = spy(new ForeignExceptionDispatcher());
095    final ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(watcher, "testSimple");
096
097    // mock out cohort member callbacks
098    final ProcedureMember member = Mockito.mock(ProcedureMember.class);
099    Mockito.doReturn(sub).when(member).createSubprocedure(operationName, data);
100    Mockito.doAnswer(new Answer<Void>() {
101      @Override
102      public Void answer(InvocationOnMock invocation) throws Throwable {
103        controller.sendMemberAcquired(sub);
104        prepared.countDown();
105        return null;
106      }
107    }).when(member).submitSubprocedure(sub);
108    Mockito.doAnswer(new Answer<Void>() {
109      @Override
110      public Void answer(InvocationOnMock invocation) throws Throwable {
111        controller.sendMemberCompleted(sub, memberData);
112        committed.countDown();
113        return null;
114      }
115    }).when(member).receivedReachedGlobalBarrier(operationName);
116
117    // start running the listener
118    controller.start(COHORT_NODE_NAME, member);
119
120    // set a prepare node from a 'coordinator'
121    String prepare =
122      ZKProcedureUtil.getAcquireBarrierNode(controller.getZkController(), operationName);
123    ZKUtil.createSetData(watcher, prepare, ProtobufUtil.prependPBMagic(data));
124    // wait for the operation to be prepared
125    prepared.await();
126
127    // create the commit node so we update the operation to enter the commit phase
128    String commit =
129      ZKProcedureUtil.getReachedBarrierNode(controller.getZkController(), operationName);
130    LOG.debug("Found prepared, posting commit node:" + commit);
131    ZKUtil.createAndFailSilent(watcher, commit);
132    LOG.debug("Commit node:" + commit + ", exists:" + ZKUtil.checkExists(watcher, commit));
133    committed.await();
134
135    verify(monitor, never()).receive(Mockito.any());
136    // XXX: broken due to composition.
137    // verify(member, never()).getManager().controllerConnectionFailure(Mockito.anyString(),
138    // Mockito.any());
139    // cleanup after the test
140    ZKUtil.deleteNodeRecursively(watcher, controller.getZkController().getBaseZnode());
141    assertEquals(-1, ZKUtil.checkExists(watcher, prepare), "Didn't delete prepare node");
142    assertEquals(-1, ZKUtil.checkExists(watcher, commit), "Didn't delete commit node");
143  }
144
145  @Test
146  public void testZKCoordinatorControllerWithNoCohort() throws Exception {
147    final String operationName = "no cohort controller test";
148    final byte[] data = new byte[] { 1, 2, 3 };
149
150    runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data);
151    runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data);
152  }
153
154  @Test
155  public void testZKCoordinatorControllerWithSingleMemberCohort() throws Exception {
156    final String operationName = "single member controller test";
157    final byte[] data = new byte[] { 1, 2, 3 };
158
159    runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data, "cohort");
160    runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data, "cohort");
161  }
162
163  @Test
164  public void testZKCoordinatorControllerMultipleCohort() throws Exception {
165    final String operationName = "multi member controller test";
166    final byte[] data = new byte[] { 1, 2, 3 };
167
168    runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data, "cohort",
169      "cohort2", "cohort3");
170    runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data, "cohort",
171      "cohort2", "cohort3");
172  }
173
174  private void runMockCommitWithOrchestratedControllers(StartControllers controllers,
175    String operationName, byte[] data, String... cohort) throws Exception {
176    ZKWatcher watcher = UTIL.getZooKeeperWatcher();
177    List<String> expected = Lists.newArrayList(cohort);
178
179    final Subprocedure sub = Mockito.mock(Subprocedure.class);
180    Mockito.when(sub.getName()).thenReturn(operationName);
181
182    CountDownLatch prepared = new CountDownLatch(expected.size());
183    CountDownLatch committed = new CountDownLatch(expected.size());
184    ArrayList<byte[]> dataFromMembers = new ArrayList<>();
185
186    // mock out coordinator so we can keep track of zk progress
187    ProcedureCoordinator coordinator =
188      setupMockCoordinator(operationName, prepared, committed, dataFromMembers);
189
190    ProcedureMember member = Mockito.mock(ProcedureMember.class);
191
192    Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> pair = controllers.start(watcher,
193      operationName, coordinator, CONTROLLER_NODE_NAME, member, expected);
194    ZKProcedureCoordinator controller = pair.getFirst();
195    List<ZKProcedureMemberRpcs> cohortControllers = pair.getSecond();
196    // start the operation
197    Procedure p = Mockito.mock(Procedure.class);
198    Mockito.when(p.getName()).thenReturn(operationName);
199
200    controller.sendGlobalBarrierAcquire(p, data, expected);
201
202    // post the prepare node for each expected node
203    for (ZKProcedureMemberRpcs cc : cohortControllers) {
204      cc.sendMemberAcquired(sub);
205    }
206
207    // wait for all the notifications to reach the coordinator
208    prepared.await();
209    // make sure we got the all the nodes and no more
210    Mockito.verify(coordinator, times(expected.size()))
211      .memberAcquiredBarrier(Mockito.eq(operationName), Mockito.anyString());
212
213    // kick off the commit phase
214    controller.sendGlobalBarrierReached(p, expected);
215
216    // post the committed node for each expected node
217    for (ZKProcedureMemberRpcs cc : cohortControllers) {
218      cc.sendMemberCompleted(sub, memberData);
219    }
220
221    // wait for all commit notifications to reach the coordinator
222    committed.await();
223    // make sure we got the all the nodes and no more
224    Mockito.verify(coordinator, times(expected.size())).memberFinishedBarrier(
225      Mockito.eq(operationName), Mockito.anyString(), Mockito.eq(memberData));
226
227    assertEquals(expected.size(), dataFromMembers.size(),
228      "Incorrect number of members returnd data");
229    for (byte[] result : dataFromMembers) {
230      assertArrayEquals(memberData, result, "Incorrect data from member");
231    }
232
233    controller.resetMembers(p);
234
235    // verify all behavior
236    verifyZooKeeperClean(operationName, watcher, controller.getZkProcedureUtil());
237    verifyCohort(member, cohortControllers.size(), operationName, data);
238    verifyCoordinator(operationName, coordinator, expected);
239  }
240
241  // TODO Broken by composition.
242  // @Test
243  // public void testCoordinatorControllerHandlesEarlyPrepareNodes() throws Exception {
244  // runEarlyPrepareNodes(startCoordinatorFirst, "testEarlyPreparenodes", new byte[] { 1, 2, 3 },
245  // "cohort1", "cohort2");
246  // runEarlyPrepareNodes(startCohortFirst, "testEarlyPreparenodes", new byte[] { 1, 2, 3 },
247  // "cohort1", "cohort2");
248  // }
249
250  public void runEarlyPrepareNodes(StartControllers controllers, String operationName, byte[] data,
251    String... cohort) throws Exception {
252    ZKWatcher watcher = UTIL.getZooKeeperWatcher();
253    List<String> expected = Lists.newArrayList(cohort);
254
255    final Subprocedure sub = Mockito.mock(Subprocedure.class);
256    Mockito.when(sub.getName()).thenReturn(operationName);
257
258    final CountDownLatch prepared = new CountDownLatch(expected.size());
259    final CountDownLatch committed = new CountDownLatch(expected.size());
260    ArrayList<byte[]> dataFromMembers = new ArrayList<>();
261
262    // mock out coordinator so we can keep track of zk progress
263    ProcedureCoordinator coordinator =
264      setupMockCoordinator(operationName, prepared, committed, dataFromMembers);
265
266    ProcedureMember member = Mockito.mock(ProcedureMember.class);
267    Procedure p = Mockito.mock(Procedure.class);
268    Mockito.when(p.getName()).thenReturn(operationName);
269
270    Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> pair = controllers.start(watcher,
271      operationName, coordinator, CONTROLLER_NODE_NAME, member, expected);
272    ZKProcedureCoordinator controller = pair.getFirst();
273    List<ZKProcedureMemberRpcs> cohortControllers = pair.getSecond();
274
275    // post 1/2 the prepare nodes early
276    for (int i = 0; i < cohortControllers.size() / 2; i++) {
277      cohortControllers.get(i).sendMemberAcquired(sub);
278    }
279
280    // start the operation
281    controller.sendGlobalBarrierAcquire(p, data, expected);
282
283    // post the prepare node for each expected node
284    for (ZKProcedureMemberRpcs cc : cohortControllers) {
285      cc.sendMemberAcquired(sub);
286    }
287
288    // wait for all the notifications to reach the coordinator
289    prepared.await();
290    // make sure we got the all the nodes and no more
291    Mockito.verify(coordinator, times(expected.size()))
292      .memberAcquiredBarrier(Mockito.eq(operationName), Mockito.anyString());
293
294    // kick off the commit phase
295    controller.sendGlobalBarrierReached(p, expected);
296
297    // post the committed node for each expected node
298    for (ZKProcedureMemberRpcs cc : cohortControllers) {
299      cc.sendMemberCompleted(sub, memberData);
300    }
301
302    // wait for all commit notifications to reach the coordiantor
303    committed.await();
304    // make sure we got the all the nodes and no more
305    Mockito.verify(coordinator, times(expected.size())).memberFinishedBarrier(
306      Mockito.eq(operationName), Mockito.anyString(), Mockito.eq(memberData));
307
308    controller.resetMembers(p);
309
310    // verify all behavior
311    verifyZooKeeperClean(operationName, watcher, controller.getZkProcedureUtil());
312    verifyCohort(member, cohortControllers.size(), operationName, data);
313    verifyCoordinator(operationName, coordinator, expected);
314  }
315
316  /**
317   * @return a mock {@link ProcedureCoordinator} that just counts down the prepared and committed
318   *         latch for called to the respective method
319   */
320  private ProcedureCoordinator setupMockCoordinator(String operationName,
321    final CountDownLatch prepared, final CountDownLatch committed,
322    final ArrayList<byte[]> dataFromMembers) {
323    ProcedureCoordinator coordinator = Mockito.mock(ProcedureCoordinator.class);
324    Mockito.doAnswer(new Answer<Void>() {
325      @Override
326      public Void answer(InvocationOnMock invocation) throws Throwable {
327        prepared.countDown();
328        return null;
329      }
330    }).when(coordinator).memberAcquiredBarrier(Mockito.eq(operationName), Mockito.anyString());
331    Mockito.doAnswer(new Answer<Void>() {
332      @Override
333      public Void answer(InvocationOnMock invocation) throws Throwable {
334        dataFromMembers.add(memberData);
335        committed.countDown();
336        return null;
337      }
338    }).when(coordinator).memberFinishedBarrier(Mockito.eq(operationName), Mockito.anyString(),
339      Mockito.eq(memberData));
340    return coordinator;
341  }
342
343  /**
344   * Verify that the prepare, commit and abort nodes for the operation are removed from zookeeper
345   */
346  private void verifyZooKeeperClean(String operationName, ZKWatcher watcher,
347    ZKProcedureUtil controller) throws Exception {
348    String prepare = ZKProcedureUtil.getAcquireBarrierNode(controller, operationName);
349    String commit = ZKProcedureUtil.getReachedBarrierNode(controller, operationName);
350    String abort = ZKProcedureUtil.getAbortNode(controller, operationName);
351    assertEquals(-1, ZKUtil.checkExists(watcher, prepare), "Didn't delete prepare node");
352    assertEquals(-1, ZKUtil.checkExists(watcher, commit), "Didn't delete commit node");
353    assertEquals(-1, ZKUtil.checkExists(watcher, abort), "Didn't delete abort node");
354  }
355
356  /**
357   * Verify the cohort controller got called once per expected node to start the operation
358   */
359  private void verifyCohort(ProcedureMember member, int cohortSize, String operationName,
360    byte[] data) {
361    // verify(member, Mockito.times(cohortSize)).submitSubprocedure(Mockito.eq(operationName),
362    // (byte[]) Mockito.argThat(new ArrayEquals(data)));
363    Mockito.verify(member, Mockito.atLeast(cohortSize)).submitSubprocedure(Mockito.any());
364
365  }
366
367  /**
368   * Verify that the coordinator only got called once for each expected node
369   */
370  private void verifyCoordinator(String operationName, ProcedureCoordinator coordinator,
371    List<String> expected) {
372    // verify that we got all the expected nodes
373    for (String node : expected) {
374      verify(coordinator, once).memberAcquiredBarrier(operationName, node);
375      verify(coordinator, once).memberFinishedBarrier(operationName, node, memberData);
376    }
377  }
378
379  /**
380   * Specify how the controllers that should be started (not spy/mockable) for the test.
381   */
382  private abstract class StartControllers {
383    public abstract Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> start(
384      ZKWatcher watcher, String operationName, ProcedureCoordinator coordinator,
385      String controllerName, ProcedureMember member, List<String> cohortNames) throws Exception;
386  }
387
388  private final StartControllers startCoordinatorFirst = new StartControllers() {
389
390    @Override
391    public Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> start(ZKWatcher watcher,
392      String operationName, ProcedureCoordinator coordinator, String controllerName,
393      ProcedureMember member, List<String> expected) throws Exception {
394      // start the controller
395      ZKProcedureCoordinator controller =
396        new ZKProcedureCoordinator(watcher, operationName, CONTROLLER_NODE_NAME);
397      controller.start(coordinator);
398
399      // make a cohort controller for each expected node
400
401      List<ZKProcedureMemberRpcs> cohortControllers = new ArrayList<>();
402      for (String nodeName : expected) {
403        ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs(watcher, operationName);
404        cc.start(nodeName, member);
405        cohortControllers.add(cc);
406      }
407      return new Pair<>(controller, cohortControllers);
408    }
409  };
410
411  /**
412   * Check for the possible race condition where a cohort member starts after the controller and
413   * therefore could miss a new operation
414   */
415  private final StartControllers startCohortFirst = new StartControllers() {
416
417    @Override
418    public Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> start(ZKWatcher watcher,
419      String operationName, ProcedureCoordinator coordinator, String controllerName,
420      ProcedureMember member, List<String> expected) throws Exception {
421
422      // make a cohort controller for each expected node
423      List<ZKProcedureMemberRpcs> cohortControllers = new ArrayList<>();
424      for (String nodeName : expected) {
425        ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs(watcher, operationName);
426        cc.start(nodeName, member);
427        cohortControllers.add(cc);
428      }
429
430      // start the controller
431      ZKProcedureCoordinator controller =
432        new ZKProcedureCoordinator(watcher, operationName, CONTROLLER_NODE_NAME);
433      controller.start(coordinator);
434
435      return new Pair<>(controller, cohortControllers);
436    }
437  };
438}