001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure;
019
020import static org.junit.Assert.assertNull;
021import static org.junit.Assert.assertTrue;
022import static org.mockito.Matchers.any;
023import static org.mockito.Matchers.anyListOf;
024import static org.mockito.Matchers.anyString;
025import static org.mockito.Matchers.eq;
026import static org.mockito.Mockito.atLeastOnce;
027import static org.mockito.Mockito.doAnswer;
028import static org.mockito.Mockito.doThrow;
029import static org.mockito.Mockito.inOrder;
030import static org.mockito.Mockito.mock;
031import static org.mockito.Mockito.never;
032import static org.mockito.Mockito.reset;
033import static org.mockito.Mockito.spy;
034import static org.mockito.Mockito.times;
035import static org.mockito.Mockito.verify;
036import static org.mockito.Mockito.when;
037
038import java.io.IOException;
039import java.util.Arrays;
040import java.util.List;
041import java.util.concurrent.ThreadPoolExecutor;
042import java.util.concurrent.TimeUnit;
043import org.apache.hadoop.hbase.HBaseClassTestRule;
044import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
045import org.apache.hadoop.hbase.testclassification.MasterTests;
046import org.apache.hadoop.hbase.testclassification.SmallTests;
047import org.junit.After;
048import org.junit.ClassRule;
049import org.junit.Test;
050import org.junit.experimental.categories.Category;
051import org.mockito.InOrder;
052import org.mockito.invocation.InvocationOnMock;
053import org.mockito.stubbing.Answer;
054
055import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
056
057/**
058 * Test Procedure coordinator operation.
059 * <p>
060 * This only works correctly when we do <i>class level parallelization</i> of tests. If we do method
061 * level serialization this class will likely throw all kinds of errors.
062 */
063@Category({MasterTests.class, SmallTests.class})
064public class TestProcedureCoordinator {
065
066  @ClassRule
067  public static final HBaseClassTestRule CLASS_RULE =
068      HBaseClassTestRule.forClass(TestProcedureCoordinator.class);
069
070  // general test constants
071  private static final long WAKE_FREQUENCY = 1000;
072  private static final long TIMEOUT = 100000;
073  private static final long POOL_KEEP_ALIVE = 1;
074  private static final String nodeName = "node";
075  private static final String procName = "some op";
076  private static final byte[] procData = new byte[0];
077  private static final List<String> expected = Lists.newArrayList("remote1", "remote2");
078
079  // setup the mocks
080  private final ProcedureCoordinatorRpcs controller = mock(ProcedureCoordinatorRpcs.class);
081  private final Procedure task = mock(Procedure.class);
082  private final ForeignExceptionDispatcher monitor = mock(ForeignExceptionDispatcher.class);
083
084  // handle to the coordinator for each test
085  private ProcedureCoordinator coordinator;
086
087  @After
088  public void resetTest() throws IOException {
089    // reset all the mocks used for the tests
090    reset(controller, task, monitor);
091    // close the open coordinator, if it was used
092    if (coordinator != null) coordinator.close();
093  }
094
095  private ProcedureCoordinator buildNewCoordinator() {
096    ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(nodeName, 1, POOL_KEEP_ALIVE);
097    return spy(new ProcedureCoordinator(controller, pool));
098  }
099
100  /**
101   * Currently we can only handle one procedure at a time.  This makes sure we handle that and
102   * reject submitting more.
103   */
104  @Test
105  public void testThreadPoolSize() throws Exception {
106    ProcedureCoordinator coordinator = buildNewCoordinator();
107    Procedure proc = new Procedure(coordinator,  monitor,
108        WAKE_FREQUENCY, TIMEOUT, procName, procData, expected);
109    Procedure procSpy = spy(proc);
110
111    Procedure proc2 = new Procedure(coordinator,  monitor,
112        WAKE_FREQUENCY, TIMEOUT, procName +"2", procData, expected);
113    Procedure procSpy2 = spy(proc2);
114    when(coordinator.createProcedure(any(), eq(procName), eq(procData), anyListOf(String.class)))
115    .thenReturn(procSpy, procSpy2);
116
117    coordinator.startProcedure(procSpy.getErrorMonitor(), procName, procData, expected);
118    // null here means second procedure failed to start.
119    assertNull("Coordinator successfully ran two tasks at once with a single thread pool.",
120      coordinator.startProcedure(proc2.getErrorMonitor(), "another op", procData, expected));
121  }
122
123  /**
124   * Check handling a connection failure correctly if we get it during the acquiring phase
125   */
126  @Test
127  public void testUnreachableControllerDuringPrepare() throws Exception {
128    coordinator = buildNewCoordinator();
129    // setup the proc
130    List<String> expected = Arrays.asList("cohort");
131    Procedure proc = new Procedure(coordinator, WAKE_FREQUENCY,
132        TIMEOUT, procName, procData, expected);
133    final Procedure procSpy = spy(proc);
134
135    when(coordinator.createProcedure(any(), eq(procName), eq(procData), anyListOf(String.class)))
136        .thenReturn(procSpy);
137
138    // use the passed controller responses
139    IOException cause = new IOException("Failed to reach comms during acquire");
140    doThrow(cause).when(controller)
141        .sendGlobalBarrierAcquire(eq(procSpy), eq(procData), anyListOf(String.class));
142
143    // run the operation
144    proc = coordinator.startProcedure(proc.getErrorMonitor(), procName, procData, expected);
145    // and wait for it to finish
146    while(!proc.completedLatch.await(WAKE_FREQUENCY, TimeUnit.MILLISECONDS));
147    verify(procSpy, atLeastOnce()).receive(any());
148    verify(coordinator, times(1)).rpcConnectionFailure(anyString(), eq(cause));
149    verify(controller, times(1)).sendGlobalBarrierAcquire(procSpy, procData, expected);
150    verify(controller, never()).sendGlobalBarrierReached(any(),
151        anyListOf(String.class));
152  }
153
154  /**
155   * Check handling a connection failure correctly if we get it during the barrier phase
156   */
157  @Test
158  public void testUnreachableControllerDuringCommit() throws Exception {
159    coordinator = buildNewCoordinator();
160
161    // setup the task and spy on it
162    List<String> expected = Arrays.asList("cohort");
163    final Procedure spy = spy(new Procedure(coordinator,
164        WAKE_FREQUENCY, TIMEOUT, procName, procData, expected));
165
166    when(coordinator.createProcedure(any(), eq(procName), eq(procData), anyListOf(String.class)))
167    .thenReturn(spy);
168
169    // use the passed controller responses
170    IOException cause = new IOException("Failed to reach controller during prepare");
171    doAnswer(new AcquireBarrierAnswer(procName, new String[] { "cohort" }))
172        .when(controller).sendGlobalBarrierAcquire(eq(spy), eq(procData), anyListOf(String.class));
173    doThrow(cause).when(controller).sendGlobalBarrierReached(eq(spy), anyListOf(String.class));
174
175    // run the operation
176    Procedure task = coordinator.startProcedure(spy.getErrorMonitor(), procName, procData, expected);
177    // and wait for it to finish
178    while(!task.completedLatch.await(WAKE_FREQUENCY, TimeUnit.MILLISECONDS));
179    verify(spy, atLeastOnce()).receive(any());
180    verify(coordinator, times(1)).rpcConnectionFailure(anyString(), eq(cause));
181    verify(controller, times(1)).sendGlobalBarrierAcquire(eq(spy),
182        eq(procData), anyListOf(String.class));
183    verify(controller, times(1)).sendGlobalBarrierReached(any(),
184        anyListOf(String.class));
185  }
186
187  @Test
188  public void testNoCohort() throws Exception {
189    runSimpleProcedure();
190  }
191
192  @Test
193  public void testSingleCohortOrchestration() throws Exception {
194    runSimpleProcedure("one");
195  }
196
197  @Test
198  public void testMultipleCohortOrchestration() throws Exception {
199    runSimpleProcedure("one", "two", "three", "four");
200  }
201
202  public void runSimpleProcedure(String... members) throws Exception {
203    coordinator = buildNewCoordinator();
204    Procedure task = new Procedure(coordinator, monitor, WAKE_FREQUENCY,
205        TIMEOUT, procName, procData, Arrays.asList(members));
206    final Procedure spy = spy(task);
207    runCoordinatedProcedure(spy, members);
208  }
209
210  /**
211   * Test that if nodes join the barrier early we still correctly handle the progress
212   */
213  @Test
214  public void testEarlyJoiningBarrier() throws Exception {
215    final String[] cohort = new String[] { "one", "two", "three", "four" };
216    coordinator = buildNewCoordinator();
217    final ProcedureCoordinator ref = coordinator;
218    Procedure task = new Procedure(coordinator, monitor, WAKE_FREQUENCY,
219        TIMEOUT, procName, procData, Arrays.asList(cohort));
220    final Procedure spy = spy(task);
221
222    AcquireBarrierAnswer prepare = new AcquireBarrierAnswer(procName, cohort) {
223      @Override
224      public void doWork() {
225        // then do some fun where we commit before all nodes have prepared
226        // "one" commits before anyone else is done
227        ref.memberAcquiredBarrier(this.opName, this.cohort[0]);
228        ref.memberFinishedBarrier(this.opName, this.cohort[0], new byte[0]);
229        // but "two" takes a while
230        ref.memberAcquiredBarrier(this.opName, this.cohort[1]);
231        // "three"jumps ahead
232        ref.memberAcquiredBarrier(this.opName, this.cohort[2]);
233        ref.memberFinishedBarrier(this.opName, this.cohort[2], new byte[0]);
234        // and "four" takes a while
235        ref.memberAcquiredBarrier(this.opName, this.cohort[3]);
236      }
237    };
238
239    BarrierAnswer commit = new BarrierAnswer(procName, cohort) {
240      @Override
241      public void doWork() {
242        ref.memberFinishedBarrier(opName, this.cohort[1], new byte[0]);
243        ref.memberFinishedBarrier(opName, this.cohort[3], new byte[0]);
244      }
245    };
246    runCoordinatedOperation(spy, prepare, commit, cohort);
247  }
248
249  /**
250   * Just run a procedure with the standard name and data, with not special task for the mock
251   * coordinator (it works just like a regular coordinator). For custom behavior see
252   * {@link #runCoordinatedOperation(Procedure, AcquireBarrierAnswer, BarrierAnswer, String[])}
253   * .
254   * @param spy Spy on a real {@link Procedure}
255   * @param cohort expected cohort members
256   * @throws Exception on failure
257   */
258  public void runCoordinatedProcedure(Procedure spy, String... cohort) throws Exception {
259    runCoordinatedOperation(spy, new AcquireBarrierAnswer(procName, cohort),
260      new BarrierAnswer(procName, cohort), cohort);
261  }
262
263  public void runCoordinatedOperation(Procedure spy, AcquireBarrierAnswer prepare,
264      String... cohort) throws Exception {
265    runCoordinatedOperation(spy, prepare, new BarrierAnswer(procName, cohort), cohort);
266  }
267
268  public void runCoordinatedOperation(Procedure spy, BarrierAnswer commit,
269      String... cohort) throws Exception {
270    runCoordinatedOperation(spy, new AcquireBarrierAnswer(procName, cohort), commit, cohort);
271  }
272
273  public void runCoordinatedOperation(Procedure spy, AcquireBarrierAnswer prepareOperation,
274      BarrierAnswer commitOperation, String... cohort) throws Exception {
275    List<String> expected = Arrays.asList(cohort);
276    when(coordinator.createProcedure(any(), eq(procName), eq(procData), anyListOf(String.class)))
277      .thenReturn(spy);
278
279    // use the passed controller responses
280    doAnswer(prepareOperation).when(controller).sendGlobalBarrierAcquire(spy, procData, expected);
281    doAnswer(commitOperation).when(controller)
282        .sendGlobalBarrierReached(eq(spy), anyListOf(String.class));
283
284    // run the operation
285    Procedure task = coordinator.startProcedure(spy.getErrorMonitor(), procName, procData, expected);
286    // and wait for it to finish
287    task.waitForCompleted();
288
289    // make sure we mocked correctly
290    prepareOperation.ensureRan();
291    // we never got an exception
292    InOrder inorder = inOrder(spy, controller);
293    inorder.verify(spy).sendGlobalBarrierStart();
294    inorder.verify(controller).sendGlobalBarrierAcquire(task, procData, expected);
295    inorder.verify(spy).sendGlobalBarrierReached();
296    inorder.verify(controller).sendGlobalBarrierReached(eq(task), anyListOf(String.class));
297  }
298
299  private static abstract class OperationAnswer implements Answer<Void> {
300    private boolean ran = false;
301
302    public void ensureRan() {
303      assertTrue("Prepare mocking didn't actually run!", ran);
304    }
305
306    @Override
307    public final Void answer(InvocationOnMock invocation) throws Throwable {
308      this.ran = true;
309      doWork();
310      return null;
311    }
312
313    protected abstract void doWork() throws Throwable;
314  }
315
316  /**
317   * Just tell the current coordinator that each of the nodes has prepared
318   */
319  private class AcquireBarrierAnswer extends OperationAnswer {
320    protected final String[] cohort;
321    protected final String opName;
322
323    public AcquireBarrierAnswer(String opName, String... cohort) {
324      this.cohort = cohort;
325      this.opName = opName;
326    }
327
328    @Override
329    public void doWork() {
330      if (cohort == null) return;
331      for (String member : cohort) {
332        TestProcedureCoordinator.this.coordinator.memberAcquiredBarrier(opName, member);
333      }
334    }
335  }
336
337  /**
338   * Just tell the current coordinator that each of the nodes has committed
339   */
340  private class BarrierAnswer extends OperationAnswer {
341    protected final String[] cohort;
342    protected final String opName;
343
344    public BarrierAnswer(String opName, String... cohort) {
345      this.cohort = cohort;
346      this.opName = opName;
347    }
348
349    @Override
350    public void doWork() {
351      if (cohort == null) return;
352      for (String member : cohort) {
353        TestProcedureCoordinator.this.coordinator.memberFinishedBarrier(opName, member,
354          new byte[0]);
355      }
356    }
357  }
358}