001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collections; 024import java.util.List; 025import java.util.Optional; 026import java.util.stream.Collectors; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.DoNotRetryIOException; 029import org.apache.hadoop.hbase.HRegionLocation; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.UnknownRegionException; 032import org.apache.hadoop.hbase.client.TableDescriptor; 033import org.apache.hadoop.hbase.conf.ConfigKey; 034import org.apache.hadoop.hbase.master.assignment.RegionStateNode; 035import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure; 036import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 037import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 038import org.apache.hadoop.hbase.procedure2.ProcedureUtil; 039import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.apache.hadoop.hbase.util.RetryCounter; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 047import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 048 049import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 050import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReopenTableRegionsState; 051import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReopenTableRegionsStateData; 052import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 053 054/** 055 * Used for reopening the regions for a table. 056 */ 057@InterfaceAudience.Private 058public class ReopenTableRegionsProcedure 059 extends AbstractStateMachineTableProcedure<ReopenTableRegionsState> { 060 061 private static final Logger LOG = LoggerFactory.getLogger(ReopenTableRegionsProcedure.class); 062 063 public static final String PROGRESSIVE_BATCH_BACKOFF_MILLIS_KEY = 064 ConfigKey.LONG("hbase.reopen.table.regions.progressive.batch.backoff.ms"); 065 public static final long PROGRESSIVE_BATCH_BACKOFF_MILLIS_DEFAULT = 0L; 066 public static final String PROGRESSIVE_BATCH_SIZE_MAX_KEY = 067 ConfigKey.INT("hbase.reopen.table.regions.progressive.batch.size.max"); 068 public static final int PROGRESSIVE_BATCH_SIZE_MAX_DISABLED = -1; 069 070 // this minimum prevents a max which would break this procedure 071 private static final int MINIMUM_BATCH_SIZE_MAX = 1; 072 073 private TableName tableName; 074 075 // Specify specific regions of a table to reopen. 076 // if specified null, all regions of the table will be reopened. 077 private List<byte[]> regionNames; 078 079 private List<HRegionLocation> regions = Collections.emptyList(); 080 081 private List<HRegionLocation> currentRegionBatch = Collections.emptyList(); 082 083 private RetryCounter retryCounter; 084 085 private long reopenBatchBackoffMillis; 086 private int reopenBatchSize; 087 private int reopenBatchSizeMax; 088 private long regionsReopened = 0; 089 private long batchesProcessed = 0; 090 091 /** 092 * Create a new ReopenTableRegionsProcedure respecting the throttling configuration for the table. 093 * First check the table descriptor, then fall back to the global configuration. Only used in 094 * ModifyTableProcedure and in {@link HMaster#reopenRegionsThrottled}. 095 */ 096 public static ReopenTableRegionsProcedure throttled(final Configuration conf, 097 final TableDescriptor desc) { 098 long backoffMillis = Optional.ofNullable(desc.getValue(PROGRESSIVE_BATCH_BACKOFF_MILLIS_KEY)) 099 .map(Long::parseLong).orElseGet(() -> conf.getLong(PROGRESSIVE_BATCH_BACKOFF_MILLIS_KEY, 100 PROGRESSIVE_BATCH_BACKOFF_MILLIS_DEFAULT)); 101 int batchSizeMax = Optional.ofNullable(desc.getValue(PROGRESSIVE_BATCH_SIZE_MAX_KEY)) 102 .map(Integer::parseInt).orElseGet( 103 () -> conf.getInt(PROGRESSIVE_BATCH_SIZE_MAX_KEY, PROGRESSIVE_BATCH_SIZE_MAX_DISABLED)); 104 105 return new ReopenTableRegionsProcedure(desc.getTableName(), backoffMillis, batchSizeMax); 106 } 107 108 /** 109 * Create a new ReopenTableRegionsProcedure for specific regions, respecting the throttling 110 * configuration for the table. First check the table descriptor, then fall back to the global 111 * configuration. Only used in {@link HMaster#reopenRegionsThrottled}. 112 */ 113 public static ReopenTableRegionsProcedure throttled(final Configuration conf, 114 final TableDescriptor desc, final List<byte[]> regionNames) { 115 long backoffMillis = Optional.ofNullable(desc.getValue(PROGRESSIVE_BATCH_BACKOFF_MILLIS_KEY)) 116 .map(Long::parseLong).orElseGet(() -> conf.getLong(PROGRESSIVE_BATCH_BACKOFF_MILLIS_KEY, 117 PROGRESSIVE_BATCH_BACKOFF_MILLIS_DEFAULT)); 118 int batchSizeMax = Optional.ofNullable(desc.getValue(PROGRESSIVE_BATCH_SIZE_MAX_KEY)) 119 .map(Integer::parseInt).orElseGet( 120 () -> conf.getInt(PROGRESSIVE_BATCH_SIZE_MAX_KEY, PROGRESSIVE_BATCH_SIZE_MAX_DISABLED)); 121 122 return new ReopenTableRegionsProcedure(desc.getTableName(), regionNames, backoffMillis, 123 batchSizeMax); 124 } 125 126 public ReopenTableRegionsProcedure() { 127 this(null); 128 } 129 130 public ReopenTableRegionsProcedure(TableName tableName) { 131 this(tableName, Collections.emptyList()); 132 } 133 134 public ReopenTableRegionsProcedure(final TableName tableName, final List<byte[]> regionNames) { 135 this(tableName, regionNames, PROGRESSIVE_BATCH_BACKOFF_MILLIS_DEFAULT, 136 PROGRESSIVE_BATCH_SIZE_MAX_DISABLED); 137 } 138 139 /** 140 * Visible for testing purposes - prefer the above methods to construct 141 */ 142 public ReopenTableRegionsProcedure(final TableName tableName, long reopenBatchBackoffMillis, 143 int reopenBatchSizeMax) { 144 this(tableName, Collections.emptyList(), reopenBatchBackoffMillis, reopenBatchSizeMax); 145 } 146 147 /** 148 * Visible for testing purposes - prefer the above methods to construct 149 */ 150 public ReopenTableRegionsProcedure(final TableName tableName, final List<byte[]> regionNames, 151 long reopenBatchBackoffMillis, int reopenBatchSizeMax) { 152 this.tableName = tableName; 153 this.regionNames = regionNames; 154 this.reopenBatchBackoffMillis = reopenBatchBackoffMillis; 155 if (reopenBatchSizeMax == PROGRESSIVE_BATCH_SIZE_MAX_DISABLED) { 156 this.reopenBatchSize = Integer.MAX_VALUE; 157 this.reopenBatchSizeMax = Integer.MAX_VALUE; 158 } else { 159 this.reopenBatchSize = 1; 160 this.reopenBatchSizeMax = Math.max(reopenBatchSizeMax, MINIMUM_BATCH_SIZE_MAX); 161 } 162 } 163 164 @Override 165 public TableName getTableName() { 166 return tableName; 167 } 168 169 @Override 170 public TableOperationType getTableOperationType() { 171 return TableOperationType.REGION_EDIT; 172 } 173 174 @RestrictedApi(explanation = "Should only be called in tests", link = "", 175 allowedOnPath = ".*/src/test/.*") 176 public long getRegionsReopened() { 177 return regionsReopened; 178 } 179 180 @RestrictedApi(explanation = "Should only be called in tests", link = "", 181 allowedOnPath = ".*/src/test/.*") 182 public long getBatchesProcessed() { 183 return batchesProcessed; 184 } 185 186 @RestrictedApi(explanation = "Should only be called in tests", link = "", 187 allowedOnPath = ".*/src/test/.*") 188 long getReopenBatchBackoffMillis() { 189 return reopenBatchBackoffMillis; 190 } 191 192 @RestrictedApi(explanation = "Should only be called internally or in tests", link = "", 193 allowedOnPath = ".*(/src/test/.*|ReopenTableRegionsProcedure).java") 194 protected int progressBatchSize() { 195 int previousBatchSize = reopenBatchSize; 196 reopenBatchSize = Math.min(reopenBatchSizeMax, 2 * reopenBatchSize); 197 if (reopenBatchSize < previousBatchSize) { 198 // the batch size should never decrease. this must be overflow, so just use max 199 reopenBatchSize = reopenBatchSizeMax; 200 } 201 return reopenBatchSize; 202 } 203 204 private boolean canSchedule(MasterProcedureEnv env, HRegionLocation loc) { 205 if (loc.getSeqNum() < 0) { 206 return false; 207 } 208 RegionStateNode regionNode = 209 env.getAssignmentManager().getRegionStates().getRegionStateNode(loc.getRegion()); 210 // If the region node is null, then at least in the next round we can remove this region to make 211 // progress. And the second condition is a normal one, if there are no TRSP with it then we can 212 // schedule one to make progress. 213 return regionNode == null || !regionNode.isTransitionScheduled(); 214 } 215 216 @Override 217 protected Flow executeFromState(MasterProcedureEnv env, ReopenTableRegionsState state) 218 throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { 219 try { 220 switch (state) { 221 case REOPEN_TABLE_REGIONS_GET_REGIONS: 222 if (!isTableEnabled(env)) { 223 LOG.info("Table {} is disabled, give up reopening its regions", tableName); 224 return Flow.NO_MORE_STATE; 225 } 226 List<HRegionLocation> tableRegions = 227 env.getAssignmentManager().getRegionStates().getRegionsOfTableForReopen(tableName); 228 regions = getRegionLocationsForReopen(tableRegions); 229 setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_REGIONS); 230 return Flow.HAS_MORE_STATE; 231 case REOPEN_TABLE_REGIONS_REOPEN_REGIONS: 232 // if we didn't finish reopening the last batch yet, let's keep trying until we do. 233 // at that point, the batch will be empty and we can generate a new batch 234 if (!regions.isEmpty() && currentRegionBatch.isEmpty()) { 235 currentRegionBatch = 236 regions.stream().limit(reopenBatchSize).collect(Collectors.toList()); 237 batchesProcessed++; 238 } 239 for (HRegionLocation loc : currentRegionBatch) { 240 RegionStateNode regionNode = 241 env.getAssignmentManager().getRegionStates().getRegionStateNode(loc.getRegion()); 242 // this possible, maybe the region has already been merged or split, see HBASE-20921 243 if (regionNode == null) { 244 continue; 245 } 246 TransitRegionStateProcedure proc; 247 regionNode.lock(); 248 try { 249 if (regionNode.getProcedure() != null) { 250 continue; 251 } 252 proc = TransitRegionStateProcedure.reopen(env, regionNode.getRegionInfo()); 253 regionNode.setProcedure(proc); 254 } finally { 255 regionNode.unlock(); 256 } 257 addChildProcedure(proc); 258 regionsReopened++; 259 } 260 setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_CONFIRM_REOPENED); 261 return Flow.HAS_MORE_STATE; 262 case REOPEN_TABLE_REGIONS_CONFIRM_REOPENED: 263 // update region lists based on what's been reopened 264 regions = filterReopened(env, regions); 265 currentRegionBatch = filterReopened(env, currentRegionBatch); 266 267 // existing batch didn't fully reopen, so try to resolve that first. 268 // since this is a retry, don't do the batch backoff 269 if (!currentRegionBatch.isEmpty()) { 270 return reopenIfSchedulable(env, currentRegionBatch, false); 271 } 272 273 if (regions.isEmpty()) { 274 return Flow.NO_MORE_STATE; 275 } 276 277 // current batch is finished, schedule more regions 278 return reopenIfSchedulable(env, regions, true); 279 default: 280 throw new UnsupportedOperationException("unhandled state=" + state); 281 } 282 } catch (IOException e) { 283 if (isRollbackSupported(state) || e instanceof DoNotRetryIOException) { 284 setFailure("master-reopen-table-regions", e); 285 } else { 286 LOG.warn("Retriable error trying to reopen regions for table={} (in state={})", tableName, 287 state, e); 288 } 289 } 290 return Flow.HAS_MORE_STATE; 291 } 292 293 private List<HRegionLocation> filterReopened(MasterProcedureEnv env, 294 List<HRegionLocation> regionsToCheck) { 295 return regionsToCheck.stream().map(env.getAssignmentManager().getRegionStates()::checkReopened) 296 .filter(l -> l != null).collect(Collectors.toList()); 297 } 298 299 private Flow reopenIfSchedulable(MasterProcedureEnv env, List<HRegionLocation> regionsToReopen, 300 boolean shouldBatchBackoff) throws ProcedureSuspendedException { 301 if (regionsToReopen.stream().anyMatch(loc -> canSchedule(env, loc))) { 302 retryCounter = null; 303 setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_REGIONS); 304 if (shouldBatchBackoff) { 305 progressBatchSize(); 306 } 307 if (shouldBatchBackoff && reopenBatchBackoffMillis > 0) { 308 setBackoffState(reopenBatchBackoffMillis); 309 throw new ProcedureSuspendedException(); 310 } else { 311 return Flow.HAS_MORE_STATE; 312 } 313 } 314 315 // We can not schedule TRSP for all the regions need to reopen, wait for a while and retry 316 // again. 317 if (retryCounter == null) { 318 retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration()); 319 } 320 long backoffMillis = retryCounter.getBackoffTimeAndIncrementAttempts(); 321 LOG.info( 322 "There are still {} region(s) which need to be reopened for table {}. {} are in " 323 + "OPENING state, suspend {}secs and try again later", 324 regions.size(), tableName, currentRegionBatch.size(), backoffMillis / 1000); 325 setBackoffState(backoffMillis); 326 throw new ProcedureSuspendedException(); 327 } 328 329 private void setBackoffState(long millis) { 330 setTimeout(Math.toIntExact(millis)); 331 setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); 332 skipPersistence(); 333 } 334 335 private List<HRegionLocation> 336 getRegionLocationsForReopen(List<HRegionLocation> tableRegionsForReopen) throws IOException { 337 338 List<HRegionLocation> regionsToReopen = new ArrayList<>(); 339 if ( 340 CollectionUtils.isNotEmpty(regionNames) && CollectionUtils.isNotEmpty(tableRegionsForReopen) 341 ) { 342 List<byte[]> notFoundRegions = new ArrayList<>(); 343 344 for (byte[] regionName : regionNames) { 345 boolean found = false; 346 for (HRegionLocation hRegionLocation : tableRegionsForReopen) { 347 if (Bytes.equals(regionName, hRegionLocation.getRegion().getRegionName())) { 348 regionsToReopen.add(hRegionLocation); 349 found = true; 350 break; 351 } 352 } 353 if (!found) { 354 notFoundRegions.add(regionName); 355 } 356 } 357 358 if (!notFoundRegions.isEmpty()) { 359 String regionNamesStr = 360 notFoundRegions.stream().map(Bytes::toStringBinary).collect(Collectors.joining(", ")); 361 throw new UnknownRegionException( 362 "The following regions do not belong to table " + tableName + ": " + regionNamesStr); 363 } 364 } else { 365 regionsToReopen = tableRegionsForReopen; 366 } 367 return regionsToReopen; 368 } 369 370 /** 371 * At end of timeout, wake ourselves up so we run again. 372 */ 373 @Override 374 protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) { 375 setState(ProcedureProtos.ProcedureState.RUNNABLE); 376 env.getProcedureScheduler().addFront(this); 377 return false; // 'false' means that this procedure handled the timeout 378 } 379 380 @Override 381 protected void rollbackState(MasterProcedureEnv env, ReopenTableRegionsState state) 382 throws IOException, InterruptedException { 383 throw new UnsupportedOperationException("unhandled state=" + state); 384 } 385 386 @Override 387 protected ReopenTableRegionsState getState(int stateId) { 388 return ReopenTableRegionsState.forNumber(stateId); 389 } 390 391 @Override 392 protected int getStateId(ReopenTableRegionsState state) { 393 return state.getNumber(); 394 } 395 396 @Override 397 protected ReopenTableRegionsState getInitialState() { 398 return ReopenTableRegionsState.REOPEN_TABLE_REGIONS_GET_REGIONS; 399 } 400 401 @Override 402 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 403 super.serializeStateData(serializer); 404 ReopenTableRegionsStateData.Builder builder = ReopenTableRegionsStateData.newBuilder() 405 .setTableName(ProtobufUtil.toProtoTableName(tableName)); 406 regions.stream().map(ProtobufUtil::toRegionLocation).forEachOrdered(builder::addRegion); 407 if (CollectionUtils.isNotEmpty(regionNames)) { 408 // As of this writing, wrapping this statement withing if condition is only required 409 // for backward compatibility as we used to have 'regionNames' as null for cases 410 // where all regions of given table should be reopened. Now, we have kept emptyList() 411 // for 'regionNames' to indicate all regions of given table should be reopened unless 412 // 'regionNames' contains at least one specific region, in which case only list of regions 413 // that 'regionNames' contain should be reopened, not all regions of given table. 414 // Now, we don't need this check since we are not dealing with null 'regionNames' and hence, 415 // guarding by this if condition can be removed in HBase 4.0.0. 416 regionNames.stream().map(ByteString::copyFrom).forEachOrdered(builder::addRegionNames); 417 } 418 serializer.serialize(builder.build()); 419 } 420 421 @Override 422 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 423 super.deserializeStateData(serializer); 424 ReopenTableRegionsStateData data = serializer.deserialize(ReopenTableRegionsStateData.class); 425 tableName = ProtobufUtil.toTableName(data.getTableName()); 426 regions = data.getRegionList().stream().map(ProtobufUtil::toRegionLocation) 427 .collect(Collectors.toList()); 428 if (CollectionUtils.isNotEmpty(data.getRegionNamesList())) { 429 regionNames = data.getRegionNamesList().stream().map(ByteString::toByteArray) 430 .collect(Collectors.toList()); 431 } else { 432 regionNames = Collections.emptyList(); 433 } 434 } 435}