001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2.store.region;
019
020import static org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY;
021import static org.apache.hadoop.hbase.HConstants.HREGION_LOGDIR_NAME;
022import static org.apache.hadoop.hbase.HConstants.NO_NONCE;
023
024import java.io.IOException;
025import java.io.UncheckedIOException;
026import java.util.ArrayList;
027import java.util.Arrays;
028import java.util.Collections;
029import java.util.HashMap;
030import java.util.List;
031import java.util.Map;
032import java.util.Optional;
033
034import org.apache.commons.lang3.mutable.MutableLong;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.FileStatus;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.Cell;
040import org.apache.hadoop.hbase.HBaseIOException;
041import org.apache.hadoop.hbase.Server;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
044import org.apache.hadoop.hbase.client.Delete;
045import org.apache.hadoop.hbase.client.Mutation;
046import org.apache.hadoop.hbase.client.Put;
047import org.apache.hadoop.hbase.client.RegionInfo;
048import org.apache.hadoop.hbase.client.RegionInfoBuilder;
049import org.apache.hadoop.hbase.client.Scan;
050import org.apache.hadoop.hbase.client.TableDescriptor;
051import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
052import org.apache.hadoop.hbase.ipc.RpcCall;
053import org.apache.hadoop.hbase.ipc.RpcServer;
054import org.apache.hadoop.hbase.log.HBaseMarkers;
055import org.apache.hadoop.hbase.master.HMaster;
056import org.apache.hadoop.hbase.master.assignment.AssignProcedure;
057import org.apache.hadoop.hbase.master.assignment.MoveRegionProcedure;
058import org.apache.hadoop.hbase.master.assignment.UnassignProcedure;
059import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
060import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
061import org.apache.hadoop.hbase.master.procedure.RecoverMetaProcedure;
062import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
063import org.apache.hadoop.hbase.procedure2.Procedure;
064import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
065import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
066import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase;
067import org.apache.hadoop.hbase.procedure2.store.ProcedureTree;
068import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
069import org.apache.hadoop.hbase.regionserver.HRegion;
070import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
071import org.apache.hadoop.hbase.regionserver.RegionScanner;
072import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
073import org.apache.hadoop.hbase.util.Bytes;
074import org.apache.hadoop.hbase.util.CommonFSUtils;
075import org.apache.hadoop.hbase.util.HFileArchiveUtil;
076import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
077import org.apache.hadoop.hbase.wal.WAL;
078import org.apache.hadoop.hbase.wal.WALFactory;
079import org.apache.yetus.audience.InterfaceAudience;
080import org.slf4j.Logger;
081import org.slf4j.LoggerFactory;
082
083import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
084import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
085import org.apache.hbase.thirdparty.com.google.common.math.IntMath;
086
087import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
088
089/**
090 * A procedure store which uses a region to store all the procedures.
091 * <p/>
092 * FileSystem layout:
093 *
094 * <pre>
095 * hbase
096 *   |
097 *   --MasterProcs
098 *       |
099 *       --data
100 *       |  |
101 *       |  --/master/procedure/&lt;encoded-region-name&gt; <---- The region data
102 *       |      |
103 *       |      --replay <---- The edits to replay
104 *       |
105 *       --WALs
106 *          |
107 *          --&lt;master-server-name&gt; <---- The WAL dir for active master
108 *          |
109 *          --&lt;master-server-name&gt;-dead <---- The WAL dir dead master
110 * </pre>
111 *
112 * We use p:d column to store the serialized protobuf format procedure, and when deleting we will
113 * first fill the info:proc column with an empty byte array, and then actually delete them in the
114 * {@link #cleanup()} method. This is because that we need to retain the max procedure id, so we can
115 * not directly delete a procedure row as we do not know if it is the one with the max procedure id.
116 */
117@InterfaceAudience.Private
118public class RegionProcedureStore extends ProcedureStoreBase {
119
120  private static final Logger LOG = LoggerFactory.getLogger(RegionProcedureStore.class);
121
122  static final String MAX_WALS_KEY = "hbase.procedure.store.region.maxwals";
123
124  private static final int DEFAULT_MAX_WALS = 10;
125
126  static final String USE_HSYNC_KEY = "hbase.procedure.store.region.wal.hsync";
127
128  static final String MASTER_PROCEDURE_DIR = "MasterProcs";
129
130  static final String HFILECLEANER_PLUGINS = "hbase.procedure.store.region.hfilecleaner.plugins";
131
132  private static final String REPLAY_EDITS_DIR = "recovered.wals";
133
134  private static final String DEAD_WAL_DIR_SUFFIX = "-dead";
135
136  static final TableName TABLE_NAME = TableName.valueOf("master:procedure");
137
138  static final byte[] FAMILY = Bytes.toBytes("p");
139
140  static final byte[] PROC_QUALIFIER = Bytes.toBytes("d");
141
142  private static final int REGION_ID = 1;
143
144  private static final TableDescriptor TABLE_DESC = TableDescriptorBuilder.newBuilder(TABLE_NAME)
145    .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build();
146
147  private final Server server;
148
149  private final DirScanPool cleanerPool;
150
151  private final LeaseRecovery leaseRecovery;
152
153  // Used to delete the compacted hfiles. Since we put all data on WAL filesystem, it is not
154  // possible to move the compacted hfiles to the global hfile archive directory, we have to do it
155  // by ourselves.
156  private HFileCleaner cleaner;
157
158  private WALFactory walFactory;
159
160  @VisibleForTesting
161  HRegion region;
162
163  @VisibleForTesting
164  RegionFlusherAndCompactor flusherAndCompactor;
165
166  @VisibleForTesting
167  RegionProcedureStoreWALRoller walRoller;
168
169  private int numThreads;
170
171  public RegionProcedureStore(Server server, DirScanPool cleanerPool, LeaseRecovery leaseRecovery) {
172    this.server = server;
173    this.cleanerPool = cleanerPool;
174    this.leaseRecovery = leaseRecovery;
175  }
176
177  @Override
178  public void start(int numThreads) throws IOException {
179    if (!setRunning(true)) {
180      return;
181    }
182    LOG.info("Starting the Region Procedure Store, number threads={}", numThreads);
183    this.numThreads = numThreads;
184  }
185
186  private void shutdownWAL() {
187    if (walFactory != null) {
188      try {
189        walFactory.shutdown();
190      } catch (IOException e) {
191        LOG.warn("Failed to shutdown WAL", e);
192      }
193    }
194  }
195
196  private void closeRegion(boolean abort) {
197    if (region != null) {
198      try {
199        region.close(abort);
200      } catch (IOException e) {
201        LOG.warn("Failed to close region", e);
202      }
203    }
204
205  }
206
207  @Override
208  public void stop(boolean abort) {
209    if (!setRunning(false)) {
210      return;
211    }
212    LOG.info("Stopping the Region Procedure Store, isAbort={}", abort);
213    if (cleaner != null) {
214      cleaner.cancel(abort);
215    }
216    if (flusherAndCompactor != null) {
217      flusherAndCompactor.close();
218    }
219    // if abort, we shutdown wal first to fail the ongoing updates to the region, and then close the
220    // region, otherwise there will be dead lock.
221    if (abort) {
222      shutdownWAL();
223      closeRegion(true);
224    } else {
225      closeRegion(false);
226      shutdownWAL();
227    }
228
229    if (walRoller != null) {
230      walRoller.close();
231    }
232  }
233
234  @Override
235  public int getNumThreads() {
236    return numThreads;
237  }
238
239  @Override
240  public int setRunningProcedureCount(int count) {
241    // useless for region based storage.
242    return count;
243  }
244
245  private WAL createWAL(FileSystem fs, Path rootDir, RegionInfo regionInfo) throws IOException {
246    String logName = AbstractFSWALProvider.getWALDirectoryName(server.getServerName().toString());
247    Path walDir = new Path(rootDir, logName);
248    LOG.debug("WALDir={}", walDir);
249    if (fs.exists(walDir)) {
250      throw new HBaseIOException(
251        "Master procedure store has already created directory at " + walDir);
252    }
253    if (!fs.mkdirs(walDir)) {
254      throw new IOException("Can not create master procedure wal directory " + walDir);
255    }
256    WAL wal = walFactory.getWAL(regionInfo);
257    walRoller.addWAL(wal);
258    return wal;
259  }
260
261  private HRegion bootstrap(Configuration conf, FileSystem fs, Path rootDir) throws IOException {
262    RegionInfo regionInfo = RegionInfoBuilder.newBuilder(TABLE_NAME).setRegionId(REGION_ID).build();
263    Path tmpTableDir = CommonFSUtils.getTableDir(rootDir, TableName
264      .valueOf(TABLE_NAME.getNamespaceAsString(), TABLE_NAME.getQualifierAsString() + "-tmp"));
265    if (fs.exists(tmpTableDir) && !fs.delete(tmpTableDir, true)) {
266      throw new IOException("Can not delete partial created proc region " + tmpTableDir);
267    }
268    HRegion.createHRegion(conf, regionInfo, fs, tmpTableDir, TABLE_DESC).close();
269    Path tableDir = CommonFSUtils.getTableDir(rootDir, TABLE_NAME);
270    if (!fs.rename(tmpTableDir, tableDir)) {
271      throw new IOException("Can not rename " + tmpTableDir + " to " + tableDir);
272    }
273    WAL wal = createWAL(fs, rootDir, regionInfo);
274    return HRegion.openHRegionFromTableDir(conf, fs, tableDir, regionInfo, TABLE_DESC, wal, null,
275      null);
276  }
277
278  private HRegion open(Configuration conf, FileSystem fs, Path rootDir) throws IOException {
279    String factoryId = server.getServerName().toString();
280    Path tableDir = CommonFSUtils.getTableDir(rootDir, TABLE_NAME);
281    Path regionDir =
282      fs.listStatus(tableDir, p -> RegionInfo.isEncodedRegionName(Bytes.toBytes(p.getName())))[0]
283        .getPath();
284    Path replayEditsDir = new Path(regionDir, REPLAY_EDITS_DIR);
285    if (!fs.exists(replayEditsDir) && !fs.mkdirs(replayEditsDir)) {
286      throw new IOException("Failed to create replay directory: " + replayEditsDir);
287    }
288    Path walsDir = new Path(rootDir, HREGION_LOGDIR_NAME);
289    for (FileStatus walDir : fs.listStatus(walsDir)) {
290      if (!walDir.isDirectory()) {
291        continue;
292      }
293      if (walDir.getPath().getName().startsWith(factoryId)) {
294        LOG.warn("This should not happen in real production as we have not created our WAL " +
295          "directory yet, ignore if you are running a procedure related UT");
296      }
297      Path deadWALDir;
298      if (!walDir.getPath().getName().endsWith(DEAD_WAL_DIR_SUFFIX)) {
299        deadWALDir =
300          new Path(walDir.getPath().getParent(), walDir.getPath().getName() + DEAD_WAL_DIR_SUFFIX);
301        if (!fs.rename(walDir.getPath(), deadWALDir)) {
302          throw new IOException("Can not rename " + walDir + " to " + deadWALDir +
303            " when recovering lease of proc store");
304        }
305        LOG.info("Renamed {} to {} as it is dead", walDir.getPath(), deadWALDir);
306      } else {
307        deadWALDir = walDir.getPath();
308        LOG.info("{} is already marked as dead", deadWALDir);
309      }
310      for (FileStatus walFile : fs.listStatus(deadWALDir)) {
311        Path replayEditsFile = new Path(replayEditsDir, walFile.getPath().getName());
312        leaseRecovery.recoverFileLease(fs, walFile.getPath());
313        if (!fs.rename(walFile.getPath(), replayEditsFile)) {
314          throw new IOException("Can not rename " + walFile.getPath() + " to " + replayEditsFile +
315            " when recovering lease of proc store");
316        }
317        LOG.info("Renamed {} to {}", walFile.getPath(), replayEditsFile);
318      }
319      LOG.info("Delete empty proc wal dir {}", deadWALDir);
320      fs.delete(deadWALDir, true);
321    }
322    RegionInfo regionInfo = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
323    WAL wal = createWAL(fs, rootDir, regionInfo);
324    conf.set(HRegion.SPECIAL_RECOVERED_EDITS_DIR,
325      replayEditsDir.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString());
326    return HRegion.openHRegionFromTableDir(conf, fs, tableDir, regionInfo, TABLE_DESC, wal, null,
327      null);
328  }
329
330  @SuppressWarnings("deprecation")
331  private static final ImmutableSet<Class<?>> UNSUPPORTED_PROCEDURES =
332    ImmutableSet.of(RecoverMetaProcedure.class, AssignProcedure.class, UnassignProcedure.class,
333      MoveRegionProcedure.class);
334
335  /**
336   * In HBASE-20811, we have introduced a new TRSP to assign/unassign/move regions, and it is
337   * incompatible with the old AssignProcedure/UnassignProcedure/MoveRegionProcedure. So we need to
338   * make sure that there are none these procedures when upgrading. If there are, the master will
339   * quit, you need to go back to the old version to finish these procedures first before upgrading.
340   */
341  private void checkUnsupportedProcedure(Map<Class<?>, List<Procedure<?>>> procsByType)
342    throws HBaseIOException {
343    // Confirm that we do not have unfinished assign/unassign related procedures. It is not easy to
344    // support both the old assign/unassign procedures and the new TransitRegionStateProcedure as
345    // there will be conflict in the code for AM. We should finish all these procedures before
346    // upgrading.
347    for (Class<?> clazz : UNSUPPORTED_PROCEDURES) {
348      List<Procedure<?>> procs = procsByType.get(clazz);
349      if (procs != null) {
350        LOG.error("Unsupported procedure type {} found, please rollback your master to the old" +
351          " version to finish them, and then try to upgrade again." +
352          " See https://hbase.apache.org/book.html#upgrade2.2 for more details." +
353          " The full procedure list: {}", clazz, procs);
354        throw new HBaseIOException("Unsupported procedure type " + clazz + " found");
355      }
356    }
357    // A special check for SCP, as we do not support RecoverMetaProcedure any more so we need to
358    // make sure that no one will try to schedule it but SCP does have a state which will schedule
359    // it.
360    if (procsByType.getOrDefault(ServerCrashProcedure.class, Collections.emptyList()).stream()
361      .map(p -> (ServerCrashProcedure) p).anyMatch(ServerCrashProcedure::isInRecoverMetaState)) {
362      LOG.error("At least one ServerCrashProcedure is going to schedule a RecoverMetaProcedure," +
363        " which is not supported any more. Please rollback your master to the old version to" +
364        " finish them, and then try to upgrade again." +
365        " See https://hbase.apache.org/book.html#upgrade2.2 for more details.");
366      throw new HBaseIOException("Unsupported procedure state found for ServerCrashProcedure");
367    }
368  }
369
370  @SuppressWarnings("deprecation")
371  private void tryMigrate(FileSystem fs) throws IOException {
372    Configuration conf = server.getConfiguration();
373    Path procWALDir =
374      new Path(CommonFSUtils.getWALRootDir(conf), WALProcedureStore.MASTER_PROCEDURE_LOGDIR);
375    if (!fs.exists(procWALDir)) {
376      return;
377    }
378    LOG.info("The old WALProcedureStore wal directory {} exists, migrating...", procWALDir);
379    WALProcedureStore store = new WALProcedureStore(conf, leaseRecovery);
380    store.start(numThreads);
381    store.recoverLease();
382    MutableLong maxProcIdSet = new MutableLong(-1);
383    List<Procedure<?>> procs = new ArrayList<>();
384    Map<Class<?>, List<Procedure<?>>> activeProcsByType = new HashMap<>();
385    store.load(new ProcedureLoader() {
386
387      @Override
388      public void setMaxProcId(long maxProcId) {
389        maxProcIdSet.setValue(maxProcId);
390      }
391
392      @Override
393      public void load(ProcedureIterator procIter) throws IOException {
394        while (procIter.hasNext()) {
395          Procedure<?> proc = procIter.next();
396          procs.add(proc);
397          if (!proc.isFinished()) {
398            activeProcsByType.computeIfAbsent(proc.getClass(), k -> new ArrayList<>()).add(proc);
399          }
400        }
401      }
402
403      @Override
404      public void handleCorrupted(ProcedureIterator procIter) throws IOException {
405        long corruptedCount = 0;
406        while (procIter.hasNext()) {
407          LOG.error("Corrupted procedure {}", procIter.next());
408          corruptedCount++;
409        }
410        if (corruptedCount > 0) {
411          throw new IOException("There are " + corruptedCount + " corrupted procedures when" +
412            " migrating from the old WAL based store to the new region based store, please" +
413            " fix them before upgrading again.");
414        }
415      }
416    });
417
418    // check whether there are unsupported procedures, this could happen when we are migrating from
419    // 2.1-. We used to do this in HMaster, after loading all the procedures from procedure store,
420    // but here we have to do it before migrating, otherwise, if we find some unsupported
421    // procedures, the users can not go back to 2.1 to finish them any more, as all the data are now
422    // in the new region based procedure store, which is not supported in 2.1-.
423    checkUnsupportedProcedure(activeProcsByType);
424
425    MutableLong maxProcIdFromProcs = new MutableLong(-1);
426    for (Procedure<?> proc : procs) {
427      update(proc);
428      if (proc.getProcId() > maxProcIdFromProcs.longValue()) {
429        maxProcIdFromProcs.setValue(proc.getProcId());
430      }
431    }
432    LOG.info("Migrated {} existing procedures from the old storage format.", procs.size());
433    LOG.info("The WALProcedureStore max pid is {}, and the max pid of all loaded procedures is {}",
434      maxProcIdSet.longValue(), maxProcIdFromProcs.longValue());
435    // Theoretically, the maxProcIdSet should be greater than or equal to maxProcIdFromProcs, but
436    // anyway, let's do a check here.
437    if (maxProcIdSet.longValue() > maxProcIdFromProcs.longValue()) {
438      if (maxProcIdSet.longValue() > 0) {
439        // let's add a fake row to retain the max proc id
440        region.put(new Put(Bytes.toBytes(maxProcIdSet.longValue())).addColumn(FAMILY,
441          PROC_QUALIFIER, EMPTY_BYTE_ARRAY));
442      }
443    } else if (maxProcIdSet.longValue() < maxProcIdFromProcs.longValue()) {
444      LOG.warn("The WALProcedureStore max pid is less than the max pid of all loaded procedures");
445    }
446    store.stop(false);
447    if (!fs.delete(procWALDir, true)) {
448      throw new IOException(
449        "Failed to delete the WALProcedureStore migrated proc wal directory " + procWALDir);
450    }
451    LOG.info("Migration of WALProcedureStore finished");
452  }
453
454  @Override
455  public void recoverLease() throws IOException {
456    LOG.debug("Starting Region Procedure Store lease recovery...");
457    Configuration baseConf = server.getConfiguration();
458    FileSystem fs = CommonFSUtils.getWALFileSystem(baseConf);
459    Path globalWALRootDir = CommonFSUtils.getWALRootDir(baseConf);
460    Path rootDir = new Path(globalWALRootDir, MASTER_PROCEDURE_DIR);
461    // we will override some configurations so create a new one.
462    Configuration conf = new Configuration(baseConf);
463    CommonFSUtils.setRootDir(conf, rootDir);
464    CommonFSUtils.setWALRootDir(conf, rootDir);
465    RegionFlusherAndCompactor.setupConf(conf);
466    conf.setInt(AbstractFSWAL.MAX_LOGS, conf.getInt(MAX_WALS_KEY, DEFAULT_MAX_WALS));
467    if (conf.get(USE_HSYNC_KEY) != null) {
468      conf.set(HRegion.WAL_HSYNC_CONF_KEY, conf.get(USE_HSYNC_KEY));
469    }
470    conf.setInt(AbstractFSWAL.RING_BUFFER_SLOT_COUNT, IntMath.ceilingPowerOfTwo(16 * numThreads));
471
472    walRoller = RegionProcedureStoreWALRoller.create(conf, server, fs, rootDir, globalWALRootDir);
473    walRoller.start();
474
475    walFactory = new WALFactory(conf, server.getServerName().toString(), false);
476    Path tableDir = CommonFSUtils.getTableDir(rootDir, TABLE_NAME);
477    if (fs.exists(tableDir)) {
478      // load the existing region.
479      region = open(conf, fs, rootDir);
480    } else {
481      // bootstrapping...
482      region = bootstrap(conf, fs, rootDir);
483    }
484    flusherAndCompactor = new RegionFlusherAndCompactor(conf, server, region);
485    walRoller.setFlusherAndCompactor(flusherAndCompactor);
486    int cleanerInterval = conf.getInt(HMaster.HBASE_MASTER_CLEANER_INTERVAL,
487      HMaster.DEFAULT_HBASE_MASTER_CLEANER_INTERVAL);
488    Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
489    if (!fs.mkdirs(archiveDir)) {
490      LOG.warn("Failed to create archive directory {}. Usually this should not happen but it will" +
491        " be created again when we actually archive the hfiles later, so continue", archiveDir);
492    }
493    cleaner = new HFileCleaner("RegionProcedureStoreHFileCleaner", cleanerInterval, server, conf,
494      fs, archiveDir, HFILECLEANER_PLUGINS, cleanerPool, Collections.emptyMap());
495    server.getChoreService().scheduleChore(cleaner);
496    tryMigrate(fs);
497  }
498
499  @Override
500  public void load(ProcedureLoader loader) throws IOException {
501    List<ProcedureProtos.Procedure> procs = new ArrayList<>();
502    long maxProcId = 0;
503
504    try (RegionScanner scanner = region.getScanner(new Scan().addColumn(FAMILY, PROC_QUALIFIER))) {
505      List<Cell> cells = new ArrayList<>();
506      boolean moreRows;
507      do {
508        moreRows = scanner.next(cells);
509        if (cells.isEmpty()) {
510          continue;
511        }
512        Cell cell = cells.get(0);
513        cells.clear();
514        maxProcId = Math.max(maxProcId,
515          Bytes.toLong(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
516        if (cell.getValueLength() > 0) {
517          ProcedureProtos.Procedure proto = ProcedureProtos.Procedure.parser()
518            .parseFrom(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
519          procs.add(proto);
520        }
521      } while (moreRows);
522    }
523    loader.setMaxProcId(maxProcId);
524    ProcedureTree tree = ProcedureTree.build(procs);
525    loader.load(tree.getValidProcs());
526    loader.handleCorrupted(tree.getCorruptedProcs());
527  }
528
529  private void serializePut(Procedure<?> proc, List<Mutation> mutations, List<byte[]> rowsToLock)
530    throws IOException {
531    ProcedureProtos.Procedure proto = ProcedureUtil.convertToProtoProcedure(proc);
532    byte[] row = Bytes.toBytes(proc.getProcId());
533    mutations.add(new Put(row).addColumn(FAMILY, PROC_QUALIFIER, proto.toByteArray()));
534    rowsToLock.add(row);
535  }
536
537  // As we need to keep the max procedure id, here we can not simply delete the procedure, just fill
538  // the proc column with an empty array.
539  private void serializeDelete(long procId, List<Mutation> mutations, List<byte[]> rowsToLock) {
540    byte[] row = Bytes.toBytes(procId);
541    mutations.add(new Put(row).addColumn(FAMILY, PROC_QUALIFIER, EMPTY_BYTE_ARRAY));
542    rowsToLock.add(row);
543  }
544
545  /**
546   * Insert procedure may be called by master's rpc call. There are some check about the rpc call
547   * when mutate region. Here unset the current rpc call and set it back in finally block. See
548   * HBASE-23895 for more details.
549   */
550  private void runWithoutRpcCall(Runnable runnable) {
551    Optional<RpcCall> rpcCall = RpcServer.unsetCurrentCall();
552    try {
553      runnable.run();
554    } finally {
555      rpcCall.ifPresent(RpcServer::setCurrentCall);
556    }
557  }
558
559  @Override
560  public void insert(Procedure<?> proc, Procedure<?>[] subProcs) {
561    if (subProcs == null || subProcs.length == 0) {
562      // same with update, just insert a single procedure
563      update(proc);
564      return;
565    }
566    List<Mutation> mutations = new ArrayList<>(subProcs.length + 1);
567    List<byte[]> rowsToLock = new ArrayList<>(subProcs.length + 1);
568    runWithoutRpcCall(() -> {
569      try {
570        serializePut(proc, mutations, rowsToLock);
571        for (Procedure<?> subProc : subProcs) {
572          serializePut(subProc, mutations, rowsToLock);
573        }
574        region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE);
575      } catch (IOException e) {
576        LOG.error(HBaseMarkers.FATAL, "Failed to insert proc {}, sub procs {}", proc,
577          Arrays.toString(subProcs), e);
578        throw new UncheckedIOException(e);
579      }
580    });
581    flusherAndCompactor.onUpdate();
582  }
583
584  @Override
585  public void insert(Procedure<?>[] procs) {
586    List<Mutation> mutations = new ArrayList<>(procs.length);
587    List<byte[]> rowsToLock = new ArrayList<>(procs.length);
588    runWithoutRpcCall(() -> {
589      try {
590        for (Procedure<?> proc : procs) {
591          serializePut(proc, mutations, rowsToLock);
592        }
593        region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE);
594      } catch (IOException e) {
595        LOG.error(HBaseMarkers.FATAL, "Failed to insert procs {}", Arrays.toString(procs), e);
596        throw new UncheckedIOException(e);
597      }
598    });
599    flusherAndCompactor.onUpdate();
600  }
601
602  @Override
603  public void update(Procedure<?> proc) {
604    runWithoutRpcCall(() -> {
605      try {
606        ProcedureProtos.Procedure proto = ProcedureUtil.convertToProtoProcedure(proc);
607        region.put(new Put(Bytes.toBytes(proc.getProcId())).addColumn(FAMILY, PROC_QUALIFIER,
608          proto.toByteArray()));
609      } catch (IOException e) {
610        LOG.error(HBaseMarkers.FATAL, "Failed to update proc {}", proc, e);
611        throw new UncheckedIOException(e);
612      }
613    });
614    flusherAndCompactor.onUpdate();
615  }
616
617  @Override
618  public void delete(long procId) {
619    try {
620      region
621        .put(new Put(Bytes.toBytes(procId)).addColumn(FAMILY, PROC_QUALIFIER, EMPTY_BYTE_ARRAY));
622    } catch (IOException e) {
623      LOG.error(HBaseMarkers.FATAL, "Failed to delete pid={}", procId, e);
624      throw new UncheckedIOException(e);
625    }
626    flusherAndCompactor.onUpdate();
627  }
628
629  @Override
630  public void delete(Procedure<?> parentProc, long[] subProcIds) {
631    List<Mutation> mutations = new ArrayList<>(subProcIds.length + 1);
632    List<byte[]> rowsToLock = new ArrayList<>(subProcIds.length + 1);
633    try {
634      serializePut(parentProc, mutations, rowsToLock);
635      for (long subProcId : subProcIds) {
636        serializeDelete(subProcId, mutations, rowsToLock);
637      }
638      region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE);
639    } catch (IOException e) {
640      LOG.error(HBaseMarkers.FATAL, "Failed to delete parent proc {}, sub pids={}", parentProc,
641        Arrays.toString(subProcIds), e);
642      throw new UncheckedIOException(e);
643    }
644    flusherAndCompactor.onUpdate();
645  }
646
647  @Override
648  public void delete(long[] procIds, int offset, int count) {
649    if (count == 0) {
650      return;
651    }
652    if (count == 1) {
653      delete(procIds[offset]);
654      return;
655    }
656    List<Mutation> mutations = new ArrayList<>(count);
657    List<byte[]> rowsToLock = new ArrayList<>(count);
658    for (int i = 0; i < count; i++) {
659      long procId = procIds[offset + i];
660      serializeDelete(procId, mutations, rowsToLock);
661    }
662    try {
663      region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE);
664    } catch (IOException e) {
665      LOG.error(HBaseMarkers.FATAL, "Failed to delete pids={}", Arrays.toString(procIds), e);
666      throw new UncheckedIOException(e);
667    }
668    flusherAndCompactor.onUpdate();
669  }
670
671  @Override
672  public void cleanup() {
673    // actually delete the procedures if it is not the one with the max procedure id.
674    List<Cell> cells = new ArrayList<Cell>();
675    try (RegionScanner scanner =
676      region.getScanner(new Scan().addColumn(FAMILY, PROC_QUALIFIER).setReversed(true))) {
677      // skip the row with max procedure id
678      boolean moreRows = scanner.next(cells);
679      if (cells.isEmpty()) {
680        return;
681      }
682      cells.clear();
683      while (moreRows) {
684        moreRows = scanner.next(cells);
685        if (cells.isEmpty()) {
686          continue;
687        }
688        Cell cell = cells.get(0);
689        cells.clear();
690        if (cell.getValueLength() == 0) {
691          region.delete(new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
692        }
693      }
694    } catch (IOException e) {
695      LOG.warn("Failed to clean up delete procedures", e);
696    }
697  }
698}