001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import java.io.IOException;
021import java.lang.Thread.UncaughtExceptionHandler;
022import java.util.List;
023import java.util.Set;
024import java.util.concurrent.TimeUnit;
025import org.apache.hadoop.hbase.CallQueueTooBigException;
026import org.apache.hadoop.hbase.DoNotRetryIOException;
027import org.apache.hadoop.hbase.ServerName;
028import org.apache.hadoop.hbase.client.RegionInfo;
029import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
030import org.apache.hadoop.hbase.master.MasterServices;
031import org.apache.hadoop.hbase.master.ServerListener;
032import org.apache.hadoop.hbase.master.ServerManager;
033import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
034import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
035import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
036import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
038import org.apache.hadoop.ipc.RemoteException;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
044import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
045import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
046
047import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
048import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
049import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
050import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
051import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
052import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
053import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
054import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest;
055
056/**
057 * A remote procecdure dispatcher for regionservers.
058 */
059@InterfaceAudience.Private
060public class RSProcedureDispatcher extends RemoteProcedureDispatcher<MasterProcedureEnv, ServerName>
061  implements ServerListener {
062  private static final Logger LOG = LoggerFactory.getLogger(RSProcedureDispatcher.class);
063
064  public static final String RS_RPC_STARTUP_WAIT_TIME_CONF_KEY =
065    "hbase.regionserver.rpc.startup.waittime";
066  private static final int DEFAULT_RS_RPC_STARTUP_WAIT_TIME = 60000;
067
068  protected final MasterServices master;
069  private final long rsStartupWaitTime;
070  private MasterProcedureEnv procedureEnv;
071
072  public RSProcedureDispatcher(final MasterServices master) {
073    super(master.getConfiguration());
074
075    this.master = master;
076    this.rsStartupWaitTime = master.getConfiguration().getLong(RS_RPC_STARTUP_WAIT_TIME_CONF_KEY,
077      DEFAULT_RS_RPC_STARTUP_WAIT_TIME);
078  }
079
080  @Override
081  protected UncaughtExceptionHandler getUncaughtExceptionHandler() {
082    return new UncaughtExceptionHandler() {
083
084      @Override
085      public void uncaughtException(Thread t, Throwable e) {
086        LOG.error("Unexpected error caught, this may cause the procedure to hang forever", e);
087      }
088    };
089  }
090
091  @Override
092  public boolean start() {
093    if (!super.start()) {
094      return false;
095    }
096    setTimeoutExecutorUncaughtExceptionHandler(this::abort);
097    if (master.isStopped()) {
098      LOG.debug("Stopped");
099      return false;
100    }
101    // Around startup, if failed, some of the below may be set back to null so NPE is possible.
102    ServerManager sm = master.getServerManager();
103    if (sm == null) {
104      LOG.debug("ServerManager is null");
105      return false;
106    }
107    sm.registerListener(this);
108    ProcedureExecutor<MasterProcedureEnv> pe = master.getMasterProcedureExecutor();
109    if (pe == null) {
110      LOG.debug("ProcedureExecutor is null");
111      return false;
112    }
113    this.procedureEnv = pe.getEnvironment();
114    if (this.procedureEnv == null) {
115      LOG.debug("ProcedureEnv is null; stopping={}", master.isStopping());
116      return false;
117    }
118    try {
119      for (ServerName serverName : sm.getOnlineServersList()) {
120        addNode(serverName);
121      }
122    } catch (Exception e) {
123      LOG.info("Failed start", e);
124      return false;
125    }
126    return true;
127  }
128
129  private void abort(Thread t, Throwable e) {
130    LOG.error("Caught error", e);
131    if (!master.isStopped() && !master.isStopping() && !master.isAborted()) {
132      master.abort("Aborting master", e);
133    }
134  }
135
136  @Override
137  public boolean stop() {
138    if (!super.stop()) {
139      return false;
140    }
141
142    master.getServerManager().unregisterListener(this);
143    return true;
144  }
145
146  @Override
147  protected void remoteDispatch(final ServerName serverName,
148    final Set<RemoteProcedure> remoteProcedures) {
149    if (!master.getServerManager().isServerOnline(serverName)) {
150      // fail fast
151      submitTask(new DeadRSRemoteCall(serverName, remoteProcedures));
152    } else {
153      submitTask(new ExecuteProceduresRemoteCall(serverName, remoteProcedures));
154    }
155  }
156
157  @Override
158  protected void abortPendingOperations(final ServerName serverName,
159    final Set<RemoteProcedure> operations) {
160    // TODO: Replace with a ServerNotOnlineException()
161    final IOException e = new DoNotRetryIOException("server not online " + serverName);
162    for (RemoteProcedure proc : operations) {
163      proc.remoteCallFailed(procedureEnv, serverName, e);
164    }
165  }
166
167  @Override
168  public void serverAdded(final ServerName serverName) {
169    addNode(serverName);
170  }
171
172  @Override
173  public void serverRemoved(final ServerName serverName) {
174    removeNode(serverName);
175  }
176
177  private interface RemoteProcedureResolver {
178    void dispatchOpenRequests(MasterProcedureEnv env, List<RegionOpenOperation> operations);
179
180    void dispatchCloseRequests(MasterProcedureEnv env, List<RegionCloseOperation> operations);
181
182    void dispatchServerOperations(MasterProcedureEnv env, List<ServerOperation> operations);
183  }
184
185  /**
186   * Fetches {@link org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation}s
187   * from the given {@code remoteProcedures} and groups them by class of the returned operation.
188   * Then {@code resolver} is used to dispatch {@link RegionOpenOperation}s and
189   * {@link RegionCloseOperation}s.
190   * @param serverName RegionServer to which the remote operations are sent
191   * @param operations Remote procedures which are dispatched to the given server
192   * @param resolver   Used to dispatch remote procedures to given server.
193   */
194  public void splitAndResolveOperation(ServerName serverName, Set<RemoteProcedure> operations,
195    RemoteProcedureResolver resolver) {
196    MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
197    ArrayListMultimap<Class<?>, RemoteOperation> reqsByType =
198      buildAndGroupRequestByType(env, serverName, operations);
199
200    List<RegionOpenOperation> openOps = fetchType(reqsByType, RegionOpenOperation.class);
201    if (!openOps.isEmpty()) {
202      resolver.dispatchOpenRequests(env, openOps);
203    }
204
205    List<RegionCloseOperation> closeOps = fetchType(reqsByType, RegionCloseOperation.class);
206    if (!closeOps.isEmpty()) {
207      resolver.dispatchCloseRequests(env, closeOps);
208    }
209
210    List<ServerOperation> refreshOps = fetchType(reqsByType, ServerOperation.class);
211    if (!refreshOps.isEmpty()) {
212      resolver.dispatchServerOperations(env, refreshOps);
213    }
214
215    if (!reqsByType.isEmpty()) {
216      LOG.warn("unknown request type in the queue: " + reqsByType);
217    }
218  }
219
220  private class DeadRSRemoteCall extends ExecuteProceduresRemoteCall {
221
222    public DeadRSRemoteCall(ServerName serverName, Set<RemoteProcedure> remoteProcedures) {
223      super(serverName, remoteProcedures);
224    }
225
226    @Override
227    public void run() {
228      remoteCallFailed(procedureEnv,
229        new RegionServerStoppedException("Server " + getServerName() + " is not online"));
230    }
231  }
232
233  // ==========================================================================
234  // Compatibility calls
235  // ==========================================================================
236  protected class ExecuteProceduresRemoteCall implements RemoteProcedureResolver, Runnable {
237
238    private final ServerName serverName;
239
240    private final Set<RemoteProcedure> remoteProcedures;
241
242    private int numberOfAttemptsSoFar = 0;
243    private long maxWaitTime = -1;
244
245    private final long rsRpcRetryInterval;
246    private static final String RS_RPC_RETRY_INTERVAL_CONF_KEY =
247      "hbase.regionserver.rpc.retry.interval";
248    private static final int DEFAULT_RS_RPC_RETRY_INTERVAL = 100;
249
250    private ExecuteProceduresRequest.Builder request = null;
251
252    public ExecuteProceduresRemoteCall(final ServerName serverName,
253      final Set<RemoteProcedure> remoteProcedures) {
254      this.serverName = serverName;
255      this.remoteProcedures = remoteProcedures;
256      this.rsRpcRetryInterval = master.getConfiguration().getLong(RS_RPC_RETRY_INTERVAL_CONF_KEY,
257        DEFAULT_RS_RPC_RETRY_INTERVAL);
258    }
259
260    private AdminService.BlockingInterface getRsAdmin() throws IOException {
261      final AdminService.BlockingInterface admin = master.getServerManager().getRsAdmin(serverName);
262      if (admin == null) {
263        throw new IOException("Attempting to send OPEN RPC to server " + getServerName()
264          + " failed because no RPC connection found to this server");
265      }
266      return admin;
267    }
268
269    protected final ServerName getServerName() {
270      return serverName;
271    }
272
273    private boolean scheduleForRetry(IOException e) {
274      LOG.debug("Request to {} failed, try={}", serverName, numberOfAttemptsSoFar, e);
275      // Should we wait a little before retrying? If the server is starting it's yes.
276      if (e instanceof ServerNotRunningYetException) {
277        long remainingTime = getMaxWaitTime() - EnvironmentEdgeManager.currentTime();
278        if (remainingTime > 0) {
279          LOG.warn("Waiting a little before retrying {}, try={}, can wait up to {}ms", serverName,
280            numberOfAttemptsSoFar, remainingTime);
281          numberOfAttemptsSoFar++;
282          // Retry every rsRpcRetryInterval millis up to maximum wait time.
283          submitTask(this, rsRpcRetryInterval, TimeUnit.MILLISECONDS);
284          return true;
285        }
286        LOG.warn("{} is throwing ServerNotRunningYetException for {}ms; trying another server",
287          serverName, getMaxWaitTime());
288        return false;
289      }
290      if (e instanceof DoNotRetryIOException) {
291        LOG.warn("{} tells us DoNotRetry due to {}, try={}, give up", serverName, e.toString(),
292          numberOfAttemptsSoFar);
293        return false;
294      }
295      // This exception is thrown in the rpc framework, where we can make sure that the call has not
296      // been executed yet, so it is safe to mark it as fail. Especially for open a region, we'd
297      // better choose another region server.
298      // Notice that, it is safe to quit only if this is the first time we send request to region
299      // server. Maybe the region server has accepted our request the first time, and then there is
300      // a network error which prevents we receive the response, and the second time we hit a
301      // CallQueueTooBigException, obviously it is not safe to quit here, otherwise it may lead to a
302      // double assign...
303      if (e instanceof CallQueueTooBigException && numberOfAttemptsSoFar == 0) {
304        LOG.warn("request to {} failed due to {}, try={}, this usually because"
305          + " server is overloaded, give up", serverName, e.toString(), numberOfAttemptsSoFar);
306        return false;
307      }
308      // Always retry for other exception types if the region server is not dead yet.
309      if (!master.getServerManager().isServerOnline(serverName)) {
310        LOG.warn("Request to {} failed due to {}, try={} and the server is not online, give up",
311          serverName, e.toString(), numberOfAttemptsSoFar);
312        return false;
313      }
314      if (e instanceof RegionServerAbortedException || e instanceof RegionServerStoppedException) {
315        // A better way is to return true here to let the upper layer quit, and then schedule a
316        // background task to check whether the region server is dead. And if it is dead, call
317        // remoteCallFailed to tell the upper layer. Keep retrying here does not lead to incorrect
318        // result, but waste some resources.
319        LOG.warn("{} is aborted or stopped, for safety we still need to"
320          + " wait until it is fully dead, try={}", serverName, numberOfAttemptsSoFar);
321      } else {
322        LOG.warn("request to {} failed due to {}, try={}, retrying...", serverName, e.toString(),
323          numberOfAttemptsSoFar);
324      }
325      numberOfAttemptsSoFar++;
326      // Add some backoff here as the attempts rise otherwise if a stuck condition, will fill logs
327      // with failed attempts. None of our backoff classes -- RetryCounter or ClientBackoffPolicy
328      // -- fit here nicely so just do something simple; increment by rsRpcRetryInterval millis *
329      // retry^2 on each try
330      // up to max of 10 seconds (don't want to back off too much in case of situation change).
331      submitTask(this,
332        Math.min(rsRpcRetryInterval * (this.numberOfAttemptsSoFar * this.numberOfAttemptsSoFar),
333          10 * 1000),
334        TimeUnit.MILLISECONDS);
335      return true;
336    }
337
338    private long getMaxWaitTime() {
339      if (this.maxWaitTime < 0) {
340        // This is the max attempts, not retries, so it should be at least 1.
341        this.maxWaitTime = EnvironmentEdgeManager.currentTime() + rsStartupWaitTime;
342      }
343      return this.maxWaitTime;
344    }
345
346    private IOException unwrapException(IOException e) {
347      if (e instanceof RemoteException) {
348        e = ((RemoteException) e).unwrapRemoteException();
349      }
350      return e;
351    }
352
353    @Override
354    public void run() {
355      request = ExecuteProceduresRequest.newBuilder();
356      if (LOG.isTraceEnabled()) {
357        LOG.trace("Building request with operations count=" + remoteProcedures.size());
358      }
359      splitAndResolveOperation(getServerName(), remoteProcedures, this);
360
361      try {
362        sendRequest(getServerName(), request.build());
363      } catch (IOException e) {
364        e = unwrapException(e);
365        // TODO: In the future some operation may want to bail out early.
366        // TODO: How many times should we retry (use numberOfAttemptsSoFar)
367        if (!scheduleForRetry(e)) {
368          remoteCallFailed(procedureEnv, e);
369        }
370      }
371    }
372
373    @Override
374    public void dispatchOpenRequests(final MasterProcedureEnv env,
375      final List<RegionOpenOperation> operations) {
376      request.addOpenRegion(buildOpenRegionRequest(env, getServerName(), operations));
377    }
378
379    @Override
380    public void dispatchCloseRequests(final MasterProcedureEnv env,
381      final List<RegionCloseOperation> operations) {
382      for (RegionCloseOperation op : operations) {
383        request.addCloseRegion(op.buildCloseRegionRequest(getServerName()));
384      }
385    }
386
387    @Override
388    public void dispatchServerOperations(MasterProcedureEnv env, List<ServerOperation> operations) {
389      operations.stream().map(o -> o.buildRequest()).forEachOrdered(request::addProc);
390    }
391
392    // will be overridden in test.
393    protected ExecuteProceduresResponse sendRequest(final ServerName serverName,
394      final ExecuteProceduresRequest request) throws IOException {
395      try {
396        return getRsAdmin().executeProcedures(null, request);
397      } catch (ServiceException se) {
398        throw ProtobufUtil.getRemoteException(se);
399      }
400    }
401
402    protected final void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
403      for (RemoteProcedure proc : remoteProcedures) {
404        proc.remoteCallFailed(env, getServerName(), e);
405      }
406    }
407  }
408
409  private static OpenRegionRequest buildOpenRegionRequest(final MasterProcedureEnv env,
410    final ServerName serverName, final List<RegionOpenOperation> operations) {
411    final OpenRegionRequest.Builder builder = OpenRegionRequest.newBuilder();
412    builder.setServerStartCode(serverName.getStartcode());
413    builder.setMasterSystemTime(EnvironmentEdgeManager.currentTime());
414    for (RegionOpenOperation op : operations) {
415      builder.addOpenInfo(op.buildRegionOpenInfoRequest(env));
416    }
417    return builder.build();
418  }
419
420  // ==========================================================================
421  // RPC Messages
422  // - ServerOperation: refreshConfig, grant, revoke, ... (TODO)
423  // - RegionOperation: open, close, flush, snapshot, ...
424  // ==========================================================================
425
426  public static final class ServerOperation extends RemoteOperation {
427
428    private final long procId;
429
430    private final Class<?> rsProcClass;
431
432    private final byte[] rsProcData;
433
434    public ServerOperation(RemoteProcedure remoteProcedure, long procId, Class<?> rsProcClass,
435      byte[] rsProcData) {
436      super(remoteProcedure);
437      this.procId = procId;
438      this.rsProcClass = rsProcClass;
439      this.rsProcData = rsProcData;
440    }
441
442    public RemoteProcedureRequest buildRequest() {
443      return RemoteProcedureRequest.newBuilder().setProcId(procId)
444        .setProcClass(rsProcClass.getName()).setProcData(ByteString.copyFrom(rsProcData)).build();
445    }
446  }
447
448  public static abstract class RegionOperation extends RemoteOperation {
449    protected final RegionInfo regionInfo;
450    protected final long procId;
451
452    protected RegionOperation(RemoteProcedure remoteProcedure, RegionInfo regionInfo, long procId) {
453      super(remoteProcedure);
454      this.regionInfo = regionInfo;
455      this.procId = procId;
456    }
457  }
458
459  public static class RegionOpenOperation extends RegionOperation {
460
461    public RegionOpenOperation(RemoteProcedure remoteProcedure, RegionInfo regionInfo,
462      long procId) {
463      super(remoteProcedure, regionInfo, procId);
464    }
465
466    public OpenRegionRequest.RegionOpenInfo
467      buildRegionOpenInfoRequest(final MasterProcedureEnv env) {
468      return RequestConverter.buildRegionOpenInfo(regionInfo,
469        env.getAssignmentManager().getFavoredNodes(regionInfo), procId);
470    }
471  }
472
473  public static class RegionCloseOperation extends RegionOperation {
474    private final ServerName destinationServer;
475
476    public RegionCloseOperation(RemoteProcedure remoteProcedure, RegionInfo regionInfo, long procId,
477      ServerName destinationServer) {
478      super(remoteProcedure, regionInfo, procId);
479      this.destinationServer = destinationServer;
480    }
481
482    public ServerName getDestinationServer() {
483      return destinationServer;
484    }
485
486    public CloseRegionRequest buildCloseRegionRequest(final ServerName serverName) {
487      return ProtobufUtil.buildCloseRegionRequest(serverName, regionInfo.getRegionName(),
488        getDestinationServer(), procId);
489    }
490  }
491}