001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import java.util.concurrent.BrokenBarrierException; 021import java.util.concurrent.CyclicBarrier; 022import org.apache.hadoop.hbase.HBaseTestingUtil; 023import org.apache.hadoop.hbase.master.replication.AbstractPeerNoLockProcedure; 024import org.apache.hadoop.hbase.master.replication.AbstractPeerProcedure; 025import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 026import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 027import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; 028import org.apache.hadoop.hbase.testclassification.LargeTests; 029import org.apache.hadoop.hbase.testclassification.MasterTests; 030import org.junit.jupiter.api.AfterAll; 031import org.junit.jupiter.api.BeforeAll; 032import org.junit.jupiter.api.Tag; 033import org.junit.jupiter.api.Test; 034 035/** 036 * Testcase for HBASE-29380 037 */ 038@Tag(MasterTests.TAG) 039@Tag(LargeTests.TAG) 040public class TestProcedureWaitAndWake { 041 042 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 043 044 public static final class MyPeerProcedure extends AbstractPeerProcedure<Integer> { 045 046 private final CyclicBarrier barrier; 047 048 private boolean passedBarrier; 049 050 public MyPeerProcedure() { 051 this(null); 052 } 053 054 public MyPeerProcedure(CyclicBarrier barrier) { 055 super("1"); 056 this.barrier = barrier; 057 } 058 059 @Override 060 public PeerOperationType getPeerOperationType() { 061 return PeerOperationType.REMOVE; 062 } 063 064 @Override 065 protected LockState acquireLock(MasterProcedureEnv env) { 066 // make sure we have two procedure arrive here at the same time, so one of them will enter the 067 // lock wait state 068 if (!passedBarrier) { 069 try { 070 barrier.await(); 071 } catch (InterruptedException | BrokenBarrierException e) { 072 throw new RuntimeException(e); 073 } 074 passedBarrier = true; 075 } 076 return super.acquireLock(env); 077 } 078 079 @Override 080 protected Flow executeFromState(MasterProcedureEnv env, Integer state) 081 throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { 082 if (state.intValue() == 0) { 083 setNextState(1); 084 addChildProcedure(new MySubPeerProcedure()); 085 return Flow.HAS_MORE_STATE; 086 } else { 087 Thread.sleep(200); 088 return Flow.NO_MORE_STATE; 089 } 090 } 091 092 @Override 093 protected Integer getState(int stateId) { 094 return Integer.valueOf(stateId); 095 } 096 097 @Override 098 protected int getStateId(Integer state) { 099 return state.intValue(); 100 } 101 102 @Override 103 protected Integer getInitialState() { 104 return 0; 105 } 106 107 public static final class MySubPeerProcedure extends AbstractPeerNoLockProcedure<Integer> { 108 109 public MySubPeerProcedure() { 110 super("1"); 111 } 112 113 @Override 114 public PeerOperationType getPeerOperationType() { 115 return PeerOperationType.REFRESH; 116 } 117 118 @Override 119 protected Flow executeFromState(MasterProcedureEnv env, Integer state) 120 throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { 121 return Flow.NO_MORE_STATE; 122 } 123 124 @Override 125 protected Integer getState(int stateId) { 126 return Integer.valueOf(stateId); 127 } 128 129 @Override 130 protected int getStateId(Integer state) { 131 return state.intValue(); 132 } 133 134 @Override 135 protected Integer getInitialState() { 136 return 0; 137 } 138 } 139 } 140 141 @BeforeAll 142 public static void setUp() throws Exception { 143 UTIL.getConfiguration().setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 8); 144 UTIL.startMiniCluster(3); 145 } 146 147 @AfterAll 148 public static void tearDown() throws Exception { 149 UTIL.shutdownMiniCluster(); 150 } 151 152 @Test 153 public void testPeerProcedure() { 154 ProcedureExecutor<MasterProcedureEnv> procExec = 155 UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor(); 156 CyclicBarrier barrier = new CyclicBarrier(2); 157 MyPeerProcedure p1 = new MyPeerProcedure(barrier); 158 MyPeerProcedure p2 = new MyPeerProcedure(barrier); 159 long id1 = procExec.submitProcedure(p1); 160 long id2 = procExec.submitProcedure(p2); 161 UTIL.waitFor(10000, () -> procExec.isFinished(id1)); 162 UTIL.waitFor(10000, () -> procExec.isFinished(id2)); 163 } 164}