001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUTKey WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure;
019
020import java.io.IOException;
021import java.util.concurrent.CountDownLatch;
022
023import org.apache.hadoop.hbase.HBaseClassTestRule;
024import org.apache.hadoop.hbase.HBaseTestingUtility;
025import org.apache.hadoop.hbase.TableName;
026import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
027import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface;
028import org.apache.hadoop.hbase.procedure2.Procedure;
029import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
030import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
031import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
032import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
033import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
034import org.apache.hadoop.hbase.testclassification.MasterTests;
035import org.apache.hadoop.hbase.testclassification.SmallTests;
036import org.junit.AfterClass;
037import org.junit.BeforeClass;
038import org.junit.ClassRule;
039import org.junit.Test;
040import org.junit.experimental.categories.Category;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044
045@Category({MasterTests.class, SmallTests.class})
046public class TestUrgentProcedureWorker {
047  @ClassRule
048  public static final HBaseClassTestRule CLASS_RULE =
049      HBaseClassTestRule.forClass(TestUrgentProcedureWorker.class);
050
051  private static final Logger LOG = LoggerFactory
052      .getLogger(TestUrgentProcedureWorker.class);
053  private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
054  private static final CountDownLatch metaFinished = new CountDownLatch(1);
055
056  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
057
058  private static final TableName TABLE_NAME = TableName.valueOf("TestUrgentProcedureWorker");
059
060  private static WALProcedureStore procStore;
061
062  private static ProcedureExecutor<TestEnv> procExec;
063
064  private static final class TestEnv {
065    private final MasterProcedureScheduler scheduler;
066
067    public TestEnv(MasterProcedureScheduler scheduler) {
068      this.scheduler = scheduler;
069    }
070
071    public MasterProcedureScheduler getScheduler() {
072      return scheduler;
073    }
074  }
075
076  public static class WaitingMetaProcedure extends ProcedureTestingUtility.NoopProcedure<TestEnv>
077      implements TableProcedureInterface {
078
079
080    @Override
081    protected Procedure<TestEnv>[] execute(TestEnv env)
082        throws ProcedureYieldException, ProcedureSuspendedException,
083        InterruptedException {
084      metaFinished.await();
085      return null;
086    }
087
088    @Override
089    protected Procedure.LockState acquireLock(TestEnv env) {
090      if (env.getScheduler().waitTableExclusiveLock(this, getTableName())) {
091        return LockState.LOCK_EVENT_WAIT;
092      }
093      return LockState.LOCK_ACQUIRED;
094    }
095
096    @Override
097    protected void releaseLock(TestEnv env) {
098      env.getScheduler().wakeTableExclusiveLock(this, getTableName());
099    }
100
101    @Override
102    protected boolean holdLock(TestEnv env) {
103      return true;
104    }
105
106    @Override
107    public TableName getTableName() {
108      return TABLE_NAME;
109    }
110
111    @Override
112    public TableOperationType getTableOperationType() {
113      return TableOperationType.EDIT;
114    }
115  }
116
117  public static class MetaProcedure extends ProcedureTestingUtility.NoopProcedure<TestEnv>
118      implements TableProcedureInterface {
119
120
121    @Override
122    protected Procedure<TestEnv>[] execute(TestEnv env)
123        throws ProcedureYieldException, ProcedureSuspendedException,
124        InterruptedException {
125      metaFinished.countDown();
126      return null;
127    }
128
129    @Override
130    protected Procedure.LockState acquireLock(TestEnv env) {
131      if (env.getScheduler().waitTableExclusiveLock(this, getTableName())) {
132        return LockState.LOCK_EVENT_WAIT;
133      }
134      return LockState.LOCK_ACQUIRED;
135    }
136
137    @Override
138    protected void releaseLock(TestEnv env) {
139      env.getScheduler().wakeTableExclusiveLock(this, getTableName());
140    }
141
142    @Override
143    protected boolean holdLock(TestEnv env) {
144      return true;
145    }
146
147    @Override
148    public TableName getTableName() {
149      return TableName.META_TABLE_NAME;
150    }
151
152    @Override
153    public TableOperationType getTableOperationType() {
154      return TableOperationType.EDIT;
155    }
156  }
157
158  @AfterClass
159  public static void tearDownAfterClass() throws IOException {
160    UTIL.cleanupTestDir();
161  }
162
163  @BeforeClass
164  public static void setUp() throws IOException {
165    UTIL.getConfiguration().setInt("hbase.procedure.worker.stuck.threshold.msec", 6000000);
166    procStore = ProcedureTestingUtility.createWalStore(UTIL.getConfiguration(),
167        UTIL.getDataTestDir("TestUrgentProcedureWorker"));
168    procStore.start(1);
169    MasterProcedureScheduler scheduler = new MasterProcedureScheduler(pid -> null);
170    procExec = new ProcedureExecutor<>(UTIL.getConfiguration(), new TestEnv(scheduler), procStore,
171        scheduler);
172    procExec.init(1, 1, false);
173    procExec.startWorkers();
174  }
175
176  @Test
177  public void test() throws Exception {
178    WaitingMetaProcedure waitingMetaProcedure = new WaitingMetaProcedure();
179    long waitProc = procExec.submitProcedure(waitingMetaProcedure);
180    MetaProcedure metaProcedure = new MetaProcedure();
181    long metaProc = procExec.submitProcedure(metaProcedure);
182    UTIL.waitFor(5000, () -> procExec.isFinished(waitProc));
183
184  }
185
186
187
188}