001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import java.io.IOException; 021import java.util.concurrent.CompletableFuture; 022import java.util.concurrent.ExecutorService; 023import java.util.function.Consumer; 024import java.util.function.Supplier; 025import org.apache.commons.lang3.mutable.MutableBoolean; 026import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 027import org.apache.hadoop.hbase.util.FutureUtils; 028import org.apache.hadoop.hbase.util.IdLock; 029import org.apache.yetus.audience.InterfaceAudience; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033/** 034 * A helper class for switching procedure out(yielding) while it is doing some time consuming 035 * operation, such as updating meta, where we can get a {@link CompletableFuture} about the 036 * operation. 037 */ 038@InterfaceAudience.Private 039public final class ProcedureFutureUtil { 040 041 private static final Logger LOG = LoggerFactory.getLogger(ProcedureFutureUtil.class); 042 043 private ProcedureFutureUtil() { 044 } 045 046 public static boolean checkFuture(Procedure<?> proc, Supplier<CompletableFuture<Void>> getFuture, 047 Consumer<CompletableFuture<Void>> setFuture, Runnable actionAfterDone) throws IOException { 048 CompletableFuture<Void> future = getFuture.get(); 049 if (future == null) { 050 return false; 051 } 052 // reset future 053 setFuture.accept(null); 054 FutureUtils.get(future); 055 actionAfterDone.run(); 056 return true; 057 } 058 059 public static void suspendIfNecessary(Procedure<?> proc, 060 Consumer<CompletableFuture<Void>> setFuture, CompletableFuture<Void> future, 061 MasterProcedureEnv env, Runnable actionAfterDone) 062 throws IOException, ProcedureSuspendedException { 063 MutableBoolean completed = new MutableBoolean(false); 064 Thread currentThread = Thread.currentThread(); 065 // This is for testing. In ProcedureTestingUtility, we will restart a ProcedureExecutor and 066 // reuse it, for performance, so we need to make sure that all the procedure have been stopped. 067 // But here, the callback of this future is not executed in a PEWorker, so in ProcedureExecutor 068 // we have no way to stop it. So here, we will get the asyncTaskExecutor first, in the PEWorker 069 // thread, where the ProcedureExecutor should have not been stopped yet, then when calling the 070 // callback, if the ProcedureExecutor have already been stopped and restarted, the 071 // asyncTaskExecutor will also be shutdown so we can not add anything back to the scheduler. 072 ExecutorService asyncTaskExecutor = env.getAsyncTaskExecutor(); 073 FutureUtils.addListener(future, (r, e) -> { 074 if (Thread.currentThread() == currentThread) { 075 LOG.debug("The future has completed while adding callback, give up suspending procedure {}", 076 proc); 077 // this means the future has already been completed, as we call the callback directly while 078 // calling addListener, so here we just set completed to true without doing anything 079 completed.setTrue(); 080 return; 081 } 082 LOG.debug("Going to wake up procedure {} because future has completed", proc); 083 // This callback may be called inside netty's event loop, so we should not block it for a long 084 // time. The worker executor will hold the execution lock while executing the procedure, and 085 // we may persist the procedure state inside the lock, which is a time consuming operation. 086 // And what makes things worse is that, we persist procedure state to master local region, 087 // where the AsyncFSWAL implementation will use the same netty's event loop for dealing with 088 // I/O, which could even cause dead lock. 089 asyncTaskExecutor.execute(() -> wakeUp(proc, env)); 090 }); 091 if (completed.getValue()) { 092 FutureUtils.get(future); 093 actionAfterDone.run(); 094 } else { 095 // suspend the procedure 096 setFuture.accept(future); 097 proc.skipPersistence(); 098 suspend(proc); 099 } 100 } 101 102 public static void suspend(Procedure<?> proc) throws ProcedureSuspendedException { 103 proc.skipPersistence(); 104 throw new ProcedureSuspendedException(); 105 } 106 107 public static void wakeUp(Procedure<?> proc, MasterProcedureEnv env) { 108 // should acquire procedure execution lock to make sure that the procedure executor has 109 // finished putting this procedure to the WAITING_TIMEOUT state, otherwise there could be 110 // race and cause unexpected result 111 IdLock procLock = env.getMasterServices().getMasterProcedureExecutor().getProcExecutionLock(); 112 IdLock.Entry lockEntry; 113 try { 114 lockEntry = procLock.getLockEntry(proc.getProcId()); 115 } catch (IOException e) { 116 LOG.error("Error while acquiring execution lock for procedure {}" 117 + " when trying to wake it up, aborting...", proc, e); 118 env.getMasterServices().abort("Can not acquire procedure execution lock", e); 119 return; 120 } 121 try { 122 env.getProcedureScheduler().addFront(proc); 123 } finally { 124 procLock.releaseLockEntry(lockEntry); 125 } 126 } 127}