001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import java.io.IOException;
021import java.lang.Thread.UncaughtExceptionHandler;
022import java.util.List;
023import java.util.Set;
024import java.util.concurrent.TimeUnit;
025import org.apache.hadoop.hbase.CallQueueTooBigException;
026import org.apache.hadoop.hbase.DoNotRetryIOException;
027import org.apache.hadoop.hbase.ServerName;
028import org.apache.hadoop.hbase.client.RegionInfo;
029import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
030import org.apache.hadoop.hbase.master.MasterServices;
031import org.apache.hadoop.hbase.master.ServerListener;
032import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
033import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
034import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
035import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
036import org.apache.hadoop.ipc.RemoteException;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
042import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
043import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
044import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
045
046import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
047import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
048import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
049import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
050import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
051import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
052import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
053import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest;
054
055/**
056 * A remote procecdure dispatcher for regionservers.
057 */
058@InterfaceAudience.Private
059public class RSProcedureDispatcher
060    extends RemoteProcedureDispatcher<MasterProcedureEnv, ServerName>
061    implements ServerListener {
062  private static final Logger LOG = LoggerFactory.getLogger(RSProcedureDispatcher.class);
063
064  public static final String RS_RPC_STARTUP_WAIT_TIME_CONF_KEY =
065      "hbase.regionserver.rpc.startup.waittime";
066  private static final int DEFAULT_RS_RPC_STARTUP_WAIT_TIME = 60000;
067
068  protected final MasterServices master;
069  private final long rsStartupWaitTime;
070  private MasterProcedureEnv procedureEnv;
071
072  public RSProcedureDispatcher(final MasterServices master) {
073    super(master.getConfiguration());
074
075    this.master = master;
076    this.rsStartupWaitTime = master.getConfiguration().getLong(
077      RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, DEFAULT_RS_RPC_STARTUP_WAIT_TIME);
078  }
079
080  @Override
081  protected UncaughtExceptionHandler getUncaughtExceptionHandler() {
082    return new UncaughtExceptionHandler() {
083
084      @Override
085      public void uncaughtException(Thread t, Throwable e) {
086        LOG.error("Unexpected error caught, this may cause the procedure to hang forever", e);
087      }
088    };
089  }
090
091  @Override
092  public boolean start() {
093    if (!super.start()) {
094      return false;
095    }
096
097    master.getServerManager().registerListener(this);
098    procedureEnv = master.getMasterProcedureExecutor().getEnvironment();
099    for (ServerName serverName: master.getServerManager().getOnlineServersList()) {
100      addNode(serverName);
101    }
102    return true;
103  }
104
105  @Override
106  public boolean stop() {
107    if (!super.stop()) {
108      return false;
109    }
110
111    master.getServerManager().unregisterListener(this);
112    return true;
113  }
114
115  @Override
116  protected void remoteDispatch(final ServerName serverName,
117      final Set<RemoteProcedure> remoteProcedures) {
118    if (!master.getServerManager().isServerOnline(serverName)) {
119      // fail fast
120      submitTask(new DeadRSRemoteCall(serverName, remoteProcedures));
121    } else {
122      submitTask(new ExecuteProceduresRemoteCall(serverName, remoteProcedures));
123    }
124  }
125
126  @Override
127  protected void abortPendingOperations(final ServerName serverName,
128      final Set<RemoteProcedure> operations) {
129    // TODO: Replace with a ServerNotOnlineException()
130    final IOException e = new DoNotRetryIOException("server not online " + serverName);
131    for (RemoteProcedure proc: operations) {
132      proc.remoteCallFailed(procedureEnv, serverName, e);
133    }
134  }
135
136  @Override
137  public void serverAdded(final ServerName serverName) {
138    addNode(serverName);
139  }
140
141  @Override
142  public void serverRemoved(final ServerName serverName) {
143    removeNode(serverName);
144  }
145
146  private interface RemoteProcedureResolver {
147    void dispatchOpenRequests(MasterProcedureEnv env, List<RegionOpenOperation> operations);
148
149    void dispatchCloseRequests(MasterProcedureEnv env, List<RegionCloseOperation> operations);
150
151    void dispatchServerOperations(MasterProcedureEnv env, List<ServerOperation> operations);
152  }
153
154  /**
155   * Fetches {@link org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation}s
156   * from the given {@code remoteProcedures} and groups them by class of the returned operation.
157   * Then {@code resolver} is used to dispatch {@link RegionOpenOperation}s and
158   * {@link RegionCloseOperation}s.
159   * @param serverName RegionServer to which the remote operations are sent
160   * @param operations Remote procedures which are dispatched to the given server
161   * @param resolver Used to dispatch remote procedures to given server.
162   */
163  public void splitAndResolveOperation(ServerName serverName, Set<RemoteProcedure> operations,
164      RemoteProcedureResolver resolver) {
165    MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
166    ArrayListMultimap<Class<?>, RemoteOperation> reqsByType =
167      buildAndGroupRequestByType(env, serverName, operations);
168
169    List<RegionOpenOperation> openOps = fetchType(reqsByType, RegionOpenOperation.class);
170    if (!openOps.isEmpty()) {
171      resolver.dispatchOpenRequests(env, openOps);
172    }
173
174    List<RegionCloseOperation> closeOps = fetchType(reqsByType, RegionCloseOperation.class);
175    if (!closeOps.isEmpty()) {
176      resolver.dispatchCloseRequests(env, closeOps);
177    }
178
179    List<ServerOperation> refreshOps = fetchType(reqsByType, ServerOperation.class);
180    if (!refreshOps.isEmpty()) {
181      resolver.dispatchServerOperations(env, refreshOps);
182    }
183
184    if (!reqsByType.isEmpty()) {
185      LOG.warn("unknown request type in the queue: " + reqsByType);
186    }
187  }
188
189  private class DeadRSRemoteCall extends ExecuteProceduresRemoteCall {
190
191    public DeadRSRemoteCall(ServerName serverName, Set<RemoteProcedure> remoteProcedures) {
192      super(serverName, remoteProcedures);
193    }
194
195    @Override
196    public void run() {
197      remoteCallFailed(procedureEnv,
198        new RegionServerStoppedException("Server " + getServerName() + " is not online"));
199    }
200  }
201
202  // ==========================================================================
203  //  Compatibility calls
204  // ==========================================================================
205  protected class ExecuteProceduresRemoteCall implements RemoteProcedureResolver, Runnable {
206
207    private final ServerName serverName;
208
209    private final Set<RemoteProcedure> remoteProcedures;
210
211    private int numberOfAttemptsSoFar = 0;
212    private long maxWaitTime = -1;
213
214    private ExecuteProceduresRequest.Builder request = null;
215
216    public ExecuteProceduresRemoteCall(final ServerName serverName,
217        final Set<RemoteProcedure> remoteProcedures) {
218      this.serverName = serverName;
219      this.remoteProcedures = remoteProcedures;
220    }
221
222    private AdminService.BlockingInterface getRsAdmin() throws IOException {
223      final AdminService.BlockingInterface admin = master.getServerManager().getRsAdmin(serverName);
224      if (admin == null) {
225        throw new IOException("Attempting to send OPEN RPC to server " + getServerName() +
226          " failed because no RPC connection found to this server");
227      }
228      return admin;
229    }
230
231    protected final ServerName getServerName() {
232      return serverName;
233    }
234
235    private boolean scheduleForRetry(IOException e) {
236      LOG.debug("request to {} failed, try={}", serverName, numberOfAttemptsSoFar, e);
237      // Should we wait a little before retrying? If the server is starting it's yes.
238      if (e instanceof ServerNotRunningYetException) {
239        long remainingTime = getMaxWaitTime() - EnvironmentEdgeManager.currentTime();
240        if (remainingTime > 0) {
241          LOG.warn("waiting a little before trying on the same server={}," +
242            " try={}, can wait up to {}ms", serverName, numberOfAttemptsSoFar, remainingTime);
243          numberOfAttemptsSoFar++;
244          submitTask(this, 100, TimeUnit.MILLISECONDS);
245          return true;
246        }
247        LOG.warn("server {} is not up for a while; try a new one", serverName);
248        return false;
249      }
250      if (e instanceof DoNotRetryIOException) {
251        LOG.warn("server {} tells us do not retry due to {}, try={}, give up", serverName,
252          e.toString(), numberOfAttemptsSoFar);
253        return false;
254      }
255      // this exception is thrown in the rpc framework, where we can make sure that the call has not
256      // been executed yet, so it is safe to mark it as fail. Especially for open a region, we'd
257      // better choose another region server
258      // notice that, it is safe to quit only if this is the first time we send request to region
259      // server. Maybe the region server has accept our request the first time, and then there is a
260      // network error which prevents we receive the response, and the second time we hit a
261      // CallQueueTooBigException, obviously it is not safe to quit here, otherwise it may lead to a
262      // double assign...
263      if (e instanceof CallQueueTooBigException && numberOfAttemptsSoFar == 0) {
264        LOG.warn("request to {} failed due to {}, try={}, this usually because" +
265          " server is overloaded, give up", serverName, e.toString(), numberOfAttemptsSoFar);
266        return false;
267      }
268      // Always retry for other exception types if the region server is not dead yet.
269      if (!master.getServerManager().isServerOnline(serverName)) {
270        LOG.warn("request to {} failed due to {}, try={}, and the server is dead, give up",
271          serverName, e.toString(), numberOfAttemptsSoFar);
272        return false;
273      }
274      if (e instanceof RegionServerAbortedException || e instanceof RegionServerStoppedException) {
275        // A better way is to return true here to let the upper layer quit, and then schedule a
276        // background task to check whether the region server is dead. And if it is dead, call
277        // remoteCallFailed to tell the upper layer. Keep retrying here does not lead to incorrect
278        // result, but waste some resources.
279        LOG.warn("server {} is aborted or stopped, for safety we still need to" +
280          " wait until it is fully dead, try={}", serverName, numberOfAttemptsSoFar);
281      } else {
282        LOG.warn("request to server {} failed due to {}, try={}, retrying...", serverName,
283          e.toString(), numberOfAttemptsSoFar);
284      }
285      numberOfAttemptsSoFar++;
286      submitTask(this, 100, TimeUnit.MILLISECONDS);
287      return true;
288    }
289
290    private long getMaxWaitTime() {
291      if (this.maxWaitTime < 0) {
292        // This is the max attempts, not retries, so it should be at least 1.
293        this.maxWaitTime = EnvironmentEdgeManager.currentTime() + rsStartupWaitTime;
294      }
295      return this.maxWaitTime;
296    }
297
298    private IOException unwrapException(IOException e) {
299      if (e instanceof RemoteException) {
300        e = ((RemoteException)e).unwrapRemoteException();
301      }
302      return e;
303    }
304
305    @Override
306    public void run() {
307      request = ExecuteProceduresRequest.newBuilder();
308      if (LOG.isTraceEnabled()) {
309        LOG.trace("Building request with operations count=" + remoteProcedures.size());
310      }
311      splitAndResolveOperation(getServerName(), remoteProcedures, this);
312
313      try {
314        sendRequest(getServerName(), request.build());
315      } catch (IOException e) {
316        e = unwrapException(e);
317        // TODO: In the future some operation may want to bail out early.
318        // TODO: How many times should we retry (use numberOfAttemptsSoFar)
319        if (!scheduleForRetry(e)) {
320          remoteCallFailed(procedureEnv, e);
321        }
322      }
323    }
324
325    @Override
326    public void dispatchOpenRequests(final MasterProcedureEnv env,
327        final List<RegionOpenOperation> operations) {
328      request.addOpenRegion(buildOpenRegionRequest(env, getServerName(), operations));
329    }
330
331    @Override
332    public void dispatchCloseRequests(final MasterProcedureEnv env,
333        final List<RegionCloseOperation> operations) {
334      for (RegionCloseOperation op: operations) {
335        request.addCloseRegion(op.buildCloseRegionRequest(getServerName()));
336      }
337    }
338
339    @Override
340    public void dispatchServerOperations(MasterProcedureEnv env, List<ServerOperation> operations) {
341      operations.stream().map(o -> o.buildRequest()).forEachOrdered(request::addProc);
342    }
343
344    // will be overridden in test.
345    @VisibleForTesting
346    protected ExecuteProceduresResponse sendRequest(final ServerName serverName,
347        final ExecuteProceduresRequest request) throws IOException {
348      try {
349        return getRsAdmin().executeProcedures(null, request);
350      } catch (ServiceException se) {
351        throw ProtobufUtil.getRemoteException(se);
352      }
353    }
354
355    protected final void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
356      for (RemoteProcedure proc : remoteProcedures) {
357        proc.remoteCallFailed(env, getServerName(), e);
358      }
359    }
360  }
361
362  private static OpenRegionRequest buildOpenRegionRequest(final MasterProcedureEnv env,
363      final ServerName serverName, final List<RegionOpenOperation> operations) {
364    final OpenRegionRequest.Builder builder = OpenRegionRequest.newBuilder();
365    builder.setServerStartCode(serverName.getStartcode());
366    builder.setMasterSystemTime(EnvironmentEdgeManager.currentTime());
367    for (RegionOpenOperation op: operations) {
368      builder.addOpenInfo(op.buildRegionOpenInfoRequest(env));
369    }
370    return builder.build();
371  }
372
373  // ==========================================================================
374  //  RPC Messages
375  //  - ServerOperation: refreshConfig, grant, revoke, ... (TODO)
376  //  - RegionOperation: open, close, flush, snapshot, ...
377  // ==========================================================================
378
379  public static final class ServerOperation extends RemoteOperation {
380
381    private final long procId;
382
383    private final Class<?> rsProcClass;
384
385    private final byte[] rsProcData;
386
387    public ServerOperation(RemoteProcedure remoteProcedure, long procId, Class<?> rsProcClass,
388        byte[] rsProcData) {
389      super(remoteProcedure);
390      this.procId = procId;
391      this.rsProcClass = rsProcClass;
392      this.rsProcData = rsProcData;
393    }
394
395    public RemoteProcedureRequest buildRequest() {
396      return RemoteProcedureRequest.newBuilder().setProcId(procId)
397          .setProcClass(rsProcClass.getName()).setProcData(ByteString.copyFrom(rsProcData)).build();
398    }
399  }
400
401  public static abstract class RegionOperation extends RemoteOperation {
402    protected final RegionInfo regionInfo;
403    protected final long procId;
404
405    protected RegionOperation(RemoteProcedure remoteProcedure, RegionInfo regionInfo, long procId) {
406      super(remoteProcedure);
407      this.regionInfo = regionInfo;
408      this.procId = procId;
409    }
410  }
411
412  public static class RegionOpenOperation extends RegionOperation {
413
414    public RegionOpenOperation(RemoteProcedure remoteProcedure, RegionInfo regionInfo,
415        long procId) {
416      super(remoteProcedure, regionInfo, procId);
417    }
418
419    public OpenRegionRequest.RegionOpenInfo buildRegionOpenInfoRequest(
420        final MasterProcedureEnv env) {
421      return RequestConverter.buildRegionOpenInfo(regionInfo,
422        env.getAssignmentManager().getFavoredNodes(regionInfo), procId);
423    }
424  }
425
426  public static class RegionCloseOperation extends RegionOperation {
427    private final ServerName destinationServer;
428
429    public RegionCloseOperation(RemoteProcedure remoteProcedure, RegionInfo regionInfo, long procId,
430        ServerName destinationServer) {
431      super(remoteProcedure, regionInfo, procId);
432      this.destinationServer = destinationServer;
433    }
434
435    public ServerName getDestinationServer() {
436      return destinationServer;
437    }
438
439    public CloseRegionRequest buildCloseRegionRequest(final ServerName serverName) {
440      return ProtobufUtil.buildCloseRegionRequest(serverName, regionInfo.getRegionName(),
441        getDestinationServer(), procId);
442    }
443  }
444}