001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.master.procedure;
020
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.util.List;
024import java.util.concurrent.ExecutionException;
025import java.util.concurrent.Future;
026import java.util.concurrent.TimeUnit;
027import java.util.concurrent.TimeoutException;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
030import org.apache.hadoop.hbase.client.RegionInfo;
031import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
032import org.apache.hadoop.hbase.master.assignment.RegionStates;
033import org.apache.hadoop.hbase.procedure2.Procedure;
034import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
035import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
036import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
037import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.apache.yetus.audience.InterfaceStability;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
044
045/**
046 * Helper to synchronously wait on conditions.
047 * This will be removed in the future (mainly when the AssignmentManager will be
048 * replaced with a Procedure version) by using ProcedureYieldException,
049 * and the queue will handle waiting and scheduling based on events.
050 */
051@InterfaceAudience.Private
052@InterfaceStability.Evolving
053public final class ProcedureSyncWait {
054  private static final Logger LOG = LoggerFactory.getLogger(ProcedureSyncWait.class);
055
056  private ProcedureSyncWait() {}
057
058  @InterfaceAudience.Private
059  public interface Predicate<T> {
060    T evaluate() throws IOException;
061  }
062
063  private static class ProcedureFuture implements Future<byte[]> {
064    private final ProcedureExecutor<MasterProcedureEnv> procExec;
065    private final Procedure<?> proc;
066
067    private boolean hasResult = false;
068    private byte[] result = null;
069
070    public ProcedureFuture(ProcedureExecutor<MasterProcedureEnv> procExec, Procedure<?> proc) {
071      this.procExec = procExec;
072      this.proc = proc;
073    }
074
075    @Override
076    public boolean cancel(boolean mayInterruptIfRunning) {
077      return false;
078    }
079
080    @Override
081    public boolean isCancelled() {
082      return false;
083    }
084
085    @Override
086    public boolean isDone() {
087      return hasResult;
088    }
089
090    @Override
091    public byte[] get() throws InterruptedException, ExecutionException {
092      if (hasResult) {
093        return result;
094      }
095      try {
096        return waitForProcedureToComplete(procExec, proc, Long.MAX_VALUE);
097      } catch (Exception e) {
098        throw new ExecutionException(e);
099      }
100    }
101
102    @Override
103    public byte[] get(long timeout, TimeUnit unit)
104        throws InterruptedException, ExecutionException, TimeoutException {
105      if (hasResult) {
106        return result;
107      }
108      try {
109        result = waitForProcedureToComplete(procExec, proc, unit.toMillis(timeout));
110        hasResult = true;
111        return result;
112      } catch (TimeoutIOException e) {
113        throw new TimeoutException(e.getMessage());
114      } catch (Exception e) {
115        throw new ExecutionException(e);
116      }
117    }
118  }
119
120  public static Future<byte[]> submitProcedure(final ProcedureExecutor<MasterProcedureEnv> procExec,
121      final Procedure<MasterProcedureEnv> proc) {
122    if (proc.isInitializing()) {
123      procExec.submitProcedure(proc);
124    }
125    return new ProcedureFuture(procExec, proc);
126  }
127
128  public static byte[] submitAndWaitProcedure(ProcedureExecutor<MasterProcedureEnv> procExec,
129      final Procedure<MasterProcedureEnv> proc) throws IOException {
130    if (proc.isInitializing()) {
131      procExec.submitProcedure(proc);
132    }
133    return waitForProcedureToCompleteIOE(procExec, proc, Long.MAX_VALUE);
134  }
135
136  public static byte[] waitForProcedureToCompleteIOE(
137      final ProcedureExecutor<MasterProcedureEnv> procExec, final Procedure<?> proc,
138      final long timeout) throws IOException {
139    try {
140      return waitForProcedureToComplete(procExec, proc, timeout);
141    } catch (IOException e) {
142      throw e;
143    } catch (Exception e) {
144      throw new IOException(e);
145    }
146  }
147
148  public static byte[] waitForProcedureToComplete(
149      final ProcedureExecutor<MasterProcedureEnv> procExec, final Procedure<?> proc,
150      final long timeout) throws IOException {
151    waitFor(procExec.getEnvironment(), timeout, "pid=" + proc.getProcId(),
152      new ProcedureSyncWait.Predicate<Boolean>() {
153        @Override
154        public Boolean evaluate() throws IOException {
155          if (!procExec.isRunning()) {
156            return true;
157          }
158          ProcedureState state = proc.getState();
159          if (state == ProcedureState.INITIALIZING || state == ProcedureState.RUNNABLE) {
160            // under these states the procedure may have not been added to procExec yet, so do not
161            // use isFinished to test whether it is finished, as this method will just check if the
162            // procedure is in the running procedure list
163            return false;
164          }
165          return procExec.isFinished(proc.getProcId());
166        }
167      });
168    if (!procExec.isRunning()) {
169      throw new IOException("The Master is Aborting");
170    }
171
172    // If the procedure fails, we should always have an exception captured. Throw it.
173    // Needs to be an IOE to get out of here.
174    if (proc.hasException()) {
175      throw MasterProcedureUtil.unwrapRemoteIOException(proc);
176    } else {
177      return proc.getResult();
178    }
179  }
180
181  public static <T> T waitFor(MasterProcedureEnv env, String purpose, Predicate<T> predicate)
182      throws IOException {
183    Configuration conf = env.getMasterConfiguration();
184    long waitTime = conf.getLong("hbase.master.wait.on.region", 5 * 60 * 1000);
185    return waitFor(env, waitTime, purpose, predicate);
186  }
187
188  public static <T> T waitFor(MasterProcedureEnv env, long waitTime, String purpose,
189      Predicate<T> predicate) throws IOException {
190    Configuration conf = env.getMasterConfiguration();
191    long waitingTimeForEvents = conf.getInt("hbase.master.event.waiting.time", 1000);
192    return waitFor(env, waitTime, waitingTimeForEvents, purpose, predicate);
193  }
194
195  public static <T> T waitFor(MasterProcedureEnv env, long waitTime, long waitingTimeForEvents,
196      String purpose, Predicate<T> predicate) throws IOException {
197    long done = EnvironmentEdgeManager.currentTime() + waitTime;
198    if (done <= 0) {
199      // long overflow, usually this means we pass Long.MAX_VALUE as waitTime
200      done = Long.MAX_VALUE;
201    }
202    boolean logged = false;
203    do {
204      T result = predicate.evaluate();
205      if (result != null && !result.equals(Boolean.FALSE)) {
206        return result;
207      }
208      try {
209        Thread.sleep(waitingTimeForEvents);
210      } catch (InterruptedException e) {
211        LOG.warn("Interrupted while sleeping, waiting on " + purpose);
212        throw (InterruptedIOException)new InterruptedIOException().initCause(e);
213      }
214      if (LOG.isTraceEnabled()) {
215        LOG.trace("waitFor " + purpose);
216      } else {
217        if (!logged) LOG.debug("waitFor " + purpose);
218      }
219      logged = true;
220    } while (EnvironmentEdgeManager.currentTime() < done && env.isRunning());
221
222    throw new TimeoutIOException("Timed out while waiting on " + purpose);
223  }
224
225  protected static void waitMetaRegions(final MasterProcedureEnv env) throws IOException {
226    int timeout = env.getMasterConfiguration().getInt("hbase.client.catalog.timeout", 10000);
227    try {
228      if (MetaTableLocator.waitMetaRegionLocation(env.getMasterServices().getZooKeeper(),
229        timeout) == null) {
230        throw new NotAllMetaRegionsOnlineException();
231      }
232    } catch (InterruptedException e) {
233      throw (InterruptedIOException) new InterruptedIOException().initCause(e);
234    }
235  }
236
237  protected static void waitRegionInTransition(final MasterProcedureEnv env,
238      final List<RegionInfo> regions) throws IOException {
239    final RegionStates states = env.getAssignmentManager().getRegionStates();
240    for (final RegionInfo region : regions) {
241      ProcedureSyncWait.waitFor(env, "regions " + region.getRegionNameAsString() + " in transition",
242          new ProcedureSyncWait.Predicate<Boolean>() {
243        @Override
244        public Boolean evaluate() throws IOException {
245          return !states.isRegionInTransition(region);
246        }
247      });
248    }
249  }
250
251  protected static MasterQuotaManager getMasterQuotaManager(final MasterProcedureEnv env)
252      throws IOException {
253    return ProcedureSyncWait.waitFor(env, "quota manager to be available",
254        new ProcedureSyncWait.Predicate<MasterQuotaManager>() {
255      @Override
256      public MasterQuotaManager evaluate() throws IOException {
257        return env.getMasterServices().getMasterQuotaManager();
258      }
259    });
260  }
261}