001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertTrue;
022import static org.mockito.ArgumentMatchers.any;
023import static org.mockito.ArgumentMatchers.anyList;
024import static org.mockito.ArgumentMatchers.eq;
025import static org.mockito.Mockito.atMost;
026import static org.mockito.Mockito.never;
027import static org.mockito.Mockito.spy;
028import static org.mockito.Mockito.when;
029
030import java.io.IOException;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.List;
034import java.util.concurrent.CountDownLatch;
035import java.util.concurrent.ThreadPoolExecutor;
036import java.util.concurrent.atomic.AtomicInteger;
037import org.apache.hadoop.hbase.Abortable;
038import org.apache.hadoop.hbase.HBaseTestingUtil;
039import org.apache.hadoop.hbase.errorhandling.ForeignException;
040import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
041import org.apache.hadoop.hbase.errorhandling.TimeoutException;
042import org.apache.hadoop.hbase.procedure.Subprocedure.SubprocedureImpl;
043import org.apache.hadoop.hbase.testclassification.MasterTests;
044import org.apache.hadoop.hbase.testclassification.MediumTests;
045import org.apache.hadoop.hbase.util.Pair;
046import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
047import org.junit.jupiter.api.AfterAll;
048import org.junit.jupiter.api.BeforeAll;
049import org.junit.jupiter.api.Tag;
050import org.junit.jupiter.api.Test;
051import org.mockito.Mockito;
052import org.mockito.internal.matchers.ArrayEquals;
053import org.mockito.invocation.InvocationOnMock;
054import org.mockito.stubbing.Answer;
055import org.mockito.verification.VerificationMode;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
060
061/**
062 * Cluster-wide testing of a distributed three-phase commit using a 'real' zookeeper cluster
063 */
064@Tag(MasterTests.TAG)
065@Tag(MediumTests.TAG)
066public class TestZKProcedure {
067
068  private static final Logger LOG = LoggerFactory.getLogger(TestZKProcedure.class);
069  private static HBaseTestingUtil UTIL = new HBaseTestingUtil();
070  private static final String COORDINATOR_NODE_NAME = "coordinator";
071  private static final long KEEP_ALIVE = 100; // seconds
072  private static final int POOL_SIZE = 1;
073  private static final long TIMEOUT = 10000; // when debugging make this larger for debugging
074  private static final long WAKE_FREQUENCY = 500;
075  private static final String opName = "op";
076  private static final byte[] data = new byte[] { 1, 2 }; // TODO what is this used for?
077  private static final VerificationMode once = Mockito.times(1);
078
079  @BeforeAll
080  public static void setupTest() throws Exception {
081    UTIL.startMiniZKCluster();
082  }
083
084  @AfterAll
085  public static void cleanupTest() throws Exception {
086    UTIL.shutdownMiniZKCluster();
087  }
088
089  private static ZKWatcher newZooKeeperWatcher() throws IOException {
090    return new ZKWatcher(UTIL.getConfiguration(), "testing utility", new Abortable() {
091      @Override
092      public void abort(String why, Throwable e) {
093        throw new RuntimeException("Unexpected abort in distributed three phase commit test:" + why,
094          e);
095      }
096
097      @Override
098      public boolean isAborted() {
099        return false;
100      }
101    });
102  }
103
104  @Test
105  public void testEmptyMemberSet() throws Exception {
106    runCommit();
107  }
108
109  @Test
110  public void testSingleMember() throws Exception {
111    runCommit("one");
112  }
113
114  @Test
115  public void testMultipleMembers() throws Exception {
116    runCommit("one", "two", "three", "four");
117  }
118
119  private void runCommit(String... members) throws Exception {
120    // make sure we just have an empty list
121    if (members == null) {
122      members = new String[0];
123    }
124    List<String> expected = Arrays.asList(members);
125
126    // setup the constants
127    ZKWatcher coordZkw = newZooKeeperWatcher();
128    String opDescription = "coordination test - " + members.length + " cohort members";
129
130    // start running the controller
131    ZKProcedureCoordinator coordinatorComms =
132      new ZKProcedureCoordinator(coordZkw, opDescription, COORDINATOR_NODE_NAME);
133    ThreadPoolExecutor pool =
134      ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, POOL_SIZE, KEEP_ALIVE);
135    ProcedureCoordinator coordinator = new ProcedureCoordinator(coordinatorComms, pool) {
136      @Override
137      public Procedure createProcedure(ForeignExceptionDispatcher fed, String procName,
138        byte[] procArgs, List<String> expectedMembers) {
139        return Mockito.spy(super.createProcedure(fed, procName, procArgs, expectedMembers));
140      }
141    };
142
143    // build and start members
144    // NOTE: There is a single subprocedure builder for all members here.
145    SubprocedureFactory subprocFactory = Mockito.mock(SubprocedureFactory.class);
146    List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> procMembers =
147      new ArrayList<>(members.length);
148    // start each member
149    for (String member : members) {
150      ZKWatcher watcher = newZooKeeperWatcher();
151      ZKProcedureMemberRpcs comms = new ZKProcedureMemberRpcs(watcher, opDescription);
152      ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(member, 1, KEEP_ALIVE);
153      ProcedureMember procMember = new ProcedureMember(comms, pool2, subprocFactory);
154      procMembers.add(new Pair<>(procMember, comms));
155      comms.start(member, procMember);
156    }
157
158    // setup mock member subprocedures
159    final List<Subprocedure> subprocs = new ArrayList<>();
160    for (int i = 0; i < procMembers.size(); i++) {
161      ForeignExceptionDispatcher cohortMonitor = new ForeignExceptionDispatcher();
162      Subprocedure commit = Mockito.spy(new SubprocedureImpl(procMembers.get(i).getFirst(), opName,
163        cohortMonitor, WAKE_FREQUENCY, TIMEOUT));
164      subprocs.add(commit);
165    }
166
167    // link subprocedure to buildNewOperation invocation.
168    final AtomicInteger i = new AtomicInteger(0); // NOTE: would be racy if not an AtomicInteger
169    Mockito.when(subprocFactory.buildSubprocedure(Mockito.eq(opName),
170      (byte[]) Mockito.argThat(new ArrayEquals(data)))).thenAnswer(new Answer<Subprocedure>() {
171        @Override
172        public Subprocedure answer(InvocationOnMock invocation) throws Throwable {
173          int index = i.getAndIncrement();
174          LOG.debug("Task size:" + subprocs.size() + ", getting:" + index);
175          Subprocedure commit = subprocs.get(index);
176          return commit;
177        }
178      });
179
180    // setup spying on the coordinator
181    // Procedure proc = Mockito.spy(procBuilder.createProcedure(coordinator, opName, data,
182    // expected));
183    // Mockito.when(procBuilder.build(coordinator, opName, data, expected)).thenReturn(proc);
184
185    // start running the operation
186    Procedure task =
187      coordinator.startProcedure(new ForeignExceptionDispatcher(), opName, data, expected);
188    // assertEquals("Didn't mock coordinator task", proc, task);
189
190    // verify all things ran as expected
191    // waitAndVerifyProc(proc, once, once, never(), once, false);
192    waitAndVerifyProc(task, once, once, never(), once, false);
193    verifyCohortSuccessful(expected, subprocFactory, subprocs, once, once, never(), once, false);
194
195    // close all the things
196    closeAll(coordinator, coordinatorComms, procMembers);
197  }
198
199  /**
200   * Test a distributed commit with multiple cohort members, where one of the cohort members has a
201   * timeout exception during the prepare stage.
202   */
203  @Test
204  public void testMultiCohortWithMemberTimeoutDuringPrepare() throws Exception {
205    String opDescription = "error injection coordination";
206    String[] cohortMembers = new String[] { "one", "two", "three" };
207    List<String> expected = Lists.newArrayList(cohortMembers);
208    // error constants
209    final int memberErrorIndex = 2;
210    final CountDownLatch coordinatorReceivedErrorLatch = new CountDownLatch(1);
211
212    // start running the coordinator and its controller
213    ZKWatcher coordinatorWatcher = newZooKeeperWatcher();
214    ZKProcedureCoordinator coordinatorController =
215      new ZKProcedureCoordinator(coordinatorWatcher, opDescription, COORDINATOR_NODE_NAME);
216    ThreadPoolExecutor pool =
217      ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, POOL_SIZE, KEEP_ALIVE);
218    ProcedureCoordinator coordinator = spy(new ProcedureCoordinator(coordinatorController, pool));
219
220    // start a member for each node
221    SubprocedureFactory subprocFactory = Mockito.mock(SubprocedureFactory.class);
222    List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> members = new ArrayList<>(expected.size());
223    for (String member : expected) {
224      ZKWatcher watcher = newZooKeeperWatcher();
225      ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(watcher, opDescription);
226      ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(member, 1, KEEP_ALIVE);
227      ProcedureMember mem = new ProcedureMember(controller, pool2, subprocFactory);
228      members.add(new Pair<>(mem, controller));
229      controller.start(member, mem);
230    }
231
232    // setup mock subprocedures
233    final List<Subprocedure> cohortTasks = new ArrayList<>();
234    final int[] elem = new int[1];
235    for (int i = 0; i < members.size(); i++) {
236      ForeignExceptionDispatcher cohortMonitor = new ForeignExceptionDispatcher();
237      final ProcedureMember comms = members.get(i).getFirst();
238      Subprocedure commit =
239        Mockito.spy(new SubprocedureImpl(comms, opName, cohortMonitor, WAKE_FREQUENCY, TIMEOUT));
240      // This nasty bit has one of the impls throw a TimeoutException
241      Mockito.doAnswer(new Answer<Void>() {
242        @Override
243        public Void answer(InvocationOnMock invocation) throws Throwable {
244          int index = elem[0];
245          if (index == memberErrorIndex) {
246            LOG.debug("Sending error to coordinator");
247            ForeignException remoteCause =
248              new ForeignException("TIMER", new TimeoutException("subprocTimeout", 1, 2, 0));
249            Subprocedure r = ((Subprocedure) invocation.getMock());
250            LOG.error("Remote commit failure, not propagating error:" + remoteCause);
251            comms.receiveAbortProcedure(r.getName(), remoteCause);
252            assertTrue(r.isComplete());
253            // don't complete the error phase until the coordinator has gotten the error
254            // notification (which ensures that we never progress past prepare)
255            try {
256              Procedure.waitForLatch(coordinatorReceivedErrorLatch,
257                new ForeignExceptionDispatcher(), WAKE_FREQUENCY, "coordinator received error");
258            } catch (InterruptedException e) {
259              LOG.debug("Wait for latch interrupted, done:"
260                + (coordinatorReceivedErrorLatch.getCount() == 0));
261              // reset the interrupt status on the thread
262              Thread.currentThread().interrupt();
263            }
264          }
265          elem[0] = ++index;
266          return null;
267        }
268      }).when(commit).acquireBarrier();
269      cohortTasks.add(commit);
270    }
271
272    // pass out a task per member
273    final AtomicInteger taskIndex = new AtomicInteger();
274    Mockito.when(subprocFactory.buildSubprocedure(Mockito.eq(opName),
275      (byte[]) Mockito.argThat(new ArrayEquals(data)))).thenAnswer(new Answer<Subprocedure>() {
276        @Override
277        public Subprocedure answer(InvocationOnMock invocation) throws Throwable {
278          int index = taskIndex.getAndIncrement();
279          Subprocedure commit = cohortTasks.get(index);
280          return commit;
281        }
282      });
283
284    // setup spying on the coordinator
285    ForeignExceptionDispatcher coordinatorTaskErrorMonitor =
286      Mockito.spy(new ForeignExceptionDispatcher());
287    Procedure coordinatorTask = Mockito.spy(new Procedure(coordinator, coordinatorTaskErrorMonitor,
288      WAKE_FREQUENCY, TIMEOUT, opName, data, expected));
289    when(coordinator.createProcedure(any(), eq(opName), eq(data), anyList()))
290      .thenReturn(coordinatorTask);
291    // count down the error latch when we get the remote error
292    Mockito.doAnswer(new Answer<Void>() {
293      @Override
294      public Void answer(InvocationOnMock invocation) throws Throwable {
295        // pass on the error to the master
296        invocation.callRealMethod();
297        // then count down the got error latch
298        coordinatorReceivedErrorLatch.countDown();
299        return null;
300      }
301    }).when(coordinatorTask).receive(Mockito.any());
302
303    // ----------------------------
304    // start running the operation
305    // ----------------------------
306
307    Procedure task =
308      coordinator.startProcedure(coordinatorTaskErrorMonitor, opName, data, expected);
309    assertEquals(coordinatorTask, task, "Didn't mock coordinator task");
310
311    // wait for the task to complete
312    try {
313      task.waitForCompleted();
314    } catch (ForeignException fe) {
315      // this may get caught or may not
316    }
317
318    // -------------
319    // verification
320    // -------------
321
322    // always expect prepared, never committed, and possible to have cleanup and finish (racy since
323    // error case)
324    waitAndVerifyProc(coordinatorTask, once, never(), once, atMost(1), true);
325    verifyCohortSuccessful(expected, subprocFactory, cohortTasks, once, never(), once, once, true);
326
327    // close all the open things
328    closeAll(coordinator, coordinatorController, members);
329  }
330
331  /**
332   * Wait for the coordinator task to complete, and verify all the mocks
333   * @param proc       the {@link Procedure} to execute
334   * @param prepare    the mock prepare
335   * @param commit     the mock commit
336   * @param cleanup    the mock cleanup
337   * @param finish     the mock finish
338   * @param opHasError the operation error state
339   * @throws Exception on unexpected failure
340   */
341  private void waitAndVerifyProc(Procedure proc, VerificationMode prepare, VerificationMode commit,
342    VerificationMode cleanup, VerificationMode finish, boolean opHasError) throws Exception {
343    boolean caughtError = false;
344    try {
345      proc.waitForCompleted();
346    } catch (ForeignException fe) {
347      caughtError = true;
348    }
349    // make sure that the task called all the expected phases
350    Mockito.verify(proc, prepare).sendGlobalBarrierStart();
351    Mockito.verify(proc, commit).sendGlobalBarrierReached();
352    Mockito.verify(proc, finish).sendGlobalBarrierComplete();
353    assertEquals(opHasError, proc.getErrorMonitor().hasException(),
354      "Operation error state was unexpected");
355    assertEquals(opHasError, caughtError, "Operation error state was unexpected");
356
357  }
358
359  /**
360   * Wait for the coordinator task to complete, and verify all the mocks
361   * @param op         the {@link Subprocedure} to use
362   * @param prepare    the mock prepare
363   * @param commit     the mock commit
364   * @param cleanup    the mock cleanup
365   * @param finish     the mock finish
366   * @param opHasError the operation error state
367   * @throws Exception on unexpected failure
368   */
369  private void waitAndVerifySubproc(Subprocedure op, VerificationMode prepare,
370    VerificationMode commit, VerificationMode cleanup, VerificationMode finish, boolean opHasError)
371    throws Exception {
372    boolean caughtError = false;
373    try {
374      op.waitForLocallyCompleted();
375    } catch (ForeignException fe) {
376      caughtError = true;
377    }
378    // make sure that the task called all the expected phases
379    Mockito.verify(op, prepare).acquireBarrier();
380    Mockito.verify(op, commit).insideBarrier();
381    // We cannot guarantee that cleanup has run so we don't check it.
382
383    assertEquals(opHasError, op.getErrorCheckable().hasException(),
384      "Operation error state was unexpected");
385    assertEquals(opHasError, caughtError, "Operation error state was unexpected");
386
387  }
388
389  private void verifyCohortSuccessful(List<String> cohortNames, SubprocedureFactory subprocFactory,
390    Iterable<Subprocedure> cohortTasks, VerificationMode prepare, VerificationMode commit,
391    VerificationMode cleanup, VerificationMode finish, boolean opHasError) throws Exception {
392
393    // make sure we build the correct number of cohort members
394    Mockito.verify(subprocFactory, Mockito.times(cohortNames.size()))
395      .buildSubprocedure(Mockito.eq(opName), (byte[]) Mockito.argThat(new ArrayEquals(data)));
396    // verify that we ran each of the operations cleanly
397    int j = 0;
398    for (Subprocedure op : cohortTasks) {
399      LOG.debug("Checking mock:" + (j++));
400      waitAndVerifySubproc(op, prepare, commit, cleanup, finish, opHasError);
401    }
402  }
403
404  private void closeAll(ProcedureCoordinator coordinator,
405    ZKProcedureCoordinator coordinatorController,
406    List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> cohort) throws IOException {
407    // make sure we close all the resources
408    for (Pair<ProcedureMember, ZKProcedureMemberRpcs> member : cohort) {
409      member.getFirst().close();
410      member.getSecond().close();
411    }
412    coordinator.close();
413    coordinatorController.close();
414  }
415}