001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import java.io.IOException;
021import java.lang.Thread.UncaughtExceptionHandler;
022import java.util.List;
023import java.util.Set;
024import java.util.concurrent.TimeUnit;
025import org.apache.hadoop.hbase.CallQueueTooBigException;
026import org.apache.hadoop.hbase.DoNotRetryIOException;
027import org.apache.hadoop.hbase.ServerName;
028import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
029import org.apache.hadoop.hbase.client.RegionInfo;
030import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
031import org.apache.hadoop.hbase.master.MasterServices;
032import org.apache.hadoop.hbase.master.ServerListener;
033import org.apache.hadoop.hbase.master.ServerManager;
034import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
035import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
036import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
037import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
038import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
039import org.apache.hadoop.hbase.util.FutureUtils;
040import org.apache.hadoop.ipc.RemoteException;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
045import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
046import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
047import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
048import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
049import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
050import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
051import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
052import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
053import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest;
054
055/**
056 * A remote procecdure dispatcher for regionservers.
057 */
058@InterfaceAudience.Private
059public class RSProcedureDispatcher
060    extends RemoteProcedureDispatcher<MasterProcedureEnv, ServerName>
061    implements ServerListener {
062  private static final Logger LOG = LoggerFactory.getLogger(RSProcedureDispatcher.class);
063
064  public static final String RS_RPC_STARTUP_WAIT_TIME_CONF_KEY =
065      "hbase.regionserver.rpc.startup.waittime";
066  private static final int DEFAULT_RS_RPC_STARTUP_WAIT_TIME = 60000;
067
068  protected final MasterServices master;
069  private final long rsStartupWaitTime;
070  private MasterProcedureEnv procedureEnv;
071
072  public RSProcedureDispatcher(final MasterServices master) {
073    super(master.getConfiguration());
074
075    this.master = master;
076    this.rsStartupWaitTime = master.getConfiguration().getLong(
077      RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, DEFAULT_RS_RPC_STARTUP_WAIT_TIME);
078  }
079
080  @Override
081  protected UncaughtExceptionHandler getUncaughtExceptionHandler() {
082    return new UncaughtExceptionHandler() {
083
084      @Override
085      public void uncaughtException(Thread t, Throwable e) {
086        LOG.error("Unexpected error caught, this may cause the procedure to hang forever", e);
087      }
088    };
089  }
090
091  @Override
092  public boolean start() {
093    if (!super.start()) {
094      return false;
095    }
096    if (master.isStopped()) {
097      LOG.debug("Stopped");
098      return false;
099    }
100    // Around startup, if failed, some of the below may be set back to null so NPE is possible.
101    ServerManager sm = master.getServerManager();
102    if (sm == null) {
103      LOG.debug("ServerManager is null");
104      return false;
105    }
106    sm.registerListener(this);
107    ProcedureExecutor<MasterProcedureEnv> pe = master.getMasterProcedureExecutor();
108    if (pe == null) {
109      LOG.debug("ProcedureExecutor is null");
110      return false;
111    }
112    this.procedureEnv = pe.getEnvironment();
113    if (this.procedureEnv == null) {
114      LOG.debug("ProcedureEnv is null; stopping={}", master.isStopping());
115      return false;
116    }
117    try {
118      for (ServerName serverName : sm.getOnlineServersList()) {
119        addNode(serverName);
120      }
121    } catch (Exception e) {
122      LOG.info("Failed start", e);
123      return false;
124    }
125    return true;
126  }
127
128  @Override
129  public boolean stop() {
130    if (!super.stop()) {
131      return false;
132    }
133
134    master.getServerManager().unregisterListener(this);
135    return true;
136  }
137
138  @Override
139  protected void remoteDispatch(final ServerName serverName,
140      final Set<RemoteProcedure> remoteProcedures) {
141    if (!master.getServerManager().isServerOnline(serverName)) {
142      // fail fast
143      submitTask(new DeadRSRemoteCall(serverName, remoteProcedures));
144    } else {
145      submitTask(new ExecuteProceduresRemoteCall(serverName, remoteProcedures));
146    }
147  }
148
149  @Override
150  protected void abortPendingOperations(final ServerName serverName,
151      final Set<RemoteProcedure> operations) {
152    // TODO: Replace with a ServerNotOnlineException()
153    final IOException e = new DoNotRetryIOException("server not online " + serverName);
154    for (RemoteProcedure proc: operations) {
155      proc.remoteCallFailed(procedureEnv, serverName, e);
156    }
157  }
158
159  @Override
160  public void serverAdded(final ServerName serverName) {
161    addNode(serverName);
162  }
163
164  @Override
165  public void serverRemoved(final ServerName serverName) {
166    removeNode(serverName);
167  }
168
169  private interface RemoteProcedureResolver {
170    void dispatchOpenRequests(MasterProcedureEnv env, List<RegionOpenOperation> operations);
171
172    void dispatchCloseRequests(MasterProcedureEnv env, List<RegionCloseOperation> operations);
173
174    void dispatchServerOperations(MasterProcedureEnv env, List<ServerOperation> operations);
175  }
176
177  /**
178   * Fetches {@link org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation}s
179   * from the given {@code remoteProcedures} and groups them by class of the returned operation.
180   * Then {@code resolver} is used to dispatch {@link RegionOpenOperation}s and
181   * {@link RegionCloseOperation}s.
182   * @param serverName RegionServer to which the remote operations are sent
183   * @param operations Remote procedures which are dispatched to the given server
184   * @param resolver Used to dispatch remote procedures to given server.
185   */
186  public void splitAndResolveOperation(ServerName serverName, Set<RemoteProcedure> operations,
187      RemoteProcedureResolver resolver) {
188    MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
189    ArrayListMultimap<Class<?>, RemoteOperation> reqsByType =
190      buildAndGroupRequestByType(env, serverName, operations);
191
192    List<RegionOpenOperation> openOps = fetchType(reqsByType, RegionOpenOperation.class);
193    if (!openOps.isEmpty()) {
194      resolver.dispatchOpenRequests(env, openOps);
195    }
196
197    List<RegionCloseOperation> closeOps = fetchType(reqsByType, RegionCloseOperation.class);
198    if (!closeOps.isEmpty()) {
199      resolver.dispatchCloseRequests(env, closeOps);
200    }
201
202    List<ServerOperation> refreshOps = fetchType(reqsByType, ServerOperation.class);
203    if (!refreshOps.isEmpty()) {
204      resolver.dispatchServerOperations(env, refreshOps);
205    }
206
207    if (!reqsByType.isEmpty()) {
208      LOG.warn("unknown request type in the queue: " + reqsByType);
209    }
210  }
211
212  private class DeadRSRemoteCall extends ExecuteProceduresRemoteCall {
213
214    public DeadRSRemoteCall(ServerName serverName, Set<RemoteProcedure> remoteProcedures) {
215      super(serverName, remoteProcedures);
216    }
217
218    @Override
219    public void run() {
220      remoteCallFailed(procedureEnv,
221        new RegionServerStoppedException("Server " + getServerName() + " is not online"));
222    }
223  }
224
225  // ==========================================================================
226  //  Compatibility calls
227  // ==========================================================================
228  protected class ExecuteProceduresRemoteCall implements RemoteProcedureResolver, Runnable {
229
230    private final ServerName serverName;
231
232    private final Set<RemoteProcedure> remoteProcedures;
233
234    private int numberOfAttemptsSoFar = 0;
235    private long maxWaitTime = -1;
236
237    private ExecuteProceduresRequest.Builder request = null;
238
239    public ExecuteProceduresRemoteCall(final ServerName serverName,
240        final Set<RemoteProcedure> remoteProcedures) {
241      this.serverName = serverName;
242      this.remoteProcedures = remoteProcedures;
243    }
244
245    private AsyncRegionServerAdmin  getRsAdmin() throws IOException {
246      return master.getAsyncClusterConnection().getRegionServerAdmin(serverName);
247    }
248
249    protected final ServerName getServerName() {
250      return serverName;
251    }
252
253    private boolean scheduleForRetry(IOException e) {
254      LOG.debug("Request to {} failed, try={}", serverName, numberOfAttemptsSoFar, e);
255      // Should we wait a little before retrying? If the server is starting it's yes.
256      if (e instanceof ServerNotRunningYetException) {
257        long remainingTime = getMaxWaitTime() - EnvironmentEdgeManager.currentTime();
258        if (remainingTime > 0) {
259          LOG.warn("Waiting a little before retrying {}, try={}, can wait up to {}ms",
260            serverName, numberOfAttemptsSoFar, remainingTime);
261          numberOfAttemptsSoFar++;
262          // Retry every 100ms up to maximum wait time.
263          submitTask(this, 100, TimeUnit.MILLISECONDS);
264          return true;
265        }
266        LOG.warn("{} is throwing ServerNotRunningYetException for {}ms; trying another server",
267          serverName, getMaxWaitTime());
268        return false;
269      }
270      if (e instanceof DoNotRetryIOException) {
271        LOG.warn("{} tells us DoNotRetry due to {}, try={}, give up", serverName,
272          e.toString(), numberOfAttemptsSoFar);
273        return false;
274      }
275      // This exception is thrown in the rpc framework, where we can make sure that the call has not
276      // been executed yet, so it is safe to mark it as fail. Especially for open a region, we'd
277      // better choose another region server.
278      // Notice that, it is safe to quit only if this is the first time we send request to region
279      // server. Maybe the region server has accepted our request the first time, and then there is
280      // a network error which prevents we receive the response, and the second time we hit a
281      // CallQueueTooBigException, obviously it is not safe to quit here, otherwise it may lead to a
282      // double assign...
283      if (e instanceof CallQueueTooBigException && numberOfAttemptsSoFar == 0) {
284        LOG.warn("request to {} failed due to {}, try={}, this usually because" +
285          " server is overloaded, give up", serverName, e.toString(), numberOfAttemptsSoFar);
286        return false;
287      }
288      // Always retry for other exception types if the region server is not dead yet.
289      if (!master.getServerManager().isServerOnline(serverName)) {
290        LOG.warn("Request to {} failed due to {}, try={} and the server is not online, give up",
291          serverName, e.toString(), numberOfAttemptsSoFar);
292        return false;
293      }
294      if (e instanceof RegionServerAbortedException || e instanceof RegionServerStoppedException) {
295        // A better way is to return true here to let the upper layer quit, and then schedule a
296        // background task to check whether the region server is dead. And if it is dead, call
297        // remoteCallFailed to tell the upper layer. Keep retrying here does not lead to incorrect
298        // result, but waste some resources.
299        LOG.warn("{} is aborted or stopped, for safety we still need to" +
300          " wait until it is fully dead, try={}", serverName, numberOfAttemptsSoFar);
301      } else {
302        LOG.warn("request to {} failed due to {}, try={}, retrying...", serverName,
303          e.toString(), numberOfAttemptsSoFar);
304      }
305      numberOfAttemptsSoFar++;
306      // Add some backoff here as the attempts rise otherwise if a stuck condition, will fill logs
307      // with failed attempts. None of our backoff classes -- RetryCounter or ClientBackoffPolicy
308      // -- fit here nicely so just do something simple; increment by 100ms * retry^2 on each try
309      // up to max of 10 seconds (don't want to back off too much in case of situation change).
310      submitTask(this,
311        Math.min(100 * (this.numberOfAttemptsSoFar * this.numberOfAttemptsSoFar), 10 * 1000),
312        TimeUnit.MILLISECONDS);
313      return true;
314    }
315
316    private long getMaxWaitTime() {
317      if (this.maxWaitTime < 0) {
318        // This is the max attempts, not retries, so it should be at least 1.
319        this.maxWaitTime = EnvironmentEdgeManager.currentTime() + rsStartupWaitTime;
320      }
321      return this.maxWaitTime;
322    }
323
324    private IOException unwrapException(IOException e) {
325      if (e instanceof RemoteException) {
326        e = ((RemoteException)e).unwrapRemoteException();
327      }
328      return e;
329    }
330
331    @Override
332    public void run() {
333      request = ExecuteProceduresRequest.newBuilder();
334      if (LOG.isTraceEnabled()) {
335        LOG.trace("Building request with operations count=" + remoteProcedures.size());
336      }
337      splitAndResolveOperation(getServerName(), remoteProcedures, this);
338
339      try {
340        sendRequest(getServerName(), request.build());
341      } catch (IOException e) {
342        e = unwrapException(e);
343        // TODO: In the future some operation may want to bail out early.
344        // TODO: How many times should we retry (use numberOfAttemptsSoFar)
345        if (!scheduleForRetry(e)) {
346          remoteCallFailed(procedureEnv, e);
347        }
348      }
349    }
350
351    @Override
352    public void dispatchOpenRequests(final MasterProcedureEnv env,
353        final List<RegionOpenOperation> operations) {
354      request.addOpenRegion(buildOpenRegionRequest(env, getServerName(), operations));
355    }
356
357    @Override
358    public void dispatchCloseRequests(final MasterProcedureEnv env,
359        final List<RegionCloseOperation> operations) {
360      for (RegionCloseOperation op: operations) {
361        request.addCloseRegion(op.buildCloseRegionRequest(getServerName()));
362      }
363    }
364
365    @Override
366    public void dispatchServerOperations(MasterProcedureEnv env, List<ServerOperation> operations) {
367      operations.stream().map(o -> o.buildRequest()).forEachOrdered(request::addProc);
368    }
369
370    // will be overridden in test.
371    @VisibleForTesting
372    protected ExecuteProceduresResponse sendRequest(final ServerName serverName,
373        final ExecuteProceduresRequest request) throws IOException {
374      return FutureUtils.get(getRsAdmin().executeProcedures(request));
375    }
376
377    protected final void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
378      for (RemoteProcedure proc : remoteProcedures) {
379        proc.remoteCallFailed(env, getServerName(), e);
380      }
381    }
382  }
383
384  private static OpenRegionRequest buildOpenRegionRequest(final MasterProcedureEnv env,
385      final ServerName serverName, final List<RegionOpenOperation> operations) {
386    final OpenRegionRequest.Builder builder = OpenRegionRequest.newBuilder();
387    builder.setServerStartCode(serverName.getStartcode());
388    builder.setMasterSystemTime(EnvironmentEdgeManager.currentTime());
389    for (RegionOpenOperation op: operations) {
390      builder.addOpenInfo(op.buildRegionOpenInfoRequest(env));
391    }
392    return builder.build();
393  }
394
395  // ==========================================================================
396  //  RPC Messages
397  //  - ServerOperation: refreshConfig, grant, revoke, ... (TODO)
398  //  - RegionOperation: open, close, flush, snapshot, ...
399  // ==========================================================================
400
401  public static final class ServerOperation extends RemoteOperation {
402
403    private final long procId;
404
405    private final Class<?> rsProcClass;
406
407    private final byte[] rsProcData;
408
409    public ServerOperation(RemoteProcedure remoteProcedure, long procId, Class<?> rsProcClass,
410        byte[] rsProcData) {
411      super(remoteProcedure);
412      this.procId = procId;
413      this.rsProcClass = rsProcClass;
414      this.rsProcData = rsProcData;
415    }
416
417    public RemoteProcedureRequest buildRequest() {
418      return RemoteProcedureRequest.newBuilder().setProcId(procId)
419          .setProcClass(rsProcClass.getName()).setProcData(ByteString.copyFrom(rsProcData)).build();
420    }
421  }
422
423  public static abstract class RegionOperation extends RemoteOperation {
424    protected final RegionInfo regionInfo;
425    protected final long procId;
426
427    protected RegionOperation(RemoteProcedure remoteProcedure, RegionInfo regionInfo, long procId) {
428      super(remoteProcedure);
429      this.regionInfo = regionInfo;
430      this.procId = procId;
431    }
432  }
433
434  public static class RegionOpenOperation extends RegionOperation {
435
436    public RegionOpenOperation(RemoteProcedure remoteProcedure, RegionInfo regionInfo,
437        long procId) {
438      super(remoteProcedure, regionInfo, procId);
439    }
440
441    public OpenRegionRequest.RegionOpenInfo buildRegionOpenInfoRequest(
442        final MasterProcedureEnv env) {
443      return RequestConverter.buildRegionOpenInfo(regionInfo,
444        env.getAssignmentManager().getFavoredNodes(regionInfo), procId);
445    }
446  }
447
448  public static class RegionCloseOperation extends RegionOperation {
449    private final ServerName destinationServer;
450
451    public RegionCloseOperation(RemoteProcedure remoteProcedure, RegionInfo regionInfo, long procId,
452        ServerName destinationServer) {
453      super(remoteProcedure, regionInfo, procId);
454      this.destinationServer = destinationServer;
455    }
456
457    public ServerName getDestinationServer() {
458      return destinationServer;
459    }
460
461    public CloseRegionRequest buildCloseRegionRequest(final ServerName serverName) {
462      return ProtobufUtil.buildCloseRegionRequest(serverName, regionInfo.getRegionName(),
463        getDestinationServer(), procId);
464    }
465  }
466}