001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.Optional;
024import java.util.concurrent.Future;
025import java.util.concurrent.Semaphore;
026import java.util.concurrent.TimeUnit;
027import org.apache.hadoop.hbase.Cell;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseTestingUtility;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
032import org.apache.hadoop.hbase.client.AsyncAdmin;
033import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
034import org.apache.hadoop.hbase.client.Durability;
035import org.apache.hadoop.hbase.client.Get;
036import org.apache.hadoop.hbase.client.Put;
037import org.apache.hadoop.hbase.client.Table;
038import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
039import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
040import org.apache.hadoop.hbase.coprocessor.ObserverContext;
041import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
042import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
043import org.apache.hadoop.hbase.coprocessor.RegionObserver;
044import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
045import org.apache.hadoop.hbase.regionserver.HRegionServer;
046import org.apache.hadoop.hbase.testclassification.LargeTests;
047import org.apache.hadoop.hbase.testclassification.MasterTests;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
050import org.apache.hadoop.hbase.wal.WALEdit;
051import org.junit.AfterClass;
052import org.junit.BeforeClass;
053import org.junit.ClassRule;
054import org.junit.Test;
055import org.junit.experimental.categories.Category;
056
057/**
058 * Test to ensure that the priority for procedures and stuck checker can partially solve the problem
059 * describe in HBASE-19976, that is, RecoverMetaProcedure can finally be executed within a certain
060 * period of time.
061 */
062@Category({ MasterTests.class, LargeTests.class })
063public class TestProcedurePriority {
064
065  @ClassRule
066  public static final HBaseClassTestRule CLASS_RULE =
067    HBaseClassTestRule.forClass(TestProcedurePriority.class);
068
069  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
070
071  private static String TABLE_NAME_PREFIX = "TestProcedurePriority-";
072
073  private static byte[] CF = Bytes.toBytes("cf");
074
075  private static byte[] CQ = Bytes.toBytes("cq");
076
077  private static int CORE_POOL_SIZE;
078
079  private static int TABLE_COUNT;
080
081  private static volatile boolean FAIL = false;
082
083  public static final class MyCP implements RegionObserver, RegionCoprocessor {
084
085    @Override
086    public Optional<RegionObserver> getRegionObserver() {
087      return Optional.of(this);
088    }
089
090    @Override
091    public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get,
092        List<Cell> result) throws IOException {
093      if (FAIL && c.getEnvironment().getRegionInfo().isMetaRegion()) {
094        throw new IOException("Inject error");
095      }
096    }
097
098    @Override
099    public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit,
100        Durability durability) throws IOException {
101      if (FAIL && c.getEnvironment().getRegionInfo().isMetaRegion()) {
102        throw new IOException("Inject error");
103      }
104    }
105  }
106
107  @BeforeClass
108  public static void setUp() throws Exception {
109    UTIL.getConfiguration().setLong(ProcedureExecutor.WORKER_KEEP_ALIVE_TIME_CONF_KEY, 5000);
110    UTIL.getConfiguration().setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 4);
111    UTIL.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, MyCP.class.getName());
112    UTIL.startMiniCluster(3);
113    CORE_POOL_SIZE =
114      UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor().getCorePoolSize();
115    TABLE_COUNT = 50 * CORE_POOL_SIZE;
116    List<Future<?>> futures = new ArrayList<>();
117    AsyncAdmin admin = UTIL.getAsyncConnection().getAdmin();
118    Semaphore concurrency = new Semaphore(10);
119    for (int i = 0; i < TABLE_COUNT; i++) {
120      concurrency.acquire();
121      futures.add(admin
122        .createTable(TableDescriptorBuilder.newBuilder(TableName.valueOf(TABLE_NAME_PREFIX + i))
123          .setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF)).build())
124        .whenComplete((r, e) -> concurrency.release()));
125    }
126    for (Future<?> future : futures) {
127      future.get(3, TimeUnit.MINUTES);
128    }
129    UTIL.getAdmin().balance(true);
130    UTIL.waitUntilNoRegionsInTransition();
131  }
132
133  @AfterClass
134  public static void tearDown() throws Exception {
135    UTIL.shutdownMiniCluster();
136  }
137
138  @Test
139  public void test() throws Exception {
140    RegionServerThread rsWithMetaThread = UTIL.getMiniHBaseCluster().getRegionServerThreads()
141      .stream().filter(t -> !t.getRegionServer().getRegions(TableName.META_TABLE_NAME).isEmpty())
142      .findAny().get();
143    HRegionServer rsNoMeta = UTIL.getOtherRegionServer(rsWithMetaThread.getRegionServer());
144    FAIL = true;
145    UTIL.getMiniHBaseCluster().killRegionServer(rsNoMeta.getServerName());
146    // wait until all the worker thread are stuck, which means that the stuck checker will start to
147    // add new worker thread.
148    ProcedureExecutor<?> executor =
149      UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
150    UTIL.waitFor(60000, new ExplainingPredicate<Exception>() {
151
152      @Override
153      public boolean evaluate() throws Exception {
154        return executor.getWorkerThreadCount() > CORE_POOL_SIZE;
155      }
156
157      @Override
158      public String explainFailure() throws Exception {
159        return "Stuck checker does not add new worker thread";
160      }
161    });
162    UTIL.getMiniHBaseCluster().killRegionServer(rsWithMetaThread.getRegionServer().getServerName());
163    rsWithMetaThread.join();
164    FAIL = false;
165    // verify that the cluster is back
166    UTIL.waitUntilNoRegionsInTransition(480000);
167    for (int i = 0; i < TABLE_COUNT; i++) {
168      try (Table table = UTIL.getConnection().getTable(TableName.valueOf(TABLE_NAME_PREFIX + i))) {
169        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
170      }
171    }
172    UTIL.waitFor(60000, new ExplainingPredicate<Exception>() {
173
174      @Override
175      public boolean evaluate() throws Exception {
176        return executor.getWorkerThreadCount() == CORE_POOL_SIZE;
177      }
178
179      @Override
180      public String explainFailure() throws Exception {
181        return "The new workers do not timeout";
182      }
183    });
184  }
185}