001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure; 019 020import static org.mockito.Matchers.any; 021import static org.mockito.Matchers.anyString; 022import static org.mockito.Mockito.mock; 023import static org.mockito.Mockito.never; 024import static org.mockito.Mockito.spy; 025import static org.mockito.Mockito.verify; 026import static org.mockito.Mockito.when; 027 028import java.util.ArrayList; 029import java.util.List; 030import java.util.concurrent.CountDownLatch; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.errorhandling.ForeignException; 033import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 034import org.apache.hadoop.hbase.testclassification.MasterTests; 035import org.apache.hadoop.hbase.testclassification.SmallTests; 036import org.junit.Before; 037import org.junit.ClassRule; 038import org.junit.Test; 039import org.junit.experimental.categories.Category; 040 041/** 042 * Demonstrate how Procedure handles single members, multiple members, and errors semantics 043 */ 044@Category({MasterTests.class, SmallTests.class}) 045public class TestProcedure { 046 047 @ClassRule 048 public static final HBaseClassTestRule CLASS_RULE = 049 HBaseClassTestRule.forClass(TestProcedure.class); 050 051 ProcedureCoordinator coord; 052 053 @Before 054 public void setup() { 055 coord = mock(ProcedureCoordinator.class); 056 final ProcedureCoordinatorRpcs comms = mock(ProcedureCoordinatorRpcs.class); 057 when(coord.getRpcs()).thenReturn(comms); // make it not null 058 } 059 060 static class LatchedProcedure extends Procedure { 061 CountDownLatch startedAcquireBarrier = new CountDownLatch(1); 062 CountDownLatch startedDuringBarrier = new CountDownLatch(1); 063 CountDownLatch completedProcedure = new CountDownLatch(1); 064 065 public LatchedProcedure(ProcedureCoordinator coord, ForeignExceptionDispatcher monitor, 066 long wakeFreq, long timeout, String opName, byte[] data, 067 List<String> expectedMembers) { 068 super(coord, monitor, wakeFreq, timeout, opName, data, expectedMembers); 069 } 070 071 @Override 072 public void sendGlobalBarrierStart() { 073 startedAcquireBarrier.countDown(); 074 } 075 076 @Override 077 public void sendGlobalBarrierReached() { 078 startedDuringBarrier.countDown(); 079 } 080 081 @Override 082 public void sendGlobalBarrierComplete() { 083 completedProcedure.countDown(); 084 } 085 }; 086 087 /** 088 * With a single member, verify ordered execution. The Coordinator side is run in a separate 089 * thread so we can only trigger from members and wait for particular state latches. 090 */ 091 @Test 092 public void testSingleMember() throws Exception { 093 // The member 094 List<String> members = new ArrayList<>(); 095 members.add("member"); 096 LatchedProcedure proc = new LatchedProcedure(coord, new ForeignExceptionDispatcher(), 100, 097 Integer.MAX_VALUE, "op", null, members); 098 final LatchedProcedure procspy = spy(proc); 099 // coordinator: start the barrier procedure 100 new Thread() { 101 @Override 102 public void run() { 103 procspy.call(); 104 } 105 }.start(); 106 107 // coordinator: wait for the barrier to be acquired, then send start barrier 108 proc.startedAcquireBarrier.await(); 109 110 // we only know that {@link Procedure#sendStartBarrier()} was called, and others are blocked. 111 verify(procspy).sendGlobalBarrierStart(); 112 verify(procspy, never()).sendGlobalBarrierReached(); 113 verify(procspy, never()).sendGlobalBarrierComplete(); 114 verify(procspy, never()).barrierAcquiredByMember(anyString()); 115 116 // member: trigger global barrier acquisition 117 proc.barrierAcquiredByMember(members.get(0)); 118 119 // coordinator: wait for global barrier to be acquired. 120 proc.acquiredBarrierLatch.await(); 121 verify(procspy).sendGlobalBarrierStart(); // old news 122 123 // since two threads, we cannot guarantee that {@link Procedure#sendSatsifiedBarrier()} was 124 // or was not called here. 125 126 // member: trigger global barrier release 127 proc.barrierReleasedByMember(members.get(0), new byte[0]); 128 129 // coordinator: wait for procedure to be completed 130 proc.completedProcedure.await(); 131 verify(procspy).sendGlobalBarrierReached(); 132 verify(procspy).sendGlobalBarrierComplete(); 133 verify(procspy, never()).receive(any()); 134 } 135 136 @Test 137 public void testMultipleMember() throws Exception { 138 // 2 members 139 List<String> members = new ArrayList<>(); 140 members.add("member1"); 141 members.add("member2"); 142 143 LatchedProcedure proc = new LatchedProcedure(coord, new ForeignExceptionDispatcher(), 100, 144 Integer.MAX_VALUE, "op", null, members); 145 final LatchedProcedure procspy = spy(proc); 146 // start the barrier procedure 147 new Thread() { 148 @Override 149 public void run() { 150 procspy.call(); 151 } 152 }.start(); 153 154 // coordinator: wait for the barrier to be acquired, then send start barrier 155 procspy.startedAcquireBarrier.await(); 156 157 // we only know that {@link Procedure#sendStartBarrier()} was called, and others are blocked. 158 verify(procspy).sendGlobalBarrierStart(); 159 verify(procspy, never()).sendGlobalBarrierReached(); 160 verify(procspy, never()).sendGlobalBarrierComplete(); 161 verify(procspy, never()).barrierAcquiredByMember(anyString()); // no externals 162 163 // member0: [1/2] trigger global barrier acquisition. 164 procspy.barrierAcquiredByMember(members.get(0)); 165 166 // coordinator not satisified. 167 verify(procspy).sendGlobalBarrierStart(); 168 verify(procspy, never()).sendGlobalBarrierReached(); 169 verify(procspy, never()).sendGlobalBarrierComplete(); 170 171 // member 1: [2/2] trigger global barrier acquisition. 172 procspy.barrierAcquiredByMember(members.get(1)); 173 174 // coordinator: wait for global barrier to be acquired. 175 procspy.startedDuringBarrier.await(); 176 verify(procspy).sendGlobalBarrierStart(); // old news 177 178 // member 1, 2: trigger global barrier release 179 procspy.barrierReleasedByMember(members.get(0), new byte[0]); 180 procspy.barrierReleasedByMember(members.get(1), new byte[0]); 181 182 // coordinator wait for procedure to be completed 183 procspy.completedProcedure.await(); 184 verify(procspy).sendGlobalBarrierReached(); 185 verify(procspy).sendGlobalBarrierComplete(); 186 verify(procspy, never()).receive(any()); 187 } 188 189 @Test 190 public void testErrorPropagation() throws Exception { 191 List<String> members = new ArrayList<>(); 192 members.add("member"); 193 Procedure proc = new Procedure(coord, new ForeignExceptionDispatcher(), 100, 194 Integer.MAX_VALUE, "op", null, members); 195 final Procedure procspy = spy(proc); 196 197 ForeignException cause = new ForeignException("SRC", "External Exception"); 198 proc.receive(cause); 199 200 // start the barrier procedure 201 Thread t = new Thread() { 202 @Override 203 public void run() { 204 procspy.call(); 205 } 206 }; 207 t.start(); 208 t.join(); 209 210 verify(procspy, never()).sendGlobalBarrierStart(); 211 verify(procspy, never()).sendGlobalBarrierReached(); 212 verify(procspy).sendGlobalBarrierComplete(); 213 } 214 215 @Test 216 public void testBarrieredErrorPropagation() throws Exception { 217 List<String> members = new ArrayList<>(); 218 members.add("member"); 219 LatchedProcedure proc = new LatchedProcedure(coord, new ForeignExceptionDispatcher(), 100, 220 Integer.MAX_VALUE, "op", null, members); 221 final LatchedProcedure procspy = spy(proc); 222 223 // start the barrier procedure 224 Thread t = new Thread() { 225 @Override 226 public void run() { 227 procspy.call(); 228 } 229 }; 230 t.start(); 231 232 // now test that we can put an error in before the commit phase runs 233 procspy.startedAcquireBarrier.await(); 234 ForeignException cause = new ForeignException("SRC", "External Exception"); 235 procspy.receive(cause); 236 procspy.barrierAcquiredByMember(members.get(0)); 237 t.join(); 238 239 // verify state of all the object 240 verify(procspy).sendGlobalBarrierStart(); 241 verify(procspy).sendGlobalBarrierComplete(); 242 verify(procspy, never()).sendGlobalBarrierReached(); 243 } 244}