001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.master.procedure; 020 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.util.List; 024import java.util.concurrent.ExecutionException; 025import java.util.concurrent.Future; 026import java.util.concurrent.TimeUnit; 027import java.util.concurrent.TimeoutException; 028 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException; 031import org.apache.hadoop.hbase.client.RegionInfo; 032import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 033import org.apache.hadoop.hbase.master.assignment.RegionStates; 034import org.apache.hadoop.hbase.procedure2.Procedure; 035import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 036import org.apache.hadoop.hbase.quotas.MasterQuotaManager; 037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.apache.yetus.audience.InterfaceStability; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; 044 045/** 046 * Helper to synchronously wait on conditions. 047 * This will be removed in the future (mainly when the AssignmentManager will be 048 * replaced with a Procedure version) by using ProcedureYieldException, 049 * and the queue will handle waiting and scheduling based on events. 050 */ 051@InterfaceAudience.Private 052@InterfaceStability.Evolving 053public final class ProcedureSyncWait { 054 private static final Logger LOG = LoggerFactory.getLogger(ProcedureSyncWait.class); 055 056 private ProcedureSyncWait() {} 057 058 @InterfaceAudience.Private 059 public interface Predicate<T> { 060 T evaluate() throws IOException; 061 } 062 063 private static class ProcedureFuture implements Future<byte[]> { 064 private final ProcedureExecutor<MasterProcedureEnv> procExec; 065 private final Procedure<?> proc; 066 067 private boolean hasResult = false; 068 private byte[] result = null; 069 070 public ProcedureFuture(ProcedureExecutor<MasterProcedureEnv> procExec, Procedure<?> proc) { 071 this.procExec = procExec; 072 this.proc = proc; 073 } 074 075 @Override 076 public boolean cancel(boolean mayInterruptIfRunning) { return false; } 077 078 @Override 079 public boolean isCancelled() { return false; } 080 081 @Override 082 public boolean isDone() { return hasResult; } 083 084 @Override 085 public byte[] get() throws InterruptedException, ExecutionException { 086 if (hasResult) return result; 087 try { 088 return waitForProcedureToComplete(procExec, proc, Long.MAX_VALUE); 089 } catch (Exception e) { 090 throw new ExecutionException(e); 091 } 092 } 093 094 @Override 095 public byte[] get(long timeout, TimeUnit unit) 096 throws InterruptedException, ExecutionException, TimeoutException { 097 if (hasResult) return result; 098 try { 099 result = waitForProcedureToComplete(procExec, proc, unit.toMillis(timeout)); 100 hasResult = true; 101 return result; 102 } catch (TimeoutIOException e) { 103 throw new TimeoutException(e.getMessage()); 104 } catch (Exception e) { 105 throw new ExecutionException(e); 106 } 107 } 108 } 109 110 public static Future<byte[]> submitProcedure(final ProcedureExecutor<MasterProcedureEnv> procExec, 111 final Procedure<MasterProcedureEnv> proc) { 112 if (proc.isInitializing()) { 113 procExec.submitProcedure(proc); 114 } 115 return new ProcedureFuture(procExec, proc); 116 } 117 118 public static byte[] submitAndWaitProcedure(ProcedureExecutor<MasterProcedureEnv> procExec, 119 final Procedure<MasterProcedureEnv> proc) throws IOException { 120 if (proc.isInitializing()) { 121 procExec.submitProcedure(proc); 122 } 123 return waitForProcedureToCompleteIOE(procExec, proc, Long.MAX_VALUE); 124 } 125 126 public static byte[] waitForProcedureToCompleteIOE( 127 final ProcedureExecutor<MasterProcedureEnv> procExec, 128 final Procedure<?> proc, final long timeout) 129 throws IOException { 130 try { 131 return waitForProcedureToComplete(procExec, proc, timeout); 132 } catch (IOException e) { 133 throw e; 134 } catch (Exception e) { 135 throw new IOException(e); 136 } 137 } 138 139 public static byte[] waitForProcedureToComplete( 140 final ProcedureExecutor<MasterProcedureEnv> procExec, final Procedure<?> proc, 141 final long timeout) throws IOException { 142 waitFor(procExec.getEnvironment(), "pid=" + proc.getProcId(), 143 new ProcedureSyncWait.Predicate<Boolean>() { 144 @Override 145 public Boolean evaluate() throws IOException { 146 if (!procExec.isRunning()) { 147 return true; 148 } 149 ProcedureState state = proc.getState(); 150 if (state == ProcedureState.INITIALIZING || state == ProcedureState.RUNNABLE) { 151 // under these states the procedure may have not been added to procExec yet, so do not 152 // use isFinished to test whether it is finished, as this method will just check if the 153 // procedure is in the running procedure list 154 return false; 155 } 156 return procExec.isFinished(proc.getProcId()); 157 } 158 }); 159 if (!procExec.isRunning()) { 160 throw new IOException("The Master is Aborting"); 161 } 162 163 // If the procedure fails, we should always have an exception captured. Throw it. 164 // Needs to be an IOE to get out of here. 165 if (proc.hasException()) { 166 throw MasterProcedureUtil.unwrapRemoteIOException(proc); 167 } else { 168 return proc.getResult(); 169 } 170 } 171 172 public static <T> T waitFor(MasterProcedureEnv env, String purpose, Predicate<T> predicate) 173 throws IOException { 174 final Configuration conf = env.getMasterConfiguration(); 175 final long waitTime = conf.getLong("hbase.master.wait.on.region", 5 * 60 * 1000); 176 final long waitingTimeForEvents = conf.getInt("hbase.master.event.waiting.time", 1000); 177 return waitFor(env, waitTime, waitingTimeForEvents, purpose, predicate); 178 } 179 180 public static <T> T waitFor(MasterProcedureEnv env, long waitTime, long waitingTimeForEvents, 181 String purpose, Predicate<T> predicate) throws IOException { 182 final long done = EnvironmentEdgeManager.currentTime() + waitTime; 183 boolean logged = false; 184 do { 185 T result = predicate.evaluate(); 186 if (result != null && !result.equals(Boolean.FALSE)) { 187 return result; 188 } 189 try { 190 Thread.sleep(waitingTimeForEvents); 191 } catch (InterruptedException e) { 192 LOG.warn("Interrupted while sleeping, waiting on " + purpose); 193 throw (InterruptedIOException)new InterruptedIOException().initCause(e); 194 } 195 if (LOG.isTraceEnabled()) { 196 LOG.trace("waitFor " + purpose); 197 } else { 198 if (!logged) LOG.debug("waitFor " + purpose); 199 } 200 logged = true; 201 } while (EnvironmentEdgeManager.currentTime() < done && env.isRunning()); 202 203 throw new TimeoutIOException("Timed out while waiting on " + purpose); 204 } 205 206 protected static void waitMetaRegions(final MasterProcedureEnv env) throws IOException { 207 int timeout = env.getMasterConfiguration().getInt("hbase.client.catalog.timeout", 10000); 208 try { 209 if (env.getMasterServices().getMetaTableLocator().waitMetaRegionLocation( 210 env.getMasterServices().getZooKeeper(), timeout) == null) { 211 throw new NotAllMetaRegionsOnlineException(); 212 } 213 } catch (InterruptedException e) { 214 throw (InterruptedIOException)new InterruptedIOException().initCause(e); 215 } 216 } 217 218 protected static void waitRegionInTransition(final MasterProcedureEnv env, 219 final List<RegionInfo> regions) throws IOException { 220 final RegionStates states = env.getAssignmentManager().getRegionStates(); 221 for (final RegionInfo region : regions) { 222 ProcedureSyncWait.waitFor(env, "regions " + region.getRegionNameAsString() + " in transition", 223 new ProcedureSyncWait.Predicate<Boolean>() { 224 @Override 225 public Boolean evaluate() throws IOException { 226 return !states.isRegionInTransition(region); 227 } 228 }); 229 } 230 } 231 232 protected static MasterQuotaManager getMasterQuotaManager(final MasterProcedureEnv env) 233 throws IOException { 234 return ProcedureSyncWait.waitFor(env, "quota manager to be available", 235 new ProcedureSyncWait.Predicate<MasterQuotaManager>() { 236 @Override 237 public MasterQuotaManager evaluate() throws IOException { 238 return env.getMasterServices().getMasterQuotaManager(); 239 } 240 }); 241 } 242}