001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure;
019
020import static org.junit.jupiter.api.Assertions.assertNull;
021import static org.junit.jupiter.api.Assertions.assertTrue;
022import static org.mockito.ArgumentMatchers.any;
023import static org.mockito.ArgumentMatchers.anyList;
024import static org.mockito.ArgumentMatchers.anyString;
025import static org.mockito.ArgumentMatchers.eq;
026import static org.mockito.Mockito.atLeastOnce;
027import static org.mockito.Mockito.doAnswer;
028import static org.mockito.Mockito.doThrow;
029import static org.mockito.Mockito.inOrder;
030import static org.mockito.Mockito.mock;
031import static org.mockito.Mockito.never;
032import static org.mockito.Mockito.reset;
033import static org.mockito.Mockito.spy;
034import static org.mockito.Mockito.times;
035import static org.mockito.Mockito.verify;
036import static org.mockito.Mockito.when;
037
038import java.io.IOException;
039import java.util.Arrays;
040import java.util.List;
041import java.util.concurrent.ThreadPoolExecutor;
042import java.util.concurrent.TimeUnit;
043import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
044import org.apache.hadoop.hbase.testclassification.MasterTests;
045import org.apache.hadoop.hbase.testclassification.SmallTests;
046import org.junit.jupiter.api.AfterEach;
047import org.junit.jupiter.api.Tag;
048import org.junit.jupiter.api.Test;
049import org.mockito.InOrder;
050import org.mockito.invocation.InvocationOnMock;
051import org.mockito.stubbing.Answer;
052
053import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
054
055/**
056 * Test Procedure coordinator operation.
057 * <p>
058 * This only works correctly when we do <i>class level parallelization</i> of tests. If we do method
059 * level serialization this class will likely throw all kinds of errors.
060 */
061@Tag(MasterTests.TAG)
062@Tag(SmallTests.TAG)
063public class TestProcedureCoordinator {
064
065  // general test constants
066  private static final long WAKE_FREQUENCY = 1000;
067  private static final long TIMEOUT = 100000;
068  private static final long POOL_KEEP_ALIVE = 1;
069  private static final String nodeName = "node";
070  private static final String procName = "some op";
071  private static final byte[] procData = new byte[0];
072  private static final List<String> expected = Lists.newArrayList("remote1", "remote2");
073
074  // setup the mocks
075  private final ProcedureCoordinatorRpcs controller = mock(ProcedureCoordinatorRpcs.class);
076  private final Procedure task = mock(Procedure.class);
077  private final ForeignExceptionDispatcher monitor = mock(ForeignExceptionDispatcher.class);
078
079  // handle to the coordinator for each test
080  private ProcedureCoordinator coordinator;
081
082  @AfterEach
083  public void resetTest() throws IOException {
084    // reset all the mocks used for the tests
085    reset(controller, task, monitor);
086    // close the open coordinator, if it was used
087    if (coordinator != null) coordinator.close();
088  }
089
090  private ProcedureCoordinator buildNewCoordinator() {
091    ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(nodeName, 1, POOL_KEEP_ALIVE);
092    return spy(new ProcedureCoordinator(controller, pool));
093  }
094
095  /**
096   * Currently we can only handle one procedure at a time. This makes sure we handle that and reject
097   * submitting more.
098   */
099  @Test
100  public void testThreadPoolSize() throws Exception {
101    ProcedureCoordinator coordinator = buildNewCoordinator();
102    Procedure proc =
103      new Procedure(coordinator, monitor, WAKE_FREQUENCY, TIMEOUT, procName, procData, expected);
104    Procedure procSpy = spy(proc);
105
106    Procedure proc2 = new Procedure(coordinator, monitor, WAKE_FREQUENCY, TIMEOUT, procName + "2",
107      procData, expected);
108    Procedure procSpy2 = spy(proc2);
109    when(coordinator.createProcedure(any(), eq(procName), eq(procData), anyList()))
110      .thenReturn(procSpy, procSpy2);
111
112    coordinator.startProcedure(procSpy.getErrorMonitor(), procName, procData, expected);
113    // null here means second procedure failed to start.
114    assertNull(
115      coordinator.startProcedure(proc2.getErrorMonitor(), "another op", procData, expected),
116      "Coordinator successfully ran two tasks at once with a single thread pool.");
117  }
118
119  /**
120   * Check handling a connection failure correctly if we get it during the acquiring phase
121   */
122  @Test
123  public void testUnreachableControllerDuringPrepare() throws Exception {
124    coordinator = buildNewCoordinator();
125    // setup the proc
126    List<String> expected = Arrays.asList("cohort");
127    Procedure proc =
128      new Procedure(coordinator, WAKE_FREQUENCY, TIMEOUT, procName, procData, expected);
129    final Procedure procSpy = spy(proc);
130
131    when(coordinator.createProcedure(any(), eq(procName), eq(procData), anyList()))
132      .thenReturn(procSpy);
133
134    // use the passed controller responses
135    IOException cause = new IOException("Failed to reach comms during acquire");
136    doThrow(cause).when(controller).sendGlobalBarrierAcquire(eq(procSpy), eq(procData), anyList());
137
138    // run the operation
139    proc = coordinator.startProcedure(proc.getErrorMonitor(), procName, procData, expected);
140    // and wait for it to finish
141    while (!proc.completedLatch.await(WAKE_FREQUENCY, TimeUnit.MILLISECONDS))
142      ;
143    verify(procSpy, atLeastOnce()).receive(any());
144    verify(coordinator, times(1)).rpcConnectionFailure(anyString(), eq(cause));
145    verify(controller, times(1)).sendGlobalBarrierAcquire(procSpy, procData, expected);
146    verify(controller, never()).sendGlobalBarrierReached(any(), anyList());
147  }
148
149  /**
150   * Check handling a connection failure correctly if we get it during the barrier phase
151   */
152  @Test
153  public void testUnreachableControllerDuringCommit() throws Exception {
154    coordinator = buildNewCoordinator();
155
156    // setup the task and spy on it
157    List<String> expected = Arrays.asList("cohort");
158    final Procedure spy =
159      spy(new Procedure(coordinator, WAKE_FREQUENCY, TIMEOUT, procName, procData, expected));
160
161    when(coordinator.createProcedure(any(), eq(procName), eq(procData), anyList())).thenReturn(spy);
162
163    // use the passed controller responses
164    IOException cause = new IOException("Failed to reach controller during prepare");
165    doAnswer(new AcquireBarrierAnswer(procName, new String[] { "cohort" })).when(controller)
166      .sendGlobalBarrierAcquire(eq(spy), eq(procData), anyList());
167    doThrow(cause).when(controller).sendGlobalBarrierReached(eq(spy), anyList());
168
169    // run the operation
170    Procedure task =
171      coordinator.startProcedure(spy.getErrorMonitor(), procName, procData, expected);
172    // and wait for it to finish
173    while (!task.completedLatch.await(WAKE_FREQUENCY, TimeUnit.MILLISECONDS))
174      ;
175    verify(spy, atLeastOnce()).receive(any());
176    verify(coordinator, times(1)).rpcConnectionFailure(anyString(), eq(cause));
177    verify(controller, times(1)).sendGlobalBarrierAcquire(eq(spy), eq(procData), anyList());
178    verify(controller, times(1)).sendGlobalBarrierReached(any(), anyList());
179  }
180
181  @Test
182  public void testNoCohort() throws Exception {
183    runSimpleProcedure();
184  }
185
186  @Test
187  public void testSingleCohortOrchestration() throws Exception {
188    runSimpleProcedure("one");
189  }
190
191  @Test
192  public void testMultipleCohortOrchestration() throws Exception {
193    runSimpleProcedure("one", "two", "three", "four");
194  }
195
196  public void runSimpleProcedure(String... members) throws Exception {
197    coordinator = buildNewCoordinator();
198    Procedure task = new Procedure(coordinator, monitor, WAKE_FREQUENCY, TIMEOUT, procName,
199      procData, Arrays.asList(members));
200    final Procedure spy = spy(task);
201    runCoordinatedProcedure(spy, members);
202  }
203
204  /**
205   * Test that if nodes join the barrier early we still correctly handle the progress
206   */
207  @Test
208  public void testEarlyJoiningBarrier() throws Exception {
209    final String[] cohort = new String[] { "one", "two", "three", "four" };
210    coordinator = buildNewCoordinator();
211    final ProcedureCoordinator ref = coordinator;
212    Procedure task = new Procedure(coordinator, monitor, WAKE_FREQUENCY, TIMEOUT, procName,
213      procData, Arrays.asList(cohort));
214    final Procedure spy = spy(task);
215
216    AcquireBarrierAnswer prepare = new AcquireBarrierAnswer(procName, cohort) {
217      @Override
218      public void doWork() {
219        // then do some fun where we commit before all nodes have prepared
220        // "one" commits before anyone else is done
221        ref.memberAcquiredBarrier(this.opName, this.cohort[0]);
222        ref.memberFinishedBarrier(this.opName, this.cohort[0], new byte[0]);
223        // but "two" takes a while
224        ref.memberAcquiredBarrier(this.opName, this.cohort[1]);
225        // "three"jumps ahead
226        ref.memberAcquiredBarrier(this.opName, this.cohort[2]);
227        ref.memberFinishedBarrier(this.opName, this.cohort[2], new byte[0]);
228        // and "four" takes a while
229        ref.memberAcquiredBarrier(this.opName, this.cohort[3]);
230      }
231    };
232
233    BarrierAnswer commit = new BarrierAnswer(procName, cohort) {
234      @Override
235      public void doWork() {
236        ref.memberFinishedBarrier(opName, this.cohort[1], new byte[0]);
237        ref.memberFinishedBarrier(opName, this.cohort[3], new byte[0]);
238      }
239    };
240    runCoordinatedOperation(spy, prepare, commit, cohort);
241  }
242
243  /**
244   * Just run a procedure with the standard name and data, with not special task for the mock
245   * coordinator (it works just like a regular coordinator). For custom behavior see
246   * {@link #runCoordinatedOperation(Procedure, AcquireBarrierAnswer, BarrierAnswer, String[])} .
247   * @param spy    Spy on a real {@link Procedure}
248   * @param cohort expected cohort members
249   * @throws Exception on failure
250   */
251  public void runCoordinatedProcedure(Procedure spy, String... cohort) throws Exception {
252    runCoordinatedOperation(spy, new AcquireBarrierAnswer(procName, cohort),
253      new BarrierAnswer(procName, cohort), cohort);
254  }
255
256  public void runCoordinatedOperation(Procedure spy, AcquireBarrierAnswer prepare, String... cohort)
257    throws Exception {
258    runCoordinatedOperation(spy, prepare, new BarrierAnswer(procName, cohort), cohort);
259  }
260
261  public void runCoordinatedOperation(Procedure spy, BarrierAnswer commit, String... cohort)
262    throws Exception {
263    runCoordinatedOperation(spy, new AcquireBarrierAnswer(procName, cohort), commit, cohort);
264  }
265
266  public void runCoordinatedOperation(Procedure spy, AcquireBarrierAnswer prepareOperation,
267    BarrierAnswer commitOperation, String... cohort) throws Exception {
268    List<String> expected = Arrays.asList(cohort);
269    when(coordinator.createProcedure(any(), eq(procName), eq(procData), anyList())).thenReturn(spy);
270
271    // use the passed controller responses
272    doAnswer(prepareOperation).when(controller).sendGlobalBarrierAcquire(spy, procData, expected);
273    doAnswer(commitOperation).when(controller).sendGlobalBarrierReached(eq(spy), anyList());
274
275    // run the operation
276    Procedure task =
277      coordinator.startProcedure(spy.getErrorMonitor(), procName, procData, expected);
278    // and wait for it to finish
279    task.waitForCompleted();
280
281    // make sure we mocked correctly
282    prepareOperation.ensureRan();
283    // we never got an exception
284    InOrder inorder = inOrder(spy, controller);
285    inorder.verify(spy).sendGlobalBarrierStart();
286    inorder.verify(controller).sendGlobalBarrierAcquire(task, procData, expected);
287    inorder.verify(spy).sendGlobalBarrierReached();
288    inorder.verify(controller).sendGlobalBarrierReached(eq(task), anyList());
289  }
290
291  private static abstract class OperationAnswer implements Answer<Void> {
292    private boolean ran = false;
293
294    public void ensureRan() {
295      assertTrue(ran, "Prepare mocking didn't actually run!");
296    }
297
298    @Override
299    public final Void answer(InvocationOnMock invocation) throws Throwable {
300      this.ran = true;
301      doWork();
302      return null;
303    }
304
305    protected abstract void doWork() throws Throwable;
306  }
307
308  /**
309   * Just tell the current coordinator that each of the nodes has prepared
310   */
311  private class AcquireBarrierAnswer extends OperationAnswer {
312    protected final String[] cohort;
313    protected final String opName;
314
315    public AcquireBarrierAnswer(String opName, String... cohort) {
316      this.cohort = cohort;
317      this.opName = opName;
318    }
319
320    @Override
321    public void doWork() {
322      if (cohort == null) return;
323      for (String member : cohort) {
324        TestProcedureCoordinator.this.coordinator.memberAcquiredBarrier(opName, member);
325      }
326    }
327  }
328
329  /**
330   * Just tell the current coordinator that each of the nodes has committed
331   */
332  private class BarrierAnswer extends OperationAnswer {
333    protected final String[] cohort;
334    protected final String opName;
335
336    public BarrierAnswer(String opName, String... cohort) {
337      this.cohort = cohort;
338      this.opName = opName;
339    }
340
341    @Override
342    public void doWork() {
343      if (cohort == null) return;
344      for (String member : cohort) {
345        TestProcedureCoordinator.this.coordinator.memberFinishedBarrier(opName, member,
346          new byte[0]);
347      }
348    }
349  }
350}