001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure;
019
020import static org.mockito.Matchers.any;
021import static org.mockito.Matchers.anyString;
022import static org.mockito.Matchers.eq;
023import static org.mockito.Mockito.doAnswer;
024import static org.mockito.Mockito.doThrow;
025import static org.mockito.Mockito.inOrder;
026import static org.mockito.Mockito.mock;
027import static org.mockito.Mockito.never;
028import static org.mockito.Mockito.reset;
029import static org.mockito.Mockito.spy;
030import static org.mockito.Mockito.verifyZeroInteractions;
031import static org.mockito.Mockito.when;
032
033import java.io.IOException;
034import java.util.concurrent.ThreadPoolExecutor;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.errorhandling.ForeignException;
037import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
038import org.apache.hadoop.hbase.errorhandling.TimeoutException;
039import org.apache.hadoop.hbase.procedure.Subprocedure.SubprocedureImpl;
040import org.apache.hadoop.hbase.testclassification.MasterTests;
041import org.apache.hadoop.hbase.testclassification.SmallTests;
042import org.junit.After;
043import org.junit.ClassRule;
044import org.junit.Test;
045import org.junit.experimental.categories.Category;
046import org.mockito.InOrder;
047import org.mockito.Mockito;
048import org.mockito.invocation.InvocationOnMock;
049import org.mockito.stubbing.Answer;
050
051/**
052 * Test the procedure member, and it's error handling mechanisms.
053 */
054@Category({MasterTests.class, SmallTests.class})
055public class TestProcedureMember {
056
057  @ClassRule
058  public static final HBaseClassTestRule CLASS_RULE =
059      HBaseClassTestRule.forClass(TestProcedureMember.class);
060
061  private static final long WAKE_FREQUENCY = 100;
062  private static final long TIMEOUT = 100000;
063  private static final long POOL_KEEP_ALIVE = 1;
064
065  private final String op = "some op";
066  private final byte[] data = new byte[0];
067  private final ForeignExceptionDispatcher mockListener = Mockito
068      .spy(new ForeignExceptionDispatcher());
069  private final SubprocedureFactory mockBuilder = mock(SubprocedureFactory.class);
070  private final ProcedureMemberRpcs mockMemberComms = Mockito
071      .mock(ProcedureMemberRpcs.class);
072  private ProcedureMember member;
073  private ForeignExceptionDispatcher dispatcher;
074  Subprocedure spySub;
075
076  /**
077   * Reset all the mock objects
078   */
079  @After
080  public void resetTest() {
081    reset(mockListener, mockBuilder, mockMemberComms);
082    if (member != null)
083      try {
084        member.close();
085      } catch (IOException e) {
086        e.printStackTrace();
087      }
088  }
089
090  /**
091   * Build a member using the class level mocks
092   * @return member to use for tests
093   */
094  private ProcedureMember buildCohortMember() {
095    String name = "node";
096    ThreadPoolExecutor pool = ProcedureMember.defaultPool(name, 1, POOL_KEEP_ALIVE);
097    return new ProcedureMember(mockMemberComms, pool, mockBuilder);
098  }
099
100  /**
101   * Setup a procedure member that returns the spied-upon {@link Subprocedure}.
102   */
103  private void buildCohortMemberPair() throws IOException {
104    dispatcher = new ForeignExceptionDispatcher();
105    String name = "node";
106    ThreadPoolExecutor pool = ProcedureMember.defaultPool(name, 1, POOL_KEEP_ALIVE);
107    member = new ProcedureMember(mockMemberComms, pool, mockBuilder);
108    when(mockMemberComms.getMemberName()).thenReturn("membername"); // needed for generating exception
109    Subprocedure subproc = new EmptySubprocedure(member, dispatcher);
110    spySub = spy(subproc);
111    when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spySub);
112    addCommitAnswer();
113  }
114
115
116  /**
117   * Add a 'in barrier phase' response to the mock controller when it gets a acquired notification
118   */
119  private void addCommitAnswer() throws IOException {
120    doAnswer(new Answer<Void>() {
121      @Override
122      public Void answer(InvocationOnMock invocation) throws Throwable {
123        member.receivedReachedGlobalBarrier(op);
124        return null;
125      }
126    }).when(mockMemberComms).sendMemberAcquired(any());
127  }
128
129  /**
130   * Test the normal sub procedure execution case.
131   */
132  @Test
133  public void testSimpleRun() throws Exception {
134    member = buildCohortMember();
135    EmptySubprocedure subproc = new EmptySubprocedure(member, mockListener);
136    EmptySubprocedure spy = spy(subproc);
137    when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spy);
138
139    // when we get a prepare, then start the commit phase
140    addCommitAnswer();
141
142    // run the operation
143    // build a new operation
144    Subprocedure subproc1 = member.createSubprocedure(op, data);
145    member.submitSubprocedure(subproc1);
146    // and wait for it to finish
147    subproc.waitForLocallyCompleted();
148
149    // make sure everything ran in order
150    InOrder order = inOrder(mockMemberComms, spy);
151    order.verify(spy).acquireBarrier();
152    order.verify(mockMemberComms).sendMemberAcquired(eq(spy));
153    order.verify(spy).insideBarrier();
154    order.verify(mockMemberComms).sendMemberCompleted(eq(spy), eq(data));
155    order.verify(mockMemberComms, never()).sendMemberAborted(eq(spy),
156        any());
157  }
158
159  /**
160   * Make sure we call cleanup etc, when we have an exception during
161   * {@link Subprocedure#acquireBarrier()}.
162   */
163  @Test
164  public void testMemberPrepareException() throws Exception {
165    buildCohortMemberPair();
166
167    // mock an exception on Subprocedure's prepare
168    doAnswer(
169        new Answer<Void>() {
170          @Override
171          public Void answer(InvocationOnMock invocation) throws Throwable {
172            throw new IOException("Forced IOException in member acquireBarrier");
173          }
174        }).when(spySub).acquireBarrier();
175
176    // run the operation
177    // build a new operation
178    Subprocedure subproc = member.createSubprocedure(op, data);
179    member.submitSubprocedure(subproc);
180    // if the operation doesn't die properly, then this will timeout
181    member.closeAndWait(TIMEOUT);
182
183    // make sure everything ran in order
184    InOrder order = inOrder(mockMemberComms, spySub);
185    order.verify(spySub).acquireBarrier();
186    // Later phases not run
187    order.verify(mockMemberComms, never()).sendMemberAcquired(eq(spySub));
188    order.verify(spySub, never()).insideBarrier();
189    order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub), eq(data));
190    // error recovery path exercised
191    order.verify(spySub).cancel(anyString(), any());
192    order.verify(spySub).cleanup(any());
193  }
194
195  /**
196   * Make sure we call cleanup etc, when we have an exception during prepare.
197   */
198  @Test
199  public void testSendMemberAcquiredCommsFailure() throws Exception {
200    buildCohortMemberPair();
201
202    // mock an exception on Subprocedure's prepare
203    doAnswer(
204        new Answer<Void>() {
205          @Override
206          public Void answer(InvocationOnMock invocation) throws Throwable {
207            throw new IOException("Forced IOException in memeber prepare");
208          }
209        }).when(mockMemberComms).sendMemberAcquired(any());
210
211    // run the operation
212    // build a new operation
213    Subprocedure subproc = member.createSubprocedure(op, data);
214    member.submitSubprocedure(subproc);
215    // if the operation doesn't die properly, then this will timeout
216    member.closeAndWait(TIMEOUT);
217
218    // make sure everything ran in order
219    InOrder order = inOrder(mockMemberComms, spySub);
220    order.verify(spySub).acquireBarrier();
221    order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
222
223    // Later phases not run
224    order.verify(spySub, never()).insideBarrier();
225    order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub), eq(data));
226    // error recovery path exercised
227    order.verify(spySub).cancel(anyString(), any());
228    order.verify(spySub).cleanup(any());
229  }
230
231  /**
232   * Fail correctly if coordinator aborts the procedure.  The subprocedure will not interrupt a
233   * running {@link Subprocedure#acquireBarrier()} -- prepare needs to finish first, and the the abort
234   * is checked.  Thus, the {@link Subprocedure#acquireBarrier()} should succeed but later get rolled back
235   * via {@link Subprocedure#cleanup}.
236   */
237  @Test
238  public void testCoordinatorAbort() throws Exception {
239    buildCohortMemberPair();
240
241    // mock that another node timed out or failed to prepare
242    final TimeoutException oate = new TimeoutException("bogus timeout", 1,2,0);
243    doAnswer(
244        new Answer<Void>() {
245          @Override
246          public Void answer(InvocationOnMock invocation) throws Throwable {
247            // inject a remote error (this would have come from an external thread)
248            spySub.cancel("bogus message", oate);
249            // sleep the wake frequency since that is what we promised
250            Thread.sleep(WAKE_FREQUENCY);
251            return null;
252          }
253        }).when(spySub).waitForReachedGlobalBarrier();
254
255    // run the operation
256    // build a new operation
257    Subprocedure subproc = member.createSubprocedure(op, data);
258    member.submitSubprocedure(subproc);
259    // if the operation doesn't die properly, then this will timeout
260    member.closeAndWait(TIMEOUT);
261
262    // make sure everything ran in order
263    InOrder order = inOrder(mockMemberComms, spySub);
264    order.verify(spySub).acquireBarrier();
265    order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
266    // Later phases not run
267    order.verify(spySub, never()).insideBarrier();
268    order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub), eq(data));
269    // error recovery path exercised
270    order.verify(spySub).cancel(anyString(), any());
271    order.verify(spySub).cleanup(any());
272  }
273
274  /**
275   * Handle failures if a member's commit phase fails.
276   *
277   * NOTE: This is the core difference that makes this different from traditional 2PC.  In true
278   * 2PC the transaction is committed just before the coordinator sends commit messages to the
279   * member.  Members are then responsible for reading its TX log.  This implementation actually
280   * rolls back, and thus breaks the normal TX guarantees.
281  */
282  @Test
283  public void testMemberCommitException() throws Exception {
284    buildCohortMemberPair();
285
286    // mock an exception on Subprocedure's prepare
287    doAnswer(
288        new Answer<Void>() {
289          @Override
290          public Void answer(InvocationOnMock invocation) throws Throwable {
291            throw new IOException("Forced IOException in memeber prepare");
292          }
293        }).when(spySub).insideBarrier();
294
295    // run the operation
296    // build a new operation
297    Subprocedure subproc = member.createSubprocedure(op, data);
298    member.submitSubprocedure(subproc);
299    // if the operation doesn't die properly, then this will timeout
300    member.closeAndWait(TIMEOUT);
301
302    // make sure everything ran in order
303    InOrder order = inOrder(mockMemberComms, spySub);
304    order.verify(spySub).acquireBarrier();
305    order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
306    order.verify(spySub).insideBarrier();
307
308    // Later phases not run
309    order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub), eq(data));
310    // error recovery path exercised
311    order.verify(spySub).cancel(anyString(), any());
312    order.verify(spySub).cleanup(any());
313  }
314
315  /**
316   * Handle Failures if a member's commit phase succeeds but notification to coordinator fails
317   *
318   * NOTE: This is the core difference that makes this different from traditional 2PC.  In true
319   * 2PC the transaction is committed just before the coordinator sends commit messages to the
320   * member.  Members are then responsible for reading its TX log.  This implementation actually
321   * rolls back, and thus breaks the normal TX guarantees.
322  */
323  @Test
324  public void testMemberCommitCommsFailure() throws Exception {
325    buildCohortMemberPair();
326    final TimeoutException oate = new TimeoutException("bogus timeout",1,2,0);
327    doAnswer(
328        new Answer<Void>() {
329          @Override
330          public Void answer(InvocationOnMock invocation) throws Throwable {
331            // inject a remote error (this would have come from an external thread)
332            spySub.cancel("commit comms fail", oate);
333            // sleep the wake frequency since that is what we promised
334            Thread.sleep(WAKE_FREQUENCY);
335            return null;
336          }
337        }).when(mockMemberComms).sendMemberCompleted(any(), eq(data));
338
339    // run the operation
340    // build a new operation
341    Subprocedure subproc = member.createSubprocedure(op, data);
342    member.submitSubprocedure(subproc);
343    // if the operation doesn't die properly, then this will timeout
344    member.closeAndWait(TIMEOUT);
345
346    // make sure everything ran in order
347    InOrder order = inOrder(mockMemberComms, spySub);
348    order.verify(spySub).acquireBarrier();
349    order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
350    order.verify(spySub).insideBarrier();
351    order.verify(mockMemberComms).sendMemberCompleted(eq(spySub), eq(data));
352    // error recovery path exercised
353    order.verify(spySub).cancel(anyString(), any());
354    order.verify(spySub).cleanup(any());
355  }
356
357  /**
358   * Fail correctly on getting an external error while waiting for the prepared latch
359   * @throws Exception on failure
360   */
361  @Test
362  public void testPropagateConnectionErrorBackToManager() throws Exception {
363    // setup the operation
364    member = buildCohortMember();
365    ProcedureMember memberSpy = spy(member);
366
367    // setup the commit and the spy
368    final ForeignExceptionDispatcher dispatcher = new ForeignExceptionDispatcher();
369    ForeignExceptionDispatcher dispSpy = spy(dispatcher);
370    Subprocedure commit = new EmptySubprocedure(member, dispatcher);
371    Subprocedure spy = spy(commit);
372    when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spy);
373
374    // fail during the prepare phase
375    doThrow(new ForeignException("SRC", "prepare exception")).when(spy).acquireBarrier();
376    // and throw a connection error when we try to tell the controller about it
377    doThrow(new IOException("Controller is down!")).when(mockMemberComms)
378        .sendMemberAborted(eq(spy), any());
379
380
381    // run the operation
382    // build a new operation
383    Subprocedure subproc = memberSpy.createSubprocedure(op, data);
384    memberSpy.submitSubprocedure(subproc);
385    // if the operation doesn't die properly, then this will timeout
386    memberSpy.closeAndWait(TIMEOUT);
387
388    // make sure everything ran in order
389    InOrder order = inOrder(mockMemberComms, spy, dispSpy);
390    // make sure we acquire.
391    order.verify(spy).acquireBarrier();
392    order.verify(mockMemberComms, never()).sendMemberAcquired(spy);
393
394    // TODO Need to do another refactor to get this to propagate to the coordinator.
395    // make sure we pass a remote exception back the controller
396//    order.verify(mockMemberComms).sendMemberAborted(eq(spy),
397//      any());
398//    order.verify(dispSpy).receiveError(anyString(),
399//        any(), any());
400  }
401
402  /**
403   * Test that the cohort member correctly doesn't attempt to start a task when the builder cannot
404   * correctly build a new task for the requested operation
405   * @throws Exception on failure
406   */
407  @Test
408  public void testNoTaskToBeRunFromRequest() throws Exception {
409    ThreadPoolExecutor pool = mock(ThreadPoolExecutor.class);
410    when(mockBuilder.buildSubprocedure(op, data)).thenReturn(null)
411      .thenThrow(new IllegalStateException("Wrong state!"), new IllegalArgumentException("can't understand the args"));
412    member = new ProcedureMember(mockMemberComms, pool, mockBuilder);
413    // builder returns null
414    // build a new operation
415    Subprocedure subproc = member.createSubprocedure(op, data);
416    member.submitSubprocedure(subproc);
417    // throws an illegal state exception
418    try {
419      // build a new operation
420      Subprocedure subproc2 = member.createSubprocedure(op, data);
421      member.submitSubprocedure(subproc2);
422    } catch (IllegalStateException ise) {
423    }
424    // throws an illegal argument exception
425    try {
426      // build a new operation
427      Subprocedure subproc3 = member.createSubprocedure(op, data);
428      member.submitSubprocedure(subproc3);
429    } catch (IllegalArgumentException iae) {
430    }
431
432    // no request should reach the pool
433    verifyZeroInteractions(pool);
434    // get two abort requests
435    // TODO Need to do another refactor to get this to propagate to the coordinator.
436    // verify(mockMemberComms, times(2)).sendMemberAborted(any(), any());
437  }
438
439  /**
440   * Helper {@link Procedure} who's phase for each step is just empty
441   */
442  public class EmptySubprocedure extends SubprocedureImpl {
443    public EmptySubprocedure(ProcedureMember member, ForeignExceptionDispatcher dispatcher) {
444      super( member, op, dispatcher,
445      // TODO 1000000 is an arbitrary number that I picked.
446          WAKE_FREQUENCY, TIMEOUT);
447    }
448  }
449}