001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure;
019
020import static org.mockito.ArgumentMatchers.any;
021import static org.mockito.ArgumentMatchers.anyString;
022import static org.mockito.Mockito.mock;
023import static org.mockito.Mockito.never;
024import static org.mockito.Mockito.spy;
025import static org.mockito.Mockito.verify;
026import static org.mockito.Mockito.when;
027
028import java.util.ArrayList;
029import java.util.List;
030import java.util.concurrent.CountDownLatch;
031import org.apache.hadoop.hbase.errorhandling.ForeignException;
032import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
033import org.apache.hadoop.hbase.testclassification.MasterTests;
034import org.apache.hadoop.hbase.testclassification.SmallTests;
035import org.junit.jupiter.api.BeforeEach;
036import org.junit.jupiter.api.Tag;
037import org.junit.jupiter.api.Test;
038
039/**
040 * Demonstrate how Procedure handles single members, multiple members, and errors semantics
041 */
042@Tag(MasterTests.TAG)
043@Tag(SmallTests.TAG)
044public class TestProcedure {
045
046  ProcedureCoordinator coord;
047
048  @BeforeEach
049  public void setup() {
050    coord = mock(ProcedureCoordinator.class);
051    final ProcedureCoordinatorRpcs comms = mock(ProcedureCoordinatorRpcs.class);
052    when(coord.getRpcs()).thenReturn(comms); // make it not null
053  }
054
055  static class LatchedProcedure extends Procedure {
056    CountDownLatch startedAcquireBarrier = new CountDownLatch(1);
057    CountDownLatch startedDuringBarrier = new CountDownLatch(1);
058    CountDownLatch completedProcedure = new CountDownLatch(1);
059
060    public LatchedProcedure(ProcedureCoordinator coord, ForeignExceptionDispatcher monitor,
061      long wakeFreq, long timeout, String opName, byte[] data, List<String> expectedMembers) {
062      super(coord, monitor, wakeFreq, timeout, opName, data, expectedMembers);
063    }
064
065    @Override
066    public void sendGlobalBarrierStart() {
067      startedAcquireBarrier.countDown();
068    }
069
070    @Override
071    public void sendGlobalBarrierReached() {
072      startedDuringBarrier.countDown();
073    }
074
075    @Override
076    public void sendGlobalBarrierComplete() {
077      completedProcedure.countDown();
078    }
079  }
080
081  /**
082   * With a single member, verify ordered execution. The Coordinator side is run in a separate
083   * thread so we can only trigger from members and wait for particular state latches.
084   */
085  @Test
086  public void testSingleMember() throws Exception {
087    // The member
088    List<String> members = new ArrayList<>();
089    members.add("member");
090    LatchedProcedure proc = new LatchedProcedure(coord, new ForeignExceptionDispatcher(), 100,
091      Integer.MAX_VALUE, "op", null, members);
092    final LatchedProcedure procspy = spy(proc);
093    // coordinator: start the barrier procedure
094    new Thread() {
095      @Override
096      public void run() {
097        procspy.call();
098      }
099    }.start();
100
101    // coordinator: wait for the barrier to be acquired, then send start barrier
102    proc.startedAcquireBarrier.await();
103
104    // we only know that {@link Procedure#sendStartBarrier()} was called, and others are blocked.
105    verify(procspy).sendGlobalBarrierStart();
106    verify(procspy, never()).sendGlobalBarrierReached();
107    verify(procspy, never()).sendGlobalBarrierComplete();
108    verify(procspy, never()).barrierAcquiredByMember(anyString());
109
110    // member: trigger global barrier acquisition
111    proc.barrierAcquiredByMember(members.get(0));
112
113    // coordinator: wait for global barrier to be acquired.
114    proc.acquiredBarrierLatch.await();
115    verify(procspy).sendGlobalBarrierStart(); // old news
116
117    // since two threads, we cannot guarantee that {@link Procedure#sendSatsifiedBarrier()} was
118    // or was not called here.
119
120    // member: trigger global barrier release
121    proc.barrierReleasedByMember(members.get(0), new byte[0]);
122
123    // coordinator: wait for procedure to be completed
124    proc.completedProcedure.await();
125    verify(procspy).sendGlobalBarrierReached();
126    verify(procspy).sendGlobalBarrierComplete();
127    verify(procspy, never()).receive(any());
128  }
129
130  @Test
131  public void testMultipleMember() throws Exception {
132    // 2 members
133    List<String> members = new ArrayList<>();
134    members.add("member1");
135    members.add("member2");
136
137    LatchedProcedure proc = new LatchedProcedure(coord, new ForeignExceptionDispatcher(), 100,
138      Integer.MAX_VALUE, "op", null, members);
139    final LatchedProcedure procspy = spy(proc);
140    // start the barrier procedure
141    new Thread() {
142      @Override
143      public void run() {
144        procspy.call();
145      }
146    }.start();
147
148    // coordinator: wait for the barrier to be acquired, then send start barrier
149    procspy.startedAcquireBarrier.await();
150
151    // we only know that {@link Procedure#sendStartBarrier()} was called, and others are blocked.
152    verify(procspy).sendGlobalBarrierStart();
153    verify(procspy, never()).sendGlobalBarrierReached();
154    verify(procspy, never()).sendGlobalBarrierComplete();
155    verify(procspy, never()).barrierAcquiredByMember(anyString()); // no externals
156
157    // member0: [1/2] trigger global barrier acquisition.
158    procspy.barrierAcquiredByMember(members.get(0));
159
160    // coordinator not satisified.
161    verify(procspy).sendGlobalBarrierStart();
162    verify(procspy, never()).sendGlobalBarrierReached();
163    verify(procspy, never()).sendGlobalBarrierComplete();
164
165    // member 1: [2/2] trigger global barrier acquisition.
166    procspy.barrierAcquiredByMember(members.get(1));
167
168    // coordinator: wait for global barrier to be acquired.
169    procspy.startedDuringBarrier.await();
170    verify(procspy).sendGlobalBarrierStart(); // old news
171
172    // member 1, 2: trigger global barrier release
173    procspy.barrierReleasedByMember(members.get(0), new byte[0]);
174    procspy.barrierReleasedByMember(members.get(1), new byte[0]);
175
176    // coordinator wait for procedure to be completed
177    procspy.completedProcedure.await();
178    verify(procspy).sendGlobalBarrierReached();
179    verify(procspy).sendGlobalBarrierComplete();
180    verify(procspy, never()).receive(any());
181  }
182
183  @Test
184  public void testErrorPropagation() throws Exception {
185    List<String> members = new ArrayList<>();
186    members.add("member");
187    Procedure proc = new Procedure(coord, new ForeignExceptionDispatcher(), 100, Integer.MAX_VALUE,
188      "op", null, members);
189    final Procedure procspy = spy(proc);
190
191    ForeignException cause = new ForeignException("SRC", "External Exception");
192    proc.receive(cause);
193
194    // start the barrier procedure
195    Thread t = new Thread() {
196      @Override
197      public void run() {
198        procspy.call();
199      }
200    };
201    t.start();
202    t.join();
203
204    verify(procspy, never()).sendGlobalBarrierStart();
205    verify(procspy, never()).sendGlobalBarrierReached();
206    verify(procspy).sendGlobalBarrierComplete();
207  }
208
209  @Test
210  public void testBarrieredErrorPropagation() throws Exception {
211    List<String> members = new ArrayList<>();
212    members.add("member");
213    LatchedProcedure proc = new LatchedProcedure(coord, new ForeignExceptionDispatcher(), 100,
214      Integer.MAX_VALUE, "op", null, members);
215    final LatchedProcedure procspy = spy(proc);
216
217    // start the barrier procedure
218    Thread t = new Thread() {
219      @Override
220      public void run() {
221        procspy.call();
222      }
223    };
224    t.start();
225
226    // now test that we can put an error in before the commit phase runs
227    procspy.startedAcquireBarrier.await();
228    ForeignException cause = new ForeignException("SRC", "External Exception");
229    procspy.receive(cause);
230    procspy.barrierAcquiredByMember(members.get(0));
231    t.join();
232
233    // verify state of all the object
234    verify(procspy).sendGlobalBarrierStart();
235    verify(procspy).sendGlobalBarrierComplete();
236    verify(procspy, never()).sendGlobalBarrierReached();
237  }
238}