001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.Optional;
024import java.util.concurrent.Future;
025import java.util.concurrent.Semaphore;
026import java.util.concurrent.TimeUnit;
027import org.apache.hadoop.hbase.Cell;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseTestingUtil;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
032import org.apache.hadoop.hbase.client.AsyncAdmin;
033import org.apache.hadoop.hbase.client.BalanceRequest;
034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
035import org.apache.hadoop.hbase.client.Durability;
036import org.apache.hadoop.hbase.client.Get;
037import org.apache.hadoop.hbase.client.Put;
038import org.apache.hadoop.hbase.client.Table;
039import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
040import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
041import org.apache.hadoop.hbase.coprocessor.ObserverContext;
042import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
043import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
044import org.apache.hadoop.hbase.coprocessor.RegionObserver;
045import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
046import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
047import org.apache.hadoop.hbase.regionserver.HRegionServer;
048import org.apache.hadoop.hbase.testclassification.LargeTests;
049import org.apache.hadoop.hbase.testclassification.MasterTests;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
052import org.apache.hadoop.hbase.wal.WALEdit;
053import org.junit.AfterClass;
054import org.junit.BeforeClass;
055import org.junit.ClassRule;
056import org.junit.Test;
057import org.junit.experimental.categories.Category;
058
059import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
060
061/**
062 * Test to ensure that the priority for procedures and stuck checker can partially solve the problem
063 * describe in HBASE-19976, that is, RecoverMetaProcedure can finally be executed within a certain
064 * period of time.
065 * <p>
066 * As of HBASE-28199, we no longer block a worker when updating meta now, so this test can not test
067 * adding procedure worker now, but it could still be used to make sure that we could make progress
068 * when meta is gone and we have a lot of pending TRSPs.
069 */
070@Category({ MasterTests.class, LargeTests.class })
071public class TestProcedurePriority {
072
073  @ClassRule
074  public static final HBaseClassTestRule CLASS_RULE =
075    HBaseClassTestRule.forClass(TestProcedurePriority.class);
076
077  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
078
079  private static String TABLE_NAME_PREFIX = "TestProcedurePriority-";
080
081  private static byte[] CF = Bytes.toBytes("cf");
082
083  private static byte[] CQ = Bytes.toBytes("cq");
084
085  private static int CORE_POOL_SIZE;
086
087  private static int TABLE_COUNT;
088
089  private static volatile boolean FAIL = false;
090
091  public static final class MyCP implements RegionObserver, RegionCoprocessor {
092
093    @Override
094    public Optional<RegionObserver> getRegionObserver() {
095      return Optional.of(this);
096    }
097
098    @Override
099    public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get,
100      List<Cell> result) throws IOException {
101      if (FAIL && c.getEnvironment().getRegionInfo().isMetaRegion()) {
102        throw new IOException("Inject error");
103      }
104    }
105
106    @Override
107    public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit,
108      Durability durability) throws IOException {
109      if (FAIL && c.getEnvironment().getRegionInfo().isMetaRegion()) {
110        throw new IOException("Inject error");
111      }
112    }
113  }
114
115  @BeforeClass
116  public static void setUp() throws Exception {
117    UTIL.getConfiguration().setLong(ProcedureExecutor.WORKER_KEEP_ALIVE_TIME_CONF_KEY, 5000);
118    UTIL.getConfiguration().setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 4);
119    UTIL.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, MyCP.class.getName());
120    UTIL.startMiniCluster(3);
121    CORE_POOL_SIZE =
122      UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor().getCorePoolSize();
123    TABLE_COUNT = 50 * CORE_POOL_SIZE;
124    List<Future<?>> futures = new ArrayList<>();
125    AsyncAdmin admin = UTIL.getAsyncConnection().getAdmin();
126    Semaphore concurrency = new Semaphore(10);
127    for (int i = 0; i < TABLE_COUNT; i++) {
128      concurrency.acquire();
129      futures.add(admin
130        .createTable(TableDescriptorBuilder.newBuilder(TableName.valueOf(TABLE_NAME_PREFIX + i))
131          .setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF)).build())
132        .whenComplete((r, e) -> concurrency.release()));
133    }
134    for (Future<?> future : futures) {
135      future.get(3, TimeUnit.MINUTES);
136    }
137    UTIL.getAdmin().balance(BalanceRequest.newBuilder().setIgnoreRegionsInTransition(true).build());
138    UTIL.waitUntilNoRegionsInTransition();
139    UTIL.getAdmin().balancerSwitch(false, true);
140  }
141
142  @AfterClass
143  public static void tearDown() throws Exception {
144    UTIL.shutdownMiniCluster();
145  }
146
147  @Test
148  public void test() throws Exception {
149    RegionServerThread rsWithMetaThread = UTIL.getMiniHBaseCluster().getRegionServerThreads()
150      .stream().filter(t -> !t.getRegionServer().getRegions(TableName.META_TABLE_NAME).isEmpty())
151      .findAny().get();
152    HRegionServer rsNoMeta = UTIL.getOtherRegionServer(rsWithMetaThread.getRegionServer());
153    FAIL = true;
154    UTIL.getMiniHBaseCluster().killRegionServer(rsNoMeta.getServerName());
155    ProcedureExecutor<?> executor =
156      UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
157    // wait until we have way more TRSPs than the core pool size, and then make sure we can recover
158    // normally
159    UTIL.waitFor(60000, new ExplainingPredicate<Exception>() {
160
161      @Override
162      public boolean evaluate() throws Exception {
163        return executor.getProcedures().stream().filter(p -> !p.isFinished())
164          .filter(p -> p.getState() != ProcedureState.INITIALIZING)
165          .filter(p -> p instanceof TransitRegionStateProcedure).count() > 5 * CORE_POOL_SIZE;
166      }
167
168      @Override
169      public String explainFailure() throws Exception {
170        return "Not enough TRSPs scheduled";
171      }
172    });
173    // sleep more time to make sure the TRSPs have been executed
174    Thread.sleep(10000);
175    UTIL.getMiniHBaseCluster().killRegionServer(rsWithMetaThread.getRegionServer().getServerName());
176    rsWithMetaThread.join();
177    FAIL = false;
178    // verify that the cluster is back
179    UTIL.waitUntilNoRegionsInTransition(480000);
180    for (int i = 0; i < TABLE_COUNT; i++) {
181      try (Table table = UTIL.getConnection().getTable(TableName.valueOf(TABLE_NAME_PREFIX + i))) {
182        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
183      }
184    }
185    UTIL.waitFor(60000, new ExplainingPredicate<Exception>() {
186
187      @Override
188      public boolean evaluate() throws Exception {
189        return executor.getWorkerThreadCount() == CORE_POOL_SIZE;
190      }
191
192      @Override
193      public String explainFailure() throws Exception {
194        return "The new workers do not timeout";
195      }
196    });
197  }
198}