001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import java.io.IOException;
021import java.util.concurrent.Semaphore;
022import org.apache.hadoop.hbase.HBaseClassTestRule;
023import org.apache.hadoop.hbase.HBaseTestingUtility;
024import org.apache.hadoop.hbase.TableName;
025import org.apache.hadoop.hbase.procedure2.Procedure;
026import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
027import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
028import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
029import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
030import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
031import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
032import org.apache.hadoop.hbase.testclassification.MasterTests;
033import org.apache.hadoop.hbase.testclassification.SmallTests;
034import org.junit.After;
035import org.junit.AfterClass;
036import org.junit.Before;
037import org.junit.ClassRule;
038import org.junit.Rule;
039import org.junit.Test;
040import org.junit.experimental.categories.Category;
041import org.junit.rules.TestName;
042
043@Category({ MasterTests.class, SmallTests.class })
044public class TestSchedulerQueueDeadLock {
045
046  @ClassRule
047  public static final HBaseClassTestRule CLASS_RULE =
048    HBaseClassTestRule.forClass(TestSchedulerQueueDeadLock.class);
049
050  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
051
052  private static final TableName TABLE_NAME = TableName.valueOf("deadlock");
053
054  private static final class TestEnv {
055    private final MasterProcedureScheduler scheduler;
056
057    public TestEnv(MasterProcedureScheduler scheduler) {
058      this.scheduler = scheduler;
059    }
060
061    public MasterProcedureScheduler getScheduler() {
062      return scheduler;
063    }
064  }
065
066  public static class TableSharedProcedure extends NoopProcedure<TestEnv>
067      implements TableProcedureInterface {
068
069    private final Semaphore latch = new Semaphore(0);
070
071    @Override
072    protected Procedure<TestEnv>[] execute(TestEnv env)
073        throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
074      latch.acquire();
075      return null;
076    }
077
078    @Override
079    protected LockState acquireLock(TestEnv env) {
080      if (env.getScheduler().waitTableSharedLock(this, getTableName())) {
081        return LockState.LOCK_EVENT_WAIT;
082      }
083      return LockState.LOCK_ACQUIRED;
084    }
085
086    @Override
087    protected void releaseLock(TestEnv env) {
088      env.getScheduler().wakeTableSharedLock(this, getTableName());
089    }
090
091    @Override
092    protected boolean holdLock(TestEnv env) {
093      return true;
094    }
095
096    @Override
097    public TableName getTableName() {
098      return TABLE_NAME;
099    }
100
101    @Override
102    public TableOperationType getTableOperationType() {
103      return TableOperationType.READ;
104    }
105  }
106
107  public static class TableExclusiveProcedure extends NoopProcedure<TestEnv>
108      implements TableProcedureInterface {
109
110    private final Semaphore latch = new Semaphore(0);
111
112    @Override
113    protected Procedure<TestEnv>[] execute(TestEnv env)
114        throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
115      latch.acquire();
116      return null;
117    }
118
119    @Override
120    protected LockState acquireLock(TestEnv env) {
121      if (env.getScheduler().waitTableExclusiveLock(this, getTableName())) {
122        return LockState.LOCK_EVENT_WAIT;
123      }
124      return LockState.LOCK_ACQUIRED;
125    }
126
127    @Override
128    protected void releaseLock(TestEnv env) {
129      env.getScheduler().wakeTableExclusiveLock(this, getTableName());
130    }
131
132    @Override
133    protected boolean holdLock(TestEnv env) {
134      return true;
135    }
136
137    @Override
138    public TableName getTableName() {
139      return TABLE_NAME;
140    }
141
142    @Override
143    public TableOperationType getTableOperationType() {
144      return TableOperationType.EDIT;
145    }
146  }
147
148  @AfterClass
149  public static void tearDownAfterClass() throws IOException {
150    UTIL.cleanupTestDir();
151  }
152
153  private WALProcedureStore procStore;
154
155  private ProcedureExecutor<TestEnv> procExec;
156
157  @Rule
158  public final TestName name = new TestName();
159
160  @Before
161  public void setUp() throws IOException {
162    UTIL.getConfiguration().setInt("hbase.procedure.worker.stuck.threshold.msec", 6000000);
163    procStore = ProcedureTestingUtility.createWalStore(UTIL.getConfiguration(),
164      UTIL.getDataTestDir(name.getMethodName()));
165    procStore.start(1);
166    MasterProcedureScheduler scheduler = new MasterProcedureScheduler(pid -> null);
167    procExec = new ProcedureExecutor<>(UTIL.getConfiguration(), new TestEnv(scheduler), procStore,
168      scheduler);
169    procExec.init(1, false);
170  }
171
172  @After
173  public void tearDown() {
174    procExec.stop();
175    procStore.stop(false);
176  }
177
178  public static final class TableSharedProcedureWithId extends TableSharedProcedure {
179
180    @Override
181    protected void setProcId(long procId) {
182      // this is a hack to make this procedure be loaded after the procedure below as we will sort
183      // the procedures by id when loading.
184      super.setProcId(2L);
185    }
186  }
187
188  public static final class TableExclusiveProcedureWithId extends TableExclusiveProcedure {
189
190    @Override
191    protected void setProcId(long procId) {
192      // this is a hack to make this procedure be loaded before the procedure above as we will
193      // sort the procedures by id when loading.
194      super.setProcId(1L);
195    }
196  }
197
198  @Test
199  public void testTableProcedureDeadLockAfterRestarting() throws Exception {
200    // let the shared procedure run first, but let it have a greater procId so when loading it will
201    // be loaded at last.
202    long procId1 = procExec.submitProcedure(new TableSharedProcedureWithId());
203    long procId2 = procExec.submitProcedure(new TableExclusiveProcedureWithId());
204    procExec.startWorkers();
205    UTIL.waitFor(10000,
206      () -> ((TableSharedProcedure) procExec.getProcedure(procId1)).latch.hasQueuedThreads());
207
208    ProcedureTestingUtility.restart(procExec);
209
210    ((TableSharedProcedure) procExec.getProcedure(procId1)).latch.release();
211    ((TableExclusiveProcedure) procExec.getProcedure(procId2)).latch.release();
212
213    UTIL.waitFor(10000, () -> procExec.isFinished(procId1));
214    UTIL.waitFor(10000, () -> procExec.isFinished(procId2));
215  }
216
217  public static final class TableShardParentProcedure extends NoopProcedure<TestEnv>
218      implements TableProcedureInterface {
219
220    private boolean scheduled;
221
222    @Override
223    protected Procedure<TestEnv>[] execute(TestEnv env)
224        throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
225      if (!scheduled) {
226        scheduled = true;
227        return new Procedure[] { new TableSharedProcedure() };
228      }
229      return null;
230    }
231
232    @Override
233    protected LockState acquireLock(TestEnv env) {
234      if (env.getScheduler().waitTableSharedLock(this, getTableName())) {
235        return LockState.LOCK_EVENT_WAIT;
236      }
237      return LockState.LOCK_ACQUIRED;
238    }
239
240    @Override
241    protected void releaseLock(TestEnv env) {
242      env.getScheduler().wakeTableSharedLock(this, getTableName());
243    }
244
245    @Override
246    protected boolean holdLock(TestEnv env) {
247      return true;
248    }
249
250    @Override
251    public TableName getTableName() {
252      return TABLE_NAME;
253    }
254
255    @Override
256    public TableOperationType getTableOperationType() {
257      return TableOperationType.READ;
258    }
259  }
260
261  @Test
262  public void testTableProcedureSubProcedureDeadLock() throws Exception {
263    // the shared procedure will also schedule a shared procedure, but after the exclusive procedure
264    long procId1 = procExec.submitProcedure(new TableShardParentProcedure());
265    long procId2 = procExec.submitProcedure(new TableExclusiveProcedure());
266    procExec.startWorkers();
267    UTIL.waitFor(10000,
268      () -> procExec.getProcedures().stream().anyMatch(p -> p instanceof TableSharedProcedure));
269    procExec.getProcedures().stream().filter(p -> p instanceof TableSharedProcedure)
270      .map(p -> (TableSharedProcedure) p).forEach(p -> p.latch.release());
271    ((TableExclusiveProcedure) procExec.getProcedure(procId2)).latch.release();
272
273    UTIL.waitFor(10000, () -> procExec.isFinished(procId1));
274    UTIL.waitFor(10000, () -> procExec.isFinished(procId2));
275  }
276}