001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import java.io.IOException;
021import java.lang.Thread.UncaughtExceptionHandler;
022import java.util.List;
023import java.util.Set;
024import java.util.concurrent.TimeUnit;
025import org.apache.hadoop.hbase.CallQueueTooBigException;
026import org.apache.hadoop.hbase.DoNotRetryIOException;
027import org.apache.hadoop.hbase.ServerName;
028import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
029import org.apache.hadoop.hbase.client.RegionInfo;
030import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
031import org.apache.hadoop.hbase.master.MasterServices;
032import org.apache.hadoop.hbase.master.ServerListener;
033import org.apache.hadoop.hbase.master.ServerManager;
034import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
035import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
036import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
038import org.apache.hadoop.hbase.util.FutureUtils;
039import org.apache.hadoop.ipc.RemoteException;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
045import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
046import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
047
048import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
049import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
050import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
051import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
052import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
053import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
054import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest;
055
056/**
057 * A remote procecdure dispatcher for regionservers.
058 */
059@InterfaceAudience.Private
060public class RSProcedureDispatcher
061    extends RemoteProcedureDispatcher<MasterProcedureEnv, ServerName>
062    implements ServerListener {
063  private static final Logger LOG = LoggerFactory.getLogger(RSProcedureDispatcher.class);
064
065  public static final String RS_RPC_STARTUP_WAIT_TIME_CONF_KEY =
066      "hbase.regionserver.rpc.startup.waittime";
067  private static final int DEFAULT_RS_RPC_STARTUP_WAIT_TIME = 60000;
068
069  protected final MasterServices master;
070  private final long rsStartupWaitTime;
071  private MasterProcedureEnv procedureEnv;
072
073  public RSProcedureDispatcher(final MasterServices master) {
074    super(master.getConfiguration());
075
076    this.master = master;
077    this.rsStartupWaitTime = master.getConfiguration().getLong(
078      RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, DEFAULT_RS_RPC_STARTUP_WAIT_TIME);
079  }
080
081  @Override
082  protected UncaughtExceptionHandler getUncaughtExceptionHandler() {
083    return new UncaughtExceptionHandler() {
084
085      @Override
086      public void uncaughtException(Thread t, Throwable e) {
087        LOG.error("Unexpected error caught, this may cause the procedure to hang forever", e);
088      }
089    };
090  }
091
092  @Override
093  public boolean start() {
094    if (!super.start()) {
095      return false;
096    }
097    // Around startup, if failed, some of the below may be set back to null so NPE is possible.
098    try {
099      master.getServerManager().registerListener(this);
100      procedureEnv = master.getMasterProcedureExecutor().getEnvironment();
101      for (ServerName serverName : master.getServerManager().getOnlineServersList()) {
102        addNode(serverName);
103      }
104    } catch (Exception e) {
105      LOG.info("Failed start", e);
106      return false;
107    }
108    return true;
109  }
110
111  @Override
112  public boolean stop() {
113    if (!super.stop()) {
114      return false;
115    }
116
117    master.getServerManager().unregisterListener(this);
118    return true;
119  }
120
121  @Override
122  protected void remoteDispatch(final ServerName serverName,
123      final Set<RemoteProcedure> remoteProcedures) {
124    if (!master.getServerManager().isServerOnline(serverName)) {
125      // fail fast
126      submitTask(new DeadRSRemoteCall(serverName, remoteProcedures));
127    } else {
128      submitTask(new ExecuteProceduresRemoteCall(serverName, remoteProcedures));
129    }
130  }
131
132  @Override
133  protected void abortPendingOperations(final ServerName serverName,
134      final Set<RemoteProcedure> operations) {
135    // TODO: Replace with a ServerNotOnlineException()
136    final IOException e = new DoNotRetryIOException("server not online " + serverName);
137    for (RemoteProcedure proc: operations) {
138      proc.remoteCallFailed(procedureEnv, serverName, e);
139    }
140  }
141
142  @Override
143  public void serverAdded(final ServerName serverName) {
144    addNode(serverName);
145  }
146
147  @Override
148  public void serverRemoved(final ServerName serverName) {
149    removeNode(serverName);
150  }
151
152  private interface RemoteProcedureResolver {
153    void dispatchOpenRequests(MasterProcedureEnv env, List<RegionOpenOperation> operations);
154
155    void dispatchCloseRequests(MasterProcedureEnv env, List<RegionCloseOperation> operations);
156
157    void dispatchServerOperations(MasterProcedureEnv env, List<ServerOperation> operations);
158  }
159
160  /**
161   * Fetches {@link org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation}s
162   * from the given {@code remoteProcedures} and groups them by class of the returned operation.
163   * Then {@code resolver} is used to dispatch {@link RegionOpenOperation}s and
164   * {@link RegionCloseOperation}s.
165   * @param serverName RegionServer to which the remote operations are sent
166   * @param operations Remote procedures which are dispatched to the given server
167   * @param resolver Used to dispatch remote procedures to given server.
168   */
169  public void splitAndResolveOperation(ServerName serverName, Set<RemoteProcedure> operations,
170      RemoteProcedureResolver resolver) {
171    MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
172    ArrayListMultimap<Class<?>, RemoteOperation> reqsByType =
173      buildAndGroupRequestByType(env, serverName, operations);
174
175    List<RegionOpenOperation> openOps = fetchType(reqsByType, RegionOpenOperation.class);
176    if (!openOps.isEmpty()) {
177      resolver.dispatchOpenRequests(env, openOps);
178    }
179
180    List<RegionCloseOperation> closeOps = fetchType(reqsByType, RegionCloseOperation.class);
181    if (!closeOps.isEmpty()) {
182      resolver.dispatchCloseRequests(env, closeOps);
183    }
184
185    List<ServerOperation> refreshOps = fetchType(reqsByType, ServerOperation.class);
186    if (!refreshOps.isEmpty()) {
187      resolver.dispatchServerOperations(env, refreshOps);
188    }
189
190    if (!reqsByType.isEmpty()) {
191      LOG.warn("unknown request type in the queue: " + reqsByType);
192    }
193  }
194
195  private class DeadRSRemoteCall extends ExecuteProceduresRemoteCall {
196
197    public DeadRSRemoteCall(ServerName serverName, Set<RemoteProcedure> remoteProcedures) {
198      super(serverName, remoteProcedures);
199    }
200
201    @Override
202    public void run() {
203      remoteCallFailed(procedureEnv,
204        new RegionServerStoppedException("Server " + getServerName() + " is not online"));
205    }
206  }
207
208  // ==========================================================================
209  //  Compatibility calls
210  // ==========================================================================
211  protected class ExecuteProceduresRemoteCall implements RemoteProcedureResolver, Runnable {
212
213    private final ServerName serverName;
214
215    private final Set<RemoteProcedure> remoteProcedures;
216
217    private int numberOfAttemptsSoFar = 0;
218    private long maxWaitTime = -1;
219
220    private ExecuteProceduresRequest.Builder request = null;
221
222    public ExecuteProceduresRemoteCall(final ServerName serverName,
223        final Set<RemoteProcedure> remoteProcedures) {
224      this.serverName = serverName;
225      this.remoteProcedures = remoteProcedures;
226    }
227
228    private AsyncRegionServerAdmin  getRsAdmin() throws IOException {
229      return master.getAsyncClusterConnection().getRegionServerAdmin(serverName);
230    }
231
232    protected final ServerName getServerName() {
233      return serverName;
234    }
235
236    private boolean scheduleForRetry(IOException e) {
237      LOG.debug("request to {} failed, try={}", serverName, numberOfAttemptsSoFar, e);
238      // Should we wait a little before retrying? If the server is starting it's yes.
239      if (e instanceof ServerNotRunningYetException) {
240        long remainingTime = getMaxWaitTime() - EnvironmentEdgeManager.currentTime();
241        if (remainingTime > 0) {
242          LOG.warn("waiting a little before trying on the same server={}," +
243            " try={}, can wait up to {}ms", serverName, numberOfAttemptsSoFar, remainingTime);
244          numberOfAttemptsSoFar++;
245          submitTask(this, 100, TimeUnit.MILLISECONDS);
246          return true;
247        }
248        LOG.warn("server {} is not up for a while; try a new one", serverName);
249        return false;
250      }
251      if (e instanceof DoNotRetryIOException) {
252        LOG.warn("server {} tells us do not retry due to {}, try={}, give up", serverName,
253          e.toString(), numberOfAttemptsSoFar);
254        return false;
255      }
256      // this exception is thrown in the rpc framework, where we can make sure that the call has not
257      // been executed yet, so it is safe to mark it as fail. Especially for open a region, we'd
258      // better choose another region server
259      // notice that, it is safe to quit only if this is the first time we send request to region
260      // server. Maybe the region server has accept our request the first time, and then there is a
261      // network error which prevents we receive the response, and the second time we hit a
262      // CallQueueTooBigException, obviously it is not safe to quit here, otherwise it may lead to a
263      // double assign...
264      if (e instanceof CallQueueTooBigException && numberOfAttemptsSoFar == 0) {
265        LOG.warn("request to {} failed due to {}, try={}, this usually because" +
266          " server is overloaded, give up", serverName, e.toString(), numberOfAttemptsSoFar);
267        return false;
268      }
269      // Always retry for other exception types if the region server is not dead yet.
270      if (!master.getServerManager().isServerOnline(serverName)) {
271        LOG.warn("request to {} failed due to {}, try={}, and the server is dead, give up",
272          serverName, e.toString(), numberOfAttemptsSoFar);
273        return false;
274      }
275      if (e instanceof RegionServerAbortedException || e instanceof RegionServerStoppedException) {
276        // A better way is to return true here to let the upper layer quit, and then schedule a
277        // background task to check whether the region server is dead. And if it is dead, call
278        // remoteCallFailed to tell the upper layer. Keep retrying here does not lead to incorrect
279        // result, but waste some resources.
280        LOG.warn("server {} is aborted or stopped, for safety we still need to" +
281          " wait until it is fully dead, try={}", serverName, numberOfAttemptsSoFar);
282      } else {
283        LOG.warn("request to server {} failed due to {}, try={}, retrying...", serverName,
284          e.toString(), numberOfAttemptsSoFar);
285      }
286      numberOfAttemptsSoFar++;
287      submitTask(this, 100, TimeUnit.MILLISECONDS);
288      return true;
289    }
290
291    private long getMaxWaitTime() {
292      if (this.maxWaitTime < 0) {
293        // This is the max attempts, not retries, so it should be at least 1.
294        this.maxWaitTime = EnvironmentEdgeManager.currentTime() + rsStartupWaitTime;
295      }
296      return this.maxWaitTime;
297    }
298
299    private IOException unwrapException(IOException e) {
300      if (e instanceof RemoteException) {
301        e = ((RemoteException)e).unwrapRemoteException();
302      }
303      return e;
304    }
305
306    @Override
307    public void run() {
308      request = ExecuteProceduresRequest.newBuilder();
309      if (LOG.isTraceEnabled()) {
310        LOG.trace("Building request with operations count=" + remoteProcedures.size());
311      }
312      splitAndResolveOperation(getServerName(), remoteProcedures, this);
313
314      try {
315        sendRequest(getServerName(), request.build());
316      } catch (IOException e) {
317        e = unwrapException(e);
318        // TODO: In the future some operation may want to bail out early.
319        // TODO: How many times should we retry (use numberOfAttemptsSoFar)
320        if (!scheduleForRetry(e)) {
321          remoteCallFailed(procedureEnv, e);
322        }
323      }
324    }
325
326    @Override
327    public void dispatchOpenRequests(final MasterProcedureEnv env,
328        final List<RegionOpenOperation> operations) {
329      request.addOpenRegion(buildOpenRegionRequest(env, getServerName(), operations));
330    }
331
332    @Override
333    public void dispatchCloseRequests(final MasterProcedureEnv env,
334        final List<RegionCloseOperation> operations) {
335      for (RegionCloseOperation op: operations) {
336        request.addCloseRegion(op.buildCloseRegionRequest(getServerName()));
337      }
338    }
339
340    @Override
341    public void dispatchServerOperations(MasterProcedureEnv env, List<ServerOperation> operations) {
342      operations.stream().map(o -> o.buildRequest()).forEachOrdered(request::addProc);
343    }
344
345    // will be overridden in test.
346    @VisibleForTesting
347    protected ExecuteProceduresResponse sendRequest(final ServerName serverName,
348        final ExecuteProceduresRequest request) throws IOException {
349      return FutureUtils.get(getRsAdmin().executeProcedures(request));
350    }
351
352    protected final void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
353      for (RemoteProcedure proc : remoteProcedures) {
354        proc.remoteCallFailed(env, getServerName(), e);
355      }
356    }
357  }
358
359  private static OpenRegionRequest buildOpenRegionRequest(final MasterProcedureEnv env,
360      final ServerName serverName, final List<RegionOpenOperation> operations) {
361    final OpenRegionRequest.Builder builder = OpenRegionRequest.newBuilder();
362    builder.setServerStartCode(serverName.getStartcode());
363    builder.setMasterSystemTime(EnvironmentEdgeManager.currentTime());
364    for (RegionOpenOperation op: operations) {
365      builder.addOpenInfo(op.buildRegionOpenInfoRequest(env));
366    }
367    return builder.build();
368  }
369
370  // ==========================================================================
371  //  RPC Messages
372  //  - ServerOperation: refreshConfig, grant, revoke, ... (TODO)
373  //  - RegionOperation: open, close, flush, snapshot, ...
374  // ==========================================================================
375
376  public static final class ServerOperation extends RemoteOperation {
377
378    private final long procId;
379
380    private final Class<?> rsProcClass;
381
382    private final byte[] rsProcData;
383
384    public ServerOperation(RemoteProcedure remoteProcedure, long procId, Class<?> rsProcClass,
385        byte[] rsProcData) {
386      super(remoteProcedure);
387      this.procId = procId;
388      this.rsProcClass = rsProcClass;
389      this.rsProcData = rsProcData;
390    }
391
392    public RemoteProcedureRequest buildRequest() {
393      return RemoteProcedureRequest.newBuilder().setProcId(procId)
394          .setProcClass(rsProcClass.getName()).setProcData(ByteString.copyFrom(rsProcData)).build();
395    }
396  }
397
398  public static abstract class RegionOperation extends RemoteOperation {
399    protected final RegionInfo regionInfo;
400    protected final long procId;
401
402    protected RegionOperation(RemoteProcedure remoteProcedure, RegionInfo regionInfo, long procId) {
403      super(remoteProcedure);
404      this.regionInfo = regionInfo;
405      this.procId = procId;
406    }
407  }
408
409  public static class RegionOpenOperation extends RegionOperation {
410
411    public RegionOpenOperation(RemoteProcedure remoteProcedure, RegionInfo regionInfo,
412        long procId) {
413      super(remoteProcedure, regionInfo, procId);
414    }
415
416    public OpenRegionRequest.RegionOpenInfo buildRegionOpenInfoRequest(
417        final MasterProcedureEnv env) {
418      return RequestConverter.buildRegionOpenInfo(regionInfo,
419        env.getAssignmentManager().getFavoredNodes(regionInfo), procId);
420    }
421  }
422
423  public static class RegionCloseOperation extends RegionOperation {
424    private final ServerName destinationServer;
425
426    public RegionCloseOperation(RemoteProcedure remoteProcedure, RegionInfo regionInfo, long procId,
427        ServerName destinationServer) {
428      super(remoteProcedure, regionInfo, procId);
429      this.destinationServer = destinationServer;
430    }
431
432    public ServerName getDestinationServer() {
433      return destinationServer;
434    }
435
436    public CloseRegionRequest buildCloseRegionRequest(final ServerName serverName) {
437      return ProtobufUtil.buildCloseRegionRequest(serverName, regionInfo.getRegionName(),
438        getDestinationServer(), procId);
439    }
440  }
441}