001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure;
019
020import static org.mockito.Matchers.any;
021import static org.mockito.Matchers.anyString;
022import static org.mockito.Matchers.eq;
023import static org.mockito.Mockito.doAnswer;
024import static org.mockito.Mockito.doThrow;
025import static org.mockito.Mockito.inOrder;
026import static org.mockito.Mockito.mock;
027import static org.mockito.Mockito.never;
028import static org.mockito.Mockito.reset;
029import static org.mockito.Mockito.spy;
030import static org.mockito.Mockito.verifyZeroInteractions;
031import static org.mockito.Mockito.when;
032
033import java.io.IOException;
034import java.util.concurrent.ThreadPoolExecutor;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.errorhandling.ForeignException;
037import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
038import org.apache.hadoop.hbase.errorhandling.TimeoutException;
039import org.apache.hadoop.hbase.procedure.Subprocedure.SubprocedureImpl;
040import org.apache.hadoop.hbase.testclassification.MasterTests;
041import org.apache.hadoop.hbase.testclassification.SmallTests;
042import org.junit.After;
043import org.junit.ClassRule;
044import org.junit.Test;
045import org.junit.experimental.categories.Category;
046import org.mockito.InOrder;
047import org.mockito.Mockito;
048import org.mockito.invocation.InvocationOnMock;
049import org.mockito.stubbing.Answer;
050
051import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
052
053/**
054 * Test the procedure member, and it's error handling mechanisms.
055 */
056@Category({ MasterTests.class, SmallTests.class })
057public class TestProcedureMember {
058
059  @ClassRule
060  public static final HBaseClassTestRule CLASS_RULE =
061    HBaseClassTestRule.forClass(TestProcedureMember.class);
062
063  private static final long WAKE_FREQUENCY = 100;
064  private static final long TIMEOUT = 100000;
065  private static final long POOL_KEEP_ALIVE = 1;
066
067  private final String op = "some op";
068  private final byte[] data = new byte[0];
069  private final ForeignExceptionDispatcher mockListener =
070    Mockito.spy(new ForeignExceptionDispatcher());
071  private final SubprocedureFactory mockBuilder = mock(SubprocedureFactory.class);
072  private final ProcedureMemberRpcs mockMemberComms = Mockito.mock(ProcedureMemberRpcs.class);
073  private ProcedureMember member;
074  private ForeignExceptionDispatcher dispatcher;
075  Subprocedure spySub;
076
077  /**
078   * Reset all the mock objects
079   */
080  @After
081  public void resetTest() throws IOException {
082    reset(mockListener, mockBuilder, mockMemberComms);
083    Closeables.close(member, true);
084  }
085
086  /**
087   * Build a member using the class level mocks
088   * @return member to use for tests
089   */
090  private ProcedureMember buildCohortMember() {
091    String name = "node";
092    ThreadPoolExecutor pool = ProcedureMember.defaultPool(name, 1, POOL_KEEP_ALIVE);
093    return new ProcedureMember(mockMemberComms, pool, mockBuilder);
094  }
095
096  /**
097   * Setup a procedure member that returns the spied-upon {@link Subprocedure}.
098   */
099  private void buildCohortMemberPair() throws IOException {
100    dispatcher = new ForeignExceptionDispatcher();
101    String name = "node";
102    ThreadPoolExecutor pool = ProcedureMember.defaultPool(name, 1, POOL_KEEP_ALIVE);
103    member = new ProcedureMember(mockMemberComms, pool, mockBuilder);
104    when(mockMemberComms.getMemberName()).thenReturn("membername"); // needed for generating
105                                                                    // exception
106    Subprocedure subproc = new EmptySubprocedure(member, dispatcher);
107    spySub = spy(subproc);
108    when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spySub);
109    addCommitAnswer();
110  }
111
112  /**
113   * Add a 'in barrier phase' response to the mock controller when it gets a acquired notification
114   */
115  private void addCommitAnswer() throws IOException {
116    doAnswer(new Answer<Void>() {
117      @Override
118      public Void answer(InvocationOnMock invocation) throws Throwable {
119        member.receivedReachedGlobalBarrier(op);
120        return null;
121      }
122    }).when(mockMemberComms).sendMemberAcquired(any());
123  }
124
125  /**
126   * Test the normal sub procedure execution case.
127   */
128  @Test
129  public void testSimpleRun() throws Exception {
130    member = buildCohortMember();
131    EmptySubprocedure subproc = new EmptySubprocedure(member, mockListener);
132    EmptySubprocedure spy = spy(subproc);
133    when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spy);
134
135    // when we get a prepare, then start the commit phase
136    addCommitAnswer();
137
138    // run the operation
139    // build a new operation
140    Subprocedure subproc1 = member.createSubprocedure(op, data);
141    member.submitSubprocedure(subproc1);
142    // and wait for it to finish
143    subproc.waitForLocallyCompleted();
144
145    // make sure everything ran in order
146    InOrder order = inOrder(mockMemberComms, spy);
147    order.verify(spy).acquireBarrier();
148    order.verify(mockMemberComms).sendMemberAcquired(eq(spy));
149    order.verify(spy).insideBarrier();
150    order.verify(mockMemberComms).sendMemberCompleted(eq(spy), eq(data));
151    order.verify(mockMemberComms, never()).sendMemberAborted(eq(spy), any());
152  }
153
154  /**
155   * Make sure we call cleanup etc, when we have an exception during
156   * {@link Subprocedure#acquireBarrier()}.
157   */
158  @Test
159  public void testMemberPrepareException() throws Exception {
160    buildCohortMemberPair();
161
162    // mock an exception on Subprocedure's prepare
163    doAnswer(new Answer<Void>() {
164      @Override
165      public Void answer(InvocationOnMock invocation) throws Throwable {
166        throw new IOException("Forced IOException in member acquireBarrier");
167      }
168    }).when(spySub).acquireBarrier();
169
170    // run the operation
171    // build a new operation
172    Subprocedure subproc = member.createSubprocedure(op, data);
173    member.submitSubprocedure(subproc);
174    // if the operation doesn't die properly, then this will timeout
175    member.closeAndWait(TIMEOUT);
176
177    // make sure everything ran in order
178    InOrder order = inOrder(mockMemberComms, spySub);
179    order.verify(spySub).acquireBarrier();
180    // Later phases not run
181    order.verify(mockMemberComms, never()).sendMemberAcquired(eq(spySub));
182    order.verify(spySub, never()).insideBarrier();
183    order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub), eq(data));
184    // error recovery path exercised
185    order.verify(spySub).cancel(anyString(), any());
186    order.verify(spySub).cleanup(any());
187  }
188
189  /**
190   * Make sure we call cleanup etc, when we have an exception during prepare.
191   */
192  @Test
193  public void testSendMemberAcquiredCommsFailure() throws Exception {
194    buildCohortMemberPair();
195
196    // mock an exception on Subprocedure's prepare
197    doAnswer(new Answer<Void>() {
198      @Override
199      public Void answer(InvocationOnMock invocation) throws Throwable {
200        throw new IOException("Forced IOException in member prepare");
201      }
202    }).when(mockMemberComms).sendMemberAcquired(any());
203
204    // run the operation
205    // build a new operation
206    Subprocedure subproc = member.createSubprocedure(op, data);
207    member.submitSubprocedure(subproc);
208    // if the operation doesn't die properly, then this will timeout
209    member.closeAndWait(TIMEOUT);
210
211    // make sure everything ran in order
212    InOrder order = inOrder(mockMemberComms, spySub);
213    order.verify(spySub).acquireBarrier();
214    order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
215
216    // Later phases not run
217    order.verify(spySub, never()).insideBarrier();
218    order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub), eq(data));
219    // error recovery path exercised
220    order.verify(spySub).cancel(anyString(), any());
221    order.verify(spySub).cleanup(any());
222  }
223
224  /**
225   * Fail correctly if coordinator aborts the procedure. The subprocedure will not interrupt a
226   * running {@link Subprocedure#acquireBarrier()} -- prepare needs to finish first, and the the
227   * abort is checked. Thus, the {@link Subprocedure#acquireBarrier()} should succeed but later get
228   * rolled back via {@link Subprocedure#cleanup}.
229   */
230  @Test
231  public void testCoordinatorAbort() throws Exception {
232    buildCohortMemberPair();
233
234    // mock that another node timed out or failed to prepare
235    final TimeoutException oate = new TimeoutException("bogus timeout", 1, 2, 0);
236    doAnswer(new Answer<Void>() {
237      @Override
238      public Void answer(InvocationOnMock invocation) throws Throwable {
239        // inject a remote error (this would have come from an external thread)
240        spySub.cancel("bogus message", oate);
241        // sleep the wake frequency since that is what we promised
242        Thread.sleep(WAKE_FREQUENCY);
243        return null;
244      }
245    }).when(spySub).waitForReachedGlobalBarrier();
246
247    // run the operation
248    // build a new operation
249    Subprocedure subproc = member.createSubprocedure(op, data);
250    member.submitSubprocedure(subproc);
251    // if the operation doesn't die properly, then this will timeout
252    member.closeAndWait(TIMEOUT);
253
254    // make sure everything ran in order
255    InOrder order = inOrder(mockMemberComms, spySub);
256    order.verify(spySub).acquireBarrier();
257    order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
258    // Later phases not run
259    order.verify(spySub, never()).insideBarrier();
260    order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub), eq(data));
261    // error recovery path exercised
262    order.verify(spySub).cancel(anyString(), any());
263    order.verify(spySub).cleanup(any());
264  }
265
266  /**
267   * Handle failures if a member's commit phase fails.
268   * <p/>
269   * NOTE: This is the core difference that makes this different from traditional 2PC. In true 2PC
270   * the transaction is committed just before the coordinator sends commit messages to the member.
271   * Members are then responsible for reading its TX log. This implementation actually rolls back,
272   * and thus breaks the normal TX guarantees.
273   */
274  @Test
275  public void testMemberCommitException() throws Exception {
276    buildCohortMemberPair();
277
278    // mock an exception on Subprocedure's prepare
279    doAnswer(new Answer<Void>() {
280      @Override
281      public Void answer(InvocationOnMock invocation) throws Throwable {
282        throw new IOException("Forced IOException in member prepare");
283      }
284    }).when(spySub).insideBarrier();
285
286    // run the operation
287    // build a new operation
288    Subprocedure subproc = member.createSubprocedure(op, data);
289    member.submitSubprocedure(subproc);
290    // if the operation doesn't die properly, then this will timeout
291    member.closeAndWait(TIMEOUT);
292
293    // make sure everything ran in order
294    InOrder order = inOrder(mockMemberComms, spySub);
295    order.verify(spySub).acquireBarrier();
296    order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
297    order.verify(spySub).insideBarrier();
298
299    // Later phases not run
300    order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub), eq(data));
301    // error recovery path exercised
302    order.verify(spySub).cancel(anyString(), any());
303    order.verify(spySub).cleanup(any());
304  }
305
306  /**
307   * Handle Failures if a member's commit phase succeeds but notification to coordinator fails
308   * <p/>
309   * NOTE: This is the core difference that makes this different from traditional 2PC. In true 2PC
310   * the transaction is committed just before the coordinator sends commit messages to the member.
311   * Members are then responsible for reading its TX log. This implementation actually rolls back,
312   * and thus breaks the normal TX guarantees.
313   */
314  @Test
315  public void testMemberCommitCommsFailure() throws Exception {
316    buildCohortMemberPair();
317    final TimeoutException oate = new TimeoutException("bogus timeout", 1, 2, 0);
318    doAnswer(new Answer<Void>() {
319      @Override
320      public Void answer(InvocationOnMock invocation) throws Throwable {
321        // inject a remote error (this would have come from an external thread)
322        spySub.cancel("commit comms fail", oate);
323        // sleep the wake frequency since that is what we promised
324        Thread.sleep(WAKE_FREQUENCY);
325        return null;
326      }
327    }).when(mockMemberComms).sendMemberCompleted(any(), eq(data));
328
329    // run the operation
330    // build a new operation
331    Subprocedure subproc = member.createSubprocedure(op, data);
332    member.submitSubprocedure(subproc);
333    // if the operation doesn't die properly, then this will timeout
334    member.closeAndWait(TIMEOUT);
335
336    // make sure everything ran in order
337    InOrder order = inOrder(mockMemberComms, spySub);
338    order.verify(spySub).acquireBarrier();
339    order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
340    order.verify(spySub).insideBarrier();
341    order.verify(mockMemberComms).sendMemberCompleted(eq(spySub), eq(data));
342    // error recovery path exercised
343    order.verify(spySub).cancel(anyString(), any());
344    order.verify(spySub).cleanup(any());
345  }
346
347  /**
348   * Fail correctly on getting an external error while waiting for the prepared latch
349   * @throws Exception on failure
350   */
351  @Test
352  public void testPropagateConnectionErrorBackToManager() throws Exception {
353    // setup the operation
354    member = buildCohortMember();
355    ProcedureMember memberSpy = spy(member);
356
357    // setup the commit and the spy
358    final ForeignExceptionDispatcher dispatcher = new ForeignExceptionDispatcher();
359    ForeignExceptionDispatcher dispSpy = spy(dispatcher);
360    Subprocedure commit = new EmptySubprocedure(member, dispatcher);
361    Subprocedure spy = spy(commit);
362    when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spy);
363
364    // fail during the prepare phase
365    doThrow(new ForeignException("SRC", "prepare exception")).when(spy).acquireBarrier();
366    // and throw a connection error when we try to tell the controller about it
367    doThrow(new IOException("Controller is down!")).when(mockMemberComms).sendMemberAborted(eq(spy),
368      any());
369
370    // run the operation
371    // build a new operation
372    Subprocedure subproc = memberSpy.createSubprocedure(op, data);
373    memberSpy.submitSubprocedure(subproc);
374    // if the operation doesn't die properly, then this will timeout
375    memberSpy.closeAndWait(TIMEOUT);
376
377    // make sure everything ran in order
378    InOrder order = inOrder(mockMemberComms, spy, dispSpy);
379    // make sure we acquire.
380    order.verify(spy).acquireBarrier();
381    order.verify(mockMemberComms, never()).sendMemberAcquired(spy);
382
383    // TODO Need to do another refactor to get this to propagate to the coordinator.
384    // make sure we pass a remote exception back the controller
385    // order.verify(mockMemberComms).sendMemberAborted(eq(spy),
386    // any());
387    // order.verify(dispSpy).receiveError(anyString(),
388    // any(), any());
389  }
390
391  /**
392   * Test that the cohort member correctly doesn't attempt to start a task when the builder cannot
393   * correctly build a new task for the requested operation
394   * @throws Exception on failure
395   */
396  @Test
397  public void testNoTaskToBeRunFromRequest() throws Exception {
398    ThreadPoolExecutor pool = mock(ThreadPoolExecutor.class);
399    when(mockBuilder.buildSubprocedure(op, data)).thenReturn(null).thenThrow(
400      new IllegalStateException("Wrong state!"),
401      new IllegalArgumentException("can't understand the args"));
402    member = new ProcedureMember(mockMemberComms, pool, mockBuilder);
403    // builder returns null
404    // build a new operation
405    Subprocedure subproc = member.createSubprocedure(op, data);
406    member.submitSubprocedure(subproc);
407    // throws an illegal state exception
408    try {
409      // build a new operation
410      Subprocedure subproc2 = member.createSubprocedure(op, data);
411      member.submitSubprocedure(subproc2);
412    } catch (IllegalStateException ise) {
413    }
414    // throws an illegal argument exception
415    try {
416      // build a new operation
417      Subprocedure subproc3 = member.createSubprocedure(op, data);
418      member.submitSubprocedure(subproc3);
419    } catch (IllegalArgumentException iae) {
420    }
421
422    // no request should reach the pool
423    verifyZeroInteractions(pool);
424    // get two abort requests
425    // TODO Need to do another refactor to get this to propagate to the coordinator.
426    // verify(mockMemberComms, times(2)).sendMemberAborted(any(), any());
427  }
428
429  /**
430   * Helper {@link Procedure} who's phase for each step is just empty
431   */
432  public class EmptySubprocedure extends SubprocedureImpl {
433    public EmptySubprocedure(ProcedureMember member, ForeignExceptionDispatcher dispatcher) {
434      super(member, op, dispatcher,
435        // TODO 1000000 is an arbitrary number that I picked.
436        WAKE_FREQUENCY, TIMEOUT);
437    }
438  }
439}