001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure; 019 020import static org.mockito.ArgumentMatchers.any; 021import static org.mockito.ArgumentMatchers.anyString; 022import static org.mockito.Mockito.mock; 023import static org.mockito.Mockito.never; 024import static org.mockito.Mockito.spy; 025import static org.mockito.Mockito.verify; 026import static org.mockito.Mockito.when; 027 028import java.util.ArrayList; 029import java.util.List; 030import java.util.concurrent.CountDownLatch; 031import org.apache.hadoop.hbase.errorhandling.ForeignException; 032import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 033import org.apache.hadoop.hbase.testclassification.MasterTests; 034import org.apache.hadoop.hbase.testclassification.SmallTests; 035import org.junit.jupiter.api.BeforeEach; 036import org.junit.jupiter.api.Tag; 037import org.junit.jupiter.api.Test; 038 039/** 040 * Demonstrate how Procedure handles single members, multiple members, and errors semantics 041 */ 042@Tag(MasterTests.TAG) 043@Tag(SmallTests.TAG) 044public class TestProcedure { 045 046 ProcedureCoordinator coord; 047 048 @BeforeEach 049 public void setup() { 050 coord = mock(ProcedureCoordinator.class); 051 final ProcedureCoordinatorRpcs comms = mock(ProcedureCoordinatorRpcs.class); 052 when(coord.getRpcs()).thenReturn(comms); // make it not null 053 } 054 055 static class LatchedProcedure extends Procedure { 056 CountDownLatch startedAcquireBarrier = new CountDownLatch(1); 057 CountDownLatch startedDuringBarrier = new CountDownLatch(1); 058 CountDownLatch completedProcedure = new CountDownLatch(1); 059 060 public LatchedProcedure(ProcedureCoordinator coord, ForeignExceptionDispatcher monitor, 061 long wakeFreq, long timeout, String opName, byte[] data, List<String> expectedMembers) { 062 super(coord, monitor, wakeFreq, timeout, opName, data, expectedMembers); 063 } 064 065 @Override 066 public void sendGlobalBarrierStart() { 067 startedAcquireBarrier.countDown(); 068 } 069 070 @Override 071 public void sendGlobalBarrierReached() { 072 startedDuringBarrier.countDown(); 073 } 074 075 @Override 076 public void sendGlobalBarrierComplete() { 077 completedProcedure.countDown(); 078 } 079 } 080 081 /** 082 * With a single member, verify ordered execution. The Coordinator side is run in a separate 083 * thread so we can only trigger from members and wait for particular state latches. 084 */ 085 @Test 086 public void testSingleMember() throws Exception { 087 // The member 088 List<String> members = new ArrayList<>(); 089 members.add("member"); 090 LatchedProcedure proc = new LatchedProcedure(coord, new ForeignExceptionDispatcher(), 100, 091 Integer.MAX_VALUE, "op", null, members); 092 final LatchedProcedure procspy = spy(proc); 093 // coordinator: start the barrier procedure 094 new Thread() { 095 @Override 096 public void run() { 097 procspy.call(); 098 } 099 }.start(); 100 101 // coordinator: wait for the barrier to be acquired, then send start barrier 102 proc.startedAcquireBarrier.await(); 103 104 // we only know that {@link Procedure#sendStartBarrier()} was called, and others are blocked. 105 verify(procspy).sendGlobalBarrierStart(); 106 verify(procspy, never()).sendGlobalBarrierReached(); 107 verify(procspy, never()).sendGlobalBarrierComplete(); 108 verify(procspy, never()).barrierAcquiredByMember(anyString()); 109 110 // member: trigger global barrier acquisition 111 proc.barrierAcquiredByMember(members.get(0)); 112 113 // coordinator: wait for global barrier to be acquired. 114 proc.acquiredBarrierLatch.await(); 115 verify(procspy).sendGlobalBarrierStart(); // old news 116 117 // since two threads, we cannot guarantee that {@link Procedure#sendSatsifiedBarrier()} was 118 // or was not called here. 119 120 // member: trigger global barrier release 121 proc.barrierReleasedByMember(members.get(0), new byte[0]); 122 123 // coordinator: wait for procedure to be completed 124 proc.completedProcedure.await(); 125 verify(procspy).sendGlobalBarrierReached(); 126 verify(procspy).sendGlobalBarrierComplete(); 127 verify(procspy, never()).receive(any()); 128 } 129 130 @Test 131 public void testMultipleMember() throws Exception { 132 // 2 members 133 List<String> members = new ArrayList<>(); 134 members.add("member1"); 135 members.add("member2"); 136 137 LatchedProcedure proc = new LatchedProcedure(coord, new ForeignExceptionDispatcher(), 100, 138 Integer.MAX_VALUE, "op", null, members); 139 final LatchedProcedure procspy = spy(proc); 140 // start the barrier procedure 141 new Thread() { 142 @Override 143 public void run() { 144 procspy.call(); 145 } 146 }.start(); 147 148 // coordinator: wait for the barrier to be acquired, then send start barrier 149 procspy.startedAcquireBarrier.await(); 150 151 // we only know that {@link Procedure#sendStartBarrier()} was called, and others are blocked. 152 verify(procspy).sendGlobalBarrierStart(); 153 verify(procspy, never()).sendGlobalBarrierReached(); 154 verify(procspy, never()).sendGlobalBarrierComplete(); 155 verify(procspy, never()).barrierAcquiredByMember(anyString()); // no externals 156 157 // member0: [1/2] trigger global barrier acquisition. 158 procspy.barrierAcquiredByMember(members.get(0)); 159 160 // coordinator not satisified. 161 verify(procspy).sendGlobalBarrierStart(); 162 verify(procspy, never()).sendGlobalBarrierReached(); 163 verify(procspy, never()).sendGlobalBarrierComplete(); 164 165 // member 1: [2/2] trigger global barrier acquisition. 166 procspy.barrierAcquiredByMember(members.get(1)); 167 168 // coordinator: wait for global barrier to be acquired. 169 procspy.startedDuringBarrier.await(); 170 verify(procspy).sendGlobalBarrierStart(); // old news 171 172 // member 1, 2: trigger global barrier release 173 procspy.barrierReleasedByMember(members.get(0), new byte[0]); 174 procspy.barrierReleasedByMember(members.get(1), new byte[0]); 175 176 // coordinator wait for procedure to be completed 177 procspy.completedProcedure.await(); 178 verify(procspy).sendGlobalBarrierReached(); 179 verify(procspy).sendGlobalBarrierComplete(); 180 verify(procspy, never()).receive(any()); 181 } 182 183 @Test 184 public void testErrorPropagation() throws Exception { 185 List<String> members = new ArrayList<>(); 186 members.add("member"); 187 Procedure proc = new Procedure(coord, new ForeignExceptionDispatcher(), 100, Integer.MAX_VALUE, 188 "op", null, members); 189 final Procedure procspy = spy(proc); 190 191 ForeignException cause = new ForeignException("SRC", "External Exception"); 192 proc.receive(cause); 193 194 // start the barrier procedure 195 Thread t = new Thread() { 196 @Override 197 public void run() { 198 procspy.call(); 199 } 200 }; 201 t.start(); 202 t.join(); 203 204 verify(procspy, never()).sendGlobalBarrierStart(); 205 verify(procspy, never()).sendGlobalBarrierReached(); 206 verify(procspy).sendGlobalBarrierComplete(); 207 } 208 209 @Test 210 public void testBarrieredErrorPropagation() throws Exception { 211 List<String> members = new ArrayList<>(); 212 members.add("member"); 213 LatchedProcedure proc = new LatchedProcedure(coord, new ForeignExceptionDispatcher(), 100, 214 Integer.MAX_VALUE, "op", null, members); 215 final LatchedProcedure procspy = spy(proc); 216 217 // start the barrier procedure 218 Thread t = new Thread() { 219 @Override 220 public void run() { 221 procspy.call(); 222 } 223 }; 224 t.start(); 225 226 // now test that we can put an error in before the commit phase runs 227 procspy.startedAcquireBarrier.await(); 228 ForeignException cause = new ForeignException("SRC", "External Exception"); 229 procspy.receive(cause); 230 procspy.barrierAcquiredByMember(members.get(0)); 231 t.join(); 232 233 // verify state of all the object 234 verify(procspy).sendGlobalBarrierStart(); 235 verify(procspy).sendGlobalBarrierComplete(); 236 verify(procspy, never()).sendGlobalBarrierReached(); 237 } 238}