001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.master.procedure;
020
021import java.io.IOException;
022import java.lang.Thread.UncaughtExceptionHandler;
023import java.util.List;
024import java.util.Set;
025import java.util.concurrent.TimeUnit;
026import org.apache.hadoop.hbase.DoNotRetryIOException;
027import org.apache.hadoop.hbase.ServerName;
028import org.apache.hadoop.hbase.client.RegionInfo;
029import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
030import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
031import org.apache.hadoop.hbase.master.MasterServices;
032import org.apache.hadoop.hbase.master.ServerListener;
033import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
034import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
035import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
036import org.apache.hadoop.ipc.RemoteException;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
042import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
043import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
044import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
045
046import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
047import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
048import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
049import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
050import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
051import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
052import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
053import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
054import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
055
056/**
057 * A remote procecdure dispatcher for regionservers.
058 */
059@InterfaceAudience.Private
060public class RSProcedureDispatcher
061    extends RemoteProcedureDispatcher<MasterProcedureEnv, ServerName>
062    implements ServerListener {
063  private static final Logger LOG = LoggerFactory.getLogger(RSProcedureDispatcher.class);
064
065  public static final String RS_RPC_STARTUP_WAIT_TIME_CONF_KEY =
066      "hbase.regionserver.rpc.startup.waittime";
067  private static final int DEFAULT_RS_RPC_STARTUP_WAIT_TIME = 60000;
068
069  private static final int RS_VERSION_WITH_EXEC_PROCS = 0x0200000; // 2.0
070
071  protected final MasterServices master;
072  private final long rsStartupWaitTime;
073  private MasterProcedureEnv procedureEnv;
074
075  public RSProcedureDispatcher(final MasterServices master) {
076    super(master.getConfiguration());
077
078    this.master = master;
079    this.rsStartupWaitTime = master.getConfiguration().getLong(
080      RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, DEFAULT_RS_RPC_STARTUP_WAIT_TIME);
081  }
082
083  @Override
084  protected UncaughtExceptionHandler getUncaughtExceptionHandler() {
085    return new UncaughtExceptionHandler() {
086
087      @Override
088      public void uncaughtException(Thread t, Throwable e) {
089        LOG.error("Unexpected error caught, this may cause the procedure to hang forever", e);
090      }
091    };
092  }
093
094  @Override
095  public boolean start() {
096    if (!super.start()) {
097      return false;
098    }
099
100    master.getServerManager().registerListener(this);
101    procedureEnv = master.getMasterProcedureExecutor().getEnvironment();
102    for (ServerName serverName: master.getServerManager().getOnlineServersList()) {
103      addNode(serverName);
104    }
105    return true;
106  }
107
108  @Override
109  public boolean stop() {
110    if (!super.stop()) {
111      return false;
112    }
113
114    master.getServerManager().unregisterListener(this);
115    return true;
116  }
117
118  @Override
119  protected void remoteDispatch(final ServerName serverName,
120      final Set<RemoteProcedure> remoteProcedures) {
121    final int rsVersion = master.getServerManager().getServerVersion(serverName);
122    if (rsVersion == 0 && !master.getServerManager().isServerOnline(serverName)) {
123      submitTask(new DeadRSRemoteCall(serverName, remoteProcedures));
124    } else {
125      submitTask(new ExecuteProceduresRemoteCall(serverName, remoteProcedures));
126    }
127  }
128
129  @Override
130  protected void abortPendingOperations(final ServerName serverName,
131      final Set<RemoteProcedure> operations) {
132    // TODO: Replace with a ServerNotOnlineException()
133    final IOException e = new DoNotRetryIOException("server not online " + serverName);
134    for (RemoteProcedure proc: operations) {
135      proc.remoteCallFailed(procedureEnv, serverName, e);
136    }
137  }
138
139  @Override
140  public void serverAdded(final ServerName serverName) {
141    addNode(serverName);
142  }
143
144  @Override
145  public void serverRemoved(final ServerName serverName) {
146    removeNode(serverName);
147  }
148
149  /**
150   * Base remote call
151   */
152  protected abstract class AbstractRSRemoteCall implements Runnable {
153
154    private final ServerName serverName;
155
156    private int numberOfAttemptsSoFar = 0;
157    private long maxWaitTime = -1;
158
159    public AbstractRSRemoteCall(final ServerName serverName) {
160      this.serverName = serverName;
161    }
162
163    protected AdminService.BlockingInterface getRsAdmin() throws IOException {
164      final AdminService.BlockingInterface admin = master.getServerManager().getRsAdmin(serverName);
165      if (admin == null) {
166        throw new IOException("Attempting to send OPEN RPC to server " + getServerName() +
167          " failed because no RPC connection found to this server");
168      }
169      return admin;
170    }
171
172    protected ServerName getServerName() {
173      return serverName;
174    }
175
176    protected boolean scheduleForRetry(final IOException e) {
177      // Should we wait a little before retrying? If the server is starting it's yes.
178      final boolean hold = (e instanceof ServerNotRunningYetException);
179      if (hold) {
180        LOG.warn(String.format("waiting a little before trying on the same server=%s try=%d",
181            serverName, numberOfAttemptsSoFar), e);
182        long now = EnvironmentEdgeManager.currentTime();
183        if (now < getMaxWaitTime()) {
184          if (LOG.isDebugEnabled()) {
185            LOG.debug(String.format("server is not yet up; waiting up to %dms",
186              (getMaxWaitTime() - now)), e);
187          }
188          submitTask(this, 100, TimeUnit.MILLISECONDS);
189          return true;
190        }
191
192        LOG.warn(String.format("server %s is not up for a while; try a new one", serverName), e);
193        return false;
194      }
195
196      // In case it is a connection exception and the region server is still online,
197      // the openRegion RPC could have been accepted by the server and
198      // just the response didn't go through. So we will retry to
199      // open the region on the same server.
200      final boolean retry = !hold && (ClientExceptionsUtil.isConnectionException(e)
201          && master.getServerManager().isServerOnline(serverName));
202      if (retry) {
203        // we want to retry as many times as needed as long as the RS is not dead.
204        if (LOG.isDebugEnabled()) {
205          LOG.debug(String.format("Retrying to same RegionServer %s because: %s",
206              serverName, e.getMessage()), e);
207        }
208        submitTask(this, 100, TimeUnit.MILLISECONDS);
209        return true;
210      }
211      // trying to send the request elsewhere instead
212      LOG.warn(String.format("Failed dispatch to server=%s try=%d",
213                  serverName, numberOfAttemptsSoFar), e);
214      return false;
215    }
216
217    private long getMaxWaitTime() {
218      if (this.maxWaitTime < 0) {
219        // This is the max attempts, not retries, so it should be at least 1.
220        this.maxWaitTime = EnvironmentEdgeManager.currentTime() + rsStartupWaitTime;
221      }
222      return this.maxWaitTime;
223    }
224
225    protected IOException unwrapException(IOException e) {
226      if (e instanceof RemoteException) {
227        e = ((RemoteException)e).unwrapRemoteException();
228      }
229      return e;
230    }
231  }
232
233  private interface RemoteProcedureResolver {
234    void dispatchOpenRequests(MasterProcedureEnv env, List<RegionOpenOperation> operations);
235    void dispatchCloseRequests(MasterProcedureEnv env, List<RegionCloseOperation> operations);
236  }
237
238  /**
239   * Fetches {@link org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation}s
240   * from the given {@code remoteProcedures} and groups them by class of the returned operation.
241   * Then {@code resolver} is used to dispatch {@link RegionOpenOperation}s and
242   * {@link RegionCloseOperation}s.
243   * @param serverName RegionServer to which the remote operations are sent
244   * @param remoteProcedures Remote procedures which are dispatched to the given server
245   * @param resolver Used to dispatch remote procedures to given server.
246   */
247  public void splitAndResolveOperation(final ServerName serverName,
248      final Set<RemoteProcedure> remoteProcedures, final RemoteProcedureResolver resolver) {
249    final ArrayListMultimap<Class<?>, RemoteOperation> reqsByType =
250      buildAndGroupRequestByType(procedureEnv, serverName, remoteProcedures);
251
252    final List<RegionOpenOperation> openOps = fetchType(reqsByType, RegionOpenOperation.class);
253    if (!openOps.isEmpty()) {
254      resolver.dispatchOpenRequests(procedureEnv, openOps);
255    }
256
257    final List<RegionCloseOperation> closeOps = fetchType(reqsByType, RegionCloseOperation.class);
258    if (!closeOps.isEmpty()) {
259      resolver.dispatchCloseRequests(procedureEnv, closeOps);
260    }
261
262    if (!reqsByType.isEmpty()) {
263      LOG.warn("unknown request type in the queue: " + reqsByType);
264    }
265  }
266
267  private class DeadRSRemoteCall extends ExecuteProceduresRemoteCall {
268
269    public DeadRSRemoteCall(ServerName serverName, Set<RemoteProcedure> remoteProcedures) {
270      super(serverName, remoteProcedures);
271    }
272
273    @Override
274    public void run() {
275      remoteCallFailed(procedureEnv,
276        new RegionServerStoppedException("Server " + getServerName() + " is not online"));
277    }
278  }
279
280  // ==========================================================================
281  //  Compatibility calls
282  // ==========================================================================
283  protected class ExecuteProceduresRemoteCall extends AbstractRSRemoteCall
284      implements RemoteProcedureResolver {
285    protected final Set<RemoteProcedure> remoteProcedures;
286
287    protected ExecuteProceduresRequest.Builder request = null;
288
289    public ExecuteProceduresRemoteCall(final ServerName serverName,
290        final Set<RemoteProcedure> remoteProcedures) {
291      super(serverName);
292      this.remoteProcedures = remoteProcedures;
293    }
294
295    @Override
296    public void run() {
297      request = ExecuteProceduresRequest.newBuilder();
298      if (LOG.isTraceEnabled()) {
299        LOG.trace("Building request with operations count=" + remoteProcedures.size());
300      }
301      splitAndResolveOperation(getServerName(), remoteProcedures, this);
302
303      try {
304        final ExecuteProceduresResponse response = sendRequest(getServerName(), request.build());
305        remoteCallCompleted(procedureEnv, response);
306      } catch (IOException e) {
307        e = unwrapException(e);
308        // TODO: In the future some operation may want to bail out early.
309        // TODO: How many times should we retry (use numberOfAttemptsSoFar)
310        if (!scheduleForRetry(e)) {
311          remoteCallFailed(procedureEnv, e);
312        }
313      }
314    }
315
316    @Override
317    public void dispatchOpenRequests(final MasterProcedureEnv env,
318        final List<RegionOpenOperation> operations) {
319      submitTask(new OpenRegionRemoteCall(getServerName(), operations));
320    }
321
322    @Override
323    public void dispatchCloseRequests(final MasterProcedureEnv env,
324        final List<RegionCloseOperation> operations) {
325      for (RegionCloseOperation op: operations) {
326        submitTask(new CloseRegionRemoteCall(getServerName(), op));
327      }
328    }
329
330    protected ExecuteProceduresResponse sendRequest(final ServerName serverName,
331        final ExecuteProceduresRequest request) throws IOException {
332      try {
333        return getRsAdmin().executeProcedures(null, request);
334      } catch (ServiceException se) {
335        throw ProtobufUtil.getRemoteException(se);
336      }
337    }
338
339
340    private void remoteCallCompleted(final MasterProcedureEnv env,
341        final ExecuteProceduresResponse response) {
342      /*
343      for (RemoteProcedure proc: operations) {
344        proc.remoteCallCompleted(env, getServerName(), response);
345      }*/
346    }
347
348    protected void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
349      for (RemoteProcedure proc: remoteProcedures) {
350        proc.remoteCallFailed(env, getServerName(), e);
351      }
352    }
353  }
354
355  protected static OpenRegionRequest buildOpenRegionRequest(final MasterProcedureEnv env,
356      final ServerName serverName, final List<RegionOpenOperation> operations) {
357    final OpenRegionRequest.Builder builder = OpenRegionRequest.newBuilder();
358    builder.setServerStartCode(serverName.getStartcode());
359    builder.setMasterSystemTime(EnvironmentEdgeManager.currentTime());
360    for (RegionOpenOperation op: operations) {
361      builder.addOpenInfo(op.buildRegionOpenInfoRequest(env));
362    }
363    return builder.build();
364  }
365
366  // ==========================================================================
367  //  Compatibility calls
368  //  Since we don't have a "batch proc-exec" request on the target RS
369  //  we have to chunk the requests by type and dispatch the specific request.
370  // ==========================================================================
371  /**
372   * Compatibility class used by {@link CompatRemoteProcedureResolver} to open regions using old
373   * {@link AdminService#openRegion(RpcController, OpenRegionRequest, RpcCallback)} rpc.
374   */
375  private final class OpenRegionRemoteCall extends AbstractRSRemoteCall {
376    private final List<RegionOpenOperation> operations;
377
378    public OpenRegionRemoteCall(final ServerName serverName,
379        final List<RegionOpenOperation> operations) {
380      super(serverName);
381      this.operations = operations;
382    }
383
384    @Override
385    public void run() {
386      final OpenRegionRequest request =
387          buildOpenRegionRequest(procedureEnv, getServerName(), operations);
388
389      try {
390        OpenRegionResponse response = sendRequest(getServerName(), request);
391        remoteCallCompleted(procedureEnv, response);
392      } catch (IOException e) {
393        e = unwrapException(e);
394        // TODO: In the future some operation may want to bail out early.
395        // TODO: How many times should we retry (use numberOfAttemptsSoFar)
396        if (!scheduleForRetry(e)) {
397          remoteCallFailed(procedureEnv, e);
398        }
399      }
400    }
401
402    private OpenRegionResponse sendRequest(final ServerName serverName,
403        final OpenRegionRequest request) throws IOException {
404      try {
405        return getRsAdmin().openRegion(null, request);
406      } catch (ServiceException se) {
407        throw ProtobufUtil.getRemoteException(se);
408      }
409    }
410
411    private void remoteCallCompleted(final MasterProcedureEnv env,
412        final OpenRegionResponse response) {
413      int index = 0;
414      for (RegionOpenOperation op: operations) {
415        OpenRegionResponse.RegionOpeningState state = response.getOpeningState(index++);
416        op.setFailedOpen(state == OpenRegionResponse.RegionOpeningState.FAILED_OPENING);
417        op.getRemoteProcedure().remoteCallCompleted(env, getServerName(), op);
418      }
419    }
420
421    private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
422      for (RegionOpenOperation op: operations) {
423        op.getRemoteProcedure().remoteCallFailed(env, getServerName(), e);
424      }
425    }
426  }
427
428  /**
429   * Compatibility class used by {@link CompatRemoteProcedureResolver} to close regions using old
430   * {@link AdminService#closeRegion(RpcController, CloseRegionRequest, RpcCallback)} rpc.
431   */
432  private final class CloseRegionRemoteCall extends AbstractRSRemoteCall {
433    private final RegionCloseOperation operation;
434
435    public CloseRegionRemoteCall(final ServerName serverName,
436        final RegionCloseOperation operation) {
437      super(serverName);
438      this.operation = operation;
439    }
440
441    @Override
442    public void run() {
443      final CloseRegionRequest request = operation.buildCloseRegionRequest(getServerName());
444      try {
445        CloseRegionResponse response = sendRequest(getServerName(), request);
446        remoteCallCompleted(procedureEnv, response);
447      } catch (IOException e) {
448        e = unwrapException(e);
449        // TODO: In the future some operation may want to bail out early.
450        // TODO: How many times should we retry (use numberOfAttemptsSoFar)
451        if (!scheduleForRetry(e)) {
452          remoteCallFailed(procedureEnv, e);
453        }
454      }
455    }
456
457    private CloseRegionResponse sendRequest(final ServerName serverName,
458        final CloseRegionRequest request) throws IOException {
459      try {
460        return getRsAdmin().closeRegion(null, request);
461      } catch (ServiceException se) {
462        throw ProtobufUtil.getRemoteException(se);
463      }
464    }
465
466    private void remoteCallCompleted(final MasterProcedureEnv env,
467        final CloseRegionResponse response) {
468      operation.setClosed(response.getClosed());
469      operation.getRemoteProcedure().remoteCallCompleted(env, getServerName(), operation);
470    }
471
472    private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
473      operation.getRemoteProcedure().remoteCallFailed(env, getServerName(), e);
474    }
475  }
476
477  // ==========================================================================
478  //  RPC Messages
479  //  - ServerOperation: refreshConfig, grant, revoke, ... (TODO)
480  //  - RegionOperation: open, close, flush, snapshot, ...
481  // ==========================================================================
482  /* Currently unused
483  public static abstract class ServerOperation extends RemoteOperation {
484    protected ServerOperation(final RemoteProcedure remoteProcedure) {
485      super(remoteProcedure);
486    }
487  }
488  */
489
490  public static abstract class RegionOperation extends RemoteOperation {
491    private final RegionInfo regionInfo;
492
493    protected RegionOperation(final RemoteProcedure remoteProcedure,
494        final RegionInfo regionInfo) {
495      super(remoteProcedure);
496      this.regionInfo = regionInfo;
497    }
498
499    public RegionInfo getRegionInfo() {
500      return this.regionInfo;
501    }
502  }
503
504  public static class RegionOpenOperation extends RegionOperation {
505    private final List<ServerName> favoredNodes;
506    private final boolean openForReplay;
507    private boolean failedOpen;
508
509    public RegionOpenOperation(final RemoteProcedure remoteProcedure,
510        final RegionInfo regionInfo, final List<ServerName> favoredNodes,
511        final boolean openForReplay) {
512      super(remoteProcedure, regionInfo);
513      this.favoredNodes = favoredNodes;
514      this.openForReplay = openForReplay;
515    }
516
517    protected void setFailedOpen(final boolean failedOpen) {
518      this.failedOpen = failedOpen;
519    }
520
521    public boolean isFailedOpen() {
522      return failedOpen;
523    }
524
525    public OpenRegionRequest.RegionOpenInfo buildRegionOpenInfoRequest(
526        final MasterProcedureEnv env) {
527      return RequestConverter.buildRegionOpenInfo(getRegionInfo(),
528        env.getAssignmentManager().getFavoredNodes(getRegionInfo()));
529    }
530  }
531
532  public static class RegionCloseOperation extends RegionOperation {
533    private final ServerName destinationServer;
534    private boolean closed = false;
535
536    public RegionCloseOperation(final RemoteProcedure remoteProcedure,
537        final RegionInfo regionInfo, final ServerName destinationServer) {
538      super(remoteProcedure, regionInfo);
539      this.destinationServer = destinationServer;
540    }
541
542    public ServerName getDestinationServer() {
543      return destinationServer;
544    }
545
546    protected void setClosed(final boolean closed) {
547      this.closed = closed;
548    }
549
550    public boolean isClosed() {
551      return closed;
552    }
553
554    public CloseRegionRequest buildCloseRegionRequest(final ServerName serverName) {
555      return ProtobufUtil.buildCloseRegionRequest(serverName,
556        getRegionInfo().getRegionName(), getDestinationServer());
557    }
558  }
559}