001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2.store.region; 019 020import static org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY; 021import static org.apache.hadoop.hbase.HConstants.NO_NONCE; 022import static org.apache.hadoop.hbase.master.region.MasterRegionFactory.PROC_FAMILY; 023 024import java.io.IOException; 025import java.io.UncheckedIOException; 026import java.util.ArrayList; 027import java.util.Arrays; 028import java.util.Collections; 029import java.util.HashMap; 030import java.util.List; 031import java.util.Map; 032import org.apache.commons.lang3.mutable.MutableLong; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.hbase.Cell; 037import org.apache.hadoop.hbase.HBaseIOException; 038import org.apache.hadoop.hbase.Server; 039import org.apache.hadoop.hbase.client.Delete; 040import org.apache.hadoop.hbase.client.Mutation; 041import org.apache.hadoop.hbase.client.Put; 042import org.apache.hadoop.hbase.client.Scan; 043import org.apache.hadoop.hbase.log.HBaseMarkers; 044import org.apache.hadoop.hbase.master.assignment.AssignProcedure; 045import org.apache.hadoop.hbase.master.assignment.MoveRegionProcedure; 046import org.apache.hadoop.hbase.master.assignment.UnassignProcedure; 047import org.apache.hadoop.hbase.master.procedure.RecoverMetaProcedure; 048import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; 049import org.apache.hadoop.hbase.master.region.MasterRegion; 050import org.apache.hadoop.hbase.procedure2.Procedure; 051import org.apache.hadoop.hbase.procedure2.ProcedureUtil; 052import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery; 053import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase; 054import org.apache.hadoop.hbase.procedure2.store.ProcedureTree; 055import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; 056import org.apache.hadoop.hbase.regionserver.RegionScanner; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.apache.hadoop.hbase.util.CommonFSUtils; 059import org.apache.yetus.audience.InterfaceAudience; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; 064 065import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 066 067/** 068 * A procedure store which uses the master local store to store all the procedures. 069 * <p/> 070 * We use proc:d column to store the serialized protobuf format procedure, and when deleting we will 071 * first fill the proc:d column with an empty byte array, and then actually delete them in the 072 * {@link #cleanup()} method. This is because that we need to retain the max procedure id, so we can 073 * not directly delete a procedure row as we do not know if it is the one with the max procedure id. 074 */ 075@InterfaceAudience.Private 076public class RegionProcedureStore extends ProcedureStoreBase { 077 078 private static final Logger LOG = LoggerFactory.getLogger(RegionProcedureStore.class); 079 080 static final byte[] PROC_QUALIFIER = Bytes.toBytes("d"); 081 082 private final Server server; 083 084 private final LeaseRecovery leaseRecovery; 085 086 final MasterRegion region; 087 088 private int numThreads; 089 090 public RegionProcedureStore(Server server, MasterRegion region, LeaseRecovery leaseRecovery) { 091 this.server = server; 092 this.region = region; 093 this.leaseRecovery = leaseRecovery; 094 } 095 096 @Override 097 public void start(int numThreads) throws IOException { 098 if (!setRunning(true)) { 099 return; 100 } 101 LOG.info("Starting the Region Procedure Store, number threads={}", numThreads); 102 this.numThreads = numThreads; 103 } 104 105 @Override 106 public void stop(boolean abort) { 107 if (!setRunning(false)) { 108 return; 109 } 110 LOG.info("Stopping the Region Procedure Store, isAbort={}", abort); 111 } 112 113 @Override 114 public int getNumThreads() { 115 return numThreads; 116 } 117 118 @Override 119 public int setRunningProcedureCount(int count) { 120 // useless for region based storage. 121 return count; 122 } 123 124 @SuppressWarnings("deprecation") 125 private static final ImmutableSet<Class<?>> UNSUPPORTED_PROCEDURES = 126 ImmutableSet.of(RecoverMetaProcedure.class, AssignProcedure.class, UnassignProcedure.class, 127 MoveRegionProcedure.class); 128 129 /** 130 * In HBASE-20811, we have introduced a new TRSP to assign/unassign/move regions, and it is 131 * incompatible with the old AssignProcedure/UnassignProcedure/MoveRegionProcedure. So we need to 132 * make sure that there are none these procedures when upgrading. If there are, the master will 133 * quit, you need to go back to the old version to finish these procedures first before upgrading. 134 */ 135 private void checkUnsupportedProcedure(Map<Class<?>, List<Procedure<?>>> procsByType) 136 throws HBaseIOException { 137 // Confirm that we do not have unfinished assign/unassign related procedures. It is not easy to 138 // support both the old assign/unassign procedures and the new TransitRegionStateProcedure as 139 // there will be conflict in the code for AM. We should finish all these procedures before 140 // upgrading. 141 for (Class<?> clazz : UNSUPPORTED_PROCEDURES) { 142 List<Procedure<?>> procs = procsByType.get(clazz); 143 if (procs != null) { 144 LOG.error("Unsupported procedure type {} found, please rollback your master to the old" 145 + " version to finish them, and then try to upgrade again." 146 + " See https://hbase.apache.org/book.html#upgrade2.2 for more details." 147 + " The full procedure list: {}", clazz, procs); 148 throw new HBaseIOException("Unsupported procedure type " + clazz + " found"); 149 } 150 } 151 // A special check for SCP, as we do not support RecoverMetaProcedure any more so we need to 152 // make sure that no one will try to schedule it but SCP does have a state which will schedule 153 // it. 154 if ( 155 procsByType.getOrDefault(ServerCrashProcedure.class, Collections.emptyList()).stream() 156 .map(p -> (ServerCrashProcedure) p).anyMatch(ServerCrashProcedure::isInRecoverMetaState) 157 ) { 158 LOG.error("At least one ServerCrashProcedure is going to schedule a RecoverMetaProcedure," 159 + " which is not supported any more. Please rollback your master to the old version to" 160 + " finish them, and then try to upgrade again." 161 + " See https://hbase.apache.org/book.html#upgrade2.2 for more details."); 162 throw new HBaseIOException("Unsupported procedure state found for ServerCrashProcedure"); 163 } 164 } 165 166 @SuppressWarnings("deprecation") 167 private void tryMigrate(FileSystem fs) throws IOException { 168 Configuration conf = server.getConfiguration(); 169 Path procWALDir = 170 new Path(CommonFSUtils.getWALRootDir(conf), WALProcedureStore.MASTER_PROCEDURE_LOGDIR); 171 if (!fs.exists(procWALDir)) { 172 return; 173 } 174 LOG.info("The old WALProcedureStore wal directory {} exists, migrating...", procWALDir); 175 WALProcedureStore store = new WALProcedureStore(conf, leaseRecovery); 176 store.start(numThreads); 177 store.recoverLease(); 178 MutableLong maxProcIdSet = new MutableLong(-1); 179 List<Procedure<?>> procs = new ArrayList<>(); 180 Map<Class<?>, List<Procedure<?>>> activeProcsByType = new HashMap<>(); 181 store.load(new ProcedureLoader() { 182 183 @Override 184 public void setMaxProcId(long maxProcId) { 185 maxProcIdSet.setValue(maxProcId); 186 } 187 188 @Override 189 public void load(ProcedureIterator procIter) throws IOException { 190 while (procIter.hasNext()) { 191 Procedure<?> proc = procIter.next(); 192 procs.add(proc); 193 if (!proc.isFinished()) { 194 activeProcsByType.computeIfAbsent(proc.getClass(), k -> new ArrayList<>()).add(proc); 195 } 196 } 197 } 198 199 @Override 200 public void handleCorrupted(ProcedureIterator procIter) throws IOException { 201 long corruptedCount = 0; 202 while (procIter.hasNext()) { 203 LOG.error("Corrupted procedure {}", procIter.next()); 204 corruptedCount++; 205 } 206 if (corruptedCount > 0) { 207 throw new IOException("There are " + corruptedCount + " corrupted procedures when" 208 + " migrating from the old WAL based store to the new region based store, please" 209 + " fix them before upgrading again."); 210 } 211 } 212 }); 213 214 // check whether there are unsupported procedures, this could happen when we are migrating from 215 // 2.1-. We used to do this in HMaster, after loading all the procedures from procedure store, 216 // but here we have to do it before migrating, otherwise, if we find some unsupported 217 // procedures, the users can not go back to 2.1 to finish them any more, as all the data are now 218 // in the new region based procedure store, which is not supported in 2.1-. 219 checkUnsupportedProcedure(activeProcsByType); 220 221 MutableLong maxProcIdFromProcs = new MutableLong(-1); 222 for (Procedure<?> proc : procs) { 223 update(proc); 224 if (proc.getProcId() > maxProcIdFromProcs.longValue()) { 225 maxProcIdFromProcs.setValue(proc.getProcId()); 226 } 227 } 228 LOG.info("Migrated {} existing procedures from the old storage format.", procs.size()); 229 LOG.info("The WALProcedureStore max pid is {}, and the max pid of all loaded procedures is {}", 230 maxProcIdSet.longValue(), maxProcIdFromProcs.longValue()); 231 // Theoretically, the maxProcIdSet should be greater than or equal to maxProcIdFromProcs, but 232 // anyway, let's do a check here. 233 if (maxProcIdSet.longValue() > maxProcIdFromProcs.longValue()) { 234 if (maxProcIdSet.longValue() > 0) { 235 // let's add a fake row to retain the max proc id 236 region.update(r -> r.put(new Put(Bytes.toBytes(maxProcIdSet.longValue())) 237 .addColumn(PROC_FAMILY, PROC_QUALIFIER, EMPTY_BYTE_ARRAY))); 238 } 239 } else if (maxProcIdSet.longValue() < maxProcIdFromProcs.longValue()) { 240 LOG.warn("The WALProcedureStore max pid is less than the max pid of all loaded procedures"); 241 } 242 store.stop(false); 243 if (!fs.delete(procWALDir, true)) { 244 throw new IOException( 245 "Failed to delete the WALProcedureStore migrated proc wal directory " + procWALDir); 246 } 247 LOG.info("Migration of WALProcedureStore finished"); 248 } 249 250 @Override 251 public void recoverLease() throws IOException { 252 LOG.info("Starting Region Procedure Store lease recovery..."); 253 FileSystem fs = CommonFSUtils.getWALFileSystem(server.getConfiguration()); 254 tryMigrate(fs); 255 } 256 257 @Override 258 public void load(ProcedureLoader loader) throws IOException { 259 List<ProcedureProtos.Procedure> procs = new ArrayList<>(); 260 long maxProcId = 0; 261 262 try (RegionScanner scanner = 263 region.getRegionScanner(new Scan().addColumn(PROC_FAMILY, PROC_QUALIFIER))) { 264 List<Cell> cells = new ArrayList<>(); 265 boolean moreRows; 266 do { 267 moreRows = scanner.next(cells); 268 if (cells.isEmpty()) { 269 continue; 270 } 271 Cell cell = cells.get(0); 272 cells.clear(); 273 maxProcId = Math.max(maxProcId, 274 Bytes.toLong(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); 275 if (cell.getValueLength() > 0) { 276 ProcedureProtos.Procedure proto = ProcedureProtos.Procedure.parser() 277 .parseFrom(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); 278 procs.add(proto); 279 } 280 } while (moreRows); 281 } 282 loader.setMaxProcId(maxProcId); 283 ProcedureTree tree = ProcedureTree.build(procs); 284 loader.load(tree.getValidProcs()); 285 loader.handleCorrupted(tree.getCorruptedProcs()); 286 } 287 288 private void serializePut(Procedure<?> proc, List<Mutation> mutations, List<byte[]> rowsToLock) 289 throws IOException { 290 ProcedureProtos.Procedure proto = ProcedureUtil.convertToProtoProcedure(proc); 291 byte[] row = Bytes.toBytes(proc.getProcId()); 292 mutations.add(new Put(row).addColumn(PROC_FAMILY, PROC_QUALIFIER, proto.toByteArray())); 293 rowsToLock.add(row); 294 } 295 296 // As we need to keep the max procedure id, here we can not simply delete the procedure, just fill 297 // the proc column with an empty array. 298 private void serializeDelete(long procId, List<Mutation> mutations, List<byte[]> rowsToLock) { 299 byte[] row = Bytes.toBytes(procId); 300 mutations.add(new Put(row).addColumn(PROC_FAMILY, PROC_QUALIFIER, EMPTY_BYTE_ARRAY)); 301 rowsToLock.add(row); 302 } 303 304 @Override 305 public void insert(Procedure<?> proc, Procedure<?>[] subProcs) { 306 if (subProcs == null || subProcs.length == 0) { 307 // same with update, just insert a single procedure 308 update(proc); 309 return; 310 } 311 List<Mutation> mutations = new ArrayList<>(subProcs.length + 1); 312 List<byte[]> rowsToLock = new ArrayList<>(subProcs.length + 1); 313 try { 314 serializePut(proc, mutations, rowsToLock); 315 for (Procedure<?> subProc : subProcs) { 316 serializePut(subProc, mutations, rowsToLock); 317 } 318 region.update(r -> r.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE)); 319 } catch (IOException e) { 320 LOG.error(HBaseMarkers.FATAL, "Failed to insert proc {}, sub procs {}", proc, 321 Arrays.toString(subProcs), e); 322 throw new UncheckedIOException(e); 323 } 324 } 325 326 @Override 327 public void insert(Procedure<?>[] procs) { 328 List<Mutation> mutations = new ArrayList<>(procs.length); 329 List<byte[]> rowsToLock = new ArrayList<>(procs.length); 330 try { 331 for (Procedure<?> proc : procs) { 332 serializePut(proc, mutations, rowsToLock); 333 } 334 region.update(r -> r.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE)); 335 } catch (IOException e) { 336 LOG.error(HBaseMarkers.FATAL, "Failed to insert procs {}", Arrays.toString(procs), e); 337 throw new UncheckedIOException(e); 338 } 339 } 340 341 @Override 342 public void update(Procedure<?> proc) { 343 try { 344 ProcedureProtos.Procedure proto = ProcedureUtil.convertToProtoProcedure(proc); 345 region.update(r -> r.put(new Put(Bytes.toBytes(proc.getProcId())).addColumn(PROC_FAMILY, 346 PROC_QUALIFIER, proto.toByteArray()))); 347 } catch (IOException e) { 348 LOG.error(HBaseMarkers.FATAL, "Failed to update proc {}", proc, e); 349 throw new UncheckedIOException(e); 350 } 351 } 352 353 @Override 354 public void delete(long procId) { 355 try { 356 region.update(r -> r.put( 357 new Put(Bytes.toBytes(procId)).addColumn(PROC_FAMILY, PROC_QUALIFIER, EMPTY_BYTE_ARRAY))); 358 } catch (IOException e) { 359 LOG.error(HBaseMarkers.FATAL, "Failed to delete pid={}", procId, e); 360 throw new UncheckedIOException(e); 361 } 362 } 363 364 @Override 365 public void delete(Procedure<?> parentProc, long[] subProcIds) { 366 List<Mutation> mutations = new ArrayList<>(subProcIds.length + 1); 367 List<byte[]> rowsToLock = new ArrayList<>(subProcIds.length + 1); 368 try { 369 serializePut(parentProc, mutations, rowsToLock); 370 for (long subProcId : subProcIds) { 371 serializeDelete(subProcId, mutations, rowsToLock); 372 } 373 region.update(r -> r.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE)); 374 } catch (IOException e) { 375 LOG.error(HBaseMarkers.FATAL, "Failed to delete parent proc {}, sub pids={}", parentProc, 376 Arrays.toString(subProcIds), e); 377 throw new UncheckedIOException(e); 378 } 379 } 380 381 @Override 382 public void delete(long[] procIds, int offset, int count) { 383 if (count == 0) { 384 return; 385 } 386 if (count == 1) { 387 delete(procIds[offset]); 388 return; 389 } 390 List<Mutation> mutations = new ArrayList<>(count); 391 List<byte[]> rowsToLock = new ArrayList<>(count); 392 for (int i = 0; i < count; i++) { 393 long procId = procIds[offset + i]; 394 serializeDelete(procId, mutations, rowsToLock); 395 } 396 try { 397 region.update(r -> r.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE)); 398 } catch (IOException e) { 399 LOG.error(HBaseMarkers.FATAL, "Failed to delete pids={}", Arrays.toString(procIds), e); 400 throw new UncheckedIOException(e); 401 } 402 } 403 404 @Override 405 public void cleanup() { 406 // actually delete the procedures if it is not the one with the max procedure id. 407 List<Cell> cells = new ArrayList<Cell>(); 408 try (RegionScanner scanner = region 409 .getRegionScanner(new Scan().addColumn(PROC_FAMILY, PROC_QUALIFIER).setReversed(true))) { 410 // skip the row with max procedure id 411 boolean moreRows = scanner.next(cells); 412 if (cells.isEmpty()) { 413 return; 414 } 415 cells.clear(); 416 while (moreRows) { 417 moreRows = scanner.next(cells); 418 if (cells.isEmpty()) { 419 continue; 420 } 421 Cell cell = cells.get(0); 422 cells.clear(); 423 if (cell.getValueLength() == 0) { 424 region.update( 425 r -> r.delete(new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) 426 .addFamily(PROC_FAMILY))); 427 } 428 } 429 } catch (IOException e) { 430 LOG.warn("Failed to clean up delete procedures", e); 431 } 432 } 433}