1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master.procedure;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.util.List;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.CoordinatedStateException;
29 import org.apache.hadoop.hbase.HRegionInfo;
30 import org.apache.hadoop.hbase.MetaTableAccessor;
31 import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
32 import org.apache.hadoop.hbase.ProcedureInfo;
33 import org.apache.hadoop.hbase.ServerName;
34 import org.apache.hadoop.hbase.TableName;
35 import org.apache.hadoop.hbase.classification.InterfaceAudience;
36 import org.apache.hadoop.hbase.classification.InterfaceStability;
37 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
38 import org.apache.hadoop.hbase.master.AssignmentManager;
39 import org.apache.hadoop.hbase.master.RegionStates;
40 import org.apache.hadoop.hbase.master.RegionState.State;
41 import org.apache.hadoop.hbase.master.ServerManager;
42 import org.apache.hadoop.hbase.procedure2.Procedure;
43 import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
44 import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
45 import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
46 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
47 import org.apache.hadoop.hbase.util.Threads;
48 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
49
50
51
52
53
54
55
56 @InterfaceAudience.Private
57 @InterfaceStability.Evolving
58 public final class ProcedureSyncWait {
59 private static final Log LOG = LogFactory.getLog(ProcedureSyncWait.class);
60
61 private ProcedureSyncWait() {}
62
63 @InterfaceAudience.Private
64 public interface Predicate<T> {
65 T evaluate() throws IOException;
66 }
67
68 public static byte[] submitAndWaitProcedure(ProcedureExecutor<MasterProcedureEnv> procExec,
69 final Procedure proc) throws IOException {
70 long procId = procExec.submitProcedure(proc);
71 return waitForProcedureToComplete(procExec, procId);
72 }
73
74 public static byte[] waitForProcedureToComplete(ProcedureExecutor<MasterProcedureEnv> procExec,
75 final long procId) throws IOException {
76 while (!procExec.isFinished(procId) && procExec.isRunning()) {
77
78
79 Threads.sleepWithoutInterrupt(250);
80 }
81 ProcedureInfo result = procExec.getResult(procId);
82 if (result != null) {
83 if (result.isFailed()) {
84
85 throw RemoteProcedureException.fromProto(
86 result.getForeignExceptionMessage()).unwrapRemoteException();
87 }
88 return result.getResult();
89 } else {
90 if (procExec.isRunning()) {
91 throw new IOException("Procedure " + procId + "not found");
92 } else {
93 throw new IOException("The Master is Aborting");
94 }
95 }
96 }
97
98 public static <T> T waitFor(MasterProcedureEnv env, String purpose, Predicate<T> predicate)
99 throws IOException {
100 final Configuration conf = env.getMasterConfiguration();
101 final long waitTime = conf.getLong("hbase.master.wait.on.region", 5 * 60 * 1000);
102 final long waitingTimeForEvents = conf.getInt("hbase.master.event.waiting.time", 1000);
103 return waitFor(env, waitTime, waitingTimeForEvents, purpose, predicate);
104 }
105
106 public static <T> T waitFor(MasterProcedureEnv env, long waitTime, long waitingTimeForEvents,
107 String purpose, Predicate<T> predicate) throws IOException {
108 final long done = EnvironmentEdgeManager.currentTime() + waitTime;
109 do {
110 T result = predicate.evaluate();
111 if (result != null && !result.equals(Boolean.FALSE)) {
112 return result;
113 }
114 try {
115 Thread.sleep(waitingTimeForEvents);
116 } catch (InterruptedException e) {
117 LOG.warn("Interrupted while sleeping, waiting on " + purpose);
118 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
119 }
120 LOG.debug("Waiting on " + purpose);
121 } while (EnvironmentEdgeManager.currentTime() < done && env.isRunning());
122
123 throw new TimeoutIOException("Timed out while waiting on " + purpose);
124 }
125
126 protected static void waitMetaRegions(final MasterProcedureEnv env) throws IOException {
127 int timeout = env.getMasterConfiguration().getInt("hbase.client.catalog.timeout", 10000);
128 try {
129 if (env.getMasterServices().getMetaTableLocator().waitMetaRegionLocation(
130 env.getMasterServices().getZooKeeper(), timeout) == null) {
131 throw new NotAllMetaRegionsOnlineException();
132 }
133 } catch (InterruptedException e) {
134 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
135 }
136 }
137
138 protected static void waitRegionServers(final MasterProcedureEnv env) throws IOException {
139 final ServerManager sm = env.getMasterServices().getServerManager();
140 ProcedureSyncWait.waitFor(env, "server to assign region(s)",
141 new ProcedureSyncWait.Predicate<Boolean>() {
142 @Override
143 public Boolean evaluate() throws IOException {
144 List<ServerName> servers = sm.createDestinationServersList();
145 return servers != null && !servers.isEmpty();
146 }
147 });
148 }
149
150 protected static List<HRegionInfo> getRegionsFromMeta(final MasterProcedureEnv env,
151 final TableName tableName) throws IOException {
152 return ProcedureSyncWait.waitFor(env, "regions of table=" + tableName + " from meta",
153 new ProcedureSyncWait.Predicate<List<HRegionInfo>>() {
154 @Override
155 public List<HRegionInfo> evaluate() throws IOException {
156 if (TableName.META_TABLE_NAME.equals(tableName)) {
157 return new MetaTableLocator().getMetaRegions(env.getMasterServices().getZooKeeper());
158 }
159 return MetaTableAccessor.getTableRegions(env.getMasterServices().getZooKeeper(),
160 env.getMasterServices().getConnection(), tableName);
161 }
162 });
163 }
164
165 protected static void waitRegionInTransition(final MasterProcedureEnv env,
166 final List<HRegionInfo> regions) throws IOException, CoordinatedStateException {
167 final AssignmentManager am = env.getMasterServices().getAssignmentManager();
168 final RegionStates states = am.getRegionStates();
169 for (final HRegionInfo region : regions) {
170 ProcedureSyncWait.waitFor(env, "regions " + region.getRegionNameAsString() + " in transition",
171 new ProcedureSyncWait.Predicate<Boolean>() {
172 @Override
173 public Boolean evaluate() throws IOException {
174 if (states.isRegionInState(region, State.FAILED_OPEN)) {
175 am.regionOffline(region);
176 }
177 return !states.isRegionInTransition(region);
178 }
179 });
180 }
181 }
182
183 protected static MasterQuotaManager getMasterQuotaManager(final MasterProcedureEnv env)
184 throws IOException {
185 return ProcedureSyncWait.waitFor(env, "quota manager to be available",
186 new ProcedureSyncWait.Predicate<MasterQuotaManager>() {
187 @Override
188 public MasterQuotaManager evaluate() throws IOException {
189 return env.getMasterServices().getMasterQuotaManager();
190 }
191 });
192 }
193 }