001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.List;
025import java.util.Optional;
026import java.util.stream.Collectors;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.DoNotRetryIOException;
029import org.apache.hadoop.hbase.HRegionLocation;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.UnknownRegionException;
032import org.apache.hadoop.hbase.client.TableDescriptor;
033import org.apache.hadoop.hbase.conf.ConfigKey;
034import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
035import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
036import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
037import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
038import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
039import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.apache.hadoop.hbase.util.RetryCounter;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
047import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
048
049import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
050import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReopenTableRegionsState;
051import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReopenTableRegionsStateData;
052import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
053
054/**
055 * Used for reopening the regions for a table.
056 */
057@InterfaceAudience.Private
058public class ReopenTableRegionsProcedure
059  extends AbstractStateMachineTableProcedure<ReopenTableRegionsState> {
060
061  private static final Logger LOG = LoggerFactory.getLogger(ReopenTableRegionsProcedure.class);
062
063  public static final String PROGRESSIVE_BATCH_BACKOFF_MILLIS_KEY =
064    ConfigKey.LONG("hbase.reopen.table.regions.progressive.batch.backoff.ms");
065  public static final long PROGRESSIVE_BATCH_BACKOFF_MILLIS_DEFAULT = 0L;
066  public static final String PROGRESSIVE_BATCH_SIZE_MAX_KEY =
067    ConfigKey.INT("hbase.reopen.table.regions.progressive.batch.size.max");
068  public static final int PROGRESSIVE_BATCH_SIZE_MAX_DISABLED = -1;
069
070  // this minimum prevents a max which would break this procedure
071  private static final int MINIMUM_BATCH_SIZE_MAX = 1;
072
073  private TableName tableName;
074
075  // Specify specific regions of a table to reopen.
076  // if specified null, all regions of the table will be reopened.
077  private List<byte[]> regionNames;
078
079  private List<HRegionLocation> regions = Collections.emptyList();
080
081  private List<HRegionLocation> currentRegionBatch = Collections.emptyList();
082
083  private RetryCounter retryCounter;
084
085  private long reopenBatchBackoffMillis;
086  private int reopenBatchSize;
087  private int reopenBatchSizeMax;
088  private long regionsReopened = 0;
089  private long batchesProcessed = 0;
090
091  /**
092   * Create a new ReopenTableRegionsProcedure respecting the throttling configuration for the table.
093   * First check the table descriptor, then fall back to the global configuration. Only used in
094   * ModifyTableProcedure and in {@link HMaster#reopenRegionsThrottled}.
095   */
096  public static ReopenTableRegionsProcedure throttled(final Configuration conf,
097    final TableDescriptor desc) {
098    long backoffMillis = Optional.ofNullable(desc.getValue(PROGRESSIVE_BATCH_BACKOFF_MILLIS_KEY))
099      .map(Long::parseLong).orElseGet(() -> conf.getLong(PROGRESSIVE_BATCH_BACKOFF_MILLIS_KEY,
100        PROGRESSIVE_BATCH_BACKOFF_MILLIS_DEFAULT));
101    int batchSizeMax = Optional.ofNullable(desc.getValue(PROGRESSIVE_BATCH_SIZE_MAX_KEY))
102      .map(Integer::parseInt).orElseGet(
103        () -> conf.getInt(PROGRESSIVE_BATCH_SIZE_MAX_KEY, PROGRESSIVE_BATCH_SIZE_MAX_DISABLED));
104
105    return new ReopenTableRegionsProcedure(desc.getTableName(), backoffMillis, batchSizeMax);
106  }
107
108  /**
109   * Create a new ReopenTableRegionsProcedure for specific regions, respecting the throttling
110   * configuration for the table. First check the table descriptor, then fall back to the global
111   * configuration. Only used in {@link HMaster#reopenRegionsThrottled}.
112   */
113  public static ReopenTableRegionsProcedure throttled(final Configuration conf,
114    final TableDescriptor desc, final List<byte[]> regionNames) {
115    long backoffMillis = Optional.ofNullable(desc.getValue(PROGRESSIVE_BATCH_BACKOFF_MILLIS_KEY))
116      .map(Long::parseLong).orElseGet(() -> conf.getLong(PROGRESSIVE_BATCH_BACKOFF_MILLIS_KEY,
117        PROGRESSIVE_BATCH_BACKOFF_MILLIS_DEFAULT));
118    int batchSizeMax = Optional.ofNullable(desc.getValue(PROGRESSIVE_BATCH_SIZE_MAX_KEY))
119      .map(Integer::parseInt).orElseGet(
120        () -> conf.getInt(PROGRESSIVE_BATCH_SIZE_MAX_KEY, PROGRESSIVE_BATCH_SIZE_MAX_DISABLED));
121
122    return new ReopenTableRegionsProcedure(desc.getTableName(), regionNames, backoffMillis,
123      batchSizeMax);
124  }
125
126  public ReopenTableRegionsProcedure() {
127    this(null);
128  }
129
130  public ReopenTableRegionsProcedure(TableName tableName) {
131    this(tableName, Collections.emptyList());
132  }
133
134  public ReopenTableRegionsProcedure(final TableName tableName, final List<byte[]> regionNames) {
135    this(tableName, regionNames, PROGRESSIVE_BATCH_BACKOFF_MILLIS_DEFAULT,
136      PROGRESSIVE_BATCH_SIZE_MAX_DISABLED);
137  }
138
139  /**
140   * Visible for testing purposes - prefer the above methods to construct
141   */
142  public ReopenTableRegionsProcedure(final TableName tableName, long reopenBatchBackoffMillis,
143    int reopenBatchSizeMax) {
144    this(tableName, Collections.emptyList(), reopenBatchBackoffMillis, reopenBatchSizeMax);
145  }
146
147  /**
148   * Visible for testing purposes - prefer the above methods to construct
149   */
150  public ReopenTableRegionsProcedure(final TableName tableName, final List<byte[]> regionNames,
151    long reopenBatchBackoffMillis, int reopenBatchSizeMax) {
152    this.tableName = tableName;
153    this.regionNames = regionNames;
154    this.reopenBatchBackoffMillis = reopenBatchBackoffMillis;
155    if (reopenBatchSizeMax == PROGRESSIVE_BATCH_SIZE_MAX_DISABLED) {
156      this.reopenBatchSize = Integer.MAX_VALUE;
157      this.reopenBatchSizeMax = Integer.MAX_VALUE;
158    } else {
159      this.reopenBatchSize = 1;
160      this.reopenBatchSizeMax = Math.max(reopenBatchSizeMax, MINIMUM_BATCH_SIZE_MAX);
161    }
162  }
163
164  @Override
165  public TableName getTableName() {
166    return tableName;
167  }
168
169  @Override
170  public TableOperationType getTableOperationType() {
171    return TableOperationType.REGION_EDIT;
172  }
173
174  @RestrictedApi(explanation = "Should only be called in tests", link = "",
175      allowedOnPath = ".*/src/test/.*")
176  public long getRegionsReopened() {
177    return regionsReopened;
178  }
179
180  @RestrictedApi(explanation = "Should only be called in tests", link = "",
181      allowedOnPath = ".*/src/test/.*")
182  public long getBatchesProcessed() {
183    return batchesProcessed;
184  }
185
186  @RestrictedApi(explanation = "Should only be called in tests", link = "",
187      allowedOnPath = ".*/src/test/.*")
188  long getReopenBatchBackoffMillis() {
189    return reopenBatchBackoffMillis;
190  }
191
192  @RestrictedApi(explanation = "Should only be called internally or in tests", link = "",
193      allowedOnPath = ".*(/src/test/.*|ReopenTableRegionsProcedure).java")
194  protected int progressBatchSize() {
195    int previousBatchSize = reopenBatchSize;
196    reopenBatchSize = Math.min(reopenBatchSizeMax, 2 * reopenBatchSize);
197    if (reopenBatchSize < previousBatchSize) {
198      // the batch size should never decrease. this must be overflow, so just use max
199      reopenBatchSize = reopenBatchSizeMax;
200    }
201    return reopenBatchSize;
202  }
203
204  private boolean canSchedule(MasterProcedureEnv env, HRegionLocation loc) {
205    if (loc.getSeqNum() < 0) {
206      return false;
207    }
208    RegionStateNode regionNode =
209      env.getAssignmentManager().getRegionStates().getRegionStateNode(loc.getRegion());
210    // If the region node is null, then at least in the next round we can remove this region to make
211    // progress. And the second condition is a normal one, if there are no TRSP with it then we can
212    // schedule one to make progress.
213    return regionNode == null || !regionNode.isTransitionScheduled();
214  }
215
216  @Override
217  protected Flow executeFromState(MasterProcedureEnv env, ReopenTableRegionsState state)
218    throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
219    try {
220      switch (state) {
221        case REOPEN_TABLE_REGIONS_GET_REGIONS:
222          if (!isTableEnabled(env)) {
223            LOG.info("Table {} is disabled, give up reopening its regions", tableName);
224            return Flow.NO_MORE_STATE;
225          }
226          List<HRegionLocation> tableRegions =
227            env.getAssignmentManager().getRegionStates().getRegionsOfTableForReopen(tableName);
228          regions = getRegionLocationsForReopen(tableRegions);
229          setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_REGIONS);
230          return Flow.HAS_MORE_STATE;
231        case REOPEN_TABLE_REGIONS_REOPEN_REGIONS:
232          // if we didn't finish reopening the last batch yet, let's keep trying until we do.
233          // at that point, the batch will be empty and we can generate a new batch
234          if (!regions.isEmpty() && currentRegionBatch.isEmpty()) {
235            currentRegionBatch =
236              regions.stream().limit(reopenBatchSize).collect(Collectors.toList());
237            batchesProcessed++;
238          }
239          for (HRegionLocation loc : currentRegionBatch) {
240            RegionStateNode regionNode =
241              env.getAssignmentManager().getRegionStates().getRegionStateNode(loc.getRegion());
242            // this possible, maybe the region has already been merged or split, see HBASE-20921
243            if (regionNode == null) {
244              continue;
245            }
246            TransitRegionStateProcedure proc;
247            regionNode.lock();
248            try {
249              if (regionNode.getProcedure() != null) {
250                continue;
251              }
252              proc = TransitRegionStateProcedure.reopen(env, regionNode.getRegionInfo());
253              regionNode.setProcedure(proc);
254            } finally {
255              regionNode.unlock();
256            }
257            addChildProcedure(proc);
258            regionsReopened++;
259          }
260          setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_CONFIRM_REOPENED);
261          return Flow.HAS_MORE_STATE;
262        case REOPEN_TABLE_REGIONS_CONFIRM_REOPENED:
263          // update region lists based on what's been reopened
264          regions = filterReopened(env, regions);
265          currentRegionBatch = filterReopened(env, currentRegionBatch);
266
267          // existing batch didn't fully reopen, so try to resolve that first.
268          // since this is a retry, don't do the batch backoff
269          if (!currentRegionBatch.isEmpty()) {
270            return reopenIfSchedulable(env, currentRegionBatch, false);
271          }
272
273          if (regions.isEmpty()) {
274            return Flow.NO_MORE_STATE;
275          }
276
277          // current batch is finished, schedule more regions
278          return reopenIfSchedulable(env, regions, true);
279        default:
280          throw new UnsupportedOperationException("unhandled state=" + state);
281      }
282    } catch (IOException e) {
283      if (isRollbackSupported(state) || e instanceof DoNotRetryIOException) {
284        setFailure("master-reopen-table-regions", e);
285      } else {
286        LOG.warn("Retriable error trying to reopen regions for table={} (in state={})", tableName,
287          state, e);
288      }
289    }
290    return Flow.HAS_MORE_STATE;
291  }
292
293  private List<HRegionLocation> filterReopened(MasterProcedureEnv env,
294    List<HRegionLocation> regionsToCheck) {
295    return regionsToCheck.stream().map(env.getAssignmentManager().getRegionStates()::checkReopened)
296      .filter(l -> l != null).collect(Collectors.toList());
297  }
298
299  private Flow reopenIfSchedulable(MasterProcedureEnv env, List<HRegionLocation> regionsToReopen,
300    boolean shouldBatchBackoff) throws ProcedureSuspendedException {
301    if (regionsToReopen.stream().anyMatch(loc -> canSchedule(env, loc))) {
302      retryCounter = null;
303      setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_REGIONS);
304      if (shouldBatchBackoff) {
305        progressBatchSize();
306      }
307      if (shouldBatchBackoff && reopenBatchBackoffMillis > 0) {
308        setBackoffState(reopenBatchBackoffMillis);
309        throw new ProcedureSuspendedException();
310      } else {
311        return Flow.HAS_MORE_STATE;
312      }
313    }
314
315    // We can not schedule TRSP for all the regions need to reopen, wait for a while and retry
316    // again.
317    if (retryCounter == null) {
318      retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
319    }
320    long backoffMillis = retryCounter.getBackoffTimeAndIncrementAttempts();
321    LOG.info(
322      "There are still {} region(s) which need to be reopened for table {}. {} are in "
323        + "OPENING state, suspend {}secs and try again later",
324      regions.size(), tableName, currentRegionBatch.size(), backoffMillis / 1000);
325    setBackoffState(backoffMillis);
326    throw new ProcedureSuspendedException();
327  }
328
329  private void setBackoffState(long millis) {
330    setTimeout(Math.toIntExact(millis));
331    setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
332    skipPersistence();
333  }
334
335  private List<HRegionLocation>
336    getRegionLocationsForReopen(List<HRegionLocation> tableRegionsForReopen) throws IOException {
337
338    List<HRegionLocation> regionsToReopen = new ArrayList<>();
339    if (
340      CollectionUtils.isNotEmpty(regionNames) && CollectionUtils.isNotEmpty(tableRegionsForReopen)
341    ) {
342      List<byte[]> notFoundRegions = new ArrayList<>();
343
344      for (byte[] regionName : regionNames) {
345        boolean found = false;
346        for (HRegionLocation hRegionLocation : tableRegionsForReopen) {
347          if (Bytes.equals(regionName, hRegionLocation.getRegion().getRegionName())) {
348            regionsToReopen.add(hRegionLocation);
349            found = true;
350            break;
351          }
352        }
353        if (!found) {
354          notFoundRegions.add(regionName);
355        }
356      }
357
358      if (!notFoundRegions.isEmpty()) {
359        String regionNamesStr =
360          notFoundRegions.stream().map(Bytes::toStringBinary).collect(Collectors.joining(", "));
361        throw new UnknownRegionException(
362          "The following regions do not belong to table " + tableName + ": " + regionNamesStr);
363      }
364    } else {
365      regionsToReopen = tableRegionsForReopen;
366    }
367    return regionsToReopen;
368  }
369
370  /**
371   * At end of timeout, wake ourselves up so we run again.
372   */
373  @Override
374  protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
375    setState(ProcedureProtos.ProcedureState.RUNNABLE);
376    env.getProcedureScheduler().addFront(this);
377    return false; // 'false' means that this procedure handled the timeout
378  }
379
380  @Override
381  protected void rollbackState(MasterProcedureEnv env, ReopenTableRegionsState state)
382    throws IOException, InterruptedException {
383    throw new UnsupportedOperationException("unhandled state=" + state);
384  }
385
386  @Override
387  protected ReopenTableRegionsState getState(int stateId) {
388    return ReopenTableRegionsState.forNumber(stateId);
389  }
390
391  @Override
392  protected int getStateId(ReopenTableRegionsState state) {
393    return state.getNumber();
394  }
395
396  @Override
397  protected ReopenTableRegionsState getInitialState() {
398    return ReopenTableRegionsState.REOPEN_TABLE_REGIONS_GET_REGIONS;
399  }
400
401  @Override
402  protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
403    super.serializeStateData(serializer);
404    ReopenTableRegionsStateData.Builder builder = ReopenTableRegionsStateData.newBuilder()
405      .setTableName(ProtobufUtil.toProtoTableName(tableName));
406    regions.stream().map(ProtobufUtil::toRegionLocation).forEachOrdered(builder::addRegion);
407    if (CollectionUtils.isNotEmpty(regionNames)) {
408      // As of this writing, wrapping this statement withing if condition is only required
409      // for backward compatibility as we used to have 'regionNames' as null for cases
410      // where all regions of given table should be reopened. Now, we have kept emptyList()
411      // for 'regionNames' to indicate all regions of given table should be reopened unless
412      // 'regionNames' contains at least one specific region, in which case only list of regions
413      // that 'regionNames' contain should be reopened, not all regions of given table.
414      // Now, we don't need this check since we are not dealing with null 'regionNames' and hence,
415      // guarding by this if condition can be removed in HBase 4.0.0.
416      regionNames.stream().map(ByteString::copyFrom).forEachOrdered(builder::addRegionNames);
417    }
418    serializer.serialize(builder.build());
419  }
420
421  @Override
422  protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
423    super.deserializeStateData(serializer);
424    ReopenTableRegionsStateData data = serializer.deserialize(ReopenTableRegionsStateData.class);
425    tableName = ProtobufUtil.toTableName(data.getTableName());
426    regions = data.getRegionList().stream().map(ProtobufUtil::toRegionLocation)
427      .collect(Collectors.toList());
428    if (CollectionUtils.isNotEmpty(data.getRegionNamesList())) {
429      regionNames = data.getRegionNamesList().stream().map(ByteString::toByteArray)
430        .collect(Collectors.toList());
431    } else {
432      regionNames = Collections.emptyList();
433    }
434  }
435}