001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import java.util.concurrent.BrokenBarrierException;
021import java.util.concurrent.CyclicBarrier;
022import org.apache.hadoop.hbase.HBaseTestingUtil;
023import org.apache.hadoop.hbase.master.replication.AbstractPeerNoLockProcedure;
024import org.apache.hadoop.hbase.master.replication.AbstractPeerProcedure;
025import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
026import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
027import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
028import org.apache.hadoop.hbase.testclassification.LargeTests;
029import org.apache.hadoop.hbase.testclassification.MasterTests;
030import org.junit.jupiter.api.AfterAll;
031import org.junit.jupiter.api.BeforeAll;
032import org.junit.jupiter.api.Tag;
033import org.junit.jupiter.api.Test;
034
035/**
036 * Testcase for HBASE-29380
037 */
038@Tag(MasterTests.TAG)
039@Tag(LargeTests.TAG)
040public class TestProcedureWaitAndWake {
041
042  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
043
044  public static final class MyPeerProcedure extends AbstractPeerProcedure<Integer> {
045
046    private final CyclicBarrier barrier;
047
048    private boolean passedBarrier;
049
050    public MyPeerProcedure() {
051      this(null);
052    }
053
054    public MyPeerProcedure(CyclicBarrier barrier) {
055      super("1");
056      this.barrier = barrier;
057    }
058
059    @Override
060    public PeerOperationType getPeerOperationType() {
061      return PeerOperationType.REMOVE;
062    }
063
064    @Override
065    protected LockState acquireLock(MasterProcedureEnv env) {
066      // make sure we have two procedure arrive here at the same time, so one of them will enter the
067      // lock wait state
068      if (!passedBarrier) {
069        try {
070          barrier.await();
071        } catch (InterruptedException | BrokenBarrierException e) {
072          throw new RuntimeException(e);
073        }
074        passedBarrier = true;
075      }
076      return super.acquireLock(env);
077    }
078
079    @Override
080    protected Flow executeFromState(MasterProcedureEnv env, Integer state)
081      throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
082      if (state.intValue() == 0) {
083        setNextState(1);
084        addChildProcedure(new MySubPeerProcedure());
085        return Flow.HAS_MORE_STATE;
086      } else {
087        Thread.sleep(200);
088        return Flow.NO_MORE_STATE;
089      }
090    }
091
092    @Override
093    protected Integer getState(int stateId) {
094      return Integer.valueOf(stateId);
095    }
096
097    @Override
098    protected int getStateId(Integer state) {
099      return state.intValue();
100    }
101
102    @Override
103    protected Integer getInitialState() {
104      return 0;
105    }
106
107    public static final class MySubPeerProcedure extends AbstractPeerNoLockProcedure<Integer> {
108
109      public MySubPeerProcedure() {
110        super("1");
111      }
112
113      @Override
114      public PeerOperationType getPeerOperationType() {
115        return PeerOperationType.REFRESH;
116      }
117
118      @Override
119      protected Flow executeFromState(MasterProcedureEnv env, Integer state)
120        throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
121        return Flow.NO_MORE_STATE;
122      }
123
124      @Override
125      protected Integer getState(int stateId) {
126        return Integer.valueOf(stateId);
127      }
128
129      @Override
130      protected int getStateId(Integer state) {
131        return state.intValue();
132      }
133
134      @Override
135      protected Integer getInitialState() {
136        return 0;
137      }
138    }
139  }
140
141  @BeforeAll
142  public static void setUp() throws Exception {
143    UTIL.getConfiguration().setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 8);
144    UTIL.startMiniCluster(3);
145  }
146
147  @AfterAll
148  public static void tearDown() throws Exception {
149    UTIL.shutdownMiniCluster();
150  }
151
152  @Test
153  public void testPeerProcedure() {
154    ProcedureExecutor<MasterProcedureEnv> procExec =
155      UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
156    CyclicBarrier barrier = new CyclicBarrier(2);
157    MyPeerProcedure p1 = new MyPeerProcedure(barrier);
158    MyPeerProcedure p2 = new MyPeerProcedure(barrier);
159    long id1 = procExec.submitProcedure(p1);
160    long id2 = procExec.submitProcedure(p2);
161    UTIL.waitFor(10000, () -> procExec.isFinished(id1));
162    UTIL.waitFor(10000, () -> procExec.isFinished(id2));
163  }
164}