001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.List;
023import java.util.concurrent.ExecutionException;
024import java.util.concurrent.Future;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.TimeoutException;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
030import org.apache.hadoop.hbase.client.RegionInfo;
031import org.apache.hadoop.hbase.client.RegionInfoBuilder;
032import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
033import org.apache.hadoop.hbase.master.RegionState;
034import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
035import org.apache.hadoop.hbase.master.assignment.RegionStates;
036import org.apache.hadoop.hbase.procedure2.Procedure;
037import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
038import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
039import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.apache.yetus.audience.InterfaceStability;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
046
047/**
048 * Helper to synchronously wait on conditions. This will be removed in the future (mainly when the
049 * AssignmentManager will be replaced with a Procedure version) by using ProcedureYieldException,
050 * and the queue will handle waiting and scheduling based on events.
051 */
052@InterfaceAudience.Private
053@InterfaceStability.Evolving
054public final class ProcedureSyncWait {
055  private static final Logger LOG = LoggerFactory.getLogger(ProcedureSyncWait.class);
056
057  private ProcedureSyncWait() {
058  }
059
060  @InterfaceAudience.Private
061  public interface Predicate<T> {
062    T evaluate() throws IOException;
063  }
064
065  private static class ProcedureFuture implements Future<byte[]> {
066    private final ProcedureExecutor<MasterProcedureEnv> procExec;
067    private final Procedure<?> proc;
068
069    private boolean hasResult = false;
070    private byte[] result = null;
071
072    public ProcedureFuture(ProcedureExecutor<MasterProcedureEnv> procExec, Procedure<?> proc) {
073      this.procExec = procExec;
074      this.proc = proc;
075    }
076
077    @Override
078    public boolean cancel(boolean mayInterruptIfRunning) {
079      return false;
080    }
081
082    @Override
083    public boolean isCancelled() {
084      return false;
085    }
086
087    @Override
088    public boolean isDone() {
089      return hasResult;
090    }
091
092    @Override
093    public byte[] get() throws InterruptedException, ExecutionException {
094      if (hasResult) {
095        return result;
096      }
097      try {
098        return waitForProcedureToComplete(procExec, proc, Long.MAX_VALUE);
099      } catch (Exception e) {
100        throw new ExecutionException(e);
101      }
102    }
103
104    @Override
105    public byte[] get(long timeout, TimeUnit unit)
106      throws InterruptedException, ExecutionException, TimeoutException {
107      if (hasResult) {
108        return result;
109      }
110      try {
111        result = waitForProcedureToComplete(procExec, proc, unit.toMillis(timeout));
112        hasResult = true;
113        return result;
114      } catch (TimeoutIOException e) {
115        throw new TimeoutException(e.getMessage());
116      } catch (Exception e) {
117        throw new ExecutionException(e);
118      }
119    }
120  }
121
122  public static Future<byte[]> submitProcedure(final ProcedureExecutor<MasterProcedureEnv> procExec,
123    final Procedure<MasterProcedureEnv> proc) {
124    if (proc.isInitializing()) {
125      procExec.submitProcedure(proc);
126    }
127    return new ProcedureFuture(procExec, proc);
128  }
129
130  public static byte[] submitAndWaitProcedure(ProcedureExecutor<MasterProcedureEnv> procExec,
131    final Procedure<MasterProcedureEnv> proc) throws IOException {
132    if (proc.isInitializing()) {
133      procExec.submitProcedure(proc);
134    }
135    return waitForProcedureToCompleteIOE(procExec, proc, Long.MAX_VALUE);
136  }
137
138  public static byte[] waitForProcedureToCompleteIOE(
139    final ProcedureExecutor<MasterProcedureEnv> procExec, final Procedure<?> proc,
140    final long timeout) throws IOException {
141    try {
142      return waitForProcedureToComplete(procExec, proc, timeout);
143    } catch (IOException e) {
144      throw e;
145    } catch (Exception e) {
146      throw new IOException(e);
147    }
148  }
149
150  public static byte[] waitForProcedureToComplete(
151    final ProcedureExecutor<MasterProcedureEnv> procExec, final Procedure<?> proc,
152    final long timeout) throws IOException {
153    waitFor(procExec.getEnvironment(), timeout, "pid=" + proc.getProcId(),
154      new ProcedureSyncWait.Predicate<Boolean>() {
155        @Override
156        public Boolean evaluate() throws IOException {
157          if (!procExec.isRunning()) {
158            return true;
159          }
160          ProcedureState state = proc.getState();
161          if (state == ProcedureState.INITIALIZING || state == ProcedureState.RUNNABLE) {
162            // under these states the procedure may have not been added to procExec yet, so do not
163            // use isFinished to test whether it is finished, as this method will just check if the
164            // procedure is in the running procedure list
165            return false;
166          }
167          return procExec.isFinished(proc.getProcId());
168        }
169      });
170    if (!procExec.isRunning()) {
171      throw new IOException("The Master is Aborting");
172    }
173
174    // If the procedure fails, we should always have an exception captured. Throw it.
175    // Needs to be an IOE to get out of here.
176    if (proc.hasException()) {
177      throw MasterProcedureUtil.unwrapRemoteIOException(proc);
178    } else {
179      return proc.getResult();
180    }
181  }
182
183  public static <T> T waitFor(MasterProcedureEnv env, String purpose, Predicate<T> predicate)
184    throws IOException {
185    Configuration conf = env.getMasterConfiguration();
186    long waitTime = conf.getLong("hbase.master.wait.on.region", 5 * 60 * 1000);
187    return waitFor(env, waitTime, purpose, predicate);
188  }
189
190  public static <T> T waitFor(MasterProcedureEnv env, long waitTime, String purpose,
191    Predicate<T> predicate) throws IOException {
192    Configuration conf = env.getMasterConfiguration();
193    long waitingTimeForEvents = conf.getInt("hbase.master.event.waiting.time", 1000);
194    return waitFor(env, waitTime, waitingTimeForEvents, purpose, predicate);
195  }
196
197  public static <T> T waitFor(MasterProcedureEnv env, long waitTime, long waitingTimeForEvents,
198    String purpose, Predicate<T> predicate) throws IOException {
199    long done = EnvironmentEdgeManager.currentTime() + waitTime;
200    if (done <= 0) {
201      // long overflow, usually this means we pass Long.MAX_VALUE as waitTime
202      done = Long.MAX_VALUE;
203    }
204    boolean logged = false;
205    do {
206      T result = predicate.evaluate();
207      if (result != null && !result.equals(Boolean.FALSE)) {
208        return result;
209      }
210      try {
211        Thread.sleep(waitingTimeForEvents);
212      } catch (InterruptedException e) {
213        LOG.warn("Interrupted while sleeping, waiting on " + purpose);
214        throw (InterruptedIOException) new InterruptedIOException().initCause(e);
215      }
216      if (LOG.isTraceEnabled()) {
217        LOG.trace("waitFor " + purpose);
218      } else {
219        if (!logged) LOG.debug("waitFor " + purpose);
220      }
221      logged = true;
222    } while (EnvironmentEdgeManager.currentTime() < done && env.isRunning());
223
224    throw new TimeoutIOException("Timed out while waiting on " + purpose);
225  }
226
227  protected static void waitMetaRegions(final MasterProcedureEnv env) throws IOException {
228    int timeout = env.getMasterConfiguration().getInt("hbase.client.catalog.timeout", 10000);
229    try {
230      long start = EnvironmentEdgeManager.currentTime();
231      for (;;) {
232        RegionStateNode rsn = env.getAssignmentManager().getRegionStates()
233          .getRegionStateNode(RegionInfoBuilder.FIRST_META_REGIONINFO);
234        if (rsn != null && rsn.isInState(RegionState.State.OPEN)) {
235          return;
236        }
237        if (EnvironmentEdgeManager.currentTime() - start >= timeout) {
238          throw new NotAllMetaRegionsOnlineException();
239        }
240        Thread.sleep(HConstants.SOCKET_RETRY_WAIT_MS);
241      }
242    } catch (InterruptedException e) {
243      throw (InterruptedIOException) new InterruptedIOException().initCause(e);
244    }
245  }
246
247  protected static void waitRegionInTransition(final MasterProcedureEnv env,
248    final List<RegionInfo> regions) throws IOException {
249    final RegionStates states = env.getAssignmentManager().getRegionStates();
250    for (final RegionInfo region : regions) {
251      ProcedureSyncWait.waitFor(env, "regions " + region.getRegionNameAsString() + " in transition",
252        new ProcedureSyncWait.Predicate<Boolean>() {
253          @Override
254          public Boolean evaluate() throws IOException {
255            return !states.isRegionInTransition(region);
256          }
257        });
258    }
259  }
260
261  protected static MasterQuotaManager getMasterQuotaManager(final MasterProcedureEnv env)
262    throws IOException {
263    return ProcedureSyncWait.waitFor(env, "quota manager to be available",
264      new ProcedureSyncWait.Predicate<MasterQuotaManager>() {
265        @Override
266        public MasterQuotaManager evaluate() throws IOException {
267          return env.getMasterServices().getMasterQuotaManager();
268        }
269      });
270  }
271}