001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022import static org.mockito.Matchers.any;
023import static org.mockito.Matchers.anyListOf;
024import static org.mockito.Matchers.eq;
025import static org.mockito.Mockito.atMost;
026import static org.mockito.Mockito.never;
027import static org.mockito.Mockito.spy;
028import static org.mockito.Mockito.when;
029
030import java.io.IOException;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.List;
034import java.util.concurrent.CountDownLatch;
035import java.util.concurrent.ThreadPoolExecutor;
036import java.util.concurrent.atomic.AtomicInteger;
037import org.apache.hadoop.hbase.Abortable;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.HBaseTestingUtility;
040import org.apache.hadoop.hbase.errorhandling.ForeignException;
041import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
042import org.apache.hadoop.hbase.errorhandling.TimeoutException;
043import org.apache.hadoop.hbase.procedure.Subprocedure.SubprocedureImpl;
044import org.apache.hadoop.hbase.testclassification.MasterTests;
045import org.apache.hadoop.hbase.testclassification.MediumTests;
046import org.apache.hadoop.hbase.util.Pair;
047import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
048import org.junit.AfterClass;
049import org.junit.BeforeClass;
050import org.junit.ClassRule;
051import org.junit.Test;
052import org.junit.experimental.categories.Category;
053import org.mockito.Mockito;
054import org.mockito.internal.matchers.ArrayEquals;
055import org.mockito.invocation.InvocationOnMock;
056import org.mockito.stubbing.Answer;
057import org.mockito.verification.VerificationMode;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
062
063/**
064 * Cluster-wide testing of a distributed three-phase commit using a 'real' zookeeper cluster
065 */
066@Category({ MasterTests.class, MediumTests.class })
067public class TestZKProcedure {
068
069  @ClassRule
070  public static final HBaseClassTestRule CLASS_RULE =
071    HBaseClassTestRule.forClass(TestZKProcedure.class);
072
073  private static final Logger LOG = LoggerFactory.getLogger(TestZKProcedure.class);
074  private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
075  private static final String COORDINATOR_NODE_NAME = "coordinator";
076  private static final long KEEP_ALIVE = 100; // seconds
077  private static final int POOL_SIZE = 1;
078  private static final long TIMEOUT = 10000; // when debugging make this larger for debugging
079  private static final long WAKE_FREQUENCY = 500;
080  private static final String opName = "op";
081  private static final byte[] data = new byte[] { 1, 2 }; // TODO what is this used for?
082  private static final VerificationMode once = Mockito.times(1);
083
084  @BeforeClass
085  public static void setupTest() throws Exception {
086    UTIL.startMiniZKCluster();
087  }
088
089  @AfterClass
090  public static void cleanupTest() throws Exception {
091    UTIL.shutdownMiniZKCluster();
092  }
093
094  private static ZKWatcher newZooKeeperWatcher() throws IOException {
095    return new ZKWatcher(UTIL.getConfiguration(), "testing utility", new Abortable() {
096      @Override
097      public void abort(String why, Throwable e) {
098        throw new RuntimeException("Unexpected abort in distributed three phase commit test:" + why,
099          e);
100      }
101
102      @Override
103      public boolean isAborted() {
104        return false;
105      }
106    });
107  }
108
109  @Test
110  public void testEmptyMemberSet() throws Exception {
111    runCommit();
112  }
113
114  @Test
115  public void testSingleMember() throws Exception {
116    runCommit("one");
117  }
118
119  @Test
120  public void testMultipleMembers() throws Exception {
121    runCommit("one", "two", "three", "four");
122  }
123
124  private void runCommit(String... members) throws Exception {
125    // make sure we just have an empty list
126    if (members == null) {
127      members = new String[0];
128    }
129    List<String> expected = Arrays.asList(members);
130
131    // setup the constants
132    ZKWatcher coordZkw = newZooKeeperWatcher();
133    String opDescription = "coordination test - " + members.length + " cohort members";
134
135    // start running the controller
136    ZKProcedureCoordinator coordinatorComms =
137      new ZKProcedureCoordinator(coordZkw, opDescription, COORDINATOR_NODE_NAME);
138    ThreadPoolExecutor pool =
139      ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, POOL_SIZE, KEEP_ALIVE);
140    ProcedureCoordinator coordinator = new ProcedureCoordinator(coordinatorComms, pool) {
141      @Override
142      public Procedure createProcedure(ForeignExceptionDispatcher fed, String procName,
143        byte[] procArgs, List<String> expectedMembers) {
144        return Mockito.spy(super.createProcedure(fed, procName, procArgs, expectedMembers));
145      }
146    };
147
148    // build and start members
149    // NOTE: There is a single subprocedure builder for all members here.
150    SubprocedureFactory subprocFactory = Mockito.mock(SubprocedureFactory.class);
151    List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> procMembers =
152      new ArrayList<>(members.length);
153    // start each member
154    for (String member : members) {
155      ZKWatcher watcher = newZooKeeperWatcher();
156      ZKProcedureMemberRpcs comms = new ZKProcedureMemberRpcs(watcher, opDescription);
157      ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(member, 1, KEEP_ALIVE);
158      ProcedureMember procMember = new ProcedureMember(comms, pool2, subprocFactory);
159      procMembers.add(new Pair<>(procMember, comms));
160      comms.start(member, procMember);
161    }
162
163    // setup mock member subprocedures
164    final List<Subprocedure> subprocs = new ArrayList<>();
165    for (int i = 0; i < procMembers.size(); i++) {
166      ForeignExceptionDispatcher cohortMonitor = new ForeignExceptionDispatcher();
167      Subprocedure commit = Mockito.spy(new SubprocedureImpl(procMembers.get(i).getFirst(), opName,
168        cohortMonitor, WAKE_FREQUENCY, TIMEOUT));
169      subprocs.add(commit);
170    }
171
172    // link subprocedure to buildNewOperation invocation.
173    final AtomicInteger i = new AtomicInteger(0); // NOTE: would be racy if not an AtomicInteger
174    Mockito.when(subprocFactory.buildSubprocedure(Mockito.eq(opName),
175      (byte[]) Mockito.argThat(new ArrayEquals(data)))).thenAnswer(new Answer<Subprocedure>() {
176        @Override
177        public Subprocedure answer(InvocationOnMock invocation) throws Throwable {
178          int index = i.getAndIncrement();
179          LOG.debug("Task size:" + subprocs.size() + ", getting:" + index);
180          Subprocedure commit = subprocs.get(index);
181          return commit;
182        }
183      });
184
185    // setup spying on the coordinator
186    // Procedure proc = Mockito.spy(procBuilder.createProcedure(coordinator, opName, data,
187    // expected));
188    // Mockito.when(procBuilder.build(coordinator, opName, data, expected)).thenReturn(proc);
189
190    // start running the operation
191    Procedure task =
192      coordinator.startProcedure(new ForeignExceptionDispatcher(), opName, data, expected);
193    // assertEquals("Didn't mock coordinator task", proc, task);
194
195    // verify all things ran as expected
196    // waitAndVerifyProc(proc, once, once, never(), once, false);
197    waitAndVerifyProc(task, once, once, never(), once, false);
198    verifyCohortSuccessful(expected, subprocFactory, subprocs, once, once, never(), once, false);
199
200    // close all the things
201    closeAll(coordinator, coordinatorComms, procMembers);
202  }
203
204  /**
205   * Test a distributed commit with multiple cohort members, where one of the cohort members has a
206   * timeout exception during the prepare stage.
207   */
208  @Test
209  public void testMultiCohortWithMemberTimeoutDuringPrepare() throws Exception {
210    String opDescription = "error injection coordination";
211    String[] cohortMembers = new String[] { "one", "two", "three" };
212    List<String> expected = Lists.newArrayList(cohortMembers);
213    // error constants
214    final int memberErrorIndex = 2;
215    final CountDownLatch coordinatorReceivedErrorLatch = new CountDownLatch(1);
216
217    // start running the coordinator and its controller
218    ZKWatcher coordinatorWatcher = newZooKeeperWatcher();
219    ZKProcedureCoordinator coordinatorController =
220      new ZKProcedureCoordinator(coordinatorWatcher, opDescription, COORDINATOR_NODE_NAME);
221    ThreadPoolExecutor pool =
222      ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, POOL_SIZE, KEEP_ALIVE);
223    ProcedureCoordinator coordinator = spy(new ProcedureCoordinator(coordinatorController, pool));
224
225    // start a member for each node
226    SubprocedureFactory subprocFactory = Mockito.mock(SubprocedureFactory.class);
227    List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> members = new ArrayList<>(expected.size());
228    for (String member : expected) {
229      ZKWatcher watcher = newZooKeeperWatcher();
230      ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(watcher, opDescription);
231      ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(member, 1, KEEP_ALIVE);
232      ProcedureMember mem = new ProcedureMember(controller, pool2, subprocFactory);
233      members.add(new Pair<>(mem, controller));
234      controller.start(member, mem);
235    }
236
237    // setup mock subprocedures
238    final List<Subprocedure> cohortTasks = new ArrayList<>();
239    final int[] elem = new int[1];
240    for (int i = 0; i < members.size(); i++) {
241      ForeignExceptionDispatcher cohortMonitor = new ForeignExceptionDispatcher();
242      final ProcedureMember comms = members.get(i).getFirst();
243      Subprocedure commit =
244        Mockito.spy(new SubprocedureImpl(comms, opName, cohortMonitor, WAKE_FREQUENCY, TIMEOUT));
245      // This nasty bit has one of the impls throw a TimeoutException
246      Mockito.doAnswer(new Answer<Void>() {
247        @Override
248        public Void answer(InvocationOnMock invocation) throws Throwable {
249          int index = elem[0];
250          if (index == memberErrorIndex) {
251            LOG.debug("Sending error to coordinator");
252            ForeignException remoteCause =
253              new ForeignException("TIMER", new TimeoutException("subprocTimeout", 1, 2, 0));
254            Subprocedure r = ((Subprocedure) invocation.getMock());
255            LOG.error("Remote commit failure, not propagating error:" + remoteCause);
256            comms.receiveAbortProcedure(r.getName(), remoteCause);
257            assertTrue(r.isComplete());
258            // don't complete the error phase until the coordinator has gotten the error
259            // notification (which ensures that we never progress past prepare)
260            try {
261              Procedure.waitForLatch(coordinatorReceivedErrorLatch,
262                new ForeignExceptionDispatcher(), WAKE_FREQUENCY, "coordinator received error");
263            } catch (InterruptedException e) {
264              LOG.debug("Wait for latch interrupted, done:"
265                + (coordinatorReceivedErrorLatch.getCount() == 0));
266              // reset the interrupt status on the thread
267              Thread.currentThread().interrupt();
268            }
269          }
270          elem[0] = ++index;
271          return null;
272        }
273      }).when(commit).acquireBarrier();
274      cohortTasks.add(commit);
275    }
276
277    // pass out a task per member
278    final AtomicInteger taskIndex = new AtomicInteger();
279    Mockito.when(subprocFactory.buildSubprocedure(Mockito.eq(opName),
280      (byte[]) Mockito.argThat(new ArrayEquals(data)))).thenAnswer(new Answer<Subprocedure>() {
281        @Override
282        public Subprocedure answer(InvocationOnMock invocation) throws Throwable {
283          int index = taskIndex.getAndIncrement();
284          Subprocedure commit = cohortTasks.get(index);
285          return commit;
286        }
287      });
288
289    // setup spying on the coordinator
290    ForeignExceptionDispatcher coordinatorTaskErrorMonitor =
291      Mockito.spy(new ForeignExceptionDispatcher());
292    Procedure coordinatorTask = Mockito.spy(new Procedure(coordinator, coordinatorTaskErrorMonitor,
293      WAKE_FREQUENCY, TIMEOUT, opName, data, expected));
294    when(coordinator.createProcedure(any(), eq(opName), eq(data), anyListOf(String.class)))
295      .thenReturn(coordinatorTask);
296    // count down the error latch when we get the remote error
297    Mockito.doAnswer(new Answer<Void>() {
298      @Override
299      public Void answer(InvocationOnMock invocation) throws Throwable {
300        // pass on the error to the master
301        invocation.callRealMethod();
302        // then count down the got error latch
303        coordinatorReceivedErrorLatch.countDown();
304        return null;
305      }
306    }).when(coordinatorTask).receive(Mockito.any());
307
308    // ----------------------------
309    // start running the operation
310    // ----------------------------
311
312    Procedure task =
313      coordinator.startProcedure(coordinatorTaskErrorMonitor, opName, data, expected);
314    assertEquals("Didn't mock coordinator task", coordinatorTask, task);
315
316    // wait for the task to complete
317    try {
318      task.waitForCompleted();
319    } catch (ForeignException fe) {
320      // this may get caught or may not
321    }
322
323    // -------------
324    // verification
325    // -------------
326
327    // always expect prepared, never committed, and possible to have cleanup and finish (racy since
328    // error case)
329    waitAndVerifyProc(coordinatorTask, once, never(), once, atMost(1), true);
330    verifyCohortSuccessful(expected, subprocFactory, cohortTasks, once, never(), once, once, true);
331
332    // close all the open things
333    closeAll(coordinator, coordinatorController, members);
334  }
335
336  /**
337   * Wait for the coordinator task to complete, and verify all the mocks
338   * @param proc       the {@link Procedure} to execute
339   * @param prepare    the mock prepare
340   * @param commit     the mock commit
341   * @param cleanup    the mock cleanup
342   * @param finish     the mock finish
343   * @param opHasError the operation error state
344   * @throws Exception on unexpected failure
345   */
346  private void waitAndVerifyProc(Procedure proc, VerificationMode prepare, VerificationMode commit,
347    VerificationMode cleanup, VerificationMode finish, boolean opHasError) throws Exception {
348    boolean caughtError = false;
349    try {
350      proc.waitForCompleted();
351    } catch (ForeignException fe) {
352      caughtError = true;
353    }
354    // make sure that the task called all the expected phases
355    Mockito.verify(proc, prepare).sendGlobalBarrierStart();
356    Mockito.verify(proc, commit).sendGlobalBarrierReached();
357    Mockito.verify(proc, finish).sendGlobalBarrierComplete();
358    assertEquals("Operation error state was unexpected", opHasError,
359      proc.getErrorMonitor().hasException());
360    assertEquals("Operation error state was unexpected", opHasError, caughtError);
361
362  }
363
364  /**
365   * Wait for the coordinator task to complete, and verify all the mocks
366   * @param op         the {@link Subprocedure} to use
367   * @param prepare    the mock prepare
368   * @param commit     the mock commit
369   * @param cleanup    the mock cleanup
370   * @param finish     the mock finish
371   * @param opHasError the operation error state
372   * @throws Exception on unexpected failure
373   */
374  private void waitAndVerifySubproc(Subprocedure op, VerificationMode prepare,
375    VerificationMode commit, VerificationMode cleanup, VerificationMode finish, boolean opHasError)
376    throws Exception {
377    boolean caughtError = false;
378    try {
379      op.waitForLocallyCompleted();
380    } catch (ForeignException fe) {
381      caughtError = true;
382    }
383    // make sure that the task called all the expected phases
384    Mockito.verify(op, prepare).acquireBarrier();
385    Mockito.verify(op, commit).insideBarrier();
386    // We cannot guarantee that cleanup has run so we don't check it.
387
388    assertEquals("Operation error state was unexpected", opHasError,
389      op.getErrorCheckable().hasException());
390    assertEquals("Operation error state was unexpected", opHasError, caughtError);
391
392  }
393
394  private void verifyCohortSuccessful(List<String> cohortNames, SubprocedureFactory subprocFactory,
395    Iterable<Subprocedure> cohortTasks, VerificationMode prepare, VerificationMode commit,
396    VerificationMode cleanup, VerificationMode finish, boolean opHasError) throws Exception {
397
398    // make sure we build the correct number of cohort members
399    Mockito.verify(subprocFactory, Mockito.times(cohortNames.size()))
400      .buildSubprocedure(Mockito.eq(opName), (byte[]) Mockito.argThat(new ArrayEquals(data)));
401    // verify that we ran each of the operations cleanly
402    int j = 0;
403    for (Subprocedure op : cohortTasks) {
404      LOG.debug("Checking mock:" + (j++));
405      waitAndVerifySubproc(op, prepare, commit, cleanup, finish, opHasError);
406    }
407  }
408
409  private void closeAll(ProcedureCoordinator coordinator,
410    ZKProcedureCoordinator coordinatorController,
411    List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> cohort) throws IOException {
412    // make sure we close all the resources
413    for (Pair<ProcedureMember, ZKProcedureMemberRpcs> member : cohort) {
414      member.getFirst().close();
415      member.getSecond().close();
416    }
417    coordinator.close();
418    coordinatorController.close();
419  }
420}