001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2.store.region;
019
020import static org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY;
021import static org.apache.hadoop.hbase.HConstants.NO_NONCE;
022import static org.apache.hadoop.hbase.master.region.MasterRegionFactory.PROC_FAMILY;
023
024import java.io.IOException;
025import java.io.UncheckedIOException;
026import java.util.ArrayList;
027import java.util.Arrays;
028import java.util.Collections;
029import java.util.HashMap;
030import java.util.List;
031import java.util.Map;
032import org.apache.commons.lang3.mutable.MutableLong;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.hbase.Cell;
037import org.apache.hadoop.hbase.HBaseIOException;
038import org.apache.hadoop.hbase.Server;
039import org.apache.hadoop.hbase.client.Delete;
040import org.apache.hadoop.hbase.client.Mutation;
041import org.apache.hadoop.hbase.client.Put;
042import org.apache.hadoop.hbase.client.Scan;
043import org.apache.hadoop.hbase.log.HBaseMarkers;
044import org.apache.hadoop.hbase.master.assignment.AssignProcedure;
045import org.apache.hadoop.hbase.master.assignment.MoveRegionProcedure;
046import org.apache.hadoop.hbase.master.assignment.UnassignProcedure;
047import org.apache.hadoop.hbase.master.procedure.RecoverMetaProcedure;
048import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
049import org.apache.hadoop.hbase.master.region.MasterRegion;
050import org.apache.hadoop.hbase.procedure2.Procedure;
051import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
052import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
053import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase;
054import org.apache.hadoop.hbase.procedure2.store.ProcedureTree;
055import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
056import org.apache.hadoop.hbase.regionserver.RegionScanner;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.apache.hadoop.hbase.util.CommonFSUtils;
059import org.apache.yetus.audience.InterfaceAudience;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
064
065import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
066
067/**
068 * A procedure store which uses the master local store to store all the procedures.
069 * <p/>
070 * We use proc:d column to store the serialized protobuf format procedure, and when deleting we will
071 * first fill the proc:d column with an empty byte array, and then actually delete them in the
072 * {@link #cleanup()} method. This is because that we need to retain the max procedure id, so we can
073 * not directly delete a procedure row as we do not know if it is the one with the max procedure id.
074 */
075@InterfaceAudience.Private
076public class RegionProcedureStore extends ProcedureStoreBase {
077
078  private static final Logger LOG = LoggerFactory.getLogger(RegionProcedureStore.class);
079
080  static final byte[] PROC_QUALIFIER = Bytes.toBytes("d");
081
082  private final Server server;
083
084  private final LeaseRecovery leaseRecovery;
085
086  final MasterRegion region;
087
088  private int numThreads;
089
090  public RegionProcedureStore(Server server, MasterRegion region, LeaseRecovery leaseRecovery) {
091    this.server = server;
092    this.region = region;
093    this.leaseRecovery = leaseRecovery;
094  }
095
096  @Override
097  public void start(int numThreads) throws IOException {
098    if (!setRunning(true)) {
099      return;
100    }
101    LOG.info("Starting the Region Procedure Store, number threads={}", numThreads);
102    this.numThreads = numThreads;
103  }
104
105  @Override
106  public void stop(boolean abort) {
107    if (!setRunning(false)) {
108      return;
109    }
110    LOG.info("Stopping the Region Procedure Store, isAbort={}", abort);
111  }
112
113  @Override
114  public int getNumThreads() {
115    return numThreads;
116  }
117
118  @Override
119  public int setRunningProcedureCount(int count) {
120    // useless for region based storage.
121    return count;
122  }
123
124  @SuppressWarnings("deprecation")
125  private static final ImmutableSet<Class<?>> UNSUPPORTED_PROCEDURES =
126    ImmutableSet.of(RecoverMetaProcedure.class, AssignProcedure.class, UnassignProcedure.class,
127      MoveRegionProcedure.class);
128
129  /**
130   * In HBASE-20811, we have introduced a new TRSP to assign/unassign/move regions, and it is
131   * incompatible with the old AssignProcedure/UnassignProcedure/MoveRegionProcedure. So we need to
132   * make sure that there are none these procedures when upgrading. If there are, the master will
133   * quit, you need to go back to the old version to finish these procedures first before upgrading.
134   */
135  private void checkUnsupportedProcedure(Map<Class<?>, List<Procedure<?>>> procsByType)
136    throws HBaseIOException {
137    // Confirm that we do not have unfinished assign/unassign related procedures. It is not easy to
138    // support both the old assign/unassign procedures and the new TransitRegionStateProcedure as
139    // there will be conflict in the code for AM. We should finish all these procedures before
140    // upgrading.
141    for (Class<?> clazz : UNSUPPORTED_PROCEDURES) {
142      List<Procedure<?>> procs = procsByType.get(clazz);
143      if (procs != null) {
144        LOG.error("Unsupported procedure type {} found, please rollback your master to the old"
145          + " version to finish them, and then try to upgrade again."
146          + " See https://hbase.apache.org/book.html#upgrade2.2 for more details."
147          + " The full procedure list: {}", clazz, procs);
148        throw new HBaseIOException("Unsupported procedure type " + clazz + " found");
149      }
150    }
151    // A special check for SCP, as we do not support RecoverMetaProcedure any more so we need to
152    // make sure that no one will try to schedule it but SCP does have a state which will schedule
153    // it.
154    if (
155      procsByType.getOrDefault(ServerCrashProcedure.class, Collections.emptyList()).stream()
156        .map(p -> (ServerCrashProcedure) p).anyMatch(ServerCrashProcedure::isInRecoverMetaState)
157    ) {
158      LOG.error("At least one ServerCrashProcedure is going to schedule a RecoverMetaProcedure,"
159        + " which is not supported any more. Please rollback your master to the old version to"
160        + " finish them, and then try to upgrade again."
161        + " See https://hbase.apache.org/book.html#upgrade2.2 for more details.");
162      throw new HBaseIOException("Unsupported procedure state found for ServerCrashProcedure");
163    }
164  }
165
166  @SuppressWarnings("deprecation")
167  private void tryMigrate(FileSystem fs) throws IOException {
168    Configuration conf = server.getConfiguration();
169    Path procWALDir =
170      new Path(CommonFSUtils.getWALRootDir(conf), WALProcedureStore.MASTER_PROCEDURE_LOGDIR);
171    if (!fs.exists(procWALDir)) {
172      return;
173    }
174    LOG.info("The old WALProcedureStore wal directory {} exists, migrating...", procWALDir);
175    WALProcedureStore store = new WALProcedureStore(conf, leaseRecovery);
176    store.start(numThreads);
177    store.recoverLease();
178    MutableLong maxProcIdSet = new MutableLong(-1);
179    List<Procedure<?>> procs = new ArrayList<>();
180    Map<Class<?>, List<Procedure<?>>> activeProcsByType = new HashMap<>();
181    store.load(new ProcedureLoader() {
182
183      @Override
184      public void setMaxProcId(long maxProcId) {
185        maxProcIdSet.setValue(maxProcId);
186      }
187
188      @Override
189      public void load(ProcedureIterator procIter) throws IOException {
190        while (procIter.hasNext()) {
191          Procedure<?> proc = procIter.next();
192          procs.add(proc);
193          if (!proc.isFinished()) {
194            activeProcsByType.computeIfAbsent(proc.getClass(), k -> new ArrayList<>()).add(proc);
195          }
196        }
197      }
198
199      @Override
200      public void handleCorrupted(ProcedureIterator procIter) throws IOException {
201        long corruptedCount = 0;
202        while (procIter.hasNext()) {
203          LOG.error("Corrupted procedure {}", procIter.next());
204          corruptedCount++;
205        }
206        if (corruptedCount > 0) {
207          throw new IOException("There are " + corruptedCount + " corrupted procedures when"
208            + " migrating from the old WAL based store to the new region based store, please"
209            + " fix them before upgrading again.");
210        }
211      }
212    });
213
214    // check whether there are unsupported procedures, this could happen when we are migrating from
215    // 2.1-. We used to do this in HMaster, after loading all the procedures from procedure store,
216    // but here we have to do it before migrating, otherwise, if we find some unsupported
217    // procedures, the users can not go back to 2.1 to finish them any more, as all the data are now
218    // in the new region based procedure store, which is not supported in 2.1-.
219    checkUnsupportedProcedure(activeProcsByType);
220
221    MutableLong maxProcIdFromProcs = new MutableLong(-1);
222    for (Procedure<?> proc : procs) {
223      update(proc);
224      if (proc.getProcId() > maxProcIdFromProcs.longValue()) {
225        maxProcIdFromProcs.setValue(proc.getProcId());
226      }
227    }
228    LOG.info("Migrated {} existing procedures from the old storage format.", procs.size());
229    LOG.info("The WALProcedureStore max pid is {}, and the max pid of all loaded procedures is {}",
230      maxProcIdSet.longValue(), maxProcIdFromProcs.longValue());
231    // Theoretically, the maxProcIdSet should be greater than or equal to maxProcIdFromProcs, but
232    // anyway, let's do a check here.
233    if (maxProcIdSet.longValue() > maxProcIdFromProcs.longValue()) {
234      if (maxProcIdSet.longValue() > 0) {
235        // let's add a fake row to retain the max proc id
236        region.update(r -> r.put(new Put(Bytes.toBytes(maxProcIdSet.longValue()))
237          .addColumn(PROC_FAMILY, PROC_QUALIFIER, EMPTY_BYTE_ARRAY)));
238      }
239    } else if (maxProcIdSet.longValue() < maxProcIdFromProcs.longValue()) {
240      LOG.warn("The WALProcedureStore max pid is less than the max pid of all loaded procedures");
241    }
242    store.stop(false);
243    if (!fs.delete(procWALDir, true)) {
244      throw new IOException(
245        "Failed to delete the WALProcedureStore migrated proc wal directory " + procWALDir);
246    }
247    LOG.info("Migration of WALProcedureStore finished");
248  }
249
250  @Override
251  public void recoverLease() throws IOException {
252    LOG.info("Starting Region Procedure Store lease recovery...");
253    FileSystem fs = CommonFSUtils.getWALFileSystem(server.getConfiguration());
254    tryMigrate(fs);
255  }
256
257  @Override
258  public void load(ProcedureLoader loader) throws IOException {
259    List<ProcedureProtos.Procedure> procs = new ArrayList<>();
260    long maxProcId = 0;
261
262    try (RegionScanner scanner =
263      region.getRegionScanner(new Scan().addColumn(PROC_FAMILY, PROC_QUALIFIER))) {
264      List<Cell> cells = new ArrayList<>();
265      boolean moreRows;
266      do {
267        moreRows = scanner.next(cells);
268        if (cells.isEmpty()) {
269          continue;
270        }
271        Cell cell = cells.get(0);
272        cells.clear();
273        maxProcId = Math.max(maxProcId,
274          Bytes.toLong(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
275        if (cell.getValueLength() > 0) {
276          ProcedureProtos.Procedure proto = ProcedureProtos.Procedure.parser()
277            .parseFrom(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
278          procs.add(proto);
279        }
280      } while (moreRows);
281    }
282    loader.setMaxProcId(maxProcId);
283    ProcedureTree tree = ProcedureTree.build(procs);
284    loader.load(tree.getValidProcs());
285    loader.handleCorrupted(tree.getCorruptedProcs());
286  }
287
288  private void serializePut(Procedure<?> proc, List<Mutation> mutations, List<byte[]> rowsToLock)
289    throws IOException {
290    ProcedureProtos.Procedure proto = ProcedureUtil.convertToProtoProcedure(proc);
291    byte[] row = Bytes.toBytes(proc.getProcId());
292    mutations.add(new Put(row).addColumn(PROC_FAMILY, PROC_QUALIFIER, proto.toByteArray()));
293    rowsToLock.add(row);
294  }
295
296  // As we need to keep the max procedure id, here we can not simply delete the procedure, just fill
297  // the proc column with an empty array.
298  private void serializeDelete(long procId, List<Mutation> mutations, List<byte[]> rowsToLock) {
299    byte[] row = Bytes.toBytes(procId);
300    mutations.add(new Put(row).addColumn(PROC_FAMILY, PROC_QUALIFIER, EMPTY_BYTE_ARRAY));
301    rowsToLock.add(row);
302  }
303
304  @Override
305  public void insert(Procedure<?> proc, Procedure<?>[] subProcs) {
306    if (subProcs == null || subProcs.length == 0) {
307      // same with update, just insert a single procedure
308      update(proc);
309      return;
310    }
311    List<Mutation> mutations = new ArrayList<>(subProcs.length + 1);
312    List<byte[]> rowsToLock = new ArrayList<>(subProcs.length + 1);
313    try {
314      serializePut(proc, mutations, rowsToLock);
315      for (Procedure<?> subProc : subProcs) {
316        serializePut(subProc, mutations, rowsToLock);
317      }
318      region.update(r -> r.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE));
319    } catch (IOException e) {
320      LOG.error(HBaseMarkers.FATAL, "Failed to insert proc {}, sub procs {}", proc,
321        Arrays.toString(subProcs), e);
322      throw new UncheckedIOException(e);
323    }
324  }
325
326  @Override
327  public void insert(Procedure<?>[] procs) {
328    List<Mutation> mutations = new ArrayList<>(procs.length);
329    List<byte[]> rowsToLock = new ArrayList<>(procs.length);
330    try {
331      for (Procedure<?> proc : procs) {
332        serializePut(proc, mutations, rowsToLock);
333      }
334      region.update(r -> r.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE));
335    } catch (IOException e) {
336      LOG.error(HBaseMarkers.FATAL, "Failed to insert procs {}", Arrays.toString(procs), e);
337      throw new UncheckedIOException(e);
338    }
339  }
340
341  @Override
342  public void update(Procedure<?> proc) {
343    try {
344      ProcedureProtos.Procedure proto = ProcedureUtil.convertToProtoProcedure(proc);
345      region.update(r -> r.put(new Put(Bytes.toBytes(proc.getProcId())).addColumn(PROC_FAMILY,
346        PROC_QUALIFIER, proto.toByteArray())));
347    } catch (IOException e) {
348      LOG.error(HBaseMarkers.FATAL, "Failed to update proc {}", proc, e);
349      throw new UncheckedIOException(e);
350    }
351  }
352
353  @Override
354  public void delete(long procId) {
355    try {
356      region.update(r -> r.put(
357        new Put(Bytes.toBytes(procId)).addColumn(PROC_FAMILY, PROC_QUALIFIER, EMPTY_BYTE_ARRAY)));
358    } catch (IOException e) {
359      LOG.error(HBaseMarkers.FATAL, "Failed to delete pid={}", procId, e);
360      throw new UncheckedIOException(e);
361    }
362  }
363
364  @Override
365  public void delete(Procedure<?> parentProc, long[] subProcIds) {
366    List<Mutation> mutations = new ArrayList<>(subProcIds.length + 1);
367    List<byte[]> rowsToLock = new ArrayList<>(subProcIds.length + 1);
368    try {
369      serializePut(parentProc, mutations, rowsToLock);
370      for (long subProcId : subProcIds) {
371        serializeDelete(subProcId, mutations, rowsToLock);
372      }
373      region.update(r -> r.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE));
374    } catch (IOException e) {
375      LOG.error(HBaseMarkers.FATAL, "Failed to delete parent proc {}, sub pids={}", parentProc,
376        Arrays.toString(subProcIds), e);
377      throw new UncheckedIOException(e);
378    }
379  }
380
381  @Override
382  public void delete(long[] procIds, int offset, int count) {
383    if (count == 0) {
384      return;
385    }
386    if (count == 1) {
387      delete(procIds[offset]);
388      return;
389    }
390    List<Mutation> mutations = new ArrayList<>(count);
391    List<byte[]> rowsToLock = new ArrayList<>(count);
392    for (int i = 0; i < count; i++) {
393      long procId = procIds[offset + i];
394      serializeDelete(procId, mutations, rowsToLock);
395    }
396    try {
397      region.update(r -> r.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE));
398    } catch (IOException e) {
399      LOG.error(HBaseMarkers.FATAL, "Failed to delete pids={}", Arrays.toString(procIds), e);
400      throw new UncheckedIOException(e);
401    }
402  }
403
404  @Override
405  public void cleanup() {
406    // actually delete the procedures if it is not the one with the max procedure id.
407    List<Cell> cells = new ArrayList<Cell>();
408    try (RegionScanner scanner = region
409      .getRegionScanner(new Scan().addColumn(PROC_FAMILY, PROC_QUALIFIER).setReversed(true))) {
410      // skip the row with max procedure id
411      boolean moreRows = scanner.next(cells);
412      if (cells.isEmpty()) {
413        return;
414      }
415      cells.clear();
416      while (moreRows) {
417        moreRows = scanner.next(cells);
418        if (cells.isEmpty()) {
419          continue;
420        }
421        Cell cell = cells.get(0);
422        cells.clear();
423        if (cell.getValueLength() == 0) {
424          region.update(
425            r -> r.delete(new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())
426              .addFamily(PROC_FAMILY)));
427        }
428      }
429    } catch (IOException e) {
430      LOG.warn("Failed to clean up delete procedures", e);
431    }
432  }
433}