001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.List;
025import java.util.stream.Collectors;
026import org.apache.hadoop.hbase.HRegionLocation;
027import org.apache.hadoop.hbase.TableName;
028import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
029import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
030import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
031import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
032import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
033import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
034import org.apache.hadoop.hbase.util.Bytes;
035import org.apache.hadoop.hbase.util.RetryCounter;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
041import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
042
043import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
044import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReopenTableRegionsState;
045import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReopenTableRegionsStateData;
046import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
047
048/**
049 * Used for reopening the regions for a table.
050 */
051@InterfaceAudience.Private
052public class ReopenTableRegionsProcedure
053  extends AbstractStateMachineTableProcedure<ReopenTableRegionsState> {
054
055  private static final Logger LOG = LoggerFactory.getLogger(ReopenTableRegionsProcedure.class);
056
057  public static final String PROGRESSIVE_BATCH_BACKOFF_MILLIS_KEY =
058    "hbase.reopen.table.regions.progressive.batch.backoff.ms";
059  public static final long PROGRESSIVE_BATCH_BACKOFF_MILLIS_DEFAULT = 0L;
060  public static final String PROGRESSIVE_BATCH_SIZE_MAX_KEY =
061    "hbase.reopen.table.regions.progressive.batch.size.max";
062  public static final int PROGRESSIVE_BATCH_SIZE_MAX_DISABLED = -1;
063  private static final int PROGRESSIVE_BATCH_SIZE_MAX_DEFAULT_VALUE = Integer.MAX_VALUE;
064
065  // this minimum prevents a max which would break this procedure
066  private static final int MINIMUM_BATCH_SIZE_MAX = 1;
067
068  private TableName tableName;
069
070  // Specify specific regions of a table to reopen.
071  // if specified null, all regions of the table will be reopened.
072  private List<byte[]> regionNames;
073
074  private List<HRegionLocation> regions = Collections.emptyList();
075
076  private List<HRegionLocation> currentRegionBatch = Collections.emptyList();
077
078  private RetryCounter retryCounter;
079
080  private long reopenBatchBackoffMillis;
081  private int reopenBatchSize;
082  private int reopenBatchSizeMax;
083  private long regionsReopened = 0;
084  private long batchesProcessed = 0;
085
086  public ReopenTableRegionsProcedure() {
087    this(null);
088  }
089
090  public ReopenTableRegionsProcedure(TableName tableName) {
091    this(tableName, Collections.emptyList());
092  }
093
094  public ReopenTableRegionsProcedure(final TableName tableName, final List<byte[]> regionNames) {
095    this(tableName, regionNames, PROGRESSIVE_BATCH_BACKOFF_MILLIS_DEFAULT,
096      PROGRESSIVE_BATCH_SIZE_MAX_DISABLED);
097  }
098
099  public ReopenTableRegionsProcedure(final TableName tableName, long reopenBatchBackoffMillis,
100    int reopenBatchSizeMax) {
101    this(tableName, Collections.emptyList(), reopenBatchBackoffMillis, reopenBatchSizeMax);
102  }
103
104  public ReopenTableRegionsProcedure(final TableName tableName, final List<byte[]> regionNames,
105    long reopenBatchBackoffMillis, int reopenBatchSizeMax) {
106    this.tableName = tableName;
107    this.regionNames = regionNames;
108    this.reopenBatchBackoffMillis = reopenBatchBackoffMillis;
109    if (reopenBatchSizeMax == PROGRESSIVE_BATCH_SIZE_MAX_DISABLED) {
110      this.reopenBatchSize = Integer.MAX_VALUE;
111      this.reopenBatchSizeMax = Integer.MAX_VALUE;
112    } else {
113      this.reopenBatchSize = 1;
114      this.reopenBatchSizeMax = Math.max(reopenBatchSizeMax, MINIMUM_BATCH_SIZE_MAX);
115    }
116  }
117
118  @Override
119  public TableName getTableName() {
120    return tableName;
121  }
122
123  @Override
124  public TableOperationType getTableOperationType() {
125    return TableOperationType.REGION_EDIT;
126  }
127
128  @RestrictedApi(explanation = "Should only be called in tests", link = "",
129      allowedOnPath = ".*/src/test/.*")
130  public long getRegionsReopened() {
131    return regionsReopened;
132  }
133
134  @RestrictedApi(explanation = "Should only be called in tests", link = "",
135      allowedOnPath = ".*/src/test/.*")
136  public long getBatchesProcessed() {
137    return batchesProcessed;
138  }
139
140  @RestrictedApi(explanation = "Should only be called internally or in tests", link = "",
141      allowedOnPath = ".*(/src/test/.*|ReopenTableRegionsProcedure).java")
142  protected int progressBatchSize() {
143    int previousBatchSize = reopenBatchSize;
144    reopenBatchSize = Math.min(reopenBatchSizeMax, 2 * reopenBatchSize);
145    if (reopenBatchSize < previousBatchSize) {
146      // the batch size should never decrease. this must be overflow, so just use max
147      reopenBatchSize = reopenBatchSizeMax;
148    }
149    return reopenBatchSize;
150  }
151
152  private boolean canSchedule(MasterProcedureEnv env, HRegionLocation loc) {
153    if (loc.getSeqNum() < 0) {
154      return false;
155    }
156    RegionStateNode regionNode =
157      env.getAssignmentManager().getRegionStates().getRegionStateNode(loc.getRegion());
158    // If the region node is null, then at least in the next round we can remove this region to make
159    // progress. And the second condition is a normal one, if there are no TRSP with it then we can
160    // schedule one to make progress.
161    return regionNode == null || !regionNode.isInTransition();
162  }
163
164  @Override
165  protected Flow executeFromState(MasterProcedureEnv env, ReopenTableRegionsState state)
166    throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
167    switch (state) {
168      case REOPEN_TABLE_REGIONS_GET_REGIONS:
169        if (!isTableEnabled(env)) {
170          LOG.info("Table {} is disabled, give up reopening its regions", tableName);
171          return Flow.NO_MORE_STATE;
172        }
173        List<HRegionLocation> tableRegions =
174          env.getAssignmentManager().getRegionStates().getRegionsOfTableForReopen(tableName);
175        regions = getRegionLocationsForReopen(tableRegions);
176        setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_REGIONS);
177        return Flow.HAS_MORE_STATE;
178      case REOPEN_TABLE_REGIONS_REOPEN_REGIONS:
179        // if we didn't finish reopening the last batch yet, let's keep trying until we do.
180        // at that point, the batch will be empty and we can generate a new batch
181        if (!regions.isEmpty() && currentRegionBatch.isEmpty()) {
182          currentRegionBatch = regions.stream().limit(reopenBatchSize).collect(Collectors.toList());
183          batchesProcessed++;
184        }
185        for (HRegionLocation loc : currentRegionBatch) {
186          RegionStateNode regionNode =
187            env.getAssignmentManager().getRegionStates().getRegionStateNode(loc.getRegion());
188          // this possible, maybe the region has already been merged or split, see HBASE-20921
189          if (regionNode == null) {
190            continue;
191          }
192          TransitRegionStateProcedure proc;
193          regionNode.lock();
194          try {
195            if (regionNode.getProcedure() != null) {
196              continue;
197            }
198            proc = TransitRegionStateProcedure.reopen(env, regionNode.getRegionInfo());
199            regionNode.setProcedure(proc);
200          } finally {
201            regionNode.unlock();
202          }
203          addChildProcedure(proc);
204          regionsReopened++;
205        }
206        setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_CONFIRM_REOPENED);
207        return Flow.HAS_MORE_STATE;
208      case REOPEN_TABLE_REGIONS_CONFIRM_REOPENED:
209        // update region lists based on what's been reopened
210        regions = filterReopened(env, regions);
211        currentRegionBatch = filterReopened(env, currentRegionBatch);
212
213        // existing batch didn't fully reopen, so try to resolve that first.
214        // since this is a retry, don't do the batch backoff
215        if (!currentRegionBatch.isEmpty()) {
216          return reopenIfSchedulable(env, currentRegionBatch, false);
217        }
218
219        if (regions.isEmpty()) {
220          return Flow.NO_MORE_STATE;
221        }
222
223        // current batch is finished, schedule more regions
224        return reopenIfSchedulable(env, regions, true);
225      default:
226        throw new UnsupportedOperationException("unhandled state=" + state);
227    }
228  }
229
230  private List<HRegionLocation> filterReopened(MasterProcedureEnv env,
231    List<HRegionLocation> regionsToCheck) {
232    return regionsToCheck.stream().map(env.getAssignmentManager().getRegionStates()::checkReopened)
233      .filter(l -> l != null).collect(Collectors.toList());
234  }
235
236  private Flow reopenIfSchedulable(MasterProcedureEnv env, List<HRegionLocation> regionsToReopen,
237    boolean shouldBatchBackoff) throws ProcedureSuspendedException {
238    if (regionsToReopen.stream().anyMatch(loc -> canSchedule(env, loc))) {
239      retryCounter = null;
240      setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_REGIONS);
241      if (shouldBatchBackoff && reopenBatchBackoffMillis > 0) {
242        progressBatchSize();
243        setBackoffState(reopenBatchBackoffMillis);
244        throw new ProcedureSuspendedException();
245      } else {
246        return Flow.HAS_MORE_STATE;
247      }
248    }
249
250    // We can not schedule TRSP for all the regions need to reopen, wait for a while and retry
251    // again.
252    if (retryCounter == null) {
253      retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
254    }
255    long backoffMillis = retryCounter.getBackoffTimeAndIncrementAttempts();
256    LOG.info(
257      "There are still {} region(s) which need to be reopened for table {}. {} are in "
258        + "OPENING state, suspend {}secs and try again later",
259      regions.size(), tableName, currentRegionBatch.size(), backoffMillis / 1000);
260    setBackoffState(backoffMillis);
261    throw new ProcedureSuspendedException();
262  }
263
264  private void setBackoffState(long millis) {
265    setTimeout(Math.toIntExact(millis));
266    setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
267    skipPersistence();
268  }
269
270  private List<HRegionLocation>
271    getRegionLocationsForReopen(List<HRegionLocation> tableRegionsForReopen) {
272
273    List<HRegionLocation> regionsToReopen = new ArrayList<>();
274    if (
275      CollectionUtils.isNotEmpty(regionNames) && CollectionUtils.isNotEmpty(tableRegionsForReopen)
276    ) {
277      for (byte[] regionName : regionNames) {
278        for (HRegionLocation hRegionLocation : tableRegionsForReopen) {
279          if (Bytes.equals(regionName, hRegionLocation.getRegion().getRegionName())) {
280            regionsToReopen.add(hRegionLocation);
281            break;
282          }
283        }
284      }
285    } else {
286      regionsToReopen = tableRegionsForReopen;
287    }
288    return regionsToReopen;
289  }
290
291  /**
292   * At end of timeout, wake ourselves up so we run again.
293   */
294  @Override
295  protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
296    setState(ProcedureProtos.ProcedureState.RUNNABLE);
297    env.getProcedureScheduler().addFront(this);
298    return false; // 'false' means that this procedure handled the timeout
299  }
300
301  @Override
302  protected void rollbackState(MasterProcedureEnv env, ReopenTableRegionsState state)
303    throws IOException, InterruptedException {
304    throw new UnsupportedOperationException("unhandled state=" + state);
305  }
306
307  @Override
308  protected ReopenTableRegionsState getState(int stateId) {
309    return ReopenTableRegionsState.forNumber(stateId);
310  }
311
312  @Override
313  protected int getStateId(ReopenTableRegionsState state) {
314    return state.getNumber();
315  }
316
317  @Override
318  protected ReopenTableRegionsState getInitialState() {
319    return ReopenTableRegionsState.REOPEN_TABLE_REGIONS_GET_REGIONS;
320  }
321
322  @Override
323  protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
324    super.serializeStateData(serializer);
325    ReopenTableRegionsStateData.Builder builder = ReopenTableRegionsStateData.newBuilder()
326      .setTableName(ProtobufUtil.toProtoTableName(tableName));
327    regions.stream().map(ProtobufUtil::toRegionLocation).forEachOrdered(builder::addRegion);
328    if (CollectionUtils.isNotEmpty(regionNames)) {
329      // As of this writing, wrapping this statement withing if condition is only required
330      // for backward compatibility as we used to have 'regionNames' as null for cases
331      // where all regions of given table should be reopened. Now, we have kept emptyList()
332      // for 'regionNames' to indicate all regions of given table should be reopened unless
333      // 'regionNames' contains at least one specific region, in which case only list of regions
334      // that 'regionNames' contain should be reopened, not all regions of given table.
335      // Now, we don't need this check since we are not dealing with null 'regionNames' and hence,
336      // guarding by this if condition can be removed in HBase 4.0.0.
337      regionNames.stream().map(ByteString::copyFrom).forEachOrdered(builder::addRegionNames);
338    }
339    serializer.serialize(builder.build());
340  }
341
342  @Override
343  protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
344    super.deserializeStateData(serializer);
345    ReopenTableRegionsStateData data = serializer.deserialize(ReopenTableRegionsStateData.class);
346    tableName = ProtobufUtil.toTableName(data.getTableName());
347    regions = data.getRegionList().stream().map(ProtobufUtil::toRegionLocation)
348      .collect(Collectors.toList());
349    if (CollectionUtils.isNotEmpty(data.getRegionNamesList())) {
350      regionNames = data.getRegionNamesList().stream().map(ByteString::toByteArray)
351        .collect(Collectors.toList());
352    } else {
353      regionNames = Collections.emptyList();
354    }
355  }
356}