001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure;
019
020import static org.mockito.Matchers.any;
021import static org.mockito.Matchers.anyString;
022import static org.mockito.Mockito.mock;
023import static org.mockito.Mockito.never;
024import static org.mockito.Mockito.spy;
025import static org.mockito.Mockito.verify;
026import static org.mockito.Mockito.when;
027
028import java.util.ArrayList;
029import java.util.List;
030import java.util.concurrent.CountDownLatch;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.errorhandling.ForeignException;
033import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
034import org.apache.hadoop.hbase.testclassification.MasterTests;
035import org.apache.hadoop.hbase.testclassification.SmallTests;
036import org.junit.Before;
037import org.junit.ClassRule;
038import org.junit.Test;
039import org.junit.experimental.categories.Category;
040
041/**
042 * Demonstrate how Procedure handles single members, multiple members, and errors semantics
043 */
044@Category({MasterTests.class, SmallTests.class})
045public class TestProcedure {
046
047  @ClassRule
048  public static final HBaseClassTestRule CLASS_RULE =
049      HBaseClassTestRule.forClass(TestProcedure.class);
050
051  ProcedureCoordinator coord;
052
053  @Before
054  public void setup() {
055    coord = mock(ProcedureCoordinator.class);
056    final ProcedureCoordinatorRpcs comms = mock(ProcedureCoordinatorRpcs.class);
057    when(coord.getRpcs()).thenReturn(comms); // make it not null
058  }
059
060  static class LatchedProcedure extends Procedure {
061    CountDownLatch startedAcquireBarrier = new CountDownLatch(1);
062    CountDownLatch startedDuringBarrier = new CountDownLatch(1);
063    CountDownLatch completedProcedure = new CountDownLatch(1);
064
065    public LatchedProcedure(ProcedureCoordinator coord, ForeignExceptionDispatcher monitor,
066        long wakeFreq, long timeout, String opName, byte[] data,
067        List<String> expectedMembers) {
068      super(coord, monitor, wakeFreq, timeout, opName, data, expectedMembers);
069    }
070
071    @Override
072    public void sendGlobalBarrierStart() {
073      startedAcquireBarrier.countDown();
074    }
075
076    @Override
077    public void sendGlobalBarrierReached() {
078      startedDuringBarrier.countDown();
079    }
080
081    @Override
082    public void sendGlobalBarrierComplete() {
083      completedProcedure.countDown();
084    }
085  };
086
087  /**
088   * With a single member, verify ordered execution.  The Coordinator side is run in a separate
089   * thread so we can only trigger from members and wait for particular state latches.
090   */
091  @Test
092  public void testSingleMember() throws Exception {
093    // The member
094    List<String> members =  new ArrayList<>();
095    members.add("member");
096    LatchedProcedure proc = new LatchedProcedure(coord, new ForeignExceptionDispatcher(), 100,
097        Integer.MAX_VALUE, "op", null, members);
098    final LatchedProcedure procspy = spy(proc);
099    // coordinator: start the barrier procedure
100    new Thread() {
101      @Override
102      public void run() {
103        procspy.call();
104      }
105    }.start();
106
107    // coordinator: wait for the barrier to be acquired, then send start barrier
108    proc.startedAcquireBarrier.await();
109
110    // we only know that {@link Procedure#sendStartBarrier()} was called, and others are blocked.
111    verify(procspy).sendGlobalBarrierStart();
112    verify(procspy, never()).sendGlobalBarrierReached();
113    verify(procspy, never()).sendGlobalBarrierComplete();
114    verify(procspy, never()).barrierAcquiredByMember(anyString());
115
116    // member: trigger global barrier acquisition
117    proc.barrierAcquiredByMember(members.get(0));
118
119    // coordinator: wait for global barrier to be acquired.
120    proc.acquiredBarrierLatch.await();
121    verify(procspy).sendGlobalBarrierStart(); // old news
122
123    // since two threads, we cannot guarantee that {@link Procedure#sendSatsifiedBarrier()} was
124    // or was not called here.
125
126    // member: trigger global barrier release
127    proc.barrierReleasedByMember(members.get(0), new byte[0]);
128
129    // coordinator: wait for procedure to be completed
130    proc.completedProcedure.await();
131    verify(procspy).sendGlobalBarrierReached();
132    verify(procspy).sendGlobalBarrierComplete();
133    verify(procspy, never()).receive(any());
134  }
135
136  @Test
137  public void testMultipleMember() throws Exception {
138    // 2 members
139    List<String> members =  new ArrayList<>();
140    members.add("member1");
141    members.add("member2");
142
143    LatchedProcedure proc = new LatchedProcedure(coord, new ForeignExceptionDispatcher(), 100,
144        Integer.MAX_VALUE, "op", null, members);
145    final LatchedProcedure procspy = spy(proc);
146    // start the barrier procedure
147    new Thread() {
148      @Override
149      public void run() {
150        procspy.call();
151      }
152    }.start();
153
154    // coordinator: wait for the barrier to be acquired, then send start barrier
155    procspy.startedAcquireBarrier.await();
156
157    // we only know that {@link Procedure#sendStartBarrier()} was called, and others are blocked.
158    verify(procspy).sendGlobalBarrierStart();
159    verify(procspy, never()).sendGlobalBarrierReached();
160    verify(procspy, never()).sendGlobalBarrierComplete();
161    verify(procspy, never()).barrierAcquiredByMember(anyString()); // no externals
162
163    // member0: [1/2] trigger global barrier acquisition.
164    procspy.barrierAcquiredByMember(members.get(0));
165
166    // coordinator not satisified.
167    verify(procspy).sendGlobalBarrierStart();
168    verify(procspy, never()).sendGlobalBarrierReached();
169    verify(procspy, never()).sendGlobalBarrierComplete();
170
171    // member 1: [2/2] trigger global barrier acquisition.
172    procspy.barrierAcquiredByMember(members.get(1));
173
174    // coordinator: wait for global barrier to be acquired.
175    procspy.startedDuringBarrier.await();
176    verify(procspy).sendGlobalBarrierStart(); // old news
177
178    // member 1, 2: trigger global barrier release
179    procspy.barrierReleasedByMember(members.get(0), new byte[0]);
180    procspy.barrierReleasedByMember(members.get(1), new byte[0]);
181
182    // coordinator wait for procedure to be completed
183    procspy.completedProcedure.await();
184    verify(procspy).sendGlobalBarrierReached();
185    verify(procspy).sendGlobalBarrierComplete();
186    verify(procspy, never()).receive(any());
187  }
188
189  @Test
190  public void testErrorPropagation() throws Exception {
191    List<String> members =  new ArrayList<>();
192    members.add("member");
193    Procedure proc = new Procedure(coord, new ForeignExceptionDispatcher(), 100,
194        Integer.MAX_VALUE, "op", null, members);
195    final Procedure procspy = spy(proc);
196
197    ForeignException cause = new ForeignException("SRC", "External Exception");
198    proc.receive(cause);
199
200    // start the barrier procedure
201    Thread t = new Thread() {
202      @Override
203      public void run() {
204        procspy.call();
205      }
206    };
207    t.start();
208    t.join();
209
210    verify(procspy, never()).sendGlobalBarrierStart();
211    verify(procspy, never()).sendGlobalBarrierReached();
212    verify(procspy).sendGlobalBarrierComplete();
213  }
214
215  @Test
216  public void testBarrieredErrorPropagation() throws Exception {
217    List<String> members =  new ArrayList<>();
218    members.add("member");
219    LatchedProcedure proc = new LatchedProcedure(coord, new ForeignExceptionDispatcher(), 100,
220        Integer.MAX_VALUE, "op", null, members);
221    final LatchedProcedure procspy = spy(proc);
222
223    // start the barrier procedure
224    Thread t = new Thread() {
225      @Override
226      public void run() {
227        procspy.call();
228      }
229    };
230    t.start();
231
232    // now test that we can put an error in before the commit phase runs
233    procspy.startedAcquireBarrier.await();
234    ForeignException cause = new ForeignException("SRC", "External Exception");
235    procspy.receive(cause);
236    procspy.barrierAcquiredByMember(members.get(0));
237    t.join();
238
239    // verify state of all the object
240    verify(procspy).sendGlobalBarrierStart();
241    verify(procspy).sendGlobalBarrierComplete();
242    verify(procspy, never()).sendGlobalBarrierReached();
243  }
244}