001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.Optional;
024import java.util.concurrent.Future;
025import java.util.concurrent.Semaphore;
026import java.util.concurrent.TimeUnit;
027import org.apache.hadoop.hbase.Cell;
028import org.apache.hadoop.hbase.HBaseTestingUtil;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
031import org.apache.hadoop.hbase.client.AsyncAdmin;
032import org.apache.hadoop.hbase.client.BalanceRequest;
033import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
034import org.apache.hadoop.hbase.client.Durability;
035import org.apache.hadoop.hbase.client.Get;
036import org.apache.hadoop.hbase.client.Put;
037import org.apache.hadoop.hbase.client.Table;
038import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
039import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
040import org.apache.hadoop.hbase.coprocessor.ObserverContext;
041import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
042import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
043import org.apache.hadoop.hbase.coprocessor.RegionObserver;
044import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
045import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
046import org.apache.hadoop.hbase.regionserver.HRegionServer;
047import org.apache.hadoop.hbase.testclassification.LargeTests;
048import org.apache.hadoop.hbase.testclassification.MasterTests;
049import org.apache.hadoop.hbase.util.Bytes;
050import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
051import org.apache.hadoop.hbase.wal.WALEdit;
052import org.junit.jupiter.api.AfterAll;
053import org.junit.jupiter.api.BeforeAll;
054import org.junit.jupiter.api.Tag;
055import org.junit.jupiter.api.Test;
056
057import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
058
059/**
060 * Test to ensure that the priority for procedures and stuck checker can partially solve the problem
061 * describe in HBASE-19976, that is, RecoverMetaProcedure can finally be executed within a certain
062 * period of time.
063 * <p>
064 * As of HBASE-28199, we no longer block a worker when updating meta now, so this test can not test
065 * adding procedure worker now, but it could still be used to make sure that we could make progress
066 * when meta is gone and we have a lot of pending TRSPs.
067 */
068@Tag(MasterTests.TAG)
069@Tag(LargeTests.TAG)
070public class TestProcedurePriority {
071
072  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
073
074  private static String TABLE_NAME_PREFIX = "TestProcedurePriority-";
075
076  private static byte[] CF = Bytes.toBytes("cf");
077
078  private static byte[] CQ = Bytes.toBytes("cq");
079
080  private static int CORE_POOL_SIZE;
081
082  private static int TABLE_COUNT;
083
084  private static volatile boolean FAIL = false;
085
086  public static final class MyCP implements RegionObserver, RegionCoprocessor {
087
088    @Override
089    public Optional<RegionObserver> getRegionObserver() {
090      return Optional.of(this);
091    }
092
093    @Override
094    public void preGetOp(ObserverContext<? extends RegionCoprocessorEnvironment> c, Get get,
095      List<Cell> result) throws IOException {
096      if (FAIL && c.getEnvironment().getRegionInfo().isMetaRegion()) {
097        throw new IOException("Inject error");
098      }
099    }
100
101    @Override
102    public void prePut(ObserverContext<? extends RegionCoprocessorEnvironment> c, Put put,
103      WALEdit edit, Durability durability) throws IOException {
104      if (FAIL && c.getEnvironment().getRegionInfo().isMetaRegion()) {
105        throw new IOException("Inject error");
106      }
107    }
108  }
109
110  @BeforeAll
111  public static void setUp() throws Exception {
112    UTIL.getConfiguration().setLong(ProcedureExecutor.WORKER_KEEP_ALIVE_TIME_CONF_KEY, 5000);
113    UTIL.getConfiguration().setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 4);
114    UTIL.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, MyCP.class.getName());
115    UTIL.startMiniCluster(3);
116    CORE_POOL_SIZE =
117      UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor().getCorePoolSize();
118    TABLE_COUNT = 50 * CORE_POOL_SIZE;
119    List<Future<?>> futures = new ArrayList<>();
120    AsyncAdmin admin = UTIL.getAsyncConnection().getAdmin();
121    Semaphore concurrency = new Semaphore(10);
122    for (int i = 0; i < TABLE_COUNT; i++) {
123      concurrency.acquire();
124      futures.add(admin
125        .createTable(TableDescriptorBuilder.newBuilder(TableName.valueOf(TABLE_NAME_PREFIX + i))
126          .setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF)).build())
127        .whenComplete((r, e) -> concurrency.release()));
128    }
129    for (Future<?> future : futures) {
130      future.get(3, TimeUnit.MINUTES);
131    }
132    UTIL.getAdmin().balance(BalanceRequest.newBuilder().setIgnoreRegionsInTransition(true).build());
133    UTIL.waitUntilNoRegionsInTransition();
134    UTIL.getAdmin().balancerSwitch(false, true);
135  }
136
137  @AfterAll
138  public static void tearDown() throws Exception {
139    UTIL.shutdownMiniCluster();
140  }
141
142  @Test
143  public void test() throws Exception {
144    RegionServerThread rsWithMetaThread = UTIL.getMiniHBaseCluster().getRegionServerThreads()
145      .stream().filter(t -> !t.getRegionServer().getRegions(TableName.META_TABLE_NAME).isEmpty())
146      .findAny().get();
147    HRegionServer rsNoMeta = UTIL.getOtherRegionServer(rsWithMetaThread.getRegionServer());
148    FAIL = true;
149    UTIL.getMiniHBaseCluster().killRegionServer(rsNoMeta.getServerName());
150    ProcedureExecutor<?> executor =
151      UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
152    // wait until we have way more TRSPs than the core pool size, and then make sure we can recover
153    // normally
154    UTIL.waitFor(60000, new ExplainingPredicate<Exception>() {
155
156      @Override
157      public boolean evaluate() throws Exception {
158        return executor.getProcedures().stream().filter(p -> !p.isFinished())
159          .filter(p -> p.getState() != ProcedureState.INITIALIZING)
160          .filter(p -> p instanceof TransitRegionStateProcedure).count() > 5 * CORE_POOL_SIZE;
161      }
162
163      @Override
164      public String explainFailure() throws Exception {
165        return "Not enough TRSPs scheduled";
166      }
167    });
168    // sleep more time to make sure the TRSPs have been executed
169    Thread.sleep(10000);
170    UTIL.getMiniHBaseCluster().killRegionServer(rsWithMetaThread.getRegionServer().getServerName());
171    rsWithMetaThread.join();
172    FAIL = false;
173    // verify that the cluster is back
174    UTIL.waitUntilNoRegionsInTransition(480000);
175    for (int i = 0; i < TABLE_COUNT; i++) {
176      try (Table table = UTIL.getConnection().getTable(TableName.valueOf(TABLE_NAME_PREFIX + i))) {
177        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
178      }
179    }
180    UTIL.waitFor(60000, new ExplainingPredicate<Exception>() {
181
182      @Override
183      public boolean evaluate() throws Exception {
184        return executor.getWorkerThreadCount() == CORE_POOL_SIZE;
185      }
186
187      @Override
188      public String explainFailure() throws Exception {
189        return "The new workers do not timeout";
190      }
191    });
192  }
193}