001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.List; 023import java.util.concurrent.ExecutionException; 024import java.util.concurrent.Future; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.TimeoutException; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException; 030import org.apache.hadoop.hbase.client.RegionInfo; 031import org.apache.hadoop.hbase.client.RegionInfoBuilder; 032import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 033import org.apache.hadoop.hbase.master.RegionState; 034import org.apache.hadoop.hbase.master.assignment.RegionStateNode; 035import org.apache.hadoop.hbase.master.assignment.RegionStates; 036import org.apache.hadoop.hbase.procedure2.Procedure; 037import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 038import org.apache.hadoop.hbase.quotas.MasterQuotaManager; 039import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 040import org.apache.yetus.audience.InterfaceAudience; 041import org.apache.yetus.audience.InterfaceStability; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; 046 047/** 048 * Helper to synchronously wait on conditions. This will be removed in the future (mainly when the 049 * AssignmentManager will be replaced with a Procedure version) by using ProcedureYieldException, 050 * and the queue will handle waiting and scheduling based on events. 051 */ 052@InterfaceAudience.Private 053@InterfaceStability.Evolving 054public final class ProcedureSyncWait { 055 private static final Logger LOG = LoggerFactory.getLogger(ProcedureSyncWait.class); 056 057 private ProcedureSyncWait() { 058 } 059 060 @InterfaceAudience.Private 061 public interface Predicate<T> { 062 T evaluate() throws IOException; 063 } 064 065 private static class ProcedureFuture implements Future<byte[]> { 066 private final ProcedureExecutor<MasterProcedureEnv> procExec; 067 private final Procedure<?> proc; 068 069 private boolean hasResult = false; 070 private byte[] result = null; 071 072 public ProcedureFuture(ProcedureExecutor<MasterProcedureEnv> procExec, Procedure<?> proc) { 073 this.procExec = procExec; 074 this.proc = proc; 075 } 076 077 @Override 078 public boolean cancel(boolean mayInterruptIfRunning) { 079 return false; 080 } 081 082 @Override 083 public boolean isCancelled() { 084 return false; 085 } 086 087 @Override 088 public boolean isDone() { 089 return hasResult; 090 } 091 092 @Override 093 public byte[] get() throws InterruptedException, ExecutionException { 094 if (hasResult) { 095 return result; 096 } 097 try { 098 return waitForProcedureToComplete(procExec, proc, Long.MAX_VALUE); 099 } catch (Exception e) { 100 throw new ExecutionException(e); 101 } 102 } 103 104 @Override 105 public byte[] get(long timeout, TimeUnit unit) 106 throws InterruptedException, ExecutionException, TimeoutException { 107 if (hasResult) { 108 return result; 109 } 110 try { 111 result = waitForProcedureToComplete(procExec, proc, unit.toMillis(timeout)); 112 hasResult = true; 113 return result; 114 } catch (TimeoutIOException e) { 115 throw new TimeoutException(e.getMessage()); 116 } catch (Exception e) { 117 throw new ExecutionException(e); 118 } 119 } 120 } 121 122 public static Future<byte[]> submitProcedure(final ProcedureExecutor<MasterProcedureEnv> procExec, 123 final Procedure<MasterProcedureEnv> proc) { 124 if (proc.isInitializing()) { 125 procExec.submitProcedure(proc); 126 } 127 return new ProcedureFuture(procExec, proc); 128 } 129 130 public static byte[] submitAndWaitProcedure(ProcedureExecutor<MasterProcedureEnv> procExec, 131 final Procedure<MasterProcedureEnv> proc) throws IOException { 132 if (proc.isInitializing()) { 133 procExec.submitProcedure(proc); 134 } 135 return waitForProcedureToCompleteIOE(procExec, proc, Long.MAX_VALUE); 136 } 137 138 public static byte[] waitForProcedureToCompleteIOE( 139 final ProcedureExecutor<MasterProcedureEnv> procExec, final Procedure<?> proc, 140 final long timeout) throws IOException { 141 try { 142 return waitForProcedureToComplete(procExec, proc, timeout); 143 } catch (IOException e) { 144 throw e; 145 } catch (Exception e) { 146 throw new IOException(e); 147 } 148 } 149 150 public static byte[] waitForProcedureToComplete( 151 final ProcedureExecutor<MasterProcedureEnv> procExec, final Procedure<?> proc, 152 final long timeout) throws IOException { 153 waitFor(procExec.getEnvironment(), timeout, "pid=" + proc.getProcId(), 154 new ProcedureSyncWait.Predicate<Boolean>() { 155 @Override 156 public Boolean evaluate() throws IOException { 157 if (!procExec.isRunning()) { 158 return true; 159 } 160 ProcedureState state = proc.getState(); 161 if (state == ProcedureState.INITIALIZING || state == ProcedureState.RUNNABLE) { 162 // under these states the procedure may have not been added to procExec yet, so do not 163 // use isFinished to test whether it is finished, as this method will just check if the 164 // procedure is in the running procedure list 165 return false; 166 } 167 return procExec.isFinished(proc.getProcId()); 168 } 169 }); 170 if (!procExec.isRunning()) { 171 throw new IOException("The Master is Aborting"); 172 } 173 174 // If the procedure fails, we should always have an exception captured. Throw it. 175 // Needs to be an IOE to get out of here. 176 if (proc.hasException()) { 177 throw MasterProcedureUtil.unwrapRemoteIOException(proc); 178 } else { 179 return proc.getResult(); 180 } 181 } 182 183 public static <T> T waitFor(MasterProcedureEnv env, String purpose, Predicate<T> predicate) 184 throws IOException { 185 Configuration conf = env.getMasterConfiguration(); 186 long waitTime = conf.getLong("hbase.master.wait.on.region", 5 * 60 * 1000); 187 return waitFor(env, waitTime, purpose, predicate); 188 } 189 190 public static <T> T waitFor(MasterProcedureEnv env, long waitTime, String purpose, 191 Predicate<T> predicate) throws IOException { 192 Configuration conf = env.getMasterConfiguration(); 193 long waitingTimeForEvents = conf.getInt("hbase.master.event.waiting.time", 1000); 194 return waitFor(env, waitTime, waitingTimeForEvents, purpose, predicate); 195 } 196 197 public static <T> T waitFor(MasterProcedureEnv env, long waitTime, long waitingTimeForEvents, 198 String purpose, Predicate<T> predicate) throws IOException { 199 long done = EnvironmentEdgeManager.currentTime() + waitTime; 200 if (done <= 0) { 201 // long overflow, usually this means we pass Long.MAX_VALUE as waitTime 202 done = Long.MAX_VALUE; 203 } 204 boolean logged = false; 205 do { 206 T result = predicate.evaluate(); 207 if (result != null && !result.equals(Boolean.FALSE)) { 208 return result; 209 } 210 try { 211 Thread.sleep(waitingTimeForEvents); 212 } catch (InterruptedException e) { 213 LOG.warn("Interrupted while sleeping, waiting on " + purpose); 214 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 215 } 216 if (LOG.isTraceEnabled()) { 217 LOG.trace("waitFor " + purpose); 218 } else { 219 if (!logged) LOG.debug("waitFor " + purpose); 220 } 221 logged = true; 222 } while (EnvironmentEdgeManager.currentTime() < done && env.isRunning()); 223 224 throw new TimeoutIOException("Timed out while waiting on " + purpose); 225 } 226 227 protected static void waitMetaRegions(final MasterProcedureEnv env) throws IOException { 228 int timeout = env.getMasterConfiguration().getInt("hbase.client.catalog.timeout", 10000); 229 try { 230 long start = EnvironmentEdgeManager.currentTime(); 231 for (;;) { 232 RegionStateNode rsn = env.getAssignmentManager().getRegionStates() 233 .getRegionStateNode(RegionInfoBuilder.FIRST_META_REGIONINFO); 234 if (rsn != null && rsn.isInState(RegionState.State.OPEN)) { 235 return; 236 } 237 if (EnvironmentEdgeManager.currentTime() - start >= timeout) { 238 throw new NotAllMetaRegionsOnlineException(); 239 } 240 Thread.sleep(HConstants.SOCKET_RETRY_WAIT_MS); 241 } 242 } catch (InterruptedException e) { 243 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 244 } 245 } 246 247 protected static void waitRegionInTransition(final MasterProcedureEnv env, 248 final List<RegionInfo> regions) throws IOException { 249 final RegionStates states = env.getAssignmentManager().getRegionStates(); 250 for (final RegionInfo region : regions) { 251 ProcedureSyncWait.waitFor(env, "regions " + region.getRegionNameAsString() + " in transition", 252 new ProcedureSyncWait.Predicate<Boolean>() { 253 @Override 254 public Boolean evaluate() throws IOException { 255 return !states.isRegionInTransition(region); 256 } 257 }); 258 } 259 } 260 261 protected static MasterQuotaManager getMasterQuotaManager(final MasterProcedureEnv env) 262 throws IOException { 263 return ProcedureSyncWait.waitFor(env, "quota manager to be available", 264 new ProcedureSyncWait.Predicate<MasterQuotaManager>() { 265 @Override 266 public MasterQuotaManager evaluate() throws IOException { 267 return env.getMasterServices().getMasterQuotaManager(); 268 } 269 }); 270 } 271}