001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2.store.region;
019
020import static org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY;
021import static org.apache.hadoop.hbase.HConstants.NO_NONCE;
022import static org.apache.hadoop.hbase.master.region.MasterRegionFactory.PROC_FAMILY;
023
024import java.io.IOException;
025import java.io.UncheckedIOException;
026import java.util.ArrayList;
027import java.util.Arrays;
028import java.util.Collections;
029import java.util.HashMap;
030import java.util.List;
031import java.util.Map;
032import java.util.Optional;
033import org.apache.commons.lang3.mutable.MutableLong;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.HBaseIOException;
039import org.apache.hadoop.hbase.Server;
040import org.apache.hadoop.hbase.client.Delete;
041import org.apache.hadoop.hbase.client.Mutation;
042import org.apache.hadoop.hbase.client.Put;
043import org.apache.hadoop.hbase.client.Scan;
044import org.apache.hadoop.hbase.ipc.RpcCall;
045import org.apache.hadoop.hbase.ipc.RpcServer;
046import org.apache.hadoop.hbase.log.HBaseMarkers;
047import org.apache.hadoop.hbase.master.assignment.AssignProcedure;
048import org.apache.hadoop.hbase.master.assignment.MoveRegionProcedure;
049import org.apache.hadoop.hbase.master.assignment.UnassignProcedure;
050import org.apache.hadoop.hbase.master.procedure.RecoverMetaProcedure;
051import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
052import org.apache.hadoop.hbase.master.region.MasterRegion;
053import org.apache.hadoop.hbase.procedure2.Procedure;
054import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
055import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
056import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase;
057import org.apache.hadoop.hbase.procedure2.store.ProcedureTree;
058import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
059import org.apache.hadoop.hbase.regionserver.RegionScanner;
060import org.apache.hadoop.hbase.util.Bytes;
061import org.apache.hadoop.hbase.util.CommonFSUtils;
062import org.apache.yetus.audience.InterfaceAudience;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
067import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
068
069import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
070
071/**
072 * A procedure store which uses the master local store to store all the procedures.
073 * <p/>
074 * We use proc:d column to store the serialized protobuf format procedure, and when deleting we will
075 * first fill the info:proc column with an empty byte array, and then actually delete them in the
076 * {@link #cleanup()} method. This is because that we need to retain the max procedure id, so we can
077 * not directly delete a procedure row as we do not know if it is the one with the max procedure id.
078 */
079@InterfaceAudience.Private
080public class RegionProcedureStore extends ProcedureStoreBase {
081
082  private static final Logger LOG = LoggerFactory.getLogger(RegionProcedureStore.class);
083
084  static final byte[] PROC_QUALIFIER = Bytes.toBytes("d");
085
086  private final Server server;
087
088  private final LeaseRecovery leaseRecovery;
089
090  @VisibleForTesting
091  final MasterRegion region;
092
093  private int numThreads;
094
095  public RegionProcedureStore(Server server, MasterRegion region, LeaseRecovery leaseRecovery) {
096    this.server = server;
097    this.region = region;
098    this.leaseRecovery = leaseRecovery;
099  }
100
101  @Override
102  public void start(int numThreads) throws IOException {
103    if (!setRunning(true)) {
104      return;
105    }
106    LOG.info("Starting the Region Procedure Store, number threads={}", numThreads);
107    this.numThreads = numThreads;
108  }
109
110  @Override
111  public void stop(boolean abort) {
112    if (!setRunning(false)) {
113      return;
114    }
115    LOG.info("Stopping the Region Procedure Store, isAbort={}", abort);
116  }
117
118  @Override
119  public int getNumThreads() {
120    return numThreads;
121  }
122
123  @Override
124  public int setRunningProcedureCount(int count) {
125    // useless for region based storage.
126    return count;
127  }
128
129  @SuppressWarnings("deprecation")
130  private static final ImmutableSet<Class<?>> UNSUPPORTED_PROCEDURES =
131    ImmutableSet.of(RecoverMetaProcedure.class, AssignProcedure.class, UnassignProcedure.class,
132      MoveRegionProcedure.class);
133
134  /**
135   * In HBASE-20811, we have introduced a new TRSP to assign/unassign/move regions, and it is
136   * incompatible with the old AssignProcedure/UnassignProcedure/MoveRegionProcedure. So we need to
137   * make sure that there are none these procedures when upgrading. If there are, the master will
138   * quit, you need to go back to the old version to finish these procedures first before upgrading.
139   */
140  private void checkUnsupportedProcedure(Map<Class<?>, List<Procedure<?>>> procsByType)
141    throws HBaseIOException {
142    // Confirm that we do not have unfinished assign/unassign related procedures. It is not easy to
143    // support both the old assign/unassign procedures and the new TransitRegionStateProcedure as
144    // there will be conflict in the code for AM. We should finish all these procedures before
145    // upgrading.
146    for (Class<?> clazz : UNSUPPORTED_PROCEDURES) {
147      List<Procedure<?>> procs = procsByType.get(clazz);
148      if (procs != null) {
149        LOG.error("Unsupported procedure type {} found, please rollback your master to the old" +
150          " version to finish them, and then try to upgrade again." +
151          " See https://hbase.apache.org/book.html#upgrade2.2 for more details." +
152          " The full procedure list: {}", clazz, procs);
153        throw new HBaseIOException("Unsupported procedure type " + clazz + " found");
154      }
155    }
156    // A special check for SCP, as we do not support RecoverMetaProcedure any more so we need to
157    // make sure that no one will try to schedule it but SCP does have a state which will schedule
158    // it.
159    if (procsByType.getOrDefault(ServerCrashProcedure.class, Collections.emptyList()).stream()
160      .map(p -> (ServerCrashProcedure) p).anyMatch(ServerCrashProcedure::isInRecoverMetaState)) {
161      LOG.error("At least one ServerCrashProcedure is going to schedule a RecoverMetaProcedure," +
162        " which is not supported any more. Please rollback your master to the old version to" +
163        " finish them, and then try to upgrade again." +
164        " See https://hbase.apache.org/book.html#upgrade2.2 for more details.");
165      throw new HBaseIOException("Unsupported procedure state found for ServerCrashProcedure");
166    }
167  }
168
169  @SuppressWarnings("deprecation")
170  private void tryMigrate(FileSystem fs) throws IOException {
171    Configuration conf = server.getConfiguration();
172    Path procWALDir =
173      new Path(CommonFSUtils.getWALRootDir(conf), WALProcedureStore.MASTER_PROCEDURE_LOGDIR);
174    if (!fs.exists(procWALDir)) {
175      return;
176    }
177    LOG.info("The old WALProcedureStore wal directory {} exists, migrating...", procWALDir);
178    WALProcedureStore store = new WALProcedureStore(conf, leaseRecovery);
179    store.start(numThreads);
180    store.recoverLease();
181    MutableLong maxProcIdSet = new MutableLong(-1);
182    List<Procedure<?>> procs = new ArrayList<>();
183    Map<Class<?>, List<Procedure<?>>> activeProcsByType = new HashMap<>();
184    store.load(new ProcedureLoader() {
185
186      @Override
187      public void setMaxProcId(long maxProcId) {
188        maxProcIdSet.setValue(maxProcId);
189      }
190
191      @Override
192      public void load(ProcedureIterator procIter) throws IOException {
193        while (procIter.hasNext()) {
194          Procedure<?> proc = procIter.next();
195          procs.add(proc);
196          if (!proc.isFinished()) {
197            activeProcsByType.computeIfAbsent(proc.getClass(), k -> new ArrayList<>()).add(proc);
198          }
199        }
200      }
201
202      @Override
203      public void handleCorrupted(ProcedureIterator procIter) throws IOException {
204        long corruptedCount = 0;
205        while (procIter.hasNext()) {
206          LOG.error("Corrupted procedure {}", procIter.next());
207          corruptedCount++;
208        }
209        if (corruptedCount > 0) {
210          throw new IOException("There are " + corruptedCount + " corrupted procedures when" +
211            " migrating from the old WAL based store to the new region based store, please" +
212            " fix them before upgrading again.");
213        }
214      }
215    });
216
217    // check whether there are unsupported procedures, this could happen when we are migrating from
218    // 2.1-. We used to do this in HMaster, after loading all the procedures from procedure store,
219    // but here we have to do it before migrating, otherwise, if we find some unsupported
220    // procedures, the users can not go back to 2.1 to finish them any more, as all the data are now
221    // in the new region based procedure store, which is not supported in 2.1-.
222    checkUnsupportedProcedure(activeProcsByType);
223
224    MutableLong maxProcIdFromProcs = new MutableLong(-1);
225    for (Procedure<?> proc : procs) {
226      update(proc);
227      if (proc.getProcId() > maxProcIdFromProcs.longValue()) {
228        maxProcIdFromProcs.setValue(proc.getProcId());
229      }
230    }
231    LOG.info("Migrated {} existing procedures from the old storage format.", procs.size());
232    LOG.info("The WALProcedureStore max pid is {}, and the max pid of all loaded procedures is {}",
233      maxProcIdSet.longValue(), maxProcIdFromProcs.longValue());
234    // Theoretically, the maxProcIdSet should be greater than or equal to maxProcIdFromProcs, but
235    // anyway, let's do a check here.
236    if (maxProcIdSet.longValue() > maxProcIdFromProcs.longValue()) {
237      if (maxProcIdSet.longValue() > 0) {
238        // let's add a fake row to retain the max proc id
239        region.update(r -> r.put(new Put(Bytes.toBytes(maxProcIdSet.longValue()))
240          .addColumn(PROC_FAMILY, PROC_QUALIFIER, EMPTY_BYTE_ARRAY)));
241      }
242    } else if (maxProcIdSet.longValue() < maxProcIdFromProcs.longValue()) {
243      LOG.warn("The WALProcedureStore max pid is less than the max pid of all loaded procedures");
244    }
245    store.stop(false);
246    if (!fs.delete(procWALDir, true)) {
247      throw new IOException(
248        "Failed to delete the WALProcedureStore migrated proc wal directory " + procWALDir);
249    }
250    LOG.info("Migration of WALProcedureStore finished");
251  }
252
253  @Override
254  public void recoverLease() throws IOException {
255    LOG.info("Starting Region Procedure Store lease recovery...");
256    FileSystem fs = CommonFSUtils.getWALFileSystem(server.getConfiguration());
257    tryMigrate(fs);
258  }
259
260  @Override
261  public void load(ProcedureLoader loader) throws IOException {
262    List<ProcedureProtos.Procedure> procs = new ArrayList<>();
263    long maxProcId = 0;
264
265    try (RegionScanner scanner =
266      region.getScanner(new Scan().addColumn(PROC_FAMILY, PROC_QUALIFIER))) {
267      List<Cell> cells = new ArrayList<>();
268      boolean moreRows;
269      do {
270        moreRows = scanner.next(cells);
271        if (cells.isEmpty()) {
272          continue;
273        }
274        Cell cell = cells.get(0);
275        cells.clear();
276        maxProcId = Math.max(maxProcId,
277          Bytes.toLong(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
278        if (cell.getValueLength() > 0) {
279          ProcedureProtos.Procedure proto = ProcedureProtos.Procedure.parser()
280            .parseFrom(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
281          procs.add(proto);
282        }
283      } while (moreRows);
284    }
285    loader.setMaxProcId(maxProcId);
286    ProcedureTree tree = ProcedureTree.build(procs);
287    loader.load(tree.getValidProcs());
288    loader.handleCorrupted(tree.getCorruptedProcs());
289  }
290
291  private void serializePut(Procedure<?> proc, List<Mutation> mutations, List<byte[]> rowsToLock)
292    throws IOException {
293    ProcedureProtos.Procedure proto = ProcedureUtil.convertToProtoProcedure(proc);
294    byte[] row = Bytes.toBytes(proc.getProcId());
295    mutations.add(new Put(row).addColumn(PROC_FAMILY, PROC_QUALIFIER, proto.toByteArray()));
296    rowsToLock.add(row);
297  }
298
299  // As we need to keep the max procedure id, here we can not simply delete the procedure, just fill
300  // the proc column with an empty array.
301  private void serializeDelete(long procId, List<Mutation> mutations, List<byte[]> rowsToLock) {
302    byte[] row = Bytes.toBytes(procId);
303    mutations.add(new Put(row).addColumn(PROC_FAMILY, PROC_QUALIFIER, EMPTY_BYTE_ARRAY));
304    rowsToLock.add(row);
305  }
306
307  /**
308   * Insert procedure may be called by master's rpc call. There are some check about the rpc call
309   * when mutate region. Here unset the current rpc call and set it back in finally block. See
310   * HBASE-23895 for more details.
311   */
312  private void runWithoutRpcCall(Runnable runnable) {
313    Optional<RpcCall> rpcCall = RpcServer.unsetCurrentCall();
314    try {
315      runnable.run();
316    } finally {
317      rpcCall.ifPresent(RpcServer::setCurrentCall);
318    }
319  }
320
321  @Override
322  public void insert(Procedure<?> proc, Procedure<?>[] subProcs) {
323    if (subProcs == null || subProcs.length == 0) {
324      // same with update, just insert a single procedure
325      update(proc);
326      return;
327    }
328    List<Mutation> mutations = new ArrayList<>(subProcs.length + 1);
329    List<byte[]> rowsToLock = new ArrayList<>(subProcs.length + 1);
330    runWithoutRpcCall(() -> {
331      try {
332        serializePut(proc, mutations, rowsToLock);
333        for (Procedure<?> subProc : subProcs) {
334          serializePut(subProc, mutations, rowsToLock);
335        }
336        region.update(r -> r.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE));
337      } catch (IOException e) {
338        LOG.error(HBaseMarkers.FATAL, "Failed to insert proc {}, sub procs {}", proc,
339          Arrays.toString(subProcs), e);
340        throw new UncheckedIOException(e);
341      }
342    });
343  }
344
345  @Override
346  public void insert(Procedure<?>[] procs) {
347    List<Mutation> mutations = new ArrayList<>(procs.length);
348    List<byte[]> rowsToLock = new ArrayList<>(procs.length);
349    runWithoutRpcCall(() -> {
350      try {
351        for (Procedure<?> proc : procs) {
352          serializePut(proc, mutations, rowsToLock);
353        }
354        region.update(r -> r.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE));
355      } catch (IOException e) {
356        LOG.error(HBaseMarkers.FATAL, "Failed to insert procs {}", Arrays.toString(procs), e);
357        throw new UncheckedIOException(e);
358      }
359    });
360  }
361
362  @Override
363  public void update(Procedure<?> proc) {
364    runWithoutRpcCall(() -> {
365      try {
366        ProcedureProtos.Procedure proto = ProcedureUtil.convertToProtoProcedure(proc);
367        region.update(r -> r.put(new Put(Bytes.toBytes(proc.getProcId())).addColumn(PROC_FAMILY,
368          PROC_QUALIFIER, proto.toByteArray())));
369      } catch (IOException e) {
370        LOG.error(HBaseMarkers.FATAL, "Failed to update proc {}", proc, e);
371        throw new UncheckedIOException(e);
372      }
373    });
374  }
375
376  @Override
377  public void delete(long procId) {
378    try {
379      region.update(r -> r.put(
380        new Put(Bytes.toBytes(procId)).addColumn(PROC_FAMILY, PROC_QUALIFIER, EMPTY_BYTE_ARRAY)));
381    } catch (IOException e) {
382      LOG.error(HBaseMarkers.FATAL, "Failed to delete pid={}", procId, e);
383      throw new UncheckedIOException(e);
384    }
385  }
386
387  @Override
388  public void delete(Procedure<?> parentProc, long[] subProcIds) {
389    List<Mutation> mutations = new ArrayList<>(subProcIds.length + 1);
390    List<byte[]> rowsToLock = new ArrayList<>(subProcIds.length + 1);
391    try {
392      serializePut(parentProc, mutations, rowsToLock);
393      for (long subProcId : subProcIds) {
394        serializeDelete(subProcId, mutations, rowsToLock);
395      }
396      region.update(r -> r.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE));
397    } catch (IOException e) {
398      LOG.error(HBaseMarkers.FATAL, "Failed to delete parent proc {}, sub pids={}", parentProc,
399        Arrays.toString(subProcIds), e);
400      throw new UncheckedIOException(e);
401    }
402  }
403
404  @Override
405  public void delete(long[] procIds, int offset, int count) {
406    if (count == 0) {
407      return;
408    }
409    if (count == 1) {
410      delete(procIds[offset]);
411      return;
412    }
413    List<Mutation> mutations = new ArrayList<>(count);
414    List<byte[]> rowsToLock = new ArrayList<>(count);
415    for (int i = 0; i < count; i++) {
416      long procId = procIds[offset + i];
417      serializeDelete(procId, mutations, rowsToLock);
418    }
419    try {
420      region.update(r -> r.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE));
421    } catch (IOException e) {
422      LOG.error(HBaseMarkers.FATAL, "Failed to delete pids={}", Arrays.toString(procIds), e);
423      throw new UncheckedIOException(e);
424    }
425  }
426
427  @Override
428  public void cleanup() {
429    // actually delete the procedures if it is not the one with the max procedure id.
430    List<Cell> cells = new ArrayList<Cell>();
431    try (RegionScanner scanner =
432      region.getScanner(new Scan().addColumn(PROC_FAMILY, PROC_QUALIFIER).setReversed(true))) {
433      // skip the row with max procedure id
434      boolean moreRows = scanner.next(cells);
435      if (cells.isEmpty()) {
436        return;
437      }
438      cells.clear();
439      while (moreRows) {
440        moreRows = scanner.next(cells);
441        if (cells.isEmpty()) {
442          continue;
443        }
444        Cell cell = cells.get(0);
445        cells.clear();
446        if (cell.getValueLength() == 0) {
447          region.update(r -> r
448            .delete(new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())));
449        }
450      }
451    } catch (IOException e) {
452      LOG.warn("Failed to clean up delete procedures", e);
453    }
454  }
455}