View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.master.procedure;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.util.List;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.CoordinatedStateException;
29  import org.apache.hadoop.hbase.HRegionInfo;
30  import org.apache.hadoop.hbase.MetaTableAccessor;
31  import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
32  import org.apache.hadoop.hbase.ProcedureInfo;
33  import org.apache.hadoop.hbase.ServerName;
34  import org.apache.hadoop.hbase.TableName;
35  import org.apache.hadoop.hbase.classification.InterfaceAudience;
36  import org.apache.hadoop.hbase.classification.InterfaceStability;
37  import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
38  import org.apache.hadoop.hbase.master.AssignmentManager;
39  import org.apache.hadoop.hbase.master.RegionStates;
40  import org.apache.hadoop.hbase.master.RegionState.State;
41  import org.apache.hadoop.hbase.master.ServerManager;
42  import org.apache.hadoop.hbase.procedure2.Procedure;
43  import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
44  import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
45  import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
46  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
47  import org.apache.hadoop.hbase.util.Threads;
48  import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
49  
50  /**
51   * Helper to synchronously wait on conditions.
52   * This will be removed in the future (mainly when the AssignmentManager will be
53   * replaced with a Procedure version) by using ProcedureYieldException,
54   * and the queue will handle waiting and scheduling based on events.
55   */
56  @InterfaceAudience.Private
57  @InterfaceStability.Evolving
58  public final class ProcedureSyncWait {
59    private static final Log LOG = LogFactory.getLog(ProcedureSyncWait.class);
60  
61    private ProcedureSyncWait() {}
62  
63    @InterfaceAudience.Private
64    public interface Predicate<T> {
65      T evaluate() throws IOException;
66    }
67  
68    public static byte[] submitAndWaitProcedure(ProcedureExecutor<MasterProcedureEnv> procExec,
69        final Procedure proc) throws IOException {
70      long procId = procExec.submitProcedure(proc);
71      return waitForProcedureToComplete(procExec, procId);
72    }
73  
74    public static byte[] waitForProcedureToComplete(ProcedureExecutor<MasterProcedureEnv> procExec,
75        final long procId) throws IOException {
76      while (!procExec.isFinished(procId) && procExec.isRunning()) {
77        // TODO: add a config to make it tunable
78        // Dev Consideration: are we waiting forever, or we can set up some timeout value?
79        Threads.sleepWithoutInterrupt(250);
80      }
81      ProcedureInfo result = procExec.getResult(procId);
82      if (result != null) {
83        if (result.isFailed()) {
84          // If the procedure fails, we should always have an exception captured. Throw it.
85          throw RemoteProcedureException.fromProto(
86            result.getForeignExceptionMessage()).unwrapRemoteException();
87        }
88        return result.getResult();
89      } else {
90        if (procExec.isRunning()) {
91          throw new IOException("Procedure " + procId + "not found");
92        } else {
93          throw new IOException("The Master is Aborting");
94        }
95      }
96    }
97  
98    public static <T> T waitFor(MasterProcedureEnv env, String purpose, Predicate<T> predicate)
99        throws IOException {
100     final Configuration conf = env.getMasterConfiguration();
101     final long waitTime = conf.getLong("hbase.master.wait.on.region", 5 * 60 * 1000);
102     final long waitingTimeForEvents = conf.getInt("hbase.master.event.waiting.time", 1000);
103     return waitFor(env, waitTime, waitingTimeForEvents, purpose, predicate);
104   }
105 
106   public static <T> T waitFor(MasterProcedureEnv env, long waitTime, long waitingTimeForEvents,
107       String purpose, Predicate<T> predicate) throws IOException {
108     final long done = EnvironmentEdgeManager.currentTime() + waitTime;
109     do {
110       T result = predicate.evaluate();
111       if (result != null && !result.equals(Boolean.FALSE)) {
112         return result;
113       }
114       try {
115         Thread.sleep(waitingTimeForEvents);
116       } catch (InterruptedException e) {
117         LOG.warn("Interrupted while sleeping, waiting on " + purpose);
118         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
119       }
120       LOG.debug("Waiting on " + purpose);
121     } while (EnvironmentEdgeManager.currentTime() < done && env.isRunning());
122 
123     throw new TimeoutIOException("Timed out while waiting on " + purpose);
124   }
125 
126   protected static void waitMetaRegions(final MasterProcedureEnv env) throws IOException {
127     int timeout = env.getMasterConfiguration().getInt("hbase.client.catalog.timeout", 10000);
128     try {
129       if (env.getMasterServices().getMetaTableLocator().waitMetaRegionLocation(
130             env.getMasterServices().getZooKeeper(), timeout) == null) {
131         throw new NotAllMetaRegionsOnlineException();
132       }
133     } catch (InterruptedException e) {
134       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
135     }
136   }
137 
138   protected static void waitRegionServers(final MasterProcedureEnv env) throws IOException {
139     final ServerManager sm = env.getMasterServices().getServerManager();
140     ProcedureSyncWait.waitFor(env, "server to assign region(s)",
141         new ProcedureSyncWait.Predicate<Boolean>() {
142       @Override
143       public Boolean evaluate() throws IOException {
144         List<ServerName> servers = sm.createDestinationServersList();
145         return servers != null && !servers.isEmpty();
146       }
147     });
148   }
149 
150   protected static List<HRegionInfo> getRegionsFromMeta(final MasterProcedureEnv env,
151       final TableName tableName) throws IOException {
152     return ProcedureSyncWait.waitFor(env, "regions of table=" + tableName + " from meta",
153         new ProcedureSyncWait.Predicate<List<HRegionInfo>>() {
154       @Override
155       public List<HRegionInfo> evaluate() throws IOException {
156         if (TableName.META_TABLE_NAME.equals(tableName)) {
157           return new MetaTableLocator().getMetaRegions(env.getMasterServices().getZooKeeper());
158         }
159         return MetaTableAccessor.getTableRegions(env.getMasterServices().getZooKeeper(),
160             env.getMasterServices().getConnection(), tableName);
161       }
162     });
163   }
164 
165   protected static void waitRegionInTransition(final MasterProcedureEnv env,
166       final List<HRegionInfo> regions) throws IOException, CoordinatedStateException {
167     final AssignmentManager am = env.getMasterServices().getAssignmentManager();
168     final RegionStates states = am.getRegionStates();
169     for (final HRegionInfo region : regions) {
170       ProcedureSyncWait.waitFor(env, "regions " + region.getRegionNameAsString() + " in transition",
171           new ProcedureSyncWait.Predicate<Boolean>() {
172         @Override
173         public Boolean evaluate() throws IOException {
174           if (states.isRegionInState(region, State.FAILED_OPEN)) {
175             am.regionOffline(region);
176           }
177           return !states.isRegionInTransition(region);
178         }
179       });
180     }
181   }
182 
183   protected static MasterQuotaManager getMasterQuotaManager(final MasterProcedureEnv env)
184       throws IOException {
185     return ProcedureSyncWait.waitFor(env, "quota manager to be available",
186         new ProcedureSyncWait.Predicate<MasterQuotaManager>() {
187       @Override
188       public MasterQuotaManager evaluate() throws IOException {
189         return env.getMasterServices().getMasterQuotaManager();
190       }
191     });
192   }
193 }