001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.List;
025import java.util.Optional;
026import java.util.stream.Collectors;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.HRegionLocation;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.client.TableDescriptor;
031import org.apache.hadoop.hbase.conf.ConfigKey;
032import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
033import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
034import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
035import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
036import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
037import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.hadoop.hbase.util.RetryCounter;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
045import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
046
047import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
048import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReopenTableRegionsState;
049import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReopenTableRegionsStateData;
050import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
051
052/**
053 * Used for reopening the regions for a table.
054 */
055@InterfaceAudience.Private
056public class ReopenTableRegionsProcedure
057  extends AbstractStateMachineTableProcedure<ReopenTableRegionsState> {
058
059  private static final Logger LOG = LoggerFactory.getLogger(ReopenTableRegionsProcedure.class);
060
061  public static final String PROGRESSIVE_BATCH_BACKOFF_MILLIS_KEY =
062    ConfigKey.LONG("hbase.reopen.table.regions.progressive.batch.backoff.ms");
063  public static final long PROGRESSIVE_BATCH_BACKOFF_MILLIS_DEFAULT = 0L;
064  public static final String PROGRESSIVE_BATCH_SIZE_MAX_KEY =
065    ConfigKey.INT("hbase.reopen.table.regions.progressive.batch.size.max");
066  public static final int PROGRESSIVE_BATCH_SIZE_MAX_DISABLED = -1;
067
068  // this minimum prevents a max which would break this procedure
069  private static final int MINIMUM_BATCH_SIZE_MAX = 1;
070
071  private TableName tableName;
072
073  // Specify specific regions of a table to reopen.
074  // if specified null, all regions of the table will be reopened.
075  private List<byte[]> regionNames;
076
077  private List<HRegionLocation> regions = Collections.emptyList();
078
079  private List<HRegionLocation> currentRegionBatch = Collections.emptyList();
080
081  private RetryCounter retryCounter;
082
083  private long reopenBatchBackoffMillis;
084  private int reopenBatchSize;
085  private int reopenBatchSizeMax;
086  private long regionsReopened = 0;
087  private long batchesProcessed = 0;
088
089  /**
090   * Create a new ReopenTableRegionsProcedure respecting the throttling configuration for the table.
091   * First check the table descriptor, then fall back to the global configuration. Only used in
092   * ModifyTableProcedure.
093   */
094  public static ReopenTableRegionsProcedure throttled(final Configuration conf,
095    final TableDescriptor desc) {
096    long backoffMillis = Optional.ofNullable(desc.getValue(PROGRESSIVE_BATCH_BACKOFF_MILLIS_KEY))
097      .map(Long::parseLong).orElseGet(() -> conf.getLong(PROGRESSIVE_BATCH_BACKOFF_MILLIS_KEY,
098        PROGRESSIVE_BATCH_BACKOFF_MILLIS_DEFAULT));
099    int batchSizeMax = Optional.ofNullable(desc.getValue(PROGRESSIVE_BATCH_SIZE_MAX_KEY))
100      .map(Integer::parseInt).orElseGet(
101        () -> conf.getInt(PROGRESSIVE_BATCH_SIZE_MAX_KEY, PROGRESSIVE_BATCH_SIZE_MAX_DISABLED));
102
103    return new ReopenTableRegionsProcedure(desc.getTableName(), backoffMillis, batchSizeMax);
104  }
105
106  public ReopenTableRegionsProcedure() {
107    this(null);
108  }
109
110  public ReopenTableRegionsProcedure(TableName tableName) {
111    this(tableName, Collections.emptyList());
112  }
113
114  public ReopenTableRegionsProcedure(final TableName tableName, final List<byte[]> regionNames) {
115    this(tableName, regionNames, PROGRESSIVE_BATCH_BACKOFF_MILLIS_DEFAULT,
116      PROGRESSIVE_BATCH_SIZE_MAX_DISABLED);
117  }
118
119  ReopenTableRegionsProcedure(final TableName tableName, long reopenBatchBackoffMillis,
120    int reopenBatchSizeMax) {
121    this(tableName, Collections.emptyList(), reopenBatchBackoffMillis, reopenBatchSizeMax);
122  }
123
124  private ReopenTableRegionsProcedure(final TableName tableName, final List<byte[]> regionNames,
125    long reopenBatchBackoffMillis, int reopenBatchSizeMax) {
126    this.tableName = tableName;
127    this.regionNames = regionNames;
128    this.reopenBatchBackoffMillis = reopenBatchBackoffMillis;
129    if (reopenBatchSizeMax == PROGRESSIVE_BATCH_SIZE_MAX_DISABLED) {
130      this.reopenBatchSize = Integer.MAX_VALUE;
131      this.reopenBatchSizeMax = Integer.MAX_VALUE;
132    } else {
133      this.reopenBatchSize = 1;
134      this.reopenBatchSizeMax = Math.max(reopenBatchSizeMax, MINIMUM_BATCH_SIZE_MAX);
135    }
136  }
137
138  @Override
139  public TableName getTableName() {
140    return tableName;
141  }
142
143  @Override
144  public TableOperationType getTableOperationType() {
145    return TableOperationType.REGION_EDIT;
146  }
147
148  @RestrictedApi(explanation = "Should only be called in tests", link = "",
149      allowedOnPath = ".*/src/test/.*")
150  public long getRegionsReopened() {
151    return regionsReopened;
152  }
153
154  @RestrictedApi(explanation = "Should only be called in tests", link = "",
155      allowedOnPath = ".*/src/test/.*")
156  public long getBatchesProcessed() {
157    return batchesProcessed;
158  }
159
160  @RestrictedApi(explanation = "Should only be called in tests", link = "",
161      allowedOnPath = ".*/src/test/.*")
162  long getReopenBatchBackoffMillis() {
163    return reopenBatchBackoffMillis;
164  }
165
166  @RestrictedApi(explanation = "Should only be called internally or in tests", link = "",
167      allowedOnPath = ".*(/src/test/.*|ReopenTableRegionsProcedure).java")
168  protected int progressBatchSize() {
169    int previousBatchSize = reopenBatchSize;
170    reopenBatchSize = Math.min(reopenBatchSizeMax, 2 * reopenBatchSize);
171    if (reopenBatchSize < previousBatchSize) {
172      // the batch size should never decrease. this must be overflow, so just use max
173      reopenBatchSize = reopenBatchSizeMax;
174    }
175    return reopenBatchSize;
176  }
177
178  private boolean canSchedule(MasterProcedureEnv env, HRegionLocation loc) {
179    if (loc.getSeqNum() < 0) {
180      return false;
181    }
182    RegionStateNode regionNode =
183      env.getAssignmentManager().getRegionStates().getRegionStateNode(loc.getRegion());
184    // If the region node is null, then at least in the next round we can remove this region to make
185    // progress. And the second condition is a normal one, if there are no TRSP with it then we can
186    // schedule one to make progress.
187    return regionNode == null || !regionNode.isInTransition();
188  }
189
190  @Override
191  protected Flow executeFromState(MasterProcedureEnv env, ReopenTableRegionsState state)
192    throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
193    switch (state) {
194      case REOPEN_TABLE_REGIONS_GET_REGIONS:
195        if (!isTableEnabled(env)) {
196          LOG.info("Table {} is disabled, give up reopening its regions", tableName);
197          return Flow.NO_MORE_STATE;
198        }
199        List<HRegionLocation> tableRegions =
200          env.getAssignmentManager().getRegionStates().getRegionsOfTableForReopen(tableName);
201        regions = getRegionLocationsForReopen(tableRegions);
202        setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_REGIONS);
203        return Flow.HAS_MORE_STATE;
204      case REOPEN_TABLE_REGIONS_REOPEN_REGIONS:
205        // if we didn't finish reopening the last batch yet, let's keep trying until we do.
206        // at that point, the batch will be empty and we can generate a new batch
207        if (!regions.isEmpty() && currentRegionBatch.isEmpty()) {
208          currentRegionBatch = regions.stream().limit(reopenBatchSize).collect(Collectors.toList());
209          batchesProcessed++;
210        }
211        for (HRegionLocation loc : currentRegionBatch) {
212          RegionStateNode regionNode =
213            env.getAssignmentManager().getRegionStates().getRegionStateNode(loc.getRegion());
214          // this possible, maybe the region has already been merged or split, see HBASE-20921
215          if (regionNode == null) {
216            continue;
217          }
218          TransitRegionStateProcedure proc;
219          regionNode.lock();
220          try {
221            if (regionNode.getProcedure() != null) {
222              continue;
223            }
224            proc = TransitRegionStateProcedure.reopen(env, regionNode.getRegionInfo());
225            regionNode.setProcedure(proc);
226          } finally {
227            regionNode.unlock();
228          }
229          addChildProcedure(proc);
230          regionsReopened++;
231        }
232        setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_CONFIRM_REOPENED);
233        return Flow.HAS_MORE_STATE;
234      case REOPEN_TABLE_REGIONS_CONFIRM_REOPENED:
235        // update region lists based on what's been reopened
236        regions = filterReopened(env, regions);
237        currentRegionBatch = filterReopened(env, currentRegionBatch);
238
239        // existing batch didn't fully reopen, so try to resolve that first.
240        // since this is a retry, don't do the batch backoff
241        if (!currentRegionBatch.isEmpty()) {
242          return reopenIfSchedulable(env, currentRegionBatch, false);
243        }
244
245        if (regions.isEmpty()) {
246          return Flow.NO_MORE_STATE;
247        }
248
249        // current batch is finished, schedule more regions
250        return reopenIfSchedulable(env, regions, true);
251      default:
252        throw new UnsupportedOperationException("unhandled state=" + state);
253    }
254  }
255
256  private List<HRegionLocation> filterReopened(MasterProcedureEnv env,
257    List<HRegionLocation> regionsToCheck) {
258    return regionsToCheck.stream().map(env.getAssignmentManager().getRegionStates()::checkReopened)
259      .filter(l -> l != null).collect(Collectors.toList());
260  }
261
262  private Flow reopenIfSchedulable(MasterProcedureEnv env, List<HRegionLocation> regionsToReopen,
263    boolean shouldBatchBackoff) throws ProcedureSuspendedException {
264    if (regionsToReopen.stream().anyMatch(loc -> canSchedule(env, loc))) {
265      retryCounter = null;
266      setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_REGIONS);
267      if (shouldBatchBackoff) {
268        progressBatchSize();
269      }
270      if (shouldBatchBackoff && reopenBatchBackoffMillis > 0) {
271        setBackoffState(reopenBatchBackoffMillis);
272        throw new ProcedureSuspendedException();
273      } else {
274        return Flow.HAS_MORE_STATE;
275      }
276    }
277
278    // We can not schedule TRSP for all the regions need to reopen, wait for a while and retry
279    // again.
280    if (retryCounter == null) {
281      retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
282    }
283    long backoffMillis = retryCounter.getBackoffTimeAndIncrementAttempts();
284    LOG.info(
285      "There are still {} region(s) which need to be reopened for table {}. {} are in "
286        + "OPENING state, suspend {}secs and try again later",
287      regions.size(), tableName, currentRegionBatch.size(), backoffMillis / 1000);
288    setBackoffState(backoffMillis);
289    throw new ProcedureSuspendedException();
290  }
291
292  private void setBackoffState(long millis) {
293    setTimeout(Math.toIntExact(millis));
294    setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
295    skipPersistence();
296  }
297
298  private List<HRegionLocation>
299    getRegionLocationsForReopen(List<HRegionLocation> tableRegionsForReopen) {
300
301    List<HRegionLocation> regionsToReopen = new ArrayList<>();
302    if (
303      CollectionUtils.isNotEmpty(regionNames) && CollectionUtils.isNotEmpty(tableRegionsForReopen)
304    ) {
305      for (byte[] regionName : regionNames) {
306        for (HRegionLocation hRegionLocation : tableRegionsForReopen) {
307          if (Bytes.equals(regionName, hRegionLocation.getRegion().getRegionName())) {
308            regionsToReopen.add(hRegionLocation);
309            break;
310          }
311        }
312      }
313    } else {
314      regionsToReopen = tableRegionsForReopen;
315    }
316    return regionsToReopen;
317  }
318
319  /**
320   * At end of timeout, wake ourselves up so we run again.
321   */
322  @Override
323  protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
324    setState(ProcedureProtos.ProcedureState.RUNNABLE);
325    env.getProcedureScheduler().addFront(this);
326    return false; // 'false' means that this procedure handled the timeout
327  }
328
329  @Override
330  protected void rollbackState(MasterProcedureEnv env, ReopenTableRegionsState state)
331    throws IOException, InterruptedException {
332    throw new UnsupportedOperationException("unhandled state=" + state);
333  }
334
335  @Override
336  protected ReopenTableRegionsState getState(int stateId) {
337    return ReopenTableRegionsState.forNumber(stateId);
338  }
339
340  @Override
341  protected int getStateId(ReopenTableRegionsState state) {
342    return state.getNumber();
343  }
344
345  @Override
346  protected ReopenTableRegionsState getInitialState() {
347    return ReopenTableRegionsState.REOPEN_TABLE_REGIONS_GET_REGIONS;
348  }
349
350  @Override
351  protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
352    super.serializeStateData(serializer);
353    ReopenTableRegionsStateData.Builder builder = ReopenTableRegionsStateData.newBuilder()
354      .setTableName(ProtobufUtil.toProtoTableName(tableName));
355    regions.stream().map(ProtobufUtil::toRegionLocation).forEachOrdered(builder::addRegion);
356    if (CollectionUtils.isNotEmpty(regionNames)) {
357      // As of this writing, wrapping this statement withing if condition is only required
358      // for backward compatibility as we used to have 'regionNames' as null for cases
359      // where all regions of given table should be reopened. Now, we have kept emptyList()
360      // for 'regionNames' to indicate all regions of given table should be reopened unless
361      // 'regionNames' contains at least one specific region, in which case only list of regions
362      // that 'regionNames' contain should be reopened, not all regions of given table.
363      // Now, we don't need this check since we are not dealing with null 'regionNames' and hence,
364      // guarding by this if condition can be removed in HBase 4.0.0.
365      regionNames.stream().map(ByteString::copyFrom).forEachOrdered(builder::addRegionNames);
366    }
367    serializer.serialize(builder.build());
368  }
369
370  @Override
371  protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
372    super.deserializeStateData(serializer);
373    ReopenTableRegionsStateData data = serializer.deserialize(ReopenTableRegionsStateData.class);
374    tableName = ProtobufUtil.toTableName(data.getTableName());
375    regions = data.getRegionList().stream().map(ProtobufUtil::toRegionLocation)
376      .collect(Collectors.toList());
377    if (CollectionUtils.isNotEmpty(data.getRegionNamesList())) {
378      regionNames = data.getRegionNamesList().stream().map(ByteString::toByteArray)
379        .collect(Collectors.toList());
380    } else {
381      regionNames = Collections.emptyList();
382    }
383  }
384}