001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.IOException;
021import java.net.ConnectException;
022import java.net.UnknownHostException;
023import java.util.Arrays;
024import java.util.List;
025import java.util.Set;
026import java.util.concurrent.atomic.AtomicInteger;
027import org.apache.hadoop.hbase.ServerName;
028import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
029import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
030import org.apache.hadoop.hbase.master.MasterServices;
031import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher;
032import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
037
038/**
039 * Test implementation of RSProcedureDispatcher that throws desired errors for testing purpose.
040 */
041public class RSProcDispatcher extends RSProcedureDispatcher {
042
043  private static final Logger LOG = LoggerFactory.getLogger(RSProcDispatcher.class);
044
045  private static final AtomicInteger I = new AtomicInteger();
046
047  private static final List<IOException> ERRORS =
048    Arrays.asList(new ConnectionClosedException("test connection closed error..."),
049      new UnknownHostException("test unknown host error..."),
050      new ConnectException("test connect error..."));
051
052  private static final AtomicInteger ERROR_IDX = new AtomicInteger();
053
054  public RSProcDispatcher(MasterServices master) {
055    super(master);
056  }
057
058  @Override
059  protected void remoteDispatch(final ServerName serverName,
060    final Set<RemoteProcedure> remoteProcedures) {
061    if (!master.getServerManager().isServerOnline(serverName)) {
062      // fail fast
063      submitTask(new DeadRSRemoteCall(serverName, remoteProcedures));
064    } else {
065      submitTask(new TestExecuteProceduresRemoteCall(serverName, remoteProcedures));
066    }
067  }
068
069  class TestExecuteProceduresRemoteCall extends ExecuteProceduresRemoteCall {
070
071    public TestExecuteProceduresRemoteCall(ServerName serverName,
072      Set<RemoteProcedure> remoteProcedures) {
073      super(serverName, remoteProcedures);
074    }
075
076    @Override
077    public AdminProtos.ExecuteProceduresResponse sendRequest(final ServerName serverName,
078      final AdminProtos.ExecuteProceduresRequest request) throws IOException {
079      int j = I.addAndGet(1);
080      LOG.info("sendRequest() req: {} , j: {}", request, j);
081      if (j == 12 || j == 22) {
082        // Execute the remote close and open region requests in the last (5th) retry before
083        // throwing ConnectionClosedException. This is to ensure even if the region open/close
084        // is successfully completed by regionserver, master still schedules SCP because
085        // sendRequest() throws error which has retry-limit exhausted.
086        FutureUtils.get(getRsAdmin().executeProcedures(request));
087      }
088      // For one of the close region requests and one of the open region requests,
089      // throw ConnectionClosedException until retry limit is exhausted and master
090      // schedules recoveries for the server.
091      // We will have ABNORMALLY_CLOSED regions, and they are expected to recover on their own.
092      if (j >= 8 && j <= 13 || j >= 18 && j <= 23) {
093        throw ERRORS.get(ERROR_IDX.getAndIncrement() % ERRORS.size());
094      }
095      return FutureUtils.get(getRsAdmin().executeProcedures(request));
096    }
097
098    private AsyncRegionServerAdmin getRsAdmin() {
099      return master.getAsyncClusterConnection().getRegionServerAdmin(getServerName());
100    }
101  }
102
103  private class DeadRSRemoteCall extends ExecuteProceduresRemoteCall {
104
105    public DeadRSRemoteCall(ServerName serverName, Set<RemoteProcedure> remoteProcedures) {
106      super(serverName, remoteProcedures);
107    }
108
109    @Override
110    public void run() {
111      remoteCallFailed(master.getMasterProcedureExecutor().getEnvironment(),
112        new RegionServerStoppedException("Server " + getServerName() + " is not online"));
113    }
114  }
115
116}