001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022import static org.mockito.Matchers.any;
023import static org.mockito.Matchers.anyListOf;
024import static org.mockito.Matchers.eq;
025import static org.mockito.Mockito.atMost;
026import static org.mockito.Mockito.never;
027import static org.mockito.Mockito.spy;
028import static org.mockito.Mockito.when;
029
030import java.io.IOException;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.List;
034import java.util.concurrent.CountDownLatch;
035import java.util.concurrent.ThreadPoolExecutor;
036import java.util.concurrent.atomic.AtomicInteger;
037import org.apache.hadoop.hbase.Abortable;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.HBaseTestingUtility;
040import org.apache.hadoop.hbase.errorhandling.ForeignException;
041import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
042import org.apache.hadoop.hbase.errorhandling.TimeoutException;
043import org.apache.hadoop.hbase.procedure.Subprocedure.SubprocedureImpl;
044import org.apache.hadoop.hbase.testclassification.MasterTests;
045import org.apache.hadoop.hbase.testclassification.MediumTests;
046import org.apache.hadoop.hbase.util.Pair;
047import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
048import org.junit.AfterClass;
049import org.junit.BeforeClass;
050import org.junit.ClassRule;
051import org.junit.Test;
052import org.junit.experimental.categories.Category;
053import org.mockito.Mockito;
054import org.mockito.internal.matchers.ArrayEquals;
055import org.mockito.invocation.InvocationOnMock;
056import org.mockito.stubbing.Answer;
057import org.mockito.verification.VerificationMode;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
062
063/**
064 * Cluster-wide testing of a distributed three-phase commit using a 'real' zookeeper cluster
065 */
066@Category({MasterTests.class, MediumTests.class})
067public class TestZKProcedure {
068
069  @ClassRule
070  public static final HBaseClassTestRule CLASS_RULE =
071      HBaseClassTestRule.forClass(TestZKProcedure.class);
072
073  private static final Logger LOG = LoggerFactory.getLogger(TestZKProcedure.class);
074  private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
075  private static final String COORDINATOR_NODE_NAME = "coordinator";
076  private static final long KEEP_ALIVE = 100; // seconds
077  private static final int POOL_SIZE = 1;
078  private static final long TIMEOUT = 10000; // when debugging make this larger for debugging
079  private static final long WAKE_FREQUENCY = 500;
080  private static final String opName = "op";
081  private static final byte[] data = new byte[] { 1, 2 }; // TODO what is this used for?
082  private static final VerificationMode once = Mockito.times(1);
083
084  @BeforeClass
085  public static void setupTest() throws Exception {
086    UTIL.startMiniZKCluster();
087  }
088
089  @AfterClass
090  public static void cleanupTest() throws Exception {
091    UTIL.shutdownMiniZKCluster();
092  }
093
094  private static ZKWatcher newZooKeeperWatcher() throws IOException {
095    return new ZKWatcher(UTIL.getConfiguration(), "testing utility", new Abortable() {
096      @Override
097      public void abort(String why, Throwable e) {
098        throw new RuntimeException(
099            "Unexpected abort in distributed three phase commit test:" + why, e);
100      }
101
102      @Override
103      public boolean isAborted() {
104        return false;
105      }
106    });
107  }
108
109  @Test
110  public void testEmptyMemberSet() throws Exception {
111    runCommit();
112  }
113
114  @Test
115  public void testSingleMember() throws Exception {
116    runCommit("one");
117  }
118
119  @Test
120  public void testMultipleMembers() throws Exception {
121    runCommit("one", "two", "three", "four" );
122  }
123
124  private void runCommit(String... members) throws Exception {
125    // make sure we just have an empty list
126    if (members == null) {
127      members = new String[0];
128    }
129    List<String> expected = Arrays.asList(members);
130
131    // setup the constants
132    ZKWatcher coordZkw = newZooKeeperWatcher();
133    String opDescription = "coordination test - " + members.length + " cohort members";
134
135    // start running the controller
136    ZKProcedureCoordinator coordinatorComms = new ZKProcedureCoordinator(
137        coordZkw, opDescription, COORDINATOR_NODE_NAME);
138    ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, POOL_SIZE, KEEP_ALIVE);
139    ProcedureCoordinator coordinator = new ProcedureCoordinator(coordinatorComms, pool) {
140      @Override
141      public Procedure createProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs,
142          List<String> expectedMembers) {
143        return Mockito.spy(super.createProcedure(fed, procName, procArgs, expectedMembers));
144      }
145    };
146
147    // build and start members
148    // NOTE: There is a single subprocedure builder for all members here.
149    SubprocedureFactory subprocFactory = Mockito.mock(SubprocedureFactory.class);
150    List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> procMembers = new ArrayList<>(members.length);
151    // start each member
152    for (String member : members) {
153      ZKWatcher watcher = newZooKeeperWatcher();
154      ZKProcedureMemberRpcs comms = new ZKProcedureMemberRpcs(watcher, opDescription);
155      ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(member, 1, KEEP_ALIVE);
156      ProcedureMember procMember = new ProcedureMember(comms, pool2, subprocFactory);
157      procMembers.add(new Pair<>(procMember, comms));
158      comms.start(member, procMember);
159    }
160
161    // setup mock member subprocedures
162    final List<Subprocedure> subprocs = new ArrayList<>();
163    for (int i = 0; i < procMembers.size(); i++) {
164      ForeignExceptionDispatcher cohortMonitor = new ForeignExceptionDispatcher();
165      Subprocedure commit = Mockito
166      .spy(new SubprocedureImpl(procMembers.get(i).getFirst(), opName, cohortMonitor,
167          WAKE_FREQUENCY, TIMEOUT));
168      subprocs.add(commit);
169    }
170
171    // link subprocedure to buildNewOperation invocation.
172    final AtomicInteger i = new AtomicInteger(0); // NOTE: would be racy if not an AtomicInteger
173    Mockito.when(subprocFactory.buildSubprocedure(Mockito.eq(opName),
174        (byte[]) Mockito.argThat(new ArrayEquals(data)))).thenAnswer(
175      new Answer<Subprocedure>() {
176        @Override
177        public Subprocedure answer(InvocationOnMock invocation) throws Throwable {
178          int index = i.getAndIncrement();
179          LOG.debug("Task size:" + subprocs.size() + ", getting:" + index);
180          Subprocedure commit = subprocs.get(index);
181          return commit;
182        }
183      });
184
185    // setup spying on the coordinator
186//    Procedure proc = Mockito.spy(procBuilder.createProcedure(coordinator, opName, data, expected));
187//    Mockito.when(procBuilder.build(coordinator, opName, data, expected)).thenReturn(proc);
188
189    // start running the operation
190    Procedure task = coordinator.startProcedure(new ForeignExceptionDispatcher(), opName, data, expected);
191//    assertEquals("Didn't mock coordinator task", proc, task);
192
193    // verify all things ran as expected
194//    waitAndVerifyProc(proc, once, once, never(), once, false);
195    waitAndVerifyProc(task, once, once, never(), once, false);
196    verifyCohortSuccessful(expected, subprocFactory, subprocs, once, once, never(), once, false);
197
198    // close all the things
199    closeAll(coordinator, coordinatorComms, procMembers);
200  }
201
202  /**
203   * Test a distributed commit with multiple cohort members, where one of the cohort members has a
204   * timeout exception during the prepare stage.
205   */
206  @Test
207  public void testMultiCohortWithMemberTimeoutDuringPrepare() throws Exception {
208    String opDescription = "error injection coordination";
209    String[] cohortMembers = new String[] { "one", "two", "three" };
210    List<String> expected = Lists.newArrayList(cohortMembers);
211    // error constants
212    final int memberErrorIndex = 2;
213    final CountDownLatch coordinatorReceivedErrorLatch = new CountDownLatch(1);
214
215    // start running the coordinator and its controller
216    ZKWatcher coordinatorWatcher = newZooKeeperWatcher();
217    ZKProcedureCoordinator coordinatorController = new ZKProcedureCoordinator(
218        coordinatorWatcher, opDescription, COORDINATOR_NODE_NAME);
219    ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, POOL_SIZE, KEEP_ALIVE);
220    ProcedureCoordinator coordinator = spy(new ProcedureCoordinator(coordinatorController, pool));
221
222    // start a member for each node
223    SubprocedureFactory subprocFactory = Mockito.mock(SubprocedureFactory.class);
224    List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> members = new ArrayList<>(expected.size());
225    for (String member : expected) {
226      ZKWatcher watcher = newZooKeeperWatcher();
227      ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(watcher, opDescription);
228      ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(member, 1, KEEP_ALIVE);
229      ProcedureMember mem = new ProcedureMember(controller, pool2, subprocFactory);
230      members.add(new Pair<>(mem, controller));
231      controller.start(member, mem);
232    }
233
234    // setup mock subprocedures
235    final List<Subprocedure> cohortTasks = new ArrayList<>();
236    final int[] elem = new int[1];
237    for (int i = 0; i < members.size(); i++) {
238      ForeignExceptionDispatcher cohortMonitor = new ForeignExceptionDispatcher();
239      final ProcedureMember comms = members.get(i).getFirst();
240      Subprocedure commit = Mockito
241      .spy(new SubprocedureImpl(comms, opName, cohortMonitor, WAKE_FREQUENCY, TIMEOUT));
242      // This nasty bit has one of the impls throw a TimeoutException
243      Mockito.doAnswer(new Answer<Void>() {
244        @Override
245        public Void answer(InvocationOnMock invocation) throws Throwable {
246          int index = elem[0];
247          if (index == memberErrorIndex) {
248            LOG.debug("Sending error to coordinator");
249            ForeignException remoteCause = new ForeignException("TIMER",
250                new TimeoutException("subprocTimeout" , 1, 2, 0));
251            Subprocedure r = ((Subprocedure) invocation.getMock());
252            LOG.error("Remote commit failure, not propagating error:" + remoteCause);
253            comms.receiveAbortProcedure(r.getName(), remoteCause);
254            assertTrue(r.isComplete());
255            // don't complete the error phase until the coordinator has gotten the error
256            // notification (which ensures that we never progress past prepare)
257            try {
258              Procedure.waitForLatch(coordinatorReceivedErrorLatch, new ForeignExceptionDispatcher(),
259                  WAKE_FREQUENCY, "coordinator received error");
260            } catch (InterruptedException e) {
261              LOG.debug("Wait for latch interrupted, done:" + (coordinatorReceivedErrorLatch.getCount() == 0));
262              // reset the interrupt status on the thread
263              Thread.currentThread().interrupt();
264            }
265          }
266          elem[0] = ++index;
267          return null;
268        }
269      }).when(commit).acquireBarrier();
270      cohortTasks.add(commit);
271    }
272
273    // pass out a task per member
274    final AtomicInteger taskIndex = new AtomicInteger();
275    Mockito.when(
276      subprocFactory.buildSubprocedure(Mockito.eq(opName),
277        (byte[]) Mockito.argThat(new ArrayEquals(data)))).thenAnswer(
278      new Answer<Subprocedure>() {
279        @Override
280        public Subprocedure answer(InvocationOnMock invocation) throws Throwable {
281          int index = taskIndex.getAndIncrement();
282          Subprocedure commit = cohortTasks.get(index);
283          return commit;
284        }
285      });
286
287    // setup spying on the coordinator
288    ForeignExceptionDispatcher coordinatorTaskErrorMonitor = Mockito
289        .spy(new ForeignExceptionDispatcher());
290    Procedure coordinatorTask = Mockito.spy(new Procedure(coordinator,
291        coordinatorTaskErrorMonitor, WAKE_FREQUENCY, TIMEOUT,
292        opName, data, expected));
293    when(coordinator.createProcedure(any(), eq(opName), eq(data), anyListOf(String.class)))
294      .thenReturn(coordinatorTask);
295    // count down the error latch when we get the remote error
296    Mockito.doAnswer(new Answer<Void>() {
297      @Override
298      public Void answer(InvocationOnMock invocation) throws Throwable {
299        // pass on the error to the master
300        invocation.callRealMethod();
301        // then count down the got error latch
302        coordinatorReceivedErrorLatch.countDown();
303        return null;
304      }
305    }).when(coordinatorTask).receive(Mockito.any());
306
307    // ----------------------------
308    // start running the operation
309    // ----------------------------
310
311    Procedure task = coordinator.startProcedure(coordinatorTaskErrorMonitor, opName, data, expected);
312    assertEquals("Didn't mock coordinator task", coordinatorTask, task);
313
314    // wait for the task to complete
315    try {
316      task.waitForCompleted();
317    } catch (ForeignException fe) {
318      // this may get caught or may not
319    }
320
321    // -------------
322    // verification
323    // -------------
324
325    // always expect prepared, never committed, and possible to have cleanup and finish (racy since
326    // error case)
327    waitAndVerifyProc(coordinatorTask, once, never(), once, atMost(1), true);
328    verifyCohortSuccessful(expected, subprocFactory, cohortTasks, once, never(), once,
329      once, true);
330
331    // close all the open things
332    closeAll(coordinator, coordinatorController, members);
333  }
334
335  /**
336   * Wait for the coordinator task to complete, and verify all the mocks
337   * @param proc the {@link Procedure} to execute
338   * @param prepare the mock prepare
339   * @param commit the mock commit
340   * @param cleanup the mock cleanup
341   * @param finish the mock finish
342   * @param opHasError the operation error state
343   * @throws Exception on unexpected failure
344   */
345  private void waitAndVerifyProc(Procedure proc, VerificationMode prepare,
346      VerificationMode commit, VerificationMode cleanup, VerificationMode finish, boolean opHasError)
347      throws Exception {
348    boolean caughtError = false;
349    try {
350      proc.waitForCompleted();
351    } catch (ForeignException fe) {
352      caughtError = true;
353    }
354    // make sure that the task called all the expected phases
355    Mockito.verify(proc, prepare).sendGlobalBarrierStart();
356    Mockito.verify(proc, commit).sendGlobalBarrierReached();
357    Mockito.verify(proc, finish).sendGlobalBarrierComplete();
358    assertEquals("Operation error state was unexpected", opHasError, proc.getErrorMonitor()
359        .hasException());
360    assertEquals("Operation error state was unexpected", opHasError, caughtError);
361
362  }
363
364  /**
365   * Wait for the coordinator task to complete, and verify all the mocks
366   * @param op the {@link Subprocedure} to use
367   * @param prepare the mock prepare
368   * @param commit the mock commit
369   * @param cleanup the mock cleanup
370   * @param finish the mock finish
371   * @param opHasError the operation error state
372   * @throws Exception on unexpected failure
373   */
374  private void waitAndVerifySubproc(Subprocedure op, VerificationMode prepare,
375      VerificationMode commit, VerificationMode cleanup, VerificationMode finish, boolean opHasError)
376      throws Exception {
377    boolean caughtError = false;
378    try {
379      op.waitForLocallyCompleted();
380    } catch (ForeignException fe) {
381      caughtError = true;
382    }
383    // make sure that the task called all the expected phases
384    Mockito.verify(op, prepare).acquireBarrier();
385    Mockito.verify(op, commit).insideBarrier();
386    // We cannot guarantee that cleanup has run so we don't check it.
387
388    assertEquals("Operation error state was unexpected", opHasError, op.getErrorCheckable()
389        .hasException());
390    assertEquals("Operation error state was unexpected", opHasError, caughtError);
391
392  }
393
394  private void verifyCohortSuccessful(List<String> cohortNames,
395      SubprocedureFactory subprocFactory, Iterable<Subprocedure> cohortTasks,
396      VerificationMode prepare, VerificationMode commit, VerificationMode cleanup,
397      VerificationMode finish, boolean opHasError) throws Exception {
398
399    // make sure we build the correct number of cohort members
400    Mockito.verify(subprocFactory, Mockito.times(cohortNames.size())).buildSubprocedure(
401      Mockito.eq(opName), (byte[]) Mockito.argThat(new ArrayEquals(data)));
402    // verify that we ran each of the operations cleanly
403    int j = 0;
404    for (Subprocedure op : cohortTasks) {
405      LOG.debug("Checking mock:" + (j++));
406      waitAndVerifySubproc(op, prepare, commit, cleanup, finish, opHasError);
407    }
408  }
409
410  private void closeAll(
411      ProcedureCoordinator coordinator,
412      ZKProcedureCoordinator coordinatorController,
413      List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> cohort)
414      throws IOException {
415    // make sure we close all the resources
416    for (Pair<ProcedureMember, ZKProcedureMemberRpcs> member : cohort) {
417      member.getFirst().close();
418      member.getSecond().close();
419    }
420    coordinator.close();
421    coordinatorController.close();
422  }
423}