001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure;
019
020import static org.mockito.ArgumentMatchers.any;
021import static org.mockito.ArgumentMatchers.anyString;
022import static org.mockito.ArgumentMatchers.eq;
023import static org.mockito.Mockito.doAnswer;
024import static org.mockito.Mockito.doThrow;
025import static org.mockito.Mockito.inOrder;
026import static org.mockito.Mockito.mock;
027import static org.mockito.Mockito.never;
028import static org.mockito.Mockito.reset;
029import static org.mockito.Mockito.spy;
030import static org.mockito.Mockito.verifyNoInteractions;
031import static org.mockito.Mockito.when;
032
033import java.io.IOException;
034import java.util.concurrent.ThreadPoolExecutor;
035import org.apache.hadoop.hbase.errorhandling.ForeignException;
036import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
037import org.apache.hadoop.hbase.errorhandling.TimeoutException;
038import org.apache.hadoop.hbase.procedure.Subprocedure.SubprocedureImpl;
039import org.apache.hadoop.hbase.testclassification.MasterTests;
040import org.apache.hadoop.hbase.testclassification.SmallTests;
041import org.junit.jupiter.api.AfterEach;
042import org.junit.jupiter.api.Tag;
043import org.junit.jupiter.api.Test;
044import org.mockito.InOrder;
045import org.mockito.Mockito;
046import org.mockito.invocation.InvocationOnMock;
047import org.mockito.stubbing.Answer;
048
049import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
050
051/**
052 * Test the procedure member, and it's error handling mechanisms.
053 */
054@Tag(MasterTests.TAG)
055@Tag(SmallTests.TAG)
056public class TestProcedureMember {
057
058  private static final long WAKE_FREQUENCY = 100;
059  private static final long TIMEOUT = 100000;
060  private static final long POOL_KEEP_ALIVE = 1;
061
062  private final String op = "some op";
063  private final byte[] data = new byte[0];
064  private final ForeignExceptionDispatcher mockListener =
065    Mockito.spy(new ForeignExceptionDispatcher());
066  private final SubprocedureFactory mockBuilder = mock(SubprocedureFactory.class);
067  private final ProcedureMemberRpcs mockMemberComms = Mockito.mock(ProcedureMemberRpcs.class);
068  private ProcedureMember member;
069  private ForeignExceptionDispatcher dispatcher;
070  Subprocedure spySub;
071
072  /**
073   * Reset all the mock objects
074   */
075  @AfterEach
076  public void resetTest() throws IOException {
077    reset(mockListener, mockBuilder, mockMemberComms);
078    Closeables.close(member, true);
079  }
080
081  /**
082   * Build a member using the class level mocks
083   * @return member to use for tests
084   */
085  private ProcedureMember buildCohortMember() {
086    String name = "node";
087    ThreadPoolExecutor pool = ProcedureMember.defaultPool(name, 1, POOL_KEEP_ALIVE);
088    return new ProcedureMember(mockMemberComms, pool, mockBuilder);
089  }
090
091  /**
092   * Setup a procedure member that returns the spied-upon {@link Subprocedure}.
093   */
094  private void buildCohortMemberPair() throws IOException {
095    dispatcher = new ForeignExceptionDispatcher();
096    String name = "node";
097    ThreadPoolExecutor pool = ProcedureMember.defaultPool(name, 1, POOL_KEEP_ALIVE);
098    member = new ProcedureMember(mockMemberComms, pool, mockBuilder);
099    when(mockMemberComms.getMemberName()).thenReturn("membername"); // needed for generating
100                                                                    // exception
101    Subprocedure subproc = new EmptySubprocedure(member, dispatcher);
102    spySub = spy(subproc);
103    when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spySub);
104    addCommitAnswer();
105  }
106
107  /**
108   * Add a 'in barrier phase' response to the mock controller when it gets a acquired notification
109   */
110  private void addCommitAnswer() throws IOException {
111    doAnswer(new Answer<Void>() {
112      @Override
113      public Void answer(InvocationOnMock invocation) throws Throwable {
114        member.receivedReachedGlobalBarrier(op);
115        return null;
116      }
117    }).when(mockMemberComms).sendMemberAcquired(any());
118  }
119
120  /**
121   * Test the normal sub procedure execution case.
122   */
123  @Test
124  public void testSimpleRun() throws Exception {
125    member = buildCohortMember();
126    EmptySubprocedure subproc = new EmptySubprocedure(member, mockListener);
127    EmptySubprocedure spy = spy(subproc);
128    when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spy);
129
130    // when we get a prepare, then start the commit phase
131    addCommitAnswer();
132
133    // run the operation
134    // build a new operation
135    Subprocedure subproc1 = member.createSubprocedure(op, data);
136    member.submitSubprocedure(subproc1);
137    // and wait for it to finish
138    subproc.waitForLocallyCompleted();
139
140    // make sure everything ran in order
141    InOrder order = inOrder(mockMemberComms, spy);
142    order.verify(spy).acquireBarrier();
143    order.verify(mockMemberComms).sendMemberAcquired(eq(spy));
144    order.verify(spy).insideBarrier();
145    order.verify(mockMemberComms).sendMemberCompleted(eq(spy), eq(data));
146    order.verify(mockMemberComms, never()).sendMemberAborted(eq(spy), any());
147  }
148
149  /**
150   * Make sure we call cleanup etc, when we have an exception during
151   * {@link Subprocedure#acquireBarrier()}.
152   */
153  @Test
154  public void testMemberPrepareException() throws Exception {
155    buildCohortMemberPair();
156
157    // mock an exception on Subprocedure's prepare
158    doAnswer(new Answer<Void>() {
159      @Override
160      public Void answer(InvocationOnMock invocation) throws Throwable {
161        throw new IOException("Forced IOException in member acquireBarrier");
162      }
163    }).when(spySub).acquireBarrier();
164
165    // run the operation
166    // build a new operation
167    Subprocedure subproc = member.createSubprocedure(op, data);
168    member.submitSubprocedure(subproc);
169    // if the operation doesn't die properly, then this will timeout
170    member.closeAndWait(TIMEOUT);
171
172    // make sure everything ran in order
173    InOrder order = inOrder(mockMemberComms, spySub);
174    order.verify(spySub).acquireBarrier();
175    // Later phases not run
176    order.verify(mockMemberComms, never()).sendMemberAcquired(eq(spySub));
177    order.verify(spySub, never()).insideBarrier();
178    order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub), eq(data));
179    // error recovery path exercised
180    order.verify(spySub).cancel(anyString(), any());
181    order.verify(spySub).cleanup(any());
182  }
183
184  /**
185   * Make sure we call cleanup etc, when we have an exception during prepare.
186   */
187  @Test
188  public void testSendMemberAcquiredCommsFailure() throws Exception {
189    buildCohortMemberPair();
190
191    // mock an exception on Subprocedure's prepare
192    doAnswer(new Answer<Void>() {
193      @Override
194      public Void answer(InvocationOnMock invocation) throws Throwable {
195        throw new IOException("Forced IOException in member prepare");
196      }
197    }).when(mockMemberComms).sendMemberAcquired(any());
198
199    // run the operation
200    // build a new operation
201    Subprocedure subproc = member.createSubprocedure(op, data);
202    member.submitSubprocedure(subproc);
203    // if the operation doesn't die properly, then this will timeout
204    member.closeAndWait(TIMEOUT);
205
206    // make sure everything ran in order
207    InOrder order = inOrder(mockMemberComms, spySub);
208    order.verify(spySub).acquireBarrier();
209    order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
210
211    // Later phases not run
212    order.verify(spySub, never()).insideBarrier();
213    order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub), eq(data));
214    // error recovery path exercised
215    order.verify(spySub).cancel(anyString(), any());
216    order.verify(spySub).cleanup(any());
217  }
218
219  /**
220   * Fail correctly if coordinator aborts the procedure. The subprocedure will not interrupt a
221   * running {@link Subprocedure#acquireBarrier()} -- prepare needs to finish first, and the the
222   * abort is checked. Thus, the {@link Subprocedure#acquireBarrier()} should succeed but later get
223   * rolled back via {@link Subprocedure#cleanup}.
224   */
225  @Test
226  public void testCoordinatorAbort() throws Exception {
227    buildCohortMemberPair();
228
229    // mock that another node timed out or failed to prepare
230    final TimeoutException oate = new TimeoutException("bogus timeout", 1, 2, 0);
231    doAnswer(new Answer<Void>() {
232      @Override
233      public Void answer(InvocationOnMock invocation) throws Throwable {
234        // inject a remote error (this would have come from an external thread)
235        spySub.cancel("bogus message", oate);
236        // sleep the wake frequency since that is what we promised
237        Thread.sleep(WAKE_FREQUENCY);
238        return null;
239      }
240    }).when(spySub).waitForReachedGlobalBarrier();
241
242    // run the operation
243    // build a new operation
244    Subprocedure subproc = member.createSubprocedure(op, data);
245    member.submitSubprocedure(subproc);
246    // if the operation doesn't die properly, then this will timeout
247    member.closeAndWait(TIMEOUT);
248
249    // make sure everything ran in order
250    InOrder order = inOrder(mockMemberComms, spySub);
251    order.verify(spySub).acquireBarrier();
252    order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
253    // Later phases not run
254    order.verify(spySub, never()).insideBarrier();
255    order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub), eq(data));
256    // error recovery path exercised
257    order.verify(spySub).cancel(anyString(), any());
258    order.verify(spySub).cleanup(any());
259  }
260
261  /**
262   * Handle failures if a member's commit phase fails.
263   * <p/>
264   * NOTE: This is the core difference that makes this different from traditional 2PC. In true 2PC
265   * the transaction is committed just before the coordinator sends commit messages to the member.
266   * Members are then responsible for reading its TX log. This implementation actually rolls back,
267   * and thus breaks the normal TX guarantees.
268   */
269  @Test
270  public void testMemberCommitException() throws Exception {
271    buildCohortMemberPair();
272
273    // mock an exception on Subprocedure's prepare
274    doAnswer(new Answer<Void>() {
275      @Override
276      public Void answer(InvocationOnMock invocation) throws Throwable {
277        throw new IOException("Forced IOException in member prepare");
278      }
279    }).when(spySub).insideBarrier();
280
281    // run the operation
282    // build a new operation
283    Subprocedure subproc = member.createSubprocedure(op, data);
284    member.submitSubprocedure(subproc);
285    // if the operation doesn't die properly, then this will timeout
286    member.closeAndWait(TIMEOUT);
287
288    // make sure everything ran in order
289    InOrder order = inOrder(mockMemberComms, spySub);
290    order.verify(spySub).acquireBarrier();
291    order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
292    order.verify(spySub).insideBarrier();
293
294    // Later phases not run
295    order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub), eq(data));
296    // error recovery path exercised
297    order.verify(spySub).cancel(anyString(), any());
298    order.verify(spySub).cleanup(any());
299  }
300
301  /**
302   * Handle Failures if a member's commit phase succeeds but notification to coordinator fails
303   * <p/>
304   * NOTE: This is the core difference that makes this different from traditional 2PC. In true 2PC
305   * the transaction is committed just before the coordinator sends commit messages to the member.
306   * Members are then responsible for reading its TX log. This implementation actually rolls back,
307   * and thus breaks the normal TX guarantees.
308   */
309  @Test
310  public void testMemberCommitCommsFailure() throws Exception {
311    buildCohortMemberPair();
312    final TimeoutException oate = new TimeoutException("bogus timeout", 1, 2, 0);
313    doAnswer(new Answer<Void>() {
314      @Override
315      public Void answer(InvocationOnMock invocation) throws Throwable {
316        // inject a remote error (this would have come from an external thread)
317        spySub.cancel("commit comms fail", oate);
318        // sleep the wake frequency since that is what we promised
319        Thread.sleep(WAKE_FREQUENCY);
320        return null;
321      }
322    }).when(mockMemberComms).sendMemberCompleted(any(), eq(data));
323
324    // run the operation
325    // build a new operation
326    Subprocedure subproc = member.createSubprocedure(op, data);
327    member.submitSubprocedure(subproc);
328    // if the operation doesn't die properly, then this will timeout
329    member.closeAndWait(TIMEOUT);
330
331    // make sure everything ran in order
332    InOrder order = inOrder(mockMemberComms, spySub);
333    order.verify(spySub).acquireBarrier();
334    order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
335    order.verify(spySub).insideBarrier();
336    order.verify(mockMemberComms).sendMemberCompleted(eq(spySub), eq(data));
337    // error recovery path exercised
338    order.verify(spySub).cancel(anyString(), any());
339    order.verify(spySub).cleanup(any());
340  }
341
342  /**
343   * Fail correctly on getting an external error while waiting for the prepared latch
344   * @throws Exception on failure
345   */
346  @Test
347  public void testPropagateConnectionErrorBackToManager() throws Exception {
348    // setup the operation
349    member = buildCohortMember();
350    ProcedureMember memberSpy = spy(member);
351
352    // setup the commit and the spy
353    final ForeignExceptionDispatcher dispatcher = new ForeignExceptionDispatcher();
354    ForeignExceptionDispatcher dispSpy = spy(dispatcher);
355    Subprocedure commit = new EmptySubprocedure(member, dispatcher);
356    Subprocedure spy = spy(commit);
357    when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spy);
358
359    // fail during the prepare phase
360    doThrow(new ForeignException("SRC", "prepare exception")).when(spy).acquireBarrier();
361    // and throw a connection error when we try to tell the controller about it
362    doThrow(new IOException("Controller is down!")).when(mockMemberComms).sendMemberAborted(eq(spy),
363      any());
364
365    // run the operation
366    // build a new operation
367    Subprocedure subproc = memberSpy.createSubprocedure(op, data);
368    memberSpy.submitSubprocedure(subproc);
369    // if the operation doesn't die properly, then this will timeout
370    memberSpy.closeAndWait(TIMEOUT);
371
372    // make sure everything ran in order
373    InOrder order = inOrder(mockMemberComms, spy, dispSpy);
374    // make sure we acquire.
375    order.verify(spy).acquireBarrier();
376    order.verify(mockMemberComms, never()).sendMemberAcquired(spy);
377
378    // TODO Need to do another refactor to get this to propagate to the coordinator.
379    // make sure we pass a remote exception back the controller
380    // order.verify(mockMemberComms).sendMemberAborted(eq(spy),
381    // any());
382    // order.verify(dispSpy).receiveError(anyString(),
383    // any(), any());
384  }
385
386  /**
387   * Test that the cohort member correctly doesn't attempt to start a task when the builder cannot
388   * correctly build a new task for the requested operation
389   * @throws Exception on failure
390   */
391  @Test
392  public void testNoTaskToBeRunFromRequest() throws Exception {
393    ThreadPoolExecutor pool = mock(ThreadPoolExecutor.class);
394    when(mockBuilder.buildSubprocedure(op, data)).thenReturn(null).thenThrow(
395      new IllegalStateException("Wrong state!"),
396      new IllegalArgumentException("can't understand the args"));
397    member = new ProcedureMember(mockMemberComms, pool, mockBuilder);
398    // builder returns null
399    // build a new operation
400    Subprocedure subproc = member.createSubprocedure(op, data);
401    member.submitSubprocedure(subproc);
402    // throws an illegal state exception
403    try {
404      // build a new operation
405      Subprocedure subproc2 = member.createSubprocedure(op, data);
406      member.submitSubprocedure(subproc2);
407    } catch (IllegalStateException ise) {
408    }
409    // throws an illegal argument exception
410    try {
411      // build a new operation
412      Subprocedure subproc3 = member.createSubprocedure(op, data);
413      member.submitSubprocedure(subproc3);
414    } catch (IllegalArgumentException iae) {
415    }
416
417    // no request should reach the pool
418    verifyNoInteractions(pool);
419    // get two abort requests
420    // TODO Need to do another refactor to get this to propagate to the coordinator.
421    // verify(mockMemberComms, times(2)).sendMemberAborted(any(), any());
422  }
423
424  /**
425   * Helper {@link Procedure} who's phase for each step is just empty
426   */
427  public class EmptySubprocedure extends SubprocedureImpl {
428    public EmptySubprocedure(ProcedureMember member, ForeignExceptionDispatcher dispatcher) {
429      super(member, op, dispatcher,
430        // TODO 1000000 is an arbitrary number that I picked.
431        WAKE_FREQUENCY, TIMEOUT);
432    }
433  }
434}