001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import java.io.IOException;
021import java.lang.Thread.UncaughtExceptionHandler;
022import java.util.List;
023import java.util.Set;
024import java.util.concurrent.TimeUnit;
025import org.apache.hadoop.hbase.CallQueueTooBigException;
026import org.apache.hadoop.hbase.DoNotRetryIOException;
027import org.apache.hadoop.hbase.ServerName;
028import org.apache.hadoop.hbase.client.RegionInfo;
029import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
030import org.apache.hadoop.hbase.master.MasterServices;
031import org.apache.hadoop.hbase.master.ServerListener;
032import org.apache.hadoop.hbase.master.ServerManager;
033import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
034import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
035import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
036import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
038import org.apache.hadoop.hbase.util.RetryCounter;
039import org.apache.hadoop.ipc.RemoteException;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
044import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
045import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
046import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
047import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
048import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
049import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
050import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
051import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
052import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
053import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
054import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest;
055
056/**
057 * A remote procecdure dispatcher for regionservers.
058 */
059@InterfaceAudience.Private
060public class RSProcedureDispatcher
061    extends RemoteProcedureDispatcher<MasterProcedureEnv, ServerName>
062    implements ServerListener {
063  private static final Logger LOG = LoggerFactory.getLogger(RSProcedureDispatcher.class);
064
065  public static final String RS_RPC_STARTUP_WAIT_TIME_CONF_KEY =
066      "hbase.regionserver.rpc.startup.waittime";
067  private static final int DEFAULT_RS_RPC_STARTUP_WAIT_TIME = 60000;
068
069  protected final MasterServices master;
070  private final long rsStartupWaitTime;
071  private MasterProcedureEnv procedureEnv;
072
073  public RSProcedureDispatcher(final MasterServices master) {
074    super(master.getConfiguration());
075
076    this.master = master;
077    this.rsStartupWaitTime = master.getConfiguration().getLong(
078      RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, DEFAULT_RS_RPC_STARTUP_WAIT_TIME);
079  }
080
081  @Override
082  protected UncaughtExceptionHandler getUncaughtExceptionHandler() {
083    return new UncaughtExceptionHandler() {
084
085      @Override
086      public void uncaughtException(Thread t, Throwable e) {
087        LOG.error("Unexpected error caught, this may cause the procedure to hang forever", e);
088      }
089    };
090  }
091
092  @Override
093  public boolean start() {
094    if (!super.start()) {
095      return false;
096    }
097    if (master.isStopped()) {
098      LOG.debug("Stopped");
099      return false;
100    }
101    // Around startup, if failed, some of the below may be set back to null so NPE is possible.
102    ServerManager sm = master.getServerManager();
103    if (sm == null) {
104      LOG.debug("ServerManager is null");
105      return false;
106    }
107    sm.registerListener(this);
108    ProcedureExecutor<MasterProcedureEnv> pe = master.getMasterProcedureExecutor();
109    if (pe == null) {
110      LOG.debug("ProcedureExecutor is null");
111      return false;
112    }
113    this.procedureEnv = pe.getEnvironment();
114    if (this.procedureEnv == null) {
115      LOG.debug("ProcedureEnv is null; stopping={}", master.isStopping());
116      return false;
117    }
118    try {
119      for (ServerName serverName : sm.getOnlineServersList()) {
120        addNode(serverName);
121      }
122    } catch (Exception e) {
123      LOG.info("Failed start", e);
124      return false;
125    }
126    return true;
127  }
128
129  @Override
130  public boolean stop() {
131    if (!super.stop()) {
132      return false;
133    }
134
135    master.getServerManager().unregisterListener(this);
136    return true;
137  }
138
139  @Override
140  protected void remoteDispatch(final ServerName serverName,
141      final Set<RemoteProcedure> remoteProcedures) {
142    if (!master.getServerManager().isServerOnline(serverName)) {
143      // fail fast
144      submitTask(new DeadRSRemoteCall(serverName, remoteProcedures));
145    } else {
146      submitTask(new ExecuteProceduresRemoteCall(serverName, remoteProcedures));
147    }
148  }
149
150  @Override
151  protected void abortPendingOperations(final ServerName serverName,
152      final Set<RemoteProcedure> operations) {
153    // TODO: Replace with a ServerNotOnlineException()
154    final IOException e = new DoNotRetryIOException("server not online " + serverName);
155    for (RemoteProcedure proc: operations) {
156      proc.remoteCallFailed(procedureEnv, serverName, e);
157    }
158  }
159
160  @Override
161  public void serverAdded(final ServerName serverName) {
162    addNode(serverName);
163  }
164
165  @Override
166  public void serverRemoved(final ServerName serverName) {
167    removeNode(serverName);
168  }
169
170  private interface RemoteProcedureResolver {
171    void dispatchOpenRequests(MasterProcedureEnv env, List<RegionOpenOperation> operations);
172
173    void dispatchCloseRequests(MasterProcedureEnv env, List<RegionCloseOperation> operations);
174
175    void dispatchServerOperations(MasterProcedureEnv env, List<ServerOperation> operations);
176  }
177
178  /**
179   * Fetches {@link org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation}s
180   * from the given {@code remoteProcedures} and groups them by class of the returned operation.
181   * Then {@code resolver} is used to dispatch {@link RegionOpenOperation}s and
182   * {@link RegionCloseOperation}s.
183   * @param serverName RegionServer to which the remote operations are sent
184   * @param operations Remote procedures which are dispatched to the given server
185   * @param resolver Used to dispatch remote procedures to given server.
186   */
187  public void splitAndResolveOperation(ServerName serverName, Set<RemoteProcedure> operations,
188      RemoteProcedureResolver resolver) {
189    MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
190    ArrayListMultimap<Class<?>, RemoteOperation> reqsByType =
191      buildAndGroupRequestByType(env, serverName, operations);
192
193    List<RegionOpenOperation> openOps = fetchType(reqsByType, RegionOpenOperation.class);
194    if (!openOps.isEmpty()) {
195      resolver.dispatchOpenRequests(env, openOps);
196    }
197
198    List<RegionCloseOperation> closeOps = fetchType(reqsByType, RegionCloseOperation.class);
199    if (!closeOps.isEmpty()) {
200      resolver.dispatchCloseRequests(env, closeOps);
201    }
202
203    List<ServerOperation> refreshOps = fetchType(reqsByType, ServerOperation.class);
204    if (!refreshOps.isEmpty()) {
205      resolver.dispatchServerOperations(env, refreshOps);
206    }
207
208    if (!reqsByType.isEmpty()) {
209      LOG.warn("unknown request type in the queue: " + reqsByType);
210    }
211  }
212
213  private class DeadRSRemoteCall extends ExecuteProceduresRemoteCall {
214
215    public DeadRSRemoteCall(ServerName serverName, Set<RemoteProcedure> remoteProcedures) {
216      super(serverName, remoteProcedures);
217    }
218
219    @Override
220    public void run() {
221      remoteCallFailed(procedureEnv,
222        new RegionServerStoppedException("Server " + getServerName() + " is not online"));
223    }
224  }
225
226  // ==========================================================================
227  //  Compatibility calls
228  // ==========================================================================
229  protected class ExecuteProceduresRemoteCall implements RemoteProcedureResolver, Runnable {
230
231    private final ServerName serverName;
232
233    private final Set<RemoteProcedure> remoteProcedures;
234
235    private int numberOfAttemptsSoFar = 0;
236    private long maxWaitTime = -1;
237
238    private final long rsRpcRetryInterval;
239    private static final String RS_RPC_RETRY_INTERVAL_CONF_KEY =
240        "hbase.regionserver.rpc.retry.interval";
241    private static final int DEFAULT_RS_RPC_RETRY_INTERVAL = 100;
242
243    private ExecuteProceduresRequest.Builder request = null;
244
245    public ExecuteProceduresRemoteCall(final ServerName serverName,
246        final Set<RemoteProcedure> remoteProcedures) {
247      this.serverName = serverName;
248      this.remoteProcedures = remoteProcedures;
249      this.rsRpcRetryInterval = master.getConfiguration().getLong(RS_RPC_RETRY_INTERVAL_CONF_KEY,
250        DEFAULT_RS_RPC_RETRY_INTERVAL);
251    }
252
253    private AdminService.BlockingInterface getRsAdmin() throws IOException {
254      final AdminService.BlockingInterface admin = master.getServerManager().getRsAdmin(serverName);
255      if (admin == null) {
256        throw new IOException("Attempting to send OPEN RPC to server " + getServerName() +
257          " failed because no RPC connection found to this server");
258      }
259      return admin;
260    }
261
262    protected final ServerName getServerName() {
263      return serverName;
264    }
265
266    private boolean scheduleForRetry(IOException e) {
267      LOG.debug("Request to {} failed, try={}", serverName, numberOfAttemptsSoFar, e);
268      // Should we wait a little before retrying? If the server is starting it's yes.
269      if (e instanceof ServerNotRunningYetException) {
270        long remainingTime = getMaxWaitTime() - EnvironmentEdgeManager.currentTime();
271        if (remainingTime > 0) {
272          LOG.warn("Waiting a little before retrying {}, try={}, can wait up to {}ms",
273            serverName, numberOfAttemptsSoFar, remainingTime);
274          numberOfAttemptsSoFar++;
275          // Retry every rsRpcRetryInterval millis up to maximum wait time.
276          submitTask(this, rsRpcRetryInterval, TimeUnit.MILLISECONDS);
277          return true;
278        }
279        LOG.warn("{} is throwing ServerNotRunningYetException for {}ms; trying another server",
280          serverName, getMaxWaitTime());
281        return false;
282      }
283      if (e instanceof DoNotRetryIOException) {
284        LOG.warn("{} tells us DoNotRetry due to {}, try={}, give up", serverName,
285          e.toString(), numberOfAttemptsSoFar);
286        return false;
287      }
288      // This exception is thrown in the rpc framework, where we can make sure that the call has not
289      // been executed yet, so it is safe to mark it as fail. Especially for open a region, we'd
290      // better choose another region server.
291      // Notice that, it is safe to quit only if this is the first time we send request to region
292      // server. Maybe the region server has accepted our request the first time, and then there is
293      // a network error which prevents we receive the response, and the second time we hit a
294      // CallQueueTooBigException, obviously it is not safe to quit here, otherwise it may lead to a
295      // double assign...
296      if (e instanceof CallQueueTooBigException && numberOfAttemptsSoFar == 0) {
297        LOG.warn("request to {} failed due to {}, try={}, this usually because" +
298          " server is overloaded, give up", serverName, e.toString(), numberOfAttemptsSoFar);
299        return false;
300      }
301      // Always retry for other exception types if the region server is not dead yet.
302      if (!master.getServerManager().isServerOnline(serverName)) {
303        LOG.warn("Request to {} failed due to {}, try={} and the server is not online, give up",
304          serverName, e.toString(), numberOfAttemptsSoFar);
305        return false;
306      }
307      if (e instanceof RegionServerAbortedException || e instanceof RegionServerStoppedException) {
308        // A better way is to return true here to let the upper layer quit, and then schedule a
309        // background task to check whether the region server is dead. And if it is dead, call
310        // remoteCallFailed to tell the upper layer. Keep retrying here does not lead to incorrect
311        // result, but waste some resources.
312        LOG.warn("{} is aborted or stopped, for safety we still need to" +
313          " wait until it is fully dead, try={}", serverName, numberOfAttemptsSoFar);
314      } else {
315        LOG.warn("request to {} failed due to {}, try={}, retrying...", serverName,
316          e.toString(), numberOfAttemptsSoFar);
317      }
318      numberOfAttemptsSoFar++;
319      // Add some backoff here as the attempts rise otherwise if a stuck condition, will fill logs
320      // with failed attempts. None of our backoff classes -- RetryCounter or ClientBackoffPolicy
321      // -- fit here nicely so just do something simple; increment by rsRpcRetryInterval millis *
322      // retry^2 on each try
323      // up to max of 10 seconds (don't want to back off too much in case of situation change).
324      submitTask(this,
325        Math.min(rsRpcRetryInterval * (this.numberOfAttemptsSoFar * this.numberOfAttemptsSoFar),
326          10 * 1000),
327        TimeUnit.MILLISECONDS);
328      return true;
329    }
330
331    private long getMaxWaitTime() {
332      if (this.maxWaitTime < 0) {
333        // This is the max attempts, not retries, so it should be at least 1.
334        this.maxWaitTime = EnvironmentEdgeManager.currentTime() + rsStartupWaitTime;
335      }
336      return this.maxWaitTime;
337    }
338
339    private IOException unwrapException(IOException e) {
340      if (e instanceof RemoteException) {
341        e = ((RemoteException)e).unwrapRemoteException();
342      }
343      return e;
344    }
345
346    @Override
347    public void run() {
348      request = ExecuteProceduresRequest.newBuilder();
349      if (LOG.isTraceEnabled()) {
350        LOG.trace("Building request with operations count=" + remoteProcedures.size());
351      }
352      splitAndResolveOperation(getServerName(), remoteProcedures, this);
353
354      try {
355        sendRequest(getServerName(), request.build());
356      } catch (IOException e) {
357        e = unwrapException(e);
358        // TODO: In the future some operation may want to bail out early.
359        // TODO: How many times should we retry (use numberOfAttemptsSoFar)
360        if (!scheduleForRetry(e)) {
361          remoteCallFailed(procedureEnv, e);
362        }
363      }
364    }
365
366    @Override
367    public void dispatchOpenRequests(final MasterProcedureEnv env,
368        final List<RegionOpenOperation> operations) {
369      request.addOpenRegion(buildOpenRegionRequest(env, getServerName(), operations));
370    }
371
372    @Override
373    public void dispatchCloseRequests(final MasterProcedureEnv env,
374        final List<RegionCloseOperation> operations) {
375      for (RegionCloseOperation op: operations) {
376        request.addCloseRegion(op.buildCloseRegionRequest(getServerName()));
377      }
378    }
379
380    @Override
381    public void dispatchServerOperations(MasterProcedureEnv env, List<ServerOperation> operations) {
382      operations.stream().map(o -> o.buildRequest()).forEachOrdered(request::addProc);
383    }
384
385    // will be overridden in test.
386    @VisibleForTesting
387    protected ExecuteProceduresResponse sendRequest(final ServerName serverName,
388        final ExecuteProceduresRequest request) throws IOException {
389      try {
390        return getRsAdmin().executeProcedures(null, request);
391      } catch (ServiceException se) {
392        throw ProtobufUtil.getRemoteException(se);
393      }
394    }
395
396    protected final void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
397      for (RemoteProcedure proc : remoteProcedures) {
398        proc.remoteCallFailed(env, getServerName(), e);
399      }
400    }
401  }
402
403  private static OpenRegionRequest buildOpenRegionRequest(final MasterProcedureEnv env,
404      final ServerName serverName, final List<RegionOpenOperation> operations) {
405    final OpenRegionRequest.Builder builder = OpenRegionRequest.newBuilder();
406    builder.setServerStartCode(serverName.getStartcode());
407    builder.setMasterSystemTime(EnvironmentEdgeManager.currentTime());
408    for (RegionOpenOperation op: operations) {
409      builder.addOpenInfo(op.buildRegionOpenInfoRequest(env));
410    }
411    return builder.build();
412  }
413
414  // ==========================================================================
415  //  RPC Messages
416  //  - ServerOperation: refreshConfig, grant, revoke, ... (TODO)
417  //  - RegionOperation: open, close, flush, snapshot, ...
418  // ==========================================================================
419
420  public static final class ServerOperation extends RemoteOperation {
421
422    private final long procId;
423
424    private final Class<?> rsProcClass;
425
426    private final byte[] rsProcData;
427
428    public ServerOperation(RemoteProcedure remoteProcedure, long procId, Class<?> rsProcClass,
429        byte[] rsProcData) {
430      super(remoteProcedure);
431      this.procId = procId;
432      this.rsProcClass = rsProcClass;
433      this.rsProcData = rsProcData;
434    }
435
436    public RemoteProcedureRequest buildRequest() {
437      return RemoteProcedureRequest.newBuilder().setProcId(procId)
438          .setProcClass(rsProcClass.getName()).setProcData(ByteString.copyFrom(rsProcData)).build();
439    }
440  }
441
442  public static abstract class RegionOperation extends RemoteOperation {
443    protected final RegionInfo regionInfo;
444    protected final long procId;
445
446    protected RegionOperation(RemoteProcedure remoteProcedure, RegionInfo regionInfo, long procId) {
447      super(remoteProcedure);
448      this.regionInfo = regionInfo;
449      this.procId = procId;
450    }
451  }
452
453  public static class RegionOpenOperation extends RegionOperation {
454
455    public RegionOpenOperation(RemoteProcedure remoteProcedure, RegionInfo regionInfo,
456        long procId) {
457      super(remoteProcedure, regionInfo, procId);
458    }
459
460    public OpenRegionRequest.RegionOpenInfo buildRegionOpenInfoRequest(
461        final MasterProcedureEnv env) {
462      return RequestConverter.buildRegionOpenInfo(regionInfo,
463        env.getAssignmentManager().getFavoredNodes(regionInfo), procId);
464    }
465  }
466
467  public static class RegionCloseOperation extends RegionOperation {
468    private final ServerName destinationServer;
469
470    public RegionCloseOperation(RemoteProcedure remoteProcedure, RegionInfo regionInfo, long procId,
471        ServerName destinationServer) {
472      super(remoteProcedure, regionInfo, procId);
473      this.destinationServer = destinationServer;
474    }
475
476    public ServerName getDestinationServer() {
477      return destinationServer;
478    }
479
480    public CloseRegionRequest buildCloseRegionRequest(final ServerName serverName) {
481      return ProtobufUtil.buildCloseRegionRequest(serverName, regionInfo.getRegionName(),
482        getDestinationServer(), procId);
483    }
484  }
485}