001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import java.io.IOException;
021import java.lang.Thread.UncaughtExceptionHandler;
022import java.util.List;
023import java.util.Set;
024import java.util.concurrent.TimeUnit;
025import org.apache.hadoop.hbase.CallQueueTooBigException;
026import org.apache.hadoop.hbase.DoNotRetryIOException;
027import org.apache.hadoop.hbase.ServerName;
028import org.apache.hadoop.hbase.client.RegionInfo;
029import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
030import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
031import org.apache.hadoop.hbase.master.MasterServices;
032import org.apache.hadoop.hbase.master.ServerListener;
033import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
034import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
035import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
036import org.apache.hadoop.ipc.RemoteException;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
042import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
043import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
044import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
045import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
046
047import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
048import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
049import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
050import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
051import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
052import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
053import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
054import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
055import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
056import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest;
057
058/**
059 * A remote procecdure dispatcher for regionservers.
060 */
061@InterfaceAudience.Private
062public class RSProcedureDispatcher
063    extends RemoteProcedureDispatcher<MasterProcedureEnv, ServerName>
064    implements ServerListener {
065  private static final Logger LOG = LoggerFactory.getLogger(RSProcedureDispatcher.class);
066
067  public static final String RS_RPC_STARTUP_WAIT_TIME_CONF_KEY =
068      "hbase.regionserver.rpc.startup.waittime";
069  private static final int DEFAULT_RS_RPC_STARTUP_WAIT_TIME = 60000;
070
071  private static final int RS_VERSION_WITH_EXEC_PROCS = 0x0200000; // 2.0
072
073  protected final MasterServices master;
074  private final long rsStartupWaitTime;
075  private MasterProcedureEnv procedureEnv;
076
077  public RSProcedureDispatcher(final MasterServices master) {
078    super(master.getConfiguration());
079
080    this.master = master;
081    this.rsStartupWaitTime = master.getConfiguration().getLong(
082      RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, DEFAULT_RS_RPC_STARTUP_WAIT_TIME);
083  }
084
085  @Override
086  protected UncaughtExceptionHandler getUncaughtExceptionHandler() {
087    return new UncaughtExceptionHandler() {
088
089      @Override
090      public void uncaughtException(Thread t, Throwable e) {
091        LOG.error("Unexpected error caught, this may cause the procedure to hang forever", e);
092      }
093    };
094  }
095
096  @Override
097  public boolean start() {
098    if (!super.start()) {
099      return false;
100    }
101
102    master.getServerManager().registerListener(this);
103    procedureEnv = master.getMasterProcedureExecutor().getEnvironment();
104    for (ServerName serverName: master.getServerManager().getOnlineServersList()) {
105      addNode(serverName);
106    }
107    return true;
108  }
109
110  @Override
111  public boolean stop() {
112    if (!super.stop()) {
113      return false;
114    }
115
116    master.getServerManager().unregisterListener(this);
117    return true;
118  }
119
120  @Override
121  protected void remoteDispatch(final ServerName serverName,
122      final Set<RemoteProcedure> remoteProcedures) {
123    final int rsVersion = master.getServerManager().getVersionNumber(serverName);
124    if (rsVersion == 0 && !master.getServerManager().isServerOnline(serverName)) {
125      submitTask(new DeadRSRemoteCall(serverName, remoteProcedures));
126    } else {
127      submitTask(new ExecuteProceduresRemoteCall(serverName, remoteProcedures));
128    }
129  }
130
131  @Override
132  protected void abortPendingOperations(final ServerName serverName,
133      final Set<RemoteProcedure> operations) {
134    // TODO: Replace with a ServerNotOnlineException()
135    final IOException e = new DoNotRetryIOException("server not online " + serverName);
136    for (RemoteProcedure proc: operations) {
137      proc.remoteCallFailed(procedureEnv, serverName, e);
138    }
139  }
140
141  @Override
142  public void serverAdded(final ServerName serverName) {
143    addNode(serverName);
144  }
145
146  @Override
147  public void serverRemoved(final ServerName serverName) {
148    removeNode(serverName);
149  }
150
151  /**
152   * Base remote call
153   */
154  protected abstract class AbstractRSRemoteCall implements Runnable {
155
156    private final ServerName serverName;
157
158    private int numberOfAttemptsSoFar = 0;
159    private long maxWaitTime = -1;
160
161    public AbstractRSRemoteCall(final ServerName serverName) {
162      this.serverName = serverName;
163    }
164
165    protected AdminService.BlockingInterface getRsAdmin() throws IOException {
166      final AdminService.BlockingInterface admin = master.getServerManager().getRsAdmin(serverName);
167      if (admin == null) {
168        throw new IOException("Attempting to send OPEN RPC to server " + getServerName() +
169          " failed because no RPC connection found to this server");
170      }
171      return admin;
172    }
173
174    protected ServerName getServerName() {
175      return serverName;
176    }
177
178    protected boolean scheduleForRetry(final IOException e) {
179      LOG.debug("request to {} failed, try={}", serverName, numberOfAttemptsSoFar, e);
180      // Should we wait a little before retrying? If the server is starting it's yes.
181      if (e instanceof ServerNotRunningYetException) {
182        long remainingTime = getMaxWaitTime() - EnvironmentEdgeManager.currentTime();
183        if (remainingTime > 0) {
184          LOG.warn("waiting a little before trying on the same server={}," +
185            " try={}, can wait up to {}ms", serverName, numberOfAttemptsSoFar, remainingTime);
186          numberOfAttemptsSoFar++;
187          submitTask(this, 100, TimeUnit.MILLISECONDS);
188          return true;
189        }
190        LOG.warn("server {} is not up for a while; try a new one", serverName);
191        return false;
192      }
193
194      boolean queueFull = e instanceof CallQueueTooBigException;
195      // this exception is thrown in the rpc framework, where we can make sure that the call has not
196      // been executed yet, so it is safe to mark it as fail. Especially for open a region, we'd
197      // better choose another region server
198      // notice that, it is safe to quit only if this is the first time we send request to region
199      // server. Maybe the region server has accept our request the first time, and then there is a
200      // network error which prevents we receive the response, and the second time we hit a
201      // CallQueueTooBigException, obviously it is not safe to quit here, otherwise it may lead to a
202      // double assign..
203      if (queueFull && numberOfAttemptsSoFar == 0) {
204        LOG.warn("request to {} failed due to {}, try={}, this usually because" +
205          " server is overloaded, give up", serverName, e.toString(), numberOfAttemptsSoFar);
206        return false;
207      }
208      // In case it is a connection exception and the region server is still online, the openRegion
209      // RPC could have been accepted by the server and just the response didn't go through. So we
210      // will retry to open the region on the same server.
211      if ((queueFull || ClientExceptionsUtil.isConnectionException(e)) &&
212        master.getServerManager().isServerOnline(serverName)) {
213        // we want to retry as many times as needed as long as the RS is not dead.
214        LOG.debug("Retrying to same RegionServer {} because: {}", serverName, e.getMessage());
215        numberOfAttemptsSoFar++;
216        submitTask(this, 100, TimeUnit.MILLISECONDS);
217        return true;
218      }
219      // trying to send the request elsewhere instead
220      LOG.warn("Failed dispatch to server={} try={}", serverName, numberOfAttemptsSoFar, e);
221      return false;
222    }
223
224    private long getMaxWaitTime() {
225      if (this.maxWaitTime < 0) {
226        // This is the max attempts, not retries, so it should be at least 1.
227        this.maxWaitTime = EnvironmentEdgeManager.currentTime() + rsStartupWaitTime;
228      }
229      return this.maxWaitTime;
230    }
231
232    protected IOException unwrapException(IOException e) {
233      if (e instanceof RemoteException) {
234        e = ((RemoteException)e).unwrapRemoteException();
235      }
236      return e;
237    }
238  }
239
240  private interface RemoteProcedureResolver {
241    void dispatchOpenRequests(MasterProcedureEnv env, List<RegionOpenOperation> operations);
242
243    void dispatchCloseRequests(MasterProcedureEnv env, List<RegionCloseOperation> operations);
244
245    void dispatchServerOperations(MasterProcedureEnv env, List<ServerOperation> operations);
246  }
247
248  /**
249   * Fetches {@link org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation}s
250   * from the given {@code remoteProcedures} and groups them by class of the returned operation.
251   * Then {@code resolver} is used to dispatch {@link RegionOpenOperation}s and
252   * {@link RegionCloseOperation}s.
253   * @param serverName RegionServer to which the remote operations are sent
254   * @param operations Remote procedures which are dispatched to the given server
255   * @param resolver Used to dispatch remote procedures to given server.
256   */
257  public void splitAndResolveOperation(ServerName serverName, Set<RemoteProcedure> operations,
258      RemoteProcedureResolver resolver) {
259    MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
260    ArrayListMultimap<Class<?>, RemoteOperation> reqsByType =
261      buildAndGroupRequestByType(env, serverName, operations);
262
263    List<RegionOpenOperation> openOps = fetchType(reqsByType, RegionOpenOperation.class);
264    if (!openOps.isEmpty()) {
265      resolver.dispatchOpenRequests(env, openOps);
266    }
267
268    List<RegionCloseOperation> closeOps = fetchType(reqsByType, RegionCloseOperation.class);
269    if (!closeOps.isEmpty()) {
270      resolver.dispatchCloseRequests(env, closeOps);
271    }
272
273    List<ServerOperation> refreshOps = fetchType(reqsByType, ServerOperation.class);
274    if (!refreshOps.isEmpty()) {
275      resolver.dispatchServerOperations(env, refreshOps);
276    }
277
278    if (!reqsByType.isEmpty()) {
279      LOG.warn("unknown request type in the queue: " + reqsByType);
280    }
281  }
282
283  private class DeadRSRemoteCall extends ExecuteProceduresRemoteCall {
284
285    public DeadRSRemoteCall(ServerName serverName, Set<RemoteProcedure> remoteProcedures) {
286      super(serverName, remoteProcedures);
287    }
288
289    @Override
290    public void run() {
291      remoteCallFailed(procedureEnv,
292        new RegionServerStoppedException("Server " + getServerName() + " is not online"));
293    }
294  }
295
296  // ==========================================================================
297  //  Compatibility calls
298  // ==========================================================================
299  protected class ExecuteProceduresRemoteCall extends AbstractRSRemoteCall
300      implements RemoteProcedureResolver {
301    protected final Set<RemoteProcedure> remoteProcedures;
302
303    protected ExecuteProceduresRequest.Builder request = null;
304
305    public ExecuteProceduresRemoteCall(final ServerName serverName,
306        final Set<RemoteProcedure> remoteProcedures) {
307      super(serverName);
308      this.remoteProcedures = remoteProcedures;
309    }
310
311    @Override
312    public void run() {
313      request = ExecuteProceduresRequest.newBuilder();
314      if (LOG.isTraceEnabled()) {
315        LOG.trace("Building request with operations count=" + remoteProcedures.size());
316      }
317      splitAndResolveOperation(getServerName(), remoteProcedures, this);
318
319      try {
320        sendRequest(getServerName(), request.build());
321      } catch (IOException e) {
322        e = unwrapException(e);
323        // TODO: In the future some operation may want to bail out early.
324        // TODO: How many times should we retry (use numberOfAttemptsSoFar)
325        if (!scheduleForRetry(e)) {
326          remoteCallFailed(procedureEnv, e);
327        }
328      }
329    }
330
331    @Override
332    public void dispatchOpenRequests(final MasterProcedureEnv env,
333        final List<RegionOpenOperation> operations) {
334      submitTask(new OpenRegionRemoteCall(getServerName(), operations));
335    }
336
337    @Override
338    public void dispatchCloseRequests(final MasterProcedureEnv env,
339        final List<RegionCloseOperation> operations) {
340      for (RegionCloseOperation op: operations) {
341        submitTask(new CloseRegionRemoteCall(getServerName(), op));
342      }
343    }
344
345    @Override
346    public void dispatchServerOperations(MasterProcedureEnv env, List<ServerOperation> operations) {
347      operations.stream().map(o -> o.buildRequest()).forEachOrdered(request::addProc);
348    }
349
350    protected ExecuteProceduresResponse sendRequest(final ServerName serverName,
351        final ExecuteProceduresRequest request) throws IOException {
352      try {
353        return getRsAdmin().executeProcedures(null, request);
354      } catch (ServiceException se) {
355        throw ProtobufUtil.getRemoteException(se);
356      }
357    }
358
359    protected void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
360      for (RemoteProcedure proc : remoteProcedures) {
361        proc.remoteCallFailed(env, getServerName(), e);
362      }
363    }
364  }
365
366  protected static OpenRegionRequest buildOpenRegionRequest(final MasterProcedureEnv env,
367      final ServerName serverName, final List<RegionOpenOperation> operations) {
368    final OpenRegionRequest.Builder builder = OpenRegionRequest.newBuilder();
369    builder.setServerStartCode(serverName.getStartcode());
370    builder.setMasterSystemTime(EnvironmentEdgeManager.currentTime());
371    for (RegionOpenOperation op: operations) {
372      builder.addOpenInfo(op.buildRegionOpenInfoRequest(env));
373    }
374    return builder.build();
375  }
376
377  // ==========================================================================
378  //  Compatibility calls
379  //  Since we don't have a "batch proc-exec" request on the target RS
380  //  we have to chunk the requests by type and dispatch the specific request.
381  // ==========================================================================
382  /**
383   * Compatibility class used by {@link CompatRemoteProcedureResolver} to open regions using old
384   * {@link AdminService#openRegion(RpcController, OpenRegionRequest, RpcCallback)} rpc.
385   */
386  private final class OpenRegionRemoteCall extends AbstractRSRemoteCall {
387    private final List<RegionOpenOperation> operations;
388
389    public OpenRegionRemoteCall(final ServerName serverName,
390        final List<RegionOpenOperation> operations) {
391      super(serverName);
392      this.operations = operations;
393    }
394
395    @Override
396    public void run() {
397      final OpenRegionRequest request =
398          buildOpenRegionRequest(procedureEnv, getServerName(), operations);
399
400      try {
401        sendRequest(getServerName(), request);
402      } catch (IOException e) {
403        e = unwrapException(e);
404        // TODO: In the future some operation may want to bail out early.
405        // TODO: How many times should we retry (use numberOfAttemptsSoFar)
406        if (!scheduleForRetry(e)) {
407          remoteCallFailed(procedureEnv, e);
408        }
409      }
410    }
411
412    private OpenRegionResponse sendRequest(final ServerName serverName,
413        final OpenRegionRequest request) throws IOException {
414      try {
415        return getRsAdmin().openRegion(null, request);
416      } catch (ServiceException se) {
417        throw ProtobufUtil.getRemoteException(se);
418      }
419    }
420
421    private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
422      for (RegionOpenOperation op: operations) {
423        op.getRemoteProcedure().remoteCallFailed(env, getServerName(), e);
424      }
425    }
426  }
427
428  /**
429   * Compatibility class used by {@link CompatRemoteProcedureResolver} to close regions using old
430   * {@link AdminService#closeRegion(RpcController, CloseRegionRequest, RpcCallback)} rpc.
431   */
432  private final class CloseRegionRemoteCall extends AbstractRSRemoteCall {
433    private final RegionCloseOperation operation;
434
435    public CloseRegionRemoteCall(final ServerName serverName,
436        final RegionCloseOperation operation) {
437      super(serverName);
438      this.operation = operation;
439    }
440
441    @Override
442    public void run() {
443      final CloseRegionRequest request = operation.buildCloseRegionRequest(getServerName());
444      try {
445        CloseRegionResponse response = sendRequest(getServerName(), request);
446        remoteCallCompleted(procedureEnv, response);
447      } catch (IOException e) {
448        e = unwrapException(e);
449        // TODO: In the future some operation may want to bail out early.
450        // TODO: How many times should we retry (use numberOfAttemptsSoFar)
451        if (!scheduleForRetry(e)) {
452          remoteCallFailed(procedureEnv, e);
453        }
454      }
455    }
456
457    private CloseRegionResponse sendRequest(final ServerName serverName,
458        final CloseRegionRequest request) throws IOException {
459      try {
460        return getRsAdmin().closeRegion(null, request);
461      } catch (ServiceException se) {
462        throw ProtobufUtil.getRemoteException(se);
463      }
464    }
465
466    private void remoteCallCompleted(final MasterProcedureEnv env,
467        final CloseRegionResponse response) {
468      operation.setClosed(response.getClosed());
469    }
470
471    private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
472      operation.getRemoteProcedure().remoteCallFailed(env, getServerName(), e);
473    }
474  }
475
476  // ==========================================================================
477  //  RPC Messages
478  //  - ServerOperation: refreshConfig, grant, revoke, ... (TODO)
479  //  - RegionOperation: open, close, flush, snapshot, ...
480  // ==========================================================================
481
482  public static final class ServerOperation extends RemoteOperation {
483
484    private final long procId;
485
486    private final Class<?> rsProcClass;
487
488    private final byte[] rsProcData;
489
490    public ServerOperation(RemoteProcedure remoteProcedure, long procId, Class<?> rsProcClass,
491        byte[] rsProcData) {
492      super(remoteProcedure);
493      this.procId = procId;
494      this.rsProcClass = rsProcClass;
495      this.rsProcData = rsProcData;
496    }
497
498    public RemoteProcedureRequest buildRequest() {
499      return RemoteProcedureRequest.newBuilder().setProcId(procId)
500          .setProcClass(rsProcClass.getName()).setProcData(ByteString.copyFrom(rsProcData)).build();
501    }
502  }
503
504  public static abstract class RegionOperation extends RemoteOperation {
505    private final RegionInfo regionInfo;
506
507    protected RegionOperation(final RemoteProcedure remoteProcedure,
508        final RegionInfo regionInfo) {
509      super(remoteProcedure);
510      this.regionInfo = regionInfo;
511    }
512
513    public RegionInfo getRegionInfo() {
514      return this.regionInfo;
515    }
516  }
517
518  public static class RegionOpenOperation extends RegionOperation {
519    private final List<ServerName> favoredNodes;
520    private final boolean openForReplay;
521    private boolean failedOpen;
522
523    public RegionOpenOperation(final RemoteProcedure remoteProcedure,
524        final RegionInfo regionInfo, final List<ServerName> favoredNodes,
525        final boolean openForReplay) {
526      super(remoteProcedure, regionInfo);
527      this.favoredNodes = favoredNodes;
528      this.openForReplay = openForReplay;
529    }
530
531    protected void setFailedOpen(final boolean failedOpen) {
532      this.failedOpen = failedOpen;
533    }
534
535    public boolean isFailedOpen() {
536      return failedOpen;
537    }
538
539    public OpenRegionRequest.RegionOpenInfo buildRegionOpenInfoRequest(
540        final MasterProcedureEnv env) {
541      return RequestConverter.buildRegionOpenInfo(getRegionInfo(),
542        env.getAssignmentManager().getFavoredNodes(getRegionInfo()));
543    }
544  }
545
546  public static class RegionCloseOperation extends RegionOperation {
547    private final ServerName destinationServer;
548    private boolean closed = false;
549
550    public RegionCloseOperation(final RemoteProcedure remoteProcedure,
551        final RegionInfo regionInfo, final ServerName destinationServer) {
552      super(remoteProcedure, regionInfo);
553      this.destinationServer = destinationServer;
554    }
555
556    public ServerName getDestinationServer() {
557      return destinationServer;
558    }
559
560    protected void setClosed(final boolean closed) {
561      this.closed = closed;
562    }
563
564    public boolean isClosed() {
565      return closed;
566    }
567
568    public CloseRegionRequest buildCloseRegionRequest(final ServerName serverName) {
569      return ProtobufUtil.buildCloseRegionRequest(serverName,
570        getRegionInfo().getRegionName(), getDestinationServer());
571    }
572  }
573}