001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure;
019
020import static org.junit.Assert.assertNull;
021import static org.junit.Assert.assertTrue;
022import static org.mockito.ArgumentMatchers.any;
023import static org.mockito.ArgumentMatchers.anyList;
024import static org.mockito.ArgumentMatchers.anyString;
025import static org.mockito.ArgumentMatchers.eq;
026import static org.mockito.Mockito.atLeastOnce;
027import static org.mockito.Mockito.doAnswer;
028import static org.mockito.Mockito.doThrow;
029import static org.mockito.Mockito.inOrder;
030import static org.mockito.Mockito.mock;
031import static org.mockito.Mockito.never;
032import static org.mockito.Mockito.reset;
033import static org.mockito.Mockito.spy;
034import static org.mockito.Mockito.times;
035import static org.mockito.Mockito.verify;
036import static org.mockito.Mockito.when;
037
038import java.io.IOException;
039import java.util.Arrays;
040import java.util.List;
041import java.util.concurrent.ThreadPoolExecutor;
042import java.util.concurrent.TimeUnit;
043import org.apache.hadoop.hbase.HBaseClassTestRule;
044import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
045import org.apache.hadoop.hbase.testclassification.MasterTests;
046import org.apache.hadoop.hbase.testclassification.SmallTests;
047import org.junit.After;
048import org.junit.ClassRule;
049import org.junit.Test;
050import org.junit.experimental.categories.Category;
051import org.mockito.InOrder;
052import org.mockito.invocation.InvocationOnMock;
053import org.mockito.stubbing.Answer;
054
055import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
056
057/**
058 * Test Procedure coordinator operation.
059 * <p>
060 * This only works correctly when we do <i>class level parallelization</i> of tests. If we do method
061 * level serialization this class will likely throw all kinds of errors.
062 */
063@Category({ MasterTests.class, SmallTests.class })
064public class TestProcedureCoordinator {
065
066  @ClassRule
067  public static final HBaseClassTestRule CLASS_RULE =
068    HBaseClassTestRule.forClass(TestProcedureCoordinator.class);
069
070  // general test constants
071  private static final long WAKE_FREQUENCY = 1000;
072  private static final long TIMEOUT = 100000;
073  private static final long POOL_KEEP_ALIVE = 1;
074  private static final String nodeName = "node";
075  private static final String procName = "some op";
076  private static final byte[] procData = new byte[0];
077  private static final List<String> expected = Lists.newArrayList("remote1", "remote2");
078
079  // setup the mocks
080  private final ProcedureCoordinatorRpcs controller = mock(ProcedureCoordinatorRpcs.class);
081  private final Procedure task = mock(Procedure.class);
082  private final ForeignExceptionDispatcher monitor = mock(ForeignExceptionDispatcher.class);
083
084  // handle to the coordinator for each test
085  private ProcedureCoordinator coordinator;
086
087  @After
088  public void resetTest() throws IOException {
089    // reset all the mocks used for the tests
090    reset(controller, task, monitor);
091    // close the open coordinator, if it was used
092    if (coordinator != null) coordinator.close();
093  }
094
095  private ProcedureCoordinator buildNewCoordinator() {
096    ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(nodeName, 1, POOL_KEEP_ALIVE);
097    return spy(new ProcedureCoordinator(controller, pool));
098  }
099
100  /**
101   * Currently we can only handle one procedure at a time. This makes sure we handle that and reject
102   * submitting more.
103   */
104  @Test
105  public void testThreadPoolSize() throws Exception {
106    ProcedureCoordinator coordinator = buildNewCoordinator();
107    Procedure proc =
108      new Procedure(coordinator, monitor, WAKE_FREQUENCY, TIMEOUT, procName, procData, expected);
109    Procedure procSpy = spy(proc);
110
111    Procedure proc2 = new Procedure(coordinator, monitor, WAKE_FREQUENCY, TIMEOUT, procName + "2",
112      procData, expected);
113    Procedure procSpy2 = spy(proc2);
114    when(coordinator.createProcedure(any(), eq(procName), eq(procData), anyList()))
115      .thenReturn(procSpy, procSpy2);
116
117    coordinator.startProcedure(procSpy.getErrorMonitor(), procName, procData, expected);
118    // null here means second procedure failed to start.
119    assertNull("Coordinator successfully ran two tasks at once with a single thread pool.",
120      coordinator.startProcedure(proc2.getErrorMonitor(), "another op", procData, expected));
121  }
122
123  /**
124   * Check handling a connection failure correctly if we get it during the acquiring phase
125   */
126  @Test
127  public void testUnreachableControllerDuringPrepare() throws Exception {
128    coordinator = buildNewCoordinator();
129    // setup the proc
130    List<String> expected = Arrays.asList("cohort");
131    Procedure proc =
132      new Procedure(coordinator, WAKE_FREQUENCY, TIMEOUT, procName, procData, expected);
133    final Procedure procSpy = spy(proc);
134
135    when(coordinator.createProcedure(any(), eq(procName), eq(procData), anyList()))
136      .thenReturn(procSpy);
137
138    // use the passed controller responses
139    IOException cause = new IOException("Failed to reach comms during acquire");
140    doThrow(cause).when(controller).sendGlobalBarrierAcquire(eq(procSpy), eq(procData), anyList());
141
142    // run the operation
143    proc = coordinator.startProcedure(proc.getErrorMonitor(), procName, procData, expected);
144    // and wait for it to finish
145    while (!proc.completedLatch.await(WAKE_FREQUENCY, TimeUnit.MILLISECONDS))
146      ;
147    verify(procSpy, atLeastOnce()).receive(any());
148    verify(coordinator, times(1)).rpcConnectionFailure(anyString(), eq(cause));
149    verify(controller, times(1)).sendGlobalBarrierAcquire(procSpy, procData, expected);
150    verify(controller, never()).sendGlobalBarrierReached(any(), anyList());
151  }
152
153  /**
154   * Check handling a connection failure correctly if we get it during the barrier phase
155   */
156  @Test
157  public void testUnreachableControllerDuringCommit() throws Exception {
158    coordinator = buildNewCoordinator();
159
160    // setup the task and spy on it
161    List<String> expected = Arrays.asList("cohort");
162    final Procedure spy =
163      spy(new Procedure(coordinator, WAKE_FREQUENCY, TIMEOUT, procName, procData, expected));
164
165    when(coordinator.createProcedure(any(), eq(procName), eq(procData), anyList())).thenReturn(spy);
166
167    // use the passed controller responses
168    IOException cause = new IOException("Failed to reach controller during prepare");
169    doAnswer(new AcquireBarrierAnswer(procName, new String[] { "cohort" })).when(controller)
170      .sendGlobalBarrierAcquire(eq(spy), eq(procData), anyList());
171    doThrow(cause).when(controller).sendGlobalBarrierReached(eq(spy), anyList());
172
173    // run the operation
174    Procedure task =
175      coordinator.startProcedure(spy.getErrorMonitor(), procName, procData, expected);
176    // and wait for it to finish
177    while (!task.completedLatch.await(WAKE_FREQUENCY, TimeUnit.MILLISECONDS))
178      ;
179    verify(spy, atLeastOnce()).receive(any());
180    verify(coordinator, times(1)).rpcConnectionFailure(anyString(), eq(cause));
181    verify(controller, times(1)).sendGlobalBarrierAcquire(eq(spy), eq(procData), anyList());
182    verify(controller, times(1)).sendGlobalBarrierReached(any(), anyList());
183  }
184
185  @Test
186  public void testNoCohort() throws Exception {
187    runSimpleProcedure();
188  }
189
190  @Test
191  public void testSingleCohortOrchestration() throws Exception {
192    runSimpleProcedure("one");
193  }
194
195  @Test
196  public void testMultipleCohortOrchestration() throws Exception {
197    runSimpleProcedure("one", "two", "three", "four");
198  }
199
200  public void runSimpleProcedure(String... members) throws Exception {
201    coordinator = buildNewCoordinator();
202    Procedure task = new Procedure(coordinator, monitor, WAKE_FREQUENCY, TIMEOUT, procName,
203      procData, Arrays.asList(members));
204    final Procedure spy = spy(task);
205    runCoordinatedProcedure(spy, members);
206  }
207
208  /**
209   * Test that if nodes join the barrier early we still correctly handle the progress
210   */
211  @Test
212  public void testEarlyJoiningBarrier() throws Exception {
213    final String[] cohort = new String[] { "one", "two", "three", "four" };
214    coordinator = buildNewCoordinator();
215    final ProcedureCoordinator ref = coordinator;
216    Procedure task = new Procedure(coordinator, monitor, WAKE_FREQUENCY, TIMEOUT, procName,
217      procData, Arrays.asList(cohort));
218    final Procedure spy = spy(task);
219
220    AcquireBarrierAnswer prepare = new AcquireBarrierAnswer(procName, cohort) {
221      @Override
222      public void doWork() {
223        // then do some fun where we commit before all nodes have prepared
224        // "one" commits before anyone else is done
225        ref.memberAcquiredBarrier(this.opName, this.cohort[0]);
226        ref.memberFinishedBarrier(this.opName, this.cohort[0], new byte[0]);
227        // but "two" takes a while
228        ref.memberAcquiredBarrier(this.opName, this.cohort[1]);
229        // "three"jumps ahead
230        ref.memberAcquiredBarrier(this.opName, this.cohort[2]);
231        ref.memberFinishedBarrier(this.opName, this.cohort[2], new byte[0]);
232        // and "four" takes a while
233        ref.memberAcquiredBarrier(this.opName, this.cohort[3]);
234      }
235    };
236
237    BarrierAnswer commit = new BarrierAnswer(procName, cohort) {
238      @Override
239      public void doWork() {
240        ref.memberFinishedBarrier(opName, this.cohort[1], new byte[0]);
241        ref.memberFinishedBarrier(opName, this.cohort[3], new byte[0]);
242      }
243    };
244    runCoordinatedOperation(spy, prepare, commit, cohort);
245  }
246
247  /**
248   * Just run a procedure with the standard name and data, with not special task for the mock
249   * coordinator (it works just like a regular coordinator). For custom behavior see
250   * {@link #runCoordinatedOperation(Procedure, AcquireBarrierAnswer, BarrierAnswer, String[])} .
251   * @param spy    Spy on a real {@link Procedure}
252   * @param cohort expected cohort members
253   * @throws Exception on failure
254   */
255  public void runCoordinatedProcedure(Procedure spy, String... cohort) throws Exception {
256    runCoordinatedOperation(spy, new AcquireBarrierAnswer(procName, cohort),
257      new BarrierAnswer(procName, cohort), cohort);
258  }
259
260  public void runCoordinatedOperation(Procedure spy, AcquireBarrierAnswer prepare, String... cohort)
261    throws Exception {
262    runCoordinatedOperation(spy, prepare, new BarrierAnswer(procName, cohort), cohort);
263  }
264
265  public void runCoordinatedOperation(Procedure spy, BarrierAnswer commit, String... cohort)
266    throws Exception {
267    runCoordinatedOperation(spy, new AcquireBarrierAnswer(procName, cohort), commit, cohort);
268  }
269
270  public void runCoordinatedOperation(Procedure spy, AcquireBarrierAnswer prepareOperation,
271    BarrierAnswer commitOperation, String... cohort) throws Exception {
272    List<String> expected = Arrays.asList(cohort);
273    when(coordinator.createProcedure(any(), eq(procName), eq(procData), anyList())).thenReturn(spy);
274
275    // use the passed controller responses
276    doAnswer(prepareOperation).when(controller).sendGlobalBarrierAcquire(spy, procData, expected);
277    doAnswer(commitOperation).when(controller).sendGlobalBarrierReached(eq(spy), anyList());
278
279    // run the operation
280    Procedure task =
281      coordinator.startProcedure(spy.getErrorMonitor(), procName, procData, expected);
282    // and wait for it to finish
283    task.waitForCompleted();
284
285    // make sure we mocked correctly
286    prepareOperation.ensureRan();
287    // we never got an exception
288    InOrder inorder = inOrder(spy, controller);
289    inorder.verify(spy).sendGlobalBarrierStart();
290    inorder.verify(controller).sendGlobalBarrierAcquire(task, procData, expected);
291    inorder.verify(spy).sendGlobalBarrierReached();
292    inorder.verify(controller).sendGlobalBarrierReached(eq(task), anyList());
293  }
294
295  private static abstract class OperationAnswer implements Answer<Void> {
296    private boolean ran = false;
297
298    public void ensureRan() {
299      assertTrue("Prepare mocking didn't actually run!", ran);
300    }
301
302    @Override
303    public final Void answer(InvocationOnMock invocation) throws Throwable {
304      this.ran = true;
305      doWork();
306      return null;
307    }
308
309    protected abstract void doWork() throws Throwable;
310  }
311
312  /**
313   * Just tell the current coordinator that each of the nodes has prepared
314   */
315  private class AcquireBarrierAnswer extends OperationAnswer {
316    protected final String[] cohort;
317    protected final String opName;
318
319    public AcquireBarrierAnswer(String opName, String... cohort) {
320      this.cohort = cohort;
321      this.opName = opName;
322    }
323
324    @Override
325    public void doWork() {
326      if (cohort == null) return;
327      for (String member : cohort) {
328        TestProcedureCoordinator.this.coordinator.memberAcquiredBarrier(opName, member);
329      }
330    }
331  }
332
333  /**
334   * Just tell the current coordinator that each of the nodes has committed
335   */
336  private class BarrierAnswer extends OperationAnswer {
337    protected final String[] cohort;
338    protected final String opName;
339
340    public BarrierAnswer(String opName, String... cohort) {
341      this.cohort = cohort;
342      this.opName = opName;
343    }
344
345    @Override
346    public void doWork() {
347      if (cohort == null) return;
348      for (String member : cohort) {
349        TestProcedureCoordinator.this.coordinator.memberFinishedBarrier(opName, member,
350          new byte[0]);
351      }
352    }
353  }
354}