001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure;
019
020import static org.junit.Assert.assertNull;
021import static org.junit.Assert.assertTrue;
022import static org.mockito.Matchers.any;
023import static org.mockito.Matchers.anyListOf;
024import static org.mockito.Matchers.anyString;
025import static org.mockito.Matchers.eq;
026import static org.mockito.Mockito.atLeastOnce;
027import static org.mockito.Mockito.doAnswer;
028import static org.mockito.Mockito.doThrow;
029import static org.mockito.Mockito.inOrder;
030import static org.mockito.Mockito.mock;
031import static org.mockito.Mockito.never;
032import static org.mockito.Mockito.reset;
033import static org.mockito.Mockito.spy;
034import static org.mockito.Mockito.times;
035import static org.mockito.Mockito.verify;
036import static org.mockito.Mockito.when;
037
038import java.io.IOException;
039import java.util.Arrays;
040import java.util.List;
041import java.util.concurrent.ThreadPoolExecutor;
042import java.util.concurrent.TimeUnit;
043import org.apache.hadoop.hbase.HBaseClassTestRule;
044import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
045import org.apache.hadoop.hbase.testclassification.MasterTests;
046import org.apache.hadoop.hbase.testclassification.SmallTests;
047import org.junit.After;
048import org.junit.ClassRule;
049import org.junit.Test;
050import org.junit.experimental.categories.Category;
051import org.mockito.InOrder;
052import org.mockito.invocation.InvocationOnMock;
053import org.mockito.stubbing.Answer;
054
055import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
056
057/**
058 * Test Procedure coordinator operation.
059 * <p>
060 * This only works correctly when we do <i>class level parallelization</i> of tests. If we do method
061 * level serialization this class will likely throw all kinds of errors.
062 */
063@Category({ MasterTests.class, SmallTests.class })
064public class TestProcedureCoordinator {
065
066  @ClassRule
067  public static final HBaseClassTestRule CLASS_RULE =
068    HBaseClassTestRule.forClass(TestProcedureCoordinator.class);
069
070  // general test constants
071  private static final long WAKE_FREQUENCY = 1000;
072  private static final long TIMEOUT = 100000;
073  private static final long POOL_KEEP_ALIVE = 1;
074  private static final String nodeName = "node";
075  private static final String procName = "some op";
076  private static final byte[] procData = new byte[0];
077  private static final List<String> expected = Lists.newArrayList("remote1", "remote2");
078
079  // setup the mocks
080  private final ProcedureCoordinatorRpcs controller = mock(ProcedureCoordinatorRpcs.class);
081  private final Procedure task = mock(Procedure.class);
082  private final ForeignExceptionDispatcher monitor = mock(ForeignExceptionDispatcher.class);
083
084  // handle to the coordinator for each test
085  private ProcedureCoordinator coordinator;
086
087  @After
088  public void resetTest() throws IOException {
089    // reset all the mocks used for the tests
090    reset(controller, task, monitor);
091    // close the open coordinator, if it was used
092    if (coordinator != null) coordinator.close();
093  }
094
095  private ProcedureCoordinator buildNewCoordinator() {
096    ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(nodeName, 1, POOL_KEEP_ALIVE);
097    return spy(new ProcedureCoordinator(controller, pool));
098  }
099
100  /**
101   * Currently we can only handle one procedure at a time. This makes sure we handle that and reject
102   * submitting more.
103   */
104  @Test
105  public void testThreadPoolSize() throws Exception {
106    ProcedureCoordinator coordinator = buildNewCoordinator();
107    Procedure proc =
108      new Procedure(coordinator, monitor, WAKE_FREQUENCY, TIMEOUT, procName, procData, expected);
109    Procedure procSpy = spy(proc);
110
111    Procedure proc2 = new Procedure(coordinator, monitor, WAKE_FREQUENCY, TIMEOUT, procName + "2",
112      procData, expected);
113    Procedure procSpy2 = spy(proc2);
114    when(coordinator.createProcedure(any(), eq(procName), eq(procData), anyListOf(String.class)))
115      .thenReturn(procSpy, procSpy2);
116
117    coordinator.startProcedure(procSpy.getErrorMonitor(), procName, procData, expected);
118    // null here means second procedure failed to start.
119    assertNull("Coordinator successfully ran two tasks at once with a single thread pool.",
120      coordinator.startProcedure(proc2.getErrorMonitor(), "another op", procData, expected));
121  }
122
123  /**
124   * Check handling a connection failure correctly if we get it during the acquiring phase
125   */
126  @Test
127  public void testUnreachableControllerDuringPrepare() throws Exception {
128    coordinator = buildNewCoordinator();
129    // setup the proc
130    List<String> expected = Arrays.asList("cohort");
131    Procedure proc =
132      new Procedure(coordinator, WAKE_FREQUENCY, TIMEOUT, procName, procData, expected);
133    final Procedure procSpy = spy(proc);
134
135    when(coordinator.createProcedure(any(), eq(procName), eq(procData), anyListOf(String.class)))
136      .thenReturn(procSpy);
137
138    // use the passed controller responses
139    IOException cause = new IOException("Failed to reach comms during acquire");
140    doThrow(cause).when(controller).sendGlobalBarrierAcquire(eq(procSpy), eq(procData),
141      anyListOf(String.class));
142
143    // run the operation
144    proc = coordinator.startProcedure(proc.getErrorMonitor(), procName, procData, expected);
145    // and wait for it to finish
146    while (!proc.completedLatch.await(WAKE_FREQUENCY, TimeUnit.MILLISECONDS))
147      ;
148    verify(procSpy, atLeastOnce()).receive(any());
149    verify(coordinator, times(1)).rpcConnectionFailure(anyString(), eq(cause));
150    verify(controller, times(1)).sendGlobalBarrierAcquire(procSpy, procData, expected);
151    verify(controller, never()).sendGlobalBarrierReached(any(), anyListOf(String.class));
152  }
153
154  /**
155   * Check handling a connection failure correctly if we get it during the barrier phase
156   */
157  @Test
158  public void testUnreachableControllerDuringCommit() throws Exception {
159    coordinator = buildNewCoordinator();
160
161    // setup the task and spy on it
162    List<String> expected = Arrays.asList("cohort");
163    final Procedure spy =
164      spy(new Procedure(coordinator, WAKE_FREQUENCY, TIMEOUT, procName, procData, expected));
165
166    when(coordinator.createProcedure(any(), eq(procName), eq(procData), anyListOf(String.class)))
167      .thenReturn(spy);
168
169    // use the passed controller responses
170    IOException cause = new IOException("Failed to reach controller during prepare");
171    doAnswer(new AcquireBarrierAnswer(procName, new String[] { "cohort" })).when(controller)
172      .sendGlobalBarrierAcquire(eq(spy), eq(procData), anyListOf(String.class));
173    doThrow(cause).when(controller).sendGlobalBarrierReached(eq(spy), anyListOf(String.class));
174
175    // run the operation
176    Procedure task =
177      coordinator.startProcedure(spy.getErrorMonitor(), procName, procData, expected);
178    // and wait for it to finish
179    while (!task.completedLatch.await(WAKE_FREQUENCY, TimeUnit.MILLISECONDS))
180      ;
181    verify(spy, atLeastOnce()).receive(any());
182    verify(coordinator, times(1)).rpcConnectionFailure(anyString(), eq(cause));
183    verify(controller, times(1)).sendGlobalBarrierAcquire(eq(spy), eq(procData),
184      anyListOf(String.class));
185    verify(controller, times(1)).sendGlobalBarrierReached(any(), anyListOf(String.class));
186  }
187
188  @Test
189  public void testNoCohort() throws Exception {
190    runSimpleProcedure();
191  }
192
193  @Test
194  public void testSingleCohortOrchestration() throws Exception {
195    runSimpleProcedure("one");
196  }
197
198  @Test
199  public void testMultipleCohortOrchestration() throws Exception {
200    runSimpleProcedure("one", "two", "three", "four");
201  }
202
203  public void runSimpleProcedure(String... members) throws Exception {
204    coordinator = buildNewCoordinator();
205    Procedure task = new Procedure(coordinator, monitor, WAKE_FREQUENCY, TIMEOUT, procName,
206      procData, Arrays.asList(members));
207    final Procedure spy = spy(task);
208    runCoordinatedProcedure(spy, members);
209  }
210
211  /**
212   * Test that if nodes join the barrier early we still correctly handle the progress
213   */
214  @Test
215  public void testEarlyJoiningBarrier() throws Exception {
216    final String[] cohort = new String[] { "one", "two", "three", "four" };
217    coordinator = buildNewCoordinator();
218    final ProcedureCoordinator ref = coordinator;
219    Procedure task = new Procedure(coordinator, monitor, WAKE_FREQUENCY, TIMEOUT, procName,
220      procData, Arrays.asList(cohort));
221    final Procedure spy = spy(task);
222
223    AcquireBarrierAnswer prepare = new AcquireBarrierAnswer(procName, cohort) {
224      @Override
225      public void doWork() {
226        // then do some fun where we commit before all nodes have prepared
227        // "one" commits before anyone else is done
228        ref.memberAcquiredBarrier(this.opName, this.cohort[0]);
229        ref.memberFinishedBarrier(this.opName, this.cohort[0], new byte[0]);
230        // but "two" takes a while
231        ref.memberAcquiredBarrier(this.opName, this.cohort[1]);
232        // "three"jumps ahead
233        ref.memberAcquiredBarrier(this.opName, this.cohort[2]);
234        ref.memberFinishedBarrier(this.opName, this.cohort[2], new byte[0]);
235        // and "four" takes a while
236        ref.memberAcquiredBarrier(this.opName, this.cohort[3]);
237      }
238    };
239
240    BarrierAnswer commit = new BarrierAnswer(procName, cohort) {
241      @Override
242      public void doWork() {
243        ref.memberFinishedBarrier(opName, this.cohort[1], new byte[0]);
244        ref.memberFinishedBarrier(opName, this.cohort[3], new byte[0]);
245      }
246    };
247    runCoordinatedOperation(spy, prepare, commit, cohort);
248  }
249
250  /**
251   * Just run a procedure with the standard name and data, with not special task for the mock
252   * coordinator (it works just like a regular coordinator). For custom behavior see
253   * {@link #runCoordinatedOperation(Procedure, AcquireBarrierAnswer, BarrierAnswer, String[])} .
254   * @param spy    Spy on a real {@link Procedure}
255   * @param cohort expected cohort members
256   * @throws Exception on failure
257   */
258  public void runCoordinatedProcedure(Procedure spy, String... cohort) throws Exception {
259    runCoordinatedOperation(spy, new AcquireBarrierAnswer(procName, cohort),
260      new BarrierAnswer(procName, cohort), cohort);
261  }
262
263  public void runCoordinatedOperation(Procedure spy, AcquireBarrierAnswer prepare, String... cohort)
264    throws Exception {
265    runCoordinatedOperation(spy, prepare, new BarrierAnswer(procName, cohort), cohort);
266  }
267
268  public void runCoordinatedOperation(Procedure spy, BarrierAnswer commit, String... cohort)
269    throws Exception {
270    runCoordinatedOperation(spy, new AcquireBarrierAnswer(procName, cohort), commit, cohort);
271  }
272
273  public void runCoordinatedOperation(Procedure spy, AcquireBarrierAnswer prepareOperation,
274    BarrierAnswer commitOperation, String... cohort) throws Exception {
275    List<String> expected = Arrays.asList(cohort);
276    when(coordinator.createProcedure(any(), eq(procName), eq(procData), anyListOf(String.class)))
277      .thenReturn(spy);
278
279    // use the passed controller responses
280    doAnswer(prepareOperation).when(controller).sendGlobalBarrierAcquire(spy, procData, expected);
281    doAnswer(commitOperation).when(controller).sendGlobalBarrierReached(eq(spy),
282      anyListOf(String.class));
283
284    // run the operation
285    Procedure task =
286      coordinator.startProcedure(spy.getErrorMonitor(), procName, procData, expected);
287    // and wait for it to finish
288    task.waitForCompleted();
289
290    // make sure we mocked correctly
291    prepareOperation.ensureRan();
292    // we never got an exception
293    InOrder inorder = inOrder(spy, controller);
294    inorder.verify(spy).sendGlobalBarrierStart();
295    inorder.verify(controller).sendGlobalBarrierAcquire(task, procData, expected);
296    inorder.verify(spy).sendGlobalBarrierReached();
297    inorder.verify(controller).sendGlobalBarrierReached(eq(task), anyListOf(String.class));
298  }
299
300  private static abstract class OperationAnswer implements Answer<Void> {
301    private boolean ran = false;
302
303    public void ensureRan() {
304      assertTrue("Prepare mocking didn't actually run!", ran);
305    }
306
307    @Override
308    public final Void answer(InvocationOnMock invocation) throws Throwable {
309      this.ran = true;
310      doWork();
311      return null;
312    }
313
314    protected abstract void doWork() throws Throwable;
315  }
316
317  /**
318   * Just tell the current coordinator that each of the nodes has prepared
319   */
320  private class AcquireBarrierAnswer extends OperationAnswer {
321    protected final String[] cohort;
322    protected final String opName;
323
324    public AcquireBarrierAnswer(String opName, String... cohort) {
325      this.cohort = cohort;
326      this.opName = opName;
327    }
328
329    @Override
330    public void doWork() {
331      if (cohort == null) return;
332      for (String member : cohort) {
333        TestProcedureCoordinator.this.coordinator.memberAcquiredBarrier(opName, member);
334      }
335    }
336  }
337
338  /**
339   * Just tell the current coordinator that each of the nodes has committed
340   */
341  private class BarrierAnswer extends OperationAnswer {
342    protected final String[] cohort;
343    protected final String opName;
344
345    public BarrierAnswer(String opName, String... cohort) {
346      this.cohort = cohort;
347      this.opName = opName;
348    }
349
350    @Override
351    public void doWork() {
352      if (cohort == null) return;
353      for (String member : cohort) {
354        TestProcedureCoordinator.this.coordinator.memberFinishedBarrier(opName, member,
355          new byte[0]);
356      }
357    }
358  }
359}