001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collections; 024import java.util.List; 025import java.util.Optional; 026import java.util.stream.Collectors; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.HRegionLocation; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.client.TableDescriptor; 031import org.apache.hadoop.hbase.conf.ConfigKey; 032import org.apache.hadoop.hbase.master.assignment.RegionStateNode; 033import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure; 034import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 035import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 036import org.apache.hadoop.hbase.procedure2.ProcedureUtil; 037import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; 038import org.apache.hadoop.hbase.util.Bytes; 039import org.apache.hadoop.hbase.util.RetryCounter; 040import org.apache.yetus.audience.InterfaceAudience; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 045import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 046 047import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 048import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReopenTableRegionsState; 049import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReopenTableRegionsStateData; 050import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 051 052/** 053 * Used for reopening the regions for a table. 054 */ 055@InterfaceAudience.Private 056public class ReopenTableRegionsProcedure 057 extends AbstractStateMachineTableProcedure<ReopenTableRegionsState> { 058 059 private static final Logger LOG = LoggerFactory.getLogger(ReopenTableRegionsProcedure.class); 060 061 public static final String PROGRESSIVE_BATCH_BACKOFF_MILLIS_KEY = 062 ConfigKey.LONG("hbase.reopen.table.regions.progressive.batch.backoff.ms"); 063 public static final long PROGRESSIVE_BATCH_BACKOFF_MILLIS_DEFAULT = 0L; 064 public static final String PROGRESSIVE_BATCH_SIZE_MAX_KEY = 065 ConfigKey.INT("hbase.reopen.table.regions.progressive.batch.size.max"); 066 public static final int PROGRESSIVE_BATCH_SIZE_MAX_DISABLED = -1; 067 068 // this minimum prevents a max which would break this procedure 069 private static final int MINIMUM_BATCH_SIZE_MAX = 1; 070 071 private TableName tableName; 072 073 // Specify specific regions of a table to reopen. 074 // if specified null, all regions of the table will be reopened. 075 private List<byte[]> regionNames; 076 077 private List<HRegionLocation> regions = Collections.emptyList(); 078 079 private List<HRegionLocation> currentRegionBatch = Collections.emptyList(); 080 081 private RetryCounter retryCounter; 082 083 private long reopenBatchBackoffMillis; 084 private int reopenBatchSize; 085 private int reopenBatchSizeMax; 086 private long regionsReopened = 0; 087 private long batchesProcessed = 0; 088 089 /** 090 * Create a new ReopenTableRegionsProcedure respecting the throttling configuration for the table. 091 * First check the table descriptor, then fall back to the global configuration. Only used in 092 * ModifyTableProcedure. 093 */ 094 public static ReopenTableRegionsProcedure throttled(final Configuration conf, 095 final TableDescriptor desc) { 096 long backoffMillis = Optional.ofNullable(desc.getValue(PROGRESSIVE_BATCH_BACKOFF_MILLIS_KEY)) 097 .map(Long::parseLong).orElseGet(() -> conf.getLong(PROGRESSIVE_BATCH_BACKOFF_MILLIS_KEY, 098 PROGRESSIVE_BATCH_BACKOFF_MILLIS_DEFAULT)); 099 int batchSizeMax = Optional.ofNullable(desc.getValue(PROGRESSIVE_BATCH_SIZE_MAX_KEY)) 100 .map(Integer::parseInt).orElseGet( 101 () -> conf.getInt(PROGRESSIVE_BATCH_SIZE_MAX_KEY, PROGRESSIVE_BATCH_SIZE_MAX_DISABLED)); 102 103 return new ReopenTableRegionsProcedure(desc.getTableName(), backoffMillis, batchSizeMax); 104 } 105 106 public ReopenTableRegionsProcedure() { 107 this(null); 108 } 109 110 public ReopenTableRegionsProcedure(TableName tableName) { 111 this(tableName, Collections.emptyList()); 112 } 113 114 public ReopenTableRegionsProcedure(final TableName tableName, final List<byte[]> regionNames) { 115 this(tableName, regionNames, PROGRESSIVE_BATCH_BACKOFF_MILLIS_DEFAULT, 116 PROGRESSIVE_BATCH_SIZE_MAX_DISABLED); 117 } 118 119 ReopenTableRegionsProcedure(final TableName tableName, long reopenBatchBackoffMillis, 120 int reopenBatchSizeMax) { 121 this(tableName, Collections.emptyList(), reopenBatchBackoffMillis, reopenBatchSizeMax); 122 } 123 124 private ReopenTableRegionsProcedure(final TableName tableName, final List<byte[]> regionNames, 125 long reopenBatchBackoffMillis, int reopenBatchSizeMax) { 126 this.tableName = tableName; 127 this.regionNames = regionNames; 128 this.reopenBatchBackoffMillis = reopenBatchBackoffMillis; 129 if (reopenBatchSizeMax == PROGRESSIVE_BATCH_SIZE_MAX_DISABLED) { 130 this.reopenBatchSize = Integer.MAX_VALUE; 131 this.reopenBatchSizeMax = Integer.MAX_VALUE; 132 } else { 133 this.reopenBatchSize = 1; 134 this.reopenBatchSizeMax = Math.max(reopenBatchSizeMax, MINIMUM_BATCH_SIZE_MAX); 135 } 136 } 137 138 @Override 139 public TableName getTableName() { 140 return tableName; 141 } 142 143 @Override 144 public TableOperationType getTableOperationType() { 145 return TableOperationType.REGION_EDIT; 146 } 147 148 @RestrictedApi(explanation = "Should only be called in tests", link = "", 149 allowedOnPath = ".*/src/test/.*") 150 public long getRegionsReopened() { 151 return regionsReopened; 152 } 153 154 @RestrictedApi(explanation = "Should only be called in tests", link = "", 155 allowedOnPath = ".*/src/test/.*") 156 public long getBatchesProcessed() { 157 return batchesProcessed; 158 } 159 160 @RestrictedApi(explanation = "Should only be called in tests", link = "", 161 allowedOnPath = ".*/src/test/.*") 162 long getReopenBatchBackoffMillis() { 163 return reopenBatchBackoffMillis; 164 } 165 166 @RestrictedApi(explanation = "Should only be called internally or in tests", link = "", 167 allowedOnPath = ".*(/src/test/.*|ReopenTableRegionsProcedure).java") 168 protected int progressBatchSize() { 169 int previousBatchSize = reopenBatchSize; 170 reopenBatchSize = Math.min(reopenBatchSizeMax, 2 * reopenBatchSize); 171 if (reopenBatchSize < previousBatchSize) { 172 // the batch size should never decrease. this must be overflow, so just use max 173 reopenBatchSize = reopenBatchSizeMax; 174 } 175 return reopenBatchSize; 176 } 177 178 private boolean canSchedule(MasterProcedureEnv env, HRegionLocation loc) { 179 if (loc.getSeqNum() < 0) { 180 return false; 181 } 182 RegionStateNode regionNode = 183 env.getAssignmentManager().getRegionStates().getRegionStateNode(loc.getRegion()); 184 // If the region node is null, then at least in the next round we can remove this region to make 185 // progress. And the second condition is a normal one, if there are no TRSP with it then we can 186 // schedule one to make progress. 187 return regionNode == null || !regionNode.isInTransition(); 188 } 189 190 @Override 191 protected Flow executeFromState(MasterProcedureEnv env, ReopenTableRegionsState state) 192 throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { 193 switch (state) { 194 case REOPEN_TABLE_REGIONS_GET_REGIONS: 195 if (!isTableEnabled(env)) { 196 LOG.info("Table {} is disabled, give up reopening its regions", tableName); 197 return Flow.NO_MORE_STATE; 198 } 199 List<HRegionLocation> tableRegions = 200 env.getAssignmentManager().getRegionStates().getRegionsOfTableForReopen(tableName); 201 regions = getRegionLocationsForReopen(tableRegions); 202 setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_REGIONS); 203 return Flow.HAS_MORE_STATE; 204 case REOPEN_TABLE_REGIONS_REOPEN_REGIONS: 205 // if we didn't finish reopening the last batch yet, let's keep trying until we do. 206 // at that point, the batch will be empty and we can generate a new batch 207 if (!regions.isEmpty() && currentRegionBatch.isEmpty()) { 208 currentRegionBatch = regions.stream().limit(reopenBatchSize).collect(Collectors.toList()); 209 batchesProcessed++; 210 } 211 for (HRegionLocation loc : currentRegionBatch) { 212 RegionStateNode regionNode = 213 env.getAssignmentManager().getRegionStates().getRegionStateNode(loc.getRegion()); 214 // this possible, maybe the region has already been merged or split, see HBASE-20921 215 if (regionNode == null) { 216 continue; 217 } 218 TransitRegionStateProcedure proc; 219 regionNode.lock(); 220 try { 221 if (regionNode.getProcedure() != null) { 222 continue; 223 } 224 proc = TransitRegionStateProcedure.reopen(env, regionNode.getRegionInfo()); 225 regionNode.setProcedure(proc); 226 } finally { 227 regionNode.unlock(); 228 } 229 addChildProcedure(proc); 230 regionsReopened++; 231 } 232 setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_CONFIRM_REOPENED); 233 return Flow.HAS_MORE_STATE; 234 case REOPEN_TABLE_REGIONS_CONFIRM_REOPENED: 235 // update region lists based on what's been reopened 236 regions = filterReopened(env, regions); 237 currentRegionBatch = filterReopened(env, currentRegionBatch); 238 239 // existing batch didn't fully reopen, so try to resolve that first. 240 // since this is a retry, don't do the batch backoff 241 if (!currentRegionBatch.isEmpty()) { 242 return reopenIfSchedulable(env, currentRegionBatch, false); 243 } 244 245 if (regions.isEmpty()) { 246 return Flow.NO_MORE_STATE; 247 } 248 249 // current batch is finished, schedule more regions 250 return reopenIfSchedulable(env, regions, true); 251 default: 252 throw new UnsupportedOperationException("unhandled state=" + state); 253 } 254 } 255 256 private List<HRegionLocation> filterReopened(MasterProcedureEnv env, 257 List<HRegionLocation> regionsToCheck) { 258 return regionsToCheck.stream().map(env.getAssignmentManager().getRegionStates()::checkReopened) 259 .filter(l -> l != null).collect(Collectors.toList()); 260 } 261 262 private Flow reopenIfSchedulable(MasterProcedureEnv env, List<HRegionLocation> regionsToReopen, 263 boolean shouldBatchBackoff) throws ProcedureSuspendedException { 264 if (regionsToReopen.stream().anyMatch(loc -> canSchedule(env, loc))) { 265 retryCounter = null; 266 setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_REGIONS); 267 if (shouldBatchBackoff) { 268 progressBatchSize(); 269 } 270 if (shouldBatchBackoff && reopenBatchBackoffMillis > 0) { 271 setBackoffState(reopenBatchBackoffMillis); 272 throw new ProcedureSuspendedException(); 273 } else { 274 return Flow.HAS_MORE_STATE; 275 } 276 } 277 278 // We can not schedule TRSP for all the regions need to reopen, wait for a while and retry 279 // again. 280 if (retryCounter == null) { 281 retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration()); 282 } 283 long backoffMillis = retryCounter.getBackoffTimeAndIncrementAttempts(); 284 LOG.info( 285 "There are still {} region(s) which need to be reopened for table {}. {} are in " 286 + "OPENING state, suspend {}secs and try again later", 287 regions.size(), tableName, currentRegionBatch.size(), backoffMillis / 1000); 288 setBackoffState(backoffMillis); 289 throw new ProcedureSuspendedException(); 290 } 291 292 private void setBackoffState(long millis) { 293 setTimeout(Math.toIntExact(millis)); 294 setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); 295 skipPersistence(); 296 } 297 298 private List<HRegionLocation> 299 getRegionLocationsForReopen(List<HRegionLocation> tableRegionsForReopen) { 300 301 List<HRegionLocation> regionsToReopen = new ArrayList<>(); 302 if ( 303 CollectionUtils.isNotEmpty(regionNames) && CollectionUtils.isNotEmpty(tableRegionsForReopen) 304 ) { 305 for (byte[] regionName : regionNames) { 306 for (HRegionLocation hRegionLocation : tableRegionsForReopen) { 307 if (Bytes.equals(regionName, hRegionLocation.getRegion().getRegionName())) { 308 regionsToReopen.add(hRegionLocation); 309 break; 310 } 311 } 312 } 313 } else { 314 regionsToReopen = tableRegionsForReopen; 315 } 316 return regionsToReopen; 317 } 318 319 /** 320 * At end of timeout, wake ourselves up so we run again. 321 */ 322 @Override 323 protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) { 324 setState(ProcedureProtos.ProcedureState.RUNNABLE); 325 env.getProcedureScheduler().addFront(this); 326 return false; // 'false' means that this procedure handled the timeout 327 } 328 329 @Override 330 protected void rollbackState(MasterProcedureEnv env, ReopenTableRegionsState state) 331 throws IOException, InterruptedException { 332 throw new UnsupportedOperationException("unhandled state=" + state); 333 } 334 335 @Override 336 protected ReopenTableRegionsState getState(int stateId) { 337 return ReopenTableRegionsState.forNumber(stateId); 338 } 339 340 @Override 341 protected int getStateId(ReopenTableRegionsState state) { 342 return state.getNumber(); 343 } 344 345 @Override 346 protected ReopenTableRegionsState getInitialState() { 347 return ReopenTableRegionsState.REOPEN_TABLE_REGIONS_GET_REGIONS; 348 } 349 350 @Override 351 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 352 super.serializeStateData(serializer); 353 ReopenTableRegionsStateData.Builder builder = ReopenTableRegionsStateData.newBuilder() 354 .setTableName(ProtobufUtil.toProtoTableName(tableName)); 355 regions.stream().map(ProtobufUtil::toRegionLocation).forEachOrdered(builder::addRegion); 356 if (CollectionUtils.isNotEmpty(regionNames)) { 357 // As of this writing, wrapping this statement withing if condition is only required 358 // for backward compatibility as we used to have 'regionNames' as null for cases 359 // where all regions of given table should be reopened. Now, we have kept emptyList() 360 // for 'regionNames' to indicate all regions of given table should be reopened unless 361 // 'regionNames' contains at least one specific region, in which case only list of regions 362 // that 'regionNames' contain should be reopened, not all regions of given table. 363 // Now, we don't need this check since we are not dealing with null 'regionNames' and hence, 364 // guarding by this if condition can be removed in HBase 4.0.0. 365 regionNames.stream().map(ByteString::copyFrom).forEachOrdered(builder::addRegionNames); 366 } 367 serializer.serialize(builder.build()); 368 } 369 370 @Override 371 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 372 super.deserializeStateData(serializer); 373 ReopenTableRegionsStateData data = serializer.deserialize(ReopenTableRegionsStateData.class); 374 tableName = ProtobufUtil.toTableName(data.getTableName()); 375 regions = data.getRegionList().stream().map(ProtobufUtil::toRegionLocation) 376 .collect(Collectors.toList()); 377 if (CollectionUtils.isNotEmpty(data.getRegionNamesList())) { 378 regionNames = data.getRegionNamesList().stream().map(ByteString::toByteArray) 379 .collect(Collectors.toList()); 380 } else { 381 regionNames = Collections.emptyList(); 382 } 383 } 384}