001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.master.procedure; 020 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.util.List; 024import java.util.concurrent.ExecutionException; 025import java.util.concurrent.Future; 026import java.util.concurrent.TimeUnit; 027import java.util.concurrent.TimeoutException; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException; 030import org.apache.hadoop.hbase.client.RegionInfo; 031import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 032import org.apache.hadoop.hbase.master.assignment.RegionStates; 033import org.apache.hadoop.hbase.procedure2.Procedure; 034import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 035import org.apache.hadoop.hbase.quotas.MasterQuotaManager; 036import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 037import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.apache.yetus.audience.InterfaceStability; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; 044 045/** 046 * Helper to synchronously wait on conditions. 047 * This will be removed in the future (mainly when the AssignmentManager will be 048 * replaced with a Procedure version) by using ProcedureYieldException, 049 * and the queue will handle waiting and scheduling based on events. 050 */ 051@InterfaceAudience.Private 052@InterfaceStability.Evolving 053public final class ProcedureSyncWait { 054 private static final Logger LOG = LoggerFactory.getLogger(ProcedureSyncWait.class); 055 056 private ProcedureSyncWait() {} 057 058 @InterfaceAudience.Private 059 public interface Predicate<T> { 060 T evaluate() throws IOException; 061 } 062 063 private static class ProcedureFuture implements Future<byte[]> { 064 private final ProcedureExecutor<MasterProcedureEnv> procExec; 065 private final Procedure<?> proc; 066 067 private boolean hasResult = false; 068 private byte[] result = null; 069 070 public ProcedureFuture(ProcedureExecutor<MasterProcedureEnv> procExec, Procedure<?> proc) { 071 this.procExec = procExec; 072 this.proc = proc; 073 } 074 075 @Override 076 public boolean cancel(boolean mayInterruptIfRunning) { 077 return false; 078 } 079 080 @Override 081 public boolean isCancelled() { 082 return false; 083 } 084 085 @Override 086 public boolean isDone() { 087 return hasResult; 088 } 089 090 @Override 091 public byte[] get() throws InterruptedException, ExecutionException { 092 if (hasResult) { 093 return result; 094 } 095 try { 096 return waitForProcedureToComplete(procExec, proc, Long.MAX_VALUE); 097 } catch (Exception e) { 098 throw new ExecutionException(e); 099 } 100 } 101 102 @Override 103 public byte[] get(long timeout, TimeUnit unit) 104 throws InterruptedException, ExecutionException, TimeoutException { 105 if (hasResult) { 106 return result; 107 } 108 try { 109 result = waitForProcedureToComplete(procExec, proc, unit.toMillis(timeout)); 110 hasResult = true; 111 return result; 112 } catch (TimeoutIOException e) { 113 throw new TimeoutException(e.getMessage()); 114 } catch (Exception e) { 115 throw new ExecutionException(e); 116 } 117 } 118 } 119 120 public static Future<byte[]> submitProcedure(final ProcedureExecutor<MasterProcedureEnv> procExec, 121 final Procedure<MasterProcedureEnv> proc) { 122 if (proc.isInitializing()) { 123 procExec.submitProcedure(proc); 124 } 125 return new ProcedureFuture(procExec, proc); 126 } 127 128 public static byte[] submitAndWaitProcedure(ProcedureExecutor<MasterProcedureEnv> procExec, 129 final Procedure<MasterProcedureEnv> proc) throws IOException { 130 if (proc.isInitializing()) { 131 procExec.submitProcedure(proc); 132 } 133 return waitForProcedureToCompleteIOE(procExec, proc, Long.MAX_VALUE); 134 } 135 136 public static byte[] waitForProcedureToCompleteIOE( 137 final ProcedureExecutor<MasterProcedureEnv> procExec, final Procedure<?> proc, 138 final long timeout) throws IOException { 139 try { 140 return waitForProcedureToComplete(procExec, proc, timeout); 141 } catch (IOException e) { 142 throw e; 143 } catch (Exception e) { 144 throw new IOException(e); 145 } 146 } 147 148 public static byte[] waitForProcedureToComplete( 149 final ProcedureExecutor<MasterProcedureEnv> procExec, final Procedure<?> proc, 150 final long timeout) throws IOException { 151 waitFor(procExec.getEnvironment(), timeout, "pid=" + proc.getProcId(), 152 new ProcedureSyncWait.Predicate<Boolean>() { 153 @Override 154 public Boolean evaluate() throws IOException { 155 if (!procExec.isRunning()) { 156 return true; 157 } 158 ProcedureState state = proc.getState(); 159 if (state == ProcedureState.INITIALIZING || state == ProcedureState.RUNNABLE) { 160 // under these states the procedure may have not been added to procExec yet, so do not 161 // use isFinished to test whether it is finished, as this method will just check if the 162 // procedure is in the running procedure list 163 return false; 164 } 165 return procExec.isFinished(proc.getProcId()); 166 } 167 }); 168 if (!procExec.isRunning()) { 169 throw new IOException("The Master is Aborting"); 170 } 171 172 // If the procedure fails, we should always have an exception captured. Throw it. 173 // Needs to be an IOE to get out of here. 174 if (proc.hasException()) { 175 throw MasterProcedureUtil.unwrapRemoteIOException(proc); 176 } else { 177 return proc.getResult(); 178 } 179 } 180 181 public static <T> T waitFor(MasterProcedureEnv env, String purpose, Predicate<T> predicate) 182 throws IOException { 183 Configuration conf = env.getMasterConfiguration(); 184 long waitTime = conf.getLong("hbase.master.wait.on.region", 5 * 60 * 1000); 185 return waitFor(env, waitTime, purpose, predicate); 186 } 187 188 public static <T> T waitFor(MasterProcedureEnv env, long waitTime, String purpose, 189 Predicate<T> predicate) throws IOException { 190 Configuration conf = env.getMasterConfiguration(); 191 long waitingTimeForEvents = conf.getInt("hbase.master.event.waiting.time", 1000); 192 return waitFor(env, waitTime, waitingTimeForEvents, purpose, predicate); 193 } 194 195 public static <T> T waitFor(MasterProcedureEnv env, long waitTime, long waitingTimeForEvents, 196 String purpose, Predicate<T> predicate) throws IOException { 197 long done = EnvironmentEdgeManager.currentTime() + waitTime; 198 if (done <= 0) { 199 // long overflow, usually this means we pass Long.MAX_VALUE as waitTime 200 done = Long.MAX_VALUE; 201 } 202 boolean logged = false; 203 do { 204 T result = predicate.evaluate(); 205 if (result != null && !result.equals(Boolean.FALSE)) { 206 return result; 207 } 208 try { 209 Thread.sleep(waitingTimeForEvents); 210 } catch (InterruptedException e) { 211 LOG.warn("Interrupted while sleeping, waiting on " + purpose); 212 throw (InterruptedIOException)new InterruptedIOException().initCause(e); 213 } 214 if (LOG.isTraceEnabled()) { 215 LOG.trace("waitFor " + purpose); 216 } else { 217 if (!logged) LOG.debug("waitFor " + purpose); 218 } 219 logged = true; 220 } while (EnvironmentEdgeManager.currentTime() < done && env.isRunning()); 221 222 throw new TimeoutIOException("Timed out while waiting on " + purpose); 223 } 224 225 protected static void waitMetaRegions(final MasterProcedureEnv env) throws IOException { 226 int timeout = env.getMasterConfiguration().getInt("hbase.client.catalog.timeout", 10000); 227 try { 228 if (MetaTableLocator.waitMetaRegionLocation(env.getMasterServices().getZooKeeper(), 229 timeout) == null) { 230 throw new NotAllMetaRegionsOnlineException(); 231 } 232 } catch (InterruptedException e) { 233 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 234 } 235 } 236 237 protected static void waitRegionInTransition(final MasterProcedureEnv env, 238 final List<RegionInfo> regions) throws IOException { 239 final RegionStates states = env.getAssignmentManager().getRegionStates(); 240 for (final RegionInfo region : regions) { 241 ProcedureSyncWait.waitFor(env, "regions " + region.getRegionNameAsString() + " in transition", 242 new ProcedureSyncWait.Predicate<Boolean>() { 243 @Override 244 public Boolean evaluate() throws IOException { 245 return !states.isRegionInTransition(region); 246 } 247 }); 248 } 249 } 250 251 protected static MasterQuotaManager getMasterQuotaManager(final MasterProcedureEnv env) 252 throws IOException { 253 return ProcedureSyncWait.waitFor(env, "quota manager to be available", 254 new ProcedureSyncWait.Predicate<MasterQuotaManager>() { 255 @Override 256 public MasterQuotaManager evaluate() throws IOException { 257 return env.getMasterServices().getMasterQuotaManager(); 258 } 259 }); 260 } 261}