001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import java.io.IOException;
021import java.util.concurrent.CompletableFuture;
022import java.util.concurrent.ExecutorService;
023import java.util.function.Consumer;
024import java.util.function.Supplier;
025import org.apache.commons.lang3.mutable.MutableBoolean;
026import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
027import org.apache.hadoop.hbase.util.FutureUtils;
028import org.apache.hadoop.hbase.util.IdLock;
029import org.apache.yetus.audience.InterfaceAudience;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/**
034 * A helper class for switching procedure out(yielding) while it is doing some time consuming
035 * operation, such as updating meta, where we can get a {@link CompletableFuture} about the
036 * operation.
037 */
038@InterfaceAudience.Private
039public final class ProcedureFutureUtil {
040
041  private static final Logger LOG = LoggerFactory.getLogger(ProcedureFutureUtil.class);
042
043  private ProcedureFutureUtil() {
044  }
045
046  public static boolean checkFuture(Procedure<?> proc, Supplier<CompletableFuture<Void>> getFuture,
047    Consumer<CompletableFuture<Void>> setFuture, Runnable actionAfterDone) throws IOException {
048    CompletableFuture<Void> future = getFuture.get();
049    if (future == null) {
050      return false;
051    }
052    // reset future
053    setFuture.accept(null);
054    FutureUtils.get(future);
055    actionAfterDone.run();
056    return true;
057  }
058
059  public static void suspendIfNecessary(Procedure<?> proc,
060    Consumer<CompletableFuture<Void>> setFuture, CompletableFuture<Void> future,
061    MasterProcedureEnv env, Runnable actionAfterDone)
062    throws IOException, ProcedureSuspendedException {
063    MutableBoolean completed = new MutableBoolean(false);
064    Thread currentThread = Thread.currentThread();
065    // This is for testing. In ProcedureTestingUtility, we will restart a ProcedureExecutor and
066    // reuse it, for performance, so we need to make sure that all the procedure have been stopped.
067    // But here, the callback of this future is not executed in a PEWorker, so in ProcedureExecutor
068    // we have no way to stop it. So here, we will get the asyncTaskExecutor first, in the PEWorker
069    // thread, where the ProcedureExecutor should have not been stopped yet, then when calling the
070    // callback, if the ProcedureExecutor have already been stopped and restarted, the
071    // asyncTaskExecutor will also be shutdown so we can not add anything back to the scheduler.
072    ExecutorService asyncTaskExecutor = env.getAsyncTaskExecutor();
073    FutureUtils.addListener(future, (r, e) -> {
074      if (Thread.currentThread() == currentThread) {
075        LOG.debug("The future has completed while adding callback, give up suspending procedure {}",
076          proc);
077        // this means the future has already been completed, as we call the callback directly while
078        // calling addListener, so here we just set completed to true without doing anything
079        completed.setTrue();
080        return;
081      }
082      LOG.debug("Going to wake up procedure {} because future has completed", proc);
083      // This callback may be called inside netty's event loop, so we should not block it for a long
084      // time. The worker executor will hold the execution lock while executing the procedure, and
085      // we may persist the procedure state inside the lock, which is a time consuming operation.
086      // And what makes things worse is that, we persist procedure state to master local region,
087      // where the AsyncFSWAL implementation will use the same netty's event loop for dealing with
088      // I/O, which could even cause dead lock.
089      asyncTaskExecutor.execute(() -> wakeUp(proc, env));
090    });
091    if (completed.getValue()) {
092      FutureUtils.get(future);
093      actionAfterDone.run();
094    } else {
095      // suspend the procedure
096      setFuture.accept(future);
097      proc.skipPersistence();
098      suspend(proc);
099    }
100  }
101
102  public static void suspend(Procedure<?> proc) throws ProcedureSuspendedException {
103    proc.skipPersistence();
104    throw new ProcedureSuspendedException();
105  }
106
107  public static void wakeUp(Procedure<?> proc, MasterProcedureEnv env) {
108    // should acquire procedure execution lock to make sure that the procedure executor has
109    // finished putting this procedure to the WAITING_TIMEOUT state, otherwise there could be
110    // race and cause unexpected result
111    IdLock procLock = env.getMasterServices().getMasterProcedureExecutor().getProcExecutionLock();
112    IdLock.Entry lockEntry;
113    try {
114      lockEntry = procLock.getLockEntry(proc.getProcId());
115    } catch (IOException e) {
116      LOG.error("Error while acquiring execution lock for procedure {}"
117        + " when trying to wake it up, aborting...", proc, e);
118      env.getMasterServices().abort("Can not acquire procedure execution lock", e);
119      return;
120    }
121    try {
122      env.getProcedureScheduler().addFront(proc);
123    } finally {
124      procLock.releaseLockEntry(lockEntry);
125    }
126  }
127}