001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2.store.region; 019 020import static org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY; 021import static org.apache.hadoop.hbase.HConstants.NO_NONCE; 022import static org.apache.hadoop.hbase.master.region.MasterRegionFactory.PROC_FAMILY; 023 024import java.io.IOException; 025import java.io.UncheckedIOException; 026import java.util.ArrayList; 027import java.util.Arrays; 028import java.util.Collections; 029import java.util.HashMap; 030import java.util.List; 031import java.util.Map; 032import java.util.Optional; 033import org.apache.commons.lang3.mutable.MutableLong; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.Cell; 038import org.apache.hadoop.hbase.HBaseIOException; 039import org.apache.hadoop.hbase.Server; 040import org.apache.hadoop.hbase.client.Delete; 041import org.apache.hadoop.hbase.client.Mutation; 042import org.apache.hadoop.hbase.client.Put; 043import org.apache.hadoop.hbase.client.Scan; 044import org.apache.hadoop.hbase.ipc.RpcCall; 045import org.apache.hadoop.hbase.ipc.RpcServer; 046import org.apache.hadoop.hbase.log.HBaseMarkers; 047import org.apache.hadoop.hbase.master.assignment.AssignProcedure; 048import org.apache.hadoop.hbase.master.assignment.MoveRegionProcedure; 049import org.apache.hadoop.hbase.master.assignment.UnassignProcedure; 050import org.apache.hadoop.hbase.master.procedure.RecoverMetaProcedure; 051import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; 052import org.apache.hadoop.hbase.master.region.MasterRegion; 053import org.apache.hadoop.hbase.procedure2.Procedure; 054import org.apache.hadoop.hbase.procedure2.ProcedureUtil; 055import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery; 056import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase; 057import org.apache.hadoop.hbase.procedure2.store.ProcedureTree; 058import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; 059import org.apache.hadoop.hbase.regionserver.RegionScanner; 060import org.apache.hadoop.hbase.util.Bytes; 061import org.apache.hadoop.hbase.util.CommonFSUtils; 062import org.apache.yetus.audience.InterfaceAudience; 063import org.slf4j.Logger; 064import org.slf4j.LoggerFactory; 065 066import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; 067 068import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 069 070/** 071 * A procedure store which uses the master local store to store all the procedures. 072 * <p/> 073 * We use proc:d column to store the serialized protobuf format procedure, and when deleting we will 074 * first fill the info:proc column with an empty byte array, and then actually delete them in the 075 * {@link #cleanup()} method. This is because that we need to retain the max procedure id, so we can 076 * not directly delete a procedure row as we do not know if it is the one with the max procedure id. 077 */ 078@InterfaceAudience.Private 079public class RegionProcedureStore extends ProcedureStoreBase { 080 081 private static final Logger LOG = LoggerFactory.getLogger(RegionProcedureStore.class); 082 083 static final byte[] PROC_QUALIFIER = Bytes.toBytes("d"); 084 085 private final Server server; 086 087 private final LeaseRecovery leaseRecovery; 088 089 final MasterRegion region; 090 091 private int numThreads; 092 093 public RegionProcedureStore(Server server, MasterRegion region, LeaseRecovery leaseRecovery) { 094 this.server = server; 095 this.region = region; 096 this.leaseRecovery = leaseRecovery; 097 } 098 099 @Override 100 public void start(int numThreads) throws IOException { 101 if (!setRunning(true)) { 102 return; 103 } 104 LOG.info("Starting the Region Procedure Store, number threads={}", numThreads); 105 this.numThreads = numThreads; 106 } 107 108 @Override 109 public void stop(boolean abort) { 110 if (!setRunning(false)) { 111 return; 112 } 113 LOG.info("Stopping the Region Procedure Store, isAbort={}", abort); 114 } 115 116 @Override 117 public int getNumThreads() { 118 return numThreads; 119 } 120 121 @Override 122 public int setRunningProcedureCount(int count) { 123 // useless for region based storage. 124 return count; 125 } 126 127 @SuppressWarnings("deprecation") 128 private static final ImmutableSet<Class<?>> UNSUPPORTED_PROCEDURES = 129 ImmutableSet.of(RecoverMetaProcedure.class, AssignProcedure.class, UnassignProcedure.class, 130 MoveRegionProcedure.class); 131 132 /** 133 * In HBASE-20811, we have introduced a new TRSP to assign/unassign/move regions, and it is 134 * incompatible with the old AssignProcedure/UnassignProcedure/MoveRegionProcedure. So we need to 135 * make sure that there are none these procedures when upgrading. If there are, the master will 136 * quit, you need to go back to the old version to finish these procedures first before upgrading. 137 */ 138 private void checkUnsupportedProcedure(Map<Class<?>, List<Procedure<?>>> procsByType) 139 throws HBaseIOException { 140 // Confirm that we do not have unfinished assign/unassign related procedures. It is not easy to 141 // support both the old assign/unassign procedures and the new TransitRegionStateProcedure as 142 // there will be conflict in the code for AM. We should finish all these procedures before 143 // upgrading. 144 for (Class<?> clazz : UNSUPPORTED_PROCEDURES) { 145 List<Procedure<?>> procs = procsByType.get(clazz); 146 if (procs != null) { 147 LOG.error("Unsupported procedure type {} found, please rollback your master to the old" 148 + " version to finish them, and then try to upgrade again." 149 + " See https://hbase.apache.org/book.html#upgrade2.2 for more details." 150 + " The full procedure list: {}", clazz, procs); 151 throw new HBaseIOException("Unsupported procedure type " + clazz + " found"); 152 } 153 } 154 // A special check for SCP, as we do not support RecoverMetaProcedure any more so we need to 155 // make sure that no one will try to schedule it but SCP does have a state which will schedule 156 // it. 157 if ( 158 procsByType.getOrDefault(ServerCrashProcedure.class, Collections.emptyList()).stream() 159 .map(p -> (ServerCrashProcedure) p).anyMatch(ServerCrashProcedure::isInRecoverMetaState) 160 ) { 161 LOG.error("At least one ServerCrashProcedure is going to schedule a RecoverMetaProcedure," 162 + " which is not supported any more. Please rollback your master to the old version to" 163 + " finish them, and then try to upgrade again." 164 + " See https://hbase.apache.org/book.html#upgrade2.2 for more details."); 165 throw new HBaseIOException("Unsupported procedure state found for ServerCrashProcedure"); 166 } 167 } 168 169 @SuppressWarnings("deprecation") 170 private void tryMigrate(FileSystem fs) throws IOException { 171 Configuration conf = server.getConfiguration(); 172 Path procWALDir = 173 new Path(CommonFSUtils.getWALRootDir(conf), WALProcedureStore.MASTER_PROCEDURE_LOGDIR); 174 if (!fs.exists(procWALDir)) { 175 return; 176 } 177 LOG.info("The old WALProcedureStore wal directory {} exists, migrating...", procWALDir); 178 WALProcedureStore store = new WALProcedureStore(conf, leaseRecovery); 179 store.start(numThreads); 180 store.recoverLease(); 181 MutableLong maxProcIdSet = new MutableLong(-1); 182 List<Procedure<?>> procs = new ArrayList<>(); 183 Map<Class<?>, List<Procedure<?>>> activeProcsByType = new HashMap<>(); 184 store.load(new ProcedureLoader() { 185 186 @Override 187 public void setMaxProcId(long maxProcId) { 188 maxProcIdSet.setValue(maxProcId); 189 } 190 191 @Override 192 public void load(ProcedureIterator procIter) throws IOException { 193 while (procIter.hasNext()) { 194 Procedure<?> proc = procIter.next(); 195 procs.add(proc); 196 if (!proc.isFinished()) { 197 activeProcsByType.computeIfAbsent(proc.getClass(), k -> new ArrayList<>()).add(proc); 198 } 199 } 200 } 201 202 @Override 203 public void handleCorrupted(ProcedureIterator procIter) throws IOException { 204 long corruptedCount = 0; 205 while (procIter.hasNext()) { 206 LOG.error("Corrupted procedure {}", procIter.next()); 207 corruptedCount++; 208 } 209 if (corruptedCount > 0) { 210 throw new IOException("There are " + corruptedCount + " corrupted procedures when" 211 + " migrating from the old WAL based store to the new region based store, please" 212 + " fix them before upgrading again."); 213 } 214 } 215 }); 216 217 // check whether there are unsupported procedures, this could happen when we are migrating from 218 // 2.1-. We used to do this in HMaster, after loading all the procedures from procedure store, 219 // but here we have to do it before migrating, otherwise, if we find some unsupported 220 // procedures, the users can not go back to 2.1 to finish them any more, as all the data are now 221 // in the new region based procedure store, which is not supported in 2.1-. 222 checkUnsupportedProcedure(activeProcsByType); 223 224 MutableLong maxProcIdFromProcs = new MutableLong(-1); 225 for (Procedure<?> proc : procs) { 226 update(proc); 227 if (proc.getProcId() > maxProcIdFromProcs.longValue()) { 228 maxProcIdFromProcs.setValue(proc.getProcId()); 229 } 230 } 231 LOG.info("Migrated {} existing procedures from the old storage format.", procs.size()); 232 LOG.info("The WALProcedureStore max pid is {}, and the max pid of all loaded procedures is {}", 233 maxProcIdSet.longValue(), maxProcIdFromProcs.longValue()); 234 // Theoretically, the maxProcIdSet should be greater than or equal to maxProcIdFromProcs, but 235 // anyway, let's do a check here. 236 if (maxProcIdSet.longValue() > maxProcIdFromProcs.longValue()) { 237 if (maxProcIdSet.longValue() > 0) { 238 // let's add a fake row to retain the max proc id 239 region.update(r -> r.put(new Put(Bytes.toBytes(maxProcIdSet.longValue())) 240 .addColumn(PROC_FAMILY, PROC_QUALIFIER, EMPTY_BYTE_ARRAY))); 241 } 242 } else if (maxProcIdSet.longValue() < maxProcIdFromProcs.longValue()) { 243 LOG.warn("The WALProcedureStore max pid is less than the max pid of all loaded procedures"); 244 } 245 store.stop(false); 246 if (!fs.delete(procWALDir, true)) { 247 throw new IOException( 248 "Failed to delete the WALProcedureStore migrated proc wal directory " + procWALDir); 249 } 250 LOG.info("Migration of WALProcedureStore finished"); 251 } 252 253 @Override 254 public void recoverLease() throws IOException { 255 LOG.info("Starting Region Procedure Store lease recovery..."); 256 FileSystem fs = CommonFSUtils.getWALFileSystem(server.getConfiguration()); 257 tryMigrate(fs); 258 } 259 260 @Override 261 public void load(ProcedureLoader loader) throws IOException { 262 List<ProcedureProtos.Procedure> procs = new ArrayList<>(); 263 long maxProcId = 0; 264 265 try (RegionScanner scanner = 266 region.getRegionScanner(new Scan().addColumn(PROC_FAMILY, PROC_QUALIFIER))) { 267 List<Cell> cells = new ArrayList<>(); 268 boolean moreRows; 269 do { 270 moreRows = scanner.next(cells); 271 if (cells.isEmpty()) { 272 continue; 273 } 274 Cell cell = cells.get(0); 275 cells.clear(); 276 maxProcId = Math.max(maxProcId, 277 Bytes.toLong(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); 278 if (cell.getValueLength() > 0) { 279 ProcedureProtos.Procedure proto = ProcedureProtos.Procedure.parser() 280 .parseFrom(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); 281 procs.add(proto); 282 } 283 } while (moreRows); 284 } 285 loader.setMaxProcId(maxProcId); 286 ProcedureTree tree = ProcedureTree.build(procs); 287 loader.load(tree.getValidProcs()); 288 loader.handleCorrupted(tree.getCorruptedProcs()); 289 } 290 291 private void serializePut(Procedure<?> proc, List<Mutation> mutations, List<byte[]> rowsToLock) 292 throws IOException { 293 ProcedureProtos.Procedure proto = ProcedureUtil.convertToProtoProcedure(proc); 294 byte[] row = Bytes.toBytes(proc.getProcId()); 295 mutations.add(new Put(row).addColumn(PROC_FAMILY, PROC_QUALIFIER, proto.toByteArray())); 296 rowsToLock.add(row); 297 } 298 299 // As we need to keep the max procedure id, here we can not simply delete the procedure, just fill 300 // the proc column with an empty array. 301 private void serializeDelete(long procId, List<Mutation> mutations, List<byte[]> rowsToLock) { 302 byte[] row = Bytes.toBytes(procId); 303 mutations.add(new Put(row).addColumn(PROC_FAMILY, PROC_QUALIFIER, EMPTY_BYTE_ARRAY)); 304 rowsToLock.add(row); 305 } 306 307 /** 308 * Insert procedure may be called by master's rpc call. There are some check about the rpc call 309 * when mutate region. Here unset the current rpc call and set it back in finally block. See 310 * HBASE-23895 for more details. 311 */ 312 private void runWithoutRpcCall(Runnable runnable) { 313 Optional<RpcCall> rpcCall = RpcServer.unsetCurrentCall(); 314 try { 315 runnable.run(); 316 } finally { 317 rpcCall.ifPresent(RpcServer::setCurrentCall); 318 } 319 } 320 321 @Override 322 public void insert(Procedure<?> proc, Procedure<?>[] subProcs) { 323 if (subProcs == null || subProcs.length == 0) { 324 // same with update, just insert a single procedure 325 update(proc); 326 return; 327 } 328 List<Mutation> mutations = new ArrayList<>(subProcs.length + 1); 329 List<byte[]> rowsToLock = new ArrayList<>(subProcs.length + 1); 330 runWithoutRpcCall(() -> { 331 try { 332 serializePut(proc, mutations, rowsToLock); 333 for (Procedure<?> subProc : subProcs) { 334 serializePut(subProc, mutations, rowsToLock); 335 } 336 region.update(r -> r.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE)); 337 } catch (IOException e) { 338 LOG.error(HBaseMarkers.FATAL, "Failed to insert proc {}, sub procs {}", proc, 339 Arrays.toString(subProcs), e); 340 throw new UncheckedIOException(e); 341 } 342 }); 343 } 344 345 @Override 346 public void insert(Procedure<?>[] procs) { 347 List<Mutation> mutations = new ArrayList<>(procs.length); 348 List<byte[]> rowsToLock = new ArrayList<>(procs.length); 349 runWithoutRpcCall(() -> { 350 try { 351 for (Procedure<?> proc : procs) { 352 serializePut(proc, mutations, rowsToLock); 353 } 354 region.update(r -> r.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE)); 355 } catch (IOException e) { 356 LOG.error(HBaseMarkers.FATAL, "Failed to insert procs {}", Arrays.toString(procs), e); 357 throw new UncheckedIOException(e); 358 } 359 }); 360 } 361 362 @Override 363 public void update(Procedure<?> proc) { 364 runWithoutRpcCall(() -> { 365 try { 366 ProcedureProtos.Procedure proto = ProcedureUtil.convertToProtoProcedure(proc); 367 region.update(r -> r.put(new Put(Bytes.toBytes(proc.getProcId())).addColumn(PROC_FAMILY, 368 PROC_QUALIFIER, proto.toByteArray()))); 369 } catch (IOException e) { 370 LOG.error(HBaseMarkers.FATAL, "Failed to update proc {}", proc, e); 371 throw new UncheckedIOException(e); 372 } 373 }); 374 } 375 376 @Override 377 public void delete(long procId) { 378 try { 379 region.update(r -> r.put( 380 new Put(Bytes.toBytes(procId)).addColumn(PROC_FAMILY, PROC_QUALIFIER, EMPTY_BYTE_ARRAY))); 381 } catch (IOException e) { 382 LOG.error(HBaseMarkers.FATAL, "Failed to delete pid={}", procId, e); 383 throw new UncheckedIOException(e); 384 } 385 } 386 387 @Override 388 public void delete(Procedure<?> parentProc, long[] subProcIds) { 389 List<Mutation> mutations = new ArrayList<>(subProcIds.length + 1); 390 List<byte[]> rowsToLock = new ArrayList<>(subProcIds.length + 1); 391 try { 392 serializePut(parentProc, mutations, rowsToLock); 393 for (long subProcId : subProcIds) { 394 serializeDelete(subProcId, mutations, rowsToLock); 395 } 396 region.update(r -> r.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE)); 397 } catch (IOException e) { 398 LOG.error(HBaseMarkers.FATAL, "Failed to delete parent proc {}, sub pids={}", parentProc, 399 Arrays.toString(subProcIds), e); 400 throw new UncheckedIOException(e); 401 } 402 } 403 404 @Override 405 public void delete(long[] procIds, int offset, int count) { 406 if (count == 0) { 407 return; 408 } 409 if (count == 1) { 410 delete(procIds[offset]); 411 return; 412 } 413 List<Mutation> mutations = new ArrayList<>(count); 414 List<byte[]> rowsToLock = new ArrayList<>(count); 415 for (int i = 0; i < count; i++) { 416 long procId = procIds[offset + i]; 417 serializeDelete(procId, mutations, rowsToLock); 418 } 419 try { 420 region.update(r -> r.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE)); 421 } catch (IOException e) { 422 LOG.error(HBaseMarkers.FATAL, "Failed to delete pids={}", Arrays.toString(procIds), e); 423 throw new UncheckedIOException(e); 424 } 425 } 426 427 @Override 428 public void cleanup() { 429 // actually delete the procedures if it is not the one with the max procedure id. 430 List<Cell> cells = new ArrayList<Cell>(); 431 try (RegionScanner scanner = region 432 .getRegionScanner(new Scan().addColumn(PROC_FAMILY, PROC_QUALIFIER).setReversed(true))) { 433 // skip the row with max procedure id 434 boolean moreRows = scanner.next(cells); 435 if (cells.isEmpty()) { 436 return; 437 } 438 cells.clear(); 439 while (moreRows) { 440 moreRows = scanner.next(cells); 441 if (cells.isEmpty()) { 442 continue; 443 } 444 Cell cell = cells.get(0); 445 cells.clear(); 446 if (cell.getValueLength() == 0) { 447 region.update( 448 r -> r.delete(new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) 449 .addFamily(PROC_FAMILY))); 450 } 451 } 452 } catch (IOException e) { 453 LOG.warn("Failed to clean up delete procedures", e); 454 } 455 } 456}