001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.io.IOException;
021import org.apache.hadoop.hbase.CellScanner;
022import org.apache.hadoop.hbase.HConstants;
023import org.apache.hadoop.hbase.HRegionInfo;
024import org.apache.hadoop.hbase.HRegionLocation;
025import org.apache.hadoop.hbase.ServerName;
026import org.apache.hadoop.hbase.TableName;
027import org.apache.hadoop.hbase.TableNotEnabledException;
028import org.apache.hadoop.hbase.ipc.HBaseRpcController;
029import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
030import org.apache.hadoop.hbase.util.Bytes;
031import org.apache.yetus.audience.InterfaceAudience;
032
033import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
034
035/**
036 * Implementations make a RPC call against a RegionService via a protobuf Service. Implement
037 * rpcCall() and the parent class setClientByServiceName; this latter is where the RPC stub gets set
038 * (the appropriate protobuf 'Service'/Client). Be sure to make use of the RpcController that this
039 * instance is carrying via #getRpcController().
040 * <p>
041 * TODO: this class is actually tied to one region, because most of the paths make use of the
042 * regioninfo part of location when building requests. The only reason it works for multi-region
043 * requests (e.g. batch) is that they happen to not use the region parts. This could be done cleaner
044 * (e.g. having a generic parameter and 2 derived classes, RegionCallable and actual
045 * RegionServerCallable with ServerName.
046 * @param <T> The class that the ServerCallable handles.
047 * @param <S> The protocol to use (Admin or Client or even an Endpoint over in MetaTableAccessor).
048 */
049// TODO: MasterCallable and this Class have a lot in common. UNIFY!
050// Public but should be package private only it is used by MetaTableAccessor. FIX!!
051@InterfaceAudience.Private
052public abstract class RegionServerCallable<T, S> implements RetryingCallable<T> {
053  private final Connection connection;
054  private final TableName tableName;
055  private final byte[] row;
056  /**
057   * Some subclasses want to set their own location. Make it protected.
058   */
059  protected HRegionLocation location;
060  protected S stub;
061
062  /**
063   * This is 99% of the time a HBaseRpcController but also used doing Coprocessor Endpoints and in
064   * this case, it is a ServerRpcControllable which is not a HBaseRpcController. Can be null!
065   */
066  protected final RpcController rpcController;
067  private int priority = HConstants.NORMAL_QOS;
068
069  /**
070   * @param connection    Connection to use.
071   * @param rpcController Controller to use; can be shaded or non-shaded.
072   * @param tableName     Table name to which <code>row</code> belongs.
073   * @param row           The row we want in <code>tableName</code>.
074   */
075  public RegionServerCallable(Connection connection, TableName tableName, byte[] row,
076    RpcController rpcController) {
077    this(connection, tableName, row, rpcController, HConstants.NORMAL_QOS);
078  }
079
080  public RegionServerCallable(Connection connection, TableName tableName, byte[] row,
081    RpcController rpcController, int priority) {
082    super();
083    this.connection = connection;
084    this.tableName = tableName;
085    this.row = row;
086    this.rpcController = rpcController;
087    this.priority = priority;
088  }
089
090  protected RpcController getRpcController() {
091    return this.rpcController;
092  }
093
094  protected void setStub(S stub) {
095    this.stub = stub;
096  }
097
098  protected S getStub() {
099    return this.stub;
100  }
101
102  /**
103   * Override that changes call Exception from {@link Exception} to {@link IOException}. Also does
104   * set up of the rpcController.
105   */
106  @Override
107  public T call(int callTimeout) throws IOException {
108    try {
109      // Iff non-null and an instance of a SHADED rpcController, do config! Unshaded -- i.e.
110      // com.google.protobuf.RpcController or null -- will just skip over this config.
111      if (getRpcController() != null) {
112        RpcController shadedRpcController = (RpcController) getRpcController();
113        // Do a reset to clear previous states, such as CellScanner.
114        shadedRpcController.reset();
115        if (shadedRpcController instanceof HBaseRpcController) {
116          HBaseRpcController hrc = (HBaseRpcController) getRpcController();
117          // If it is an instance of HBaseRpcController, we can set priority on the controller based
118          // off the tableName. Set call timeout too.
119          hrc.setPriority(tableName);
120          hrc.setPriority(priority);
121          hrc.setCallTimeout(callTimeout);
122        }
123      }
124      return rpcCall();
125    } catch (Exception e) {
126      throw ProtobufUtil.handleRemoteException(e);
127    }
128  }
129
130  /**
131   * Run the RPC call. Implement this method. To get at the rpcController that has been created and
132   * configured to make this rpc call, use getRpcController(). We are trying to contain
133   * rpcController references so we don't pollute codebase with protobuf references; keep the
134   * protobuf references contained and only present in a few classes rather than all about the code
135   * base. n
136   */
137  protected abstract T rpcCall() throws Exception;
138
139  /**
140   * Get the RpcController CellScanner. If the RpcController is a HBaseRpcController, which it is in
141   * all cases except when we are processing Coprocessor Endpoint, then this method returns a
142   * reference to the CellScanner that the HBaseRpcController is carrying. Do it up here in this
143   * Callable so we don't have to scatter ugly instanceof tests around the codebase. Will return
144   * null if called in a Coproccessor Endpoint context. Should never happen.
145   */
146  protected CellScanner getRpcControllerCellScanner() {
147    return (getRpcController() != null && getRpcController() instanceof HBaseRpcController)
148      ? ((HBaseRpcController) getRpcController()).cellScanner()
149      : null;
150  }
151
152  protected void setRpcControllerCellScanner(CellScanner cellScanner) {
153    if (getRpcController() != null && getRpcController() instanceof HBaseRpcController) {
154      ((HBaseRpcController) this.rpcController).setCellScanner(cellScanner);
155    }
156  }
157
158  /** Returns {@link ClusterConnection} instance used by this Callable. */
159  protected ClusterConnection getConnection() {
160    return (ClusterConnection) this.connection;
161  }
162
163  protected HRegionLocation getLocation() {
164    return this.location;
165  }
166
167  protected void setLocation(final HRegionLocation location) {
168    this.location = location;
169  }
170
171  public TableName getTableName() {
172    return this.tableName;
173  }
174
175  public byte[] getRow() {
176    return this.row;
177  }
178
179  protected int getPriority() {
180    return this.priority;
181  }
182
183  @Override
184  public void throwable(Throwable t, boolean retrying) {
185    if (location != null) {
186      getConnection().updateCachedLocations(tableName, location.getRegionInfo().getRegionName(),
187        row, t, location.getServerName());
188    }
189  }
190
191  @Override
192  public String getExceptionMessageAdditionalDetail() {
193    return "row '" + Bytes.toStringBinary(row) + "' on table '" + tableName + "' at " + location;
194  }
195
196  @Override
197  public long sleep(long pause, int tries) {
198    return ConnectionUtils.getPauseTime(pause, tries);
199  }
200
201  /** Returns the HRegionInfo for the current region */
202  public HRegionInfo getHRegionInfo() {
203    if (this.location == null) {
204      return null;
205    }
206    return this.location.getRegionInfo();
207  }
208
209  @Override
210  public void prepare(final boolean reload) throws IOException {
211    // check table state if this is a retry
212    if (
213      reload && tableName != null && !tableName.equals(TableName.META_TABLE_NAME)
214        && getConnection().isTableDisabled(tableName)
215    ) {
216      throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled.");
217    }
218    try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
219      this.location = regionLocator.getRegionLocation(row);
220    }
221    if (this.location == null) {
222      throw new IOException("Failed to find location, tableName=" + tableName + ", row="
223        + Bytes.toString(row) + ", reload=" + reload);
224    }
225    setStubByServiceName(this.location.getServerName());
226  }
227
228  /**
229   * Set the RCP client stub
230   * @param serviceName to get the rpc stub for
231   * @throws IOException When client could not be created
232   */
233  protected abstract void setStubByServiceName(ServerName serviceName) throws IOException;
234}