001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.master.procedure;
020
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.util.List;
024import java.util.concurrent.ExecutionException;
025import java.util.concurrent.Future;
026import java.util.concurrent.TimeUnit;
027import java.util.concurrent.TimeoutException;
028
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
031import org.apache.hadoop.hbase.client.RegionInfo;
032import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
033import org.apache.hadoop.hbase.master.assignment.RegionStates;
034import org.apache.hadoop.hbase.procedure2.Procedure;
035import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
036import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.apache.yetus.audience.InterfaceStability;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
044
045/**
046 * Helper to synchronously wait on conditions.
047 * This will be removed in the future (mainly when the AssignmentManager will be
048 * replaced with a Procedure version) by using ProcedureYieldException,
049 * and the queue will handle waiting and scheduling based on events.
050 */
051@InterfaceAudience.Private
052@InterfaceStability.Evolving
053public final class ProcedureSyncWait {
054  private static final Logger LOG = LoggerFactory.getLogger(ProcedureSyncWait.class);
055
056  private ProcedureSyncWait() {}
057
058  @InterfaceAudience.Private
059  public interface Predicate<T> {
060    T evaluate() throws IOException;
061  }
062
063  private static class ProcedureFuture implements Future<byte[]> {
064      private final ProcedureExecutor<MasterProcedureEnv> procExec;
065      private final Procedure<?> proc;
066
067      private boolean hasResult = false;
068      private byte[] result = null;
069
070      public ProcedureFuture(ProcedureExecutor<MasterProcedureEnv> procExec, Procedure<?> proc) {
071        this.procExec = procExec;
072        this.proc = proc;
073      }
074
075      @Override
076      public boolean cancel(boolean mayInterruptIfRunning) { return false; }
077
078      @Override
079      public boolean isCancelled() { return false; }
080
081      @Override
082      public boolean isDone() { return hasResult; }
083
084      @Override
085      public byte[] get() throws InterruptedException, ExecutionException {
086        if (hasResult) return result;
087        try {
088          return waitForProcedureToComplete(procExec, proc, Long.MAX_VALUE);
089        } catch (Exception e) {
090          throw new ExecutionException(e);
091        }
092      }
093
094      @Override
095      public byte[] get(long timeout, TimeUnit unit)
096          throws InterruptedException, ExecutionException, TimeoutException {
097        if (hasResult) return result;
098        try {
099          result = waitForProcedureToComplete(procExec, proc, unit.toMillis(timeout));
100          hasResult = true;
101          return result;
102        } catch (TimeoutIOException e) {
103          throw new TimeoutException(e.getMessage());
104        } catch (Exception e) {
105          throw new ExecutionException(e);
106        }
107      }
108    }
109
110  public static Future<byte[]> submitProcedure(final ProcedureExecutor<MasterProcedureEnv> procExec,
111      final Procedure<MasterProcedureEnv> proc) {
112    if (proc.isInitializing()) {
113      procExec.submitProcedure(proc);
114    }
115    return new ProcedureFuture(procExec, proc);
116  }
117
118  public static byte[] submitAndWaitProcedure(ProcedureExecutor<MasterProcedureEnv> procExec,
119      final Procedure<MasterProcedureEnv> proc) throws IOException {
120    if (proc.isInitializing()) {
121      procExec.submitProcedure(proc);
122    }
123    return waitForProcedureToCompleteIOE(procExec, proc, Long.MAX_VALUE);
124  }
125
126  public static byte[] waitForProcedureToCompleteIOE(
127      final ProcedureExecutor<MasterProcedureEnv> procExec,
128      final Procedure<?> proc, final long timeout)
129  throws IOException {
130    try {
131      return waitForProcedureToComplete(procExec, proc, timeout);
132    } catch (IOException e) {
133      throw e;
134    } catch (Exception e) {
135      throw new IOException(e);
136    }
137  }
138
139  public static byte[] waitForProcedureToComplete(
140      final ProcedureExecutor<MasterProcedureEnv> procExec, final Procedure<?> proc,
141      final long timeout) throws IOException {
142    waitFor(procExec.getEnvironment(), "pid=" + proc.getProcId(),
143      new ProcedureSyncWait.Predicate<Boolean>() {
144        @Override
145        public Boolean evaluate() throws IOException {
146          if (!procExec.isRunning()) {
147            return true;
148          }
149          ProcedureState state = proc.getState();
150          if (state == ProcedureState.INITIALIZING || state == ProcedureState.RUNNABLE) {
151            // under these states the procedure may have not been added to procExec yet, so do not
152            // use isFinished to test whether it is finished, as this method will just check if the
153            // procedure is in the running procedure list
154            return false;
155          }
156          return procExec.isFinished(proc.getProcId());
157        }
158      });
159    if (!procExec.isRunning()) {
160      throw new IOException("The Master is Aborting");
161    }
162
163    // If the procedure fails, we should always have an exception captured. Throw it.
164    // Needs to be an IOE to get out of here.
165    if (proc.hasException()) {
166      throw MasterProcedureUtil.unwrapRemoteIOException(proc);
167    } else {
168      return proc.getResult();
169    }
170  }
171
172  public static <T> T waitFor(MasterProcedureEnv env, String purpose, Predicate<T> predicate)
173      throws IOException {
174    final Configuration conf = env.getMasterConfiguration();
175    final long waitTime = conf.getLong("hbase.master.wait.on.region", 5 * 60 * 1000);
176    final long waitingTimeForEvents = conf.getInt("hbase.master.event.waiting.time", 1000);
177    return waitFor(env, waitTime, waitingTimeForEvents, purpose, predicate);
178  }
179
180  public static <T> T waitFor(MasterProcedureEnv env, long waitTime, long waitingTimeForEvents,
181      String purpose, Predicate<T> predicate) throws IOException {
182    final long done = EnvironmentEdgeManager.currentTime() + waitTime;
183    boolean logged = false;
184    do {
185      T result = predicate.evaluate();
186      if (result != null && !result.equals(Boolean.FALSE)) {
187        return result;
188      }
189      try {
190        Thread.sleep(waitingTimeForEvents);
191      } catch (InterruptedException e) {
192        LOG.warn("Interrupted while sleeping, waiting on " + purpose);
193        throw (InterruptedIOException)new InterruptedIOException().initCause(e);
194      }
195      if (LOG.isTraceEnabled()) {
196        LOG.trace("waitFor " + purpose);
197      } else {
198        if (!logged) LOG.debug("waitFor " + purpose);
199      }
200      logged = true;
201    } while (EnvironmentEdgeManager.currentTime() < done && env.isRunning());
202
203    throw new TimeoutIOException("Timed out while waiting on " + purpose);
204  }
205
206  protected static void waitMetaRegions(final MasterProcedureEnv env) throws IOException {
207    int timeout = env.getMasterConfiguration().getInt("hbase.client.catalog.timeout", 10000);
208    try {
209      if (env.getMasterServices().getMetaTableLocator().waitMetaRegionLocation(
210            env.getMasterServices().getZooKeeper(), timeout) == null) {
211        throw new NotAllMetaRegionsOnlineException();
212      }
213    } catch (InterruptedException e) {
214      throw (InterruptedIOException)new InterruptedIOException().initCause(e);
215    }
216  }
217
218  protected static void waitRegionInTransition(final MasterProcedureEnv env,
219      final List<RegionInfo> regions) throws IOException {
220    final RegionStates states = env.getAssignmentManager().getRegionStates();
221    for (final RegionInfo region : regions) {
222      ProcedureSyncWait.waitFor(env, "regions " + region.getRegionNameAsString() + " in transition",
223          new ProcedureSyncWait.Predicate<Boolean>() {
224        @Override
225        public Boolean evaluate() throws IOException {
226          return !states.isRegionInTransition(region);
227        }
228      });
229    }
230  }
231
232  protected static MasterQuotaManager getMasterQuotaManager(final MasterProcedureEnv env)
233      throws IOException {
234    return ProcedureSyncWait.waitFor(env, "quota manager to be available",
235        new ProcedureSyncWait.Predicate<MasterQuotaManager>() {
236      @Override
237      public MasterQuotaManager evaluate() throws IOException {
238        return env.getMasterServices().getMasterQuotaManager();
239      }
240    });
241  }
242}