001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import java.util.concurrent.BrokenBarrierException; 021import java.util.concurrent.CyclicBarrier; 022import org.apache.hadoop.hbase.HBaseClassTestRule; 023import org.apache.hadoop.hbase.HBaseTestingUtil; 024import org.apache.hadoop.hbase.master.replication.AbstractPeerNoLockProcedure; 025import org.apache.hadoop.hbase.master.replication.AbstractPeerProcedure; 026import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 027import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 028import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; 029import org.apache.hadoop.hbase.testclassification.LargeTests; 030import org.apache.hadoop.hbase.testclassification.MasterTests; 031import org.junit.AfterClass; 032import org.junit.BeforeClass; 033import org.junit.ClassRule; 034import org.junit.Test; 035import org.junit.experimental.categories.Category; 036 037/** 038 * Testcase for HBASE-29380 039 */ 040@Category({ MasterTests.class, LargeTests.class }) 041public class TestProcedureWaitAndWake { 042 043 @ClassRule 044 public static final HBaseClassTestRule CLASS_RULE = 045 HBaseClassTestRule.forClass(TestProcedureWaitAndWake.class); 046 047 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 048 049 public static final class MyPeerProcedure extends AbstractPeerProcedure<Integer> { 050 051 private final CyclicBarrier barrier; 052 053 private boolean passedBarrier; 054 055 public MyPeerProcedure() { 056 this(null); 057 } 058 059 public MyPeerProcedure(CyclicBarrier barrier) { 060 super("1"); 061 this.barrier = barrier; 062 } 063 064 @Override 065 public PeerOperationType getPeerOperationType() { 066 return PeerOperationType.REMOVE; 067 } 068 069 @Override 070 protected LockState acquireLock(MasterProcedureEnv env) { 071 // make sure we have two procedure arrive here at the same time, so one of them will enter the 072 // lock wait state 073 if (!passedBarrier) { 074 try { 075 barrier.await(); 076 } catch (InterruptedException | BrokenBarrierException e) { 077 throw new RuntimeException(e); 078 } 079 passedBarrier = true; 080 } 081 return super.acquireLock(env); 082 } 083 084 @Override 085 protected Flow executeFromState(MasterProcedureEnv env, Integer state) 086 throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { 087 if (state.intValue() == 0) { 088 setNextState(1); 089 addChildProcedure(new MySubPeerProcedure()); 090 return Flow.HAS_MORE_STATE; 091 } else { 092 Thread.sleep(200); 093 return Flow.NO_MORE_STATE; 094 } 095 } 096 097 @Override 098 protected Integer getState(int stateId) { 099 return Integer.valueOf(stateId); 100 } 101 102 @Override 103 protected int getStateId(Integer state) { 104 return state.intValue(); 105 } 106 107 @Override 108 protected Integer getInitialState() { 109 return 0; 110 } 111 112 public static final class MySubPeerProcedure extends AbstractPeerNoLockProcedure<Integer> { 113 114 public MySubPeerProcedure() { 115 super("1"); 116 } 117 118 @Override 119 public PeerOperationType getPeerOperationType() { 120 return PeerOperationType.REFRESH; 121 } 122 123 @Override 124 protected Flow executeFromState(MasterProcedureEnv env, Integer state) 125 throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { 126 return Flow.NO_MORE_STATE; 127 } 128 129 @Override 130 protected Integer getState(int stateId) { 131 return Integer.valueOf(stateId); 132 } 133 134 @Override 135 protected int getStateId(Integer state) { 136 return state.intValue(); 137 } 138 139 @Override 140 protected Integer getInitialState() { 141 return 0; 142 } 143 } 144 } 145 146 @BeforeClass 147 public static void setUp() throws Exception { 148 UTIL.getConfiguration().setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 8); 149 UTIL.startMiniCluster(3); 150 } 151 152 @AfterClass 153 public static void tearDown() throws Exception { 154 UTIL.shutdownMiniCluster(); 155 } 156 157 @Test 158 public void testPeerProcedure() { 159 ProcedureExecutor<MasterProcedureEnv> procExec = 160 UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor(); 161 CyclicBarrier barrier = new CyclicBarrier(2); 162 MyPeerProcedure p1 = new MyPeerProcedure(barrier); 163 MyPeerProcedure p2 = new MyPeerProcedure(barrier); 164 long id1 = procExec.submitProcedure(p1); 165 long id2 = procExec.submitProcedure(p2); 166 UTIL.waitFor(10000, () -> procExec.isFinished(id1)); 167 UTIL.waitFor(10000, () -> procExec.isFinished(id2)); 168 } 169}