001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.client;
020
021import java.io.IOException;
022
023import org.apache.hadoop.hbase.CellScanner;
024import org.apache.hadoop.hbase.HConstants;
025import org.apache.hadoop.hbase.HRegionInfo;
026import org.apache.hadoop.hbase.HRegionLocation;
027import org.apache.hadoop.hbase.ServerName;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.TableNotEnabledException;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.apache.hadoop.hbase.ipc.HBaseRpcController;
032import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
033import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
034import org.apache.hadoop.hbase.util.Bytes;
035
036/**
037 * Implementations make a RPC call against a RegionService via a protobuf Service.
038 * Implement rpcCall() and the parent class setClientByServiceName; this latter is where the
039 * RPC stub gets set (the appropriate protobuf 'Service'/Client). Be sure to make use of the
040 * RpcController that this instance is carrying via #getRpcController().
041 *
042 * <p>TODO: this class is actually tied to one region, because most of the paths make use of
043 *       the regioninfo part of location when building requests. The only reason it works for
044 *       multi-region requests (e.g. batch) is that they happen to not use the region parts.
045 *       This could be done cleaner (e.g. having a generic parameter and 2 derived classes,
046 *       RegionCallable and actual RegionServerCallable with ServerName.
047 *
048 * @param <T> The class that the ServerCallable handles.
049 * @param <S> The protocol to use (Admin or Client or even an Endpoint over in MetaTableAccessor).
050 */
051// TODO: MasterCallable and this Class have a lot in common. UNIFY!
052// Public but should be package private only it is used by MetaTableAccessor. FIX!!
053@InterfaceAudience.Private
054public abstract class RegionServerCallable<T, S> implements RetryingCallable<T> {
055  private final Connection connection;
056  private final TableName tableName;
057  private final byte[] row;
058  /**
059   * Some subclasses want to set their own location. Make it protected.
060   */
061  protected HRegionLocation location;
062  protected S stub;
063
064  /**
065   * This is 99% of the time a HBaseRpcController but also used doing Coprocessor Endpoints and in
066   * this case, it is a ServerRpcControllable which is not a HBaseRpcController.
067   * Can be null!
068   */
069  protected final RpcController rpcController;
070  private int priority = HConstants.NORMAL_QOS;
071
072  /**
073   * @param connection Connection to use.
074   * @param rpcController Controller to use; can be shaded or non-shaded.
075   * @param tableName Table name to which <code>row</code> belongs.
076   * @param row The row we want in <code>tableName</code>.
077   */
078  public RegionServerCallable(Connection connection, TableName tableName, byte [] row,
079      RpcController rpcController) {
080    this(connection, tableName, row, rpcController, HConstants.NORMAL_QOS);
081  }
082
083  public RegionServerCallable(Connection connection, TableName tableName, byte [] row,
084      RpcController rpcController, int priority) {
085    super();
086    this.connection = connection;
087    this.tableName = tableName;
088    this.row = row;
089    this.rpcController = rpcController;
090    this.priority = priority;
091  }
092
093  protected RpcController getRpcController() {
094    return this.rpcController;
095  }
096
097  protected void setStub(S stub) {
098    this.stub = stub;
099  }
100
101  protected S getStub() {
102    return this.stub;
103  }
104
105  /**
106   * Override that changes call Exception from {@link Exception} to {@link IOException}.
107   * Also does set up of the rpcController.
108   */
109  @Override
110  public T call(int callTimeout) throws IOException {
111    try {
112      // Iff non-null and an instance of a SHADED rpcController, do config! Unshaded -- i.e.
113      // com.google.protobuf.RpcController or null -- will just skip over this config.
114      if (getRpcController() != null) {
115        RpcController shadedRpcController = (RpcController)getRpcController();
116        // Do a reset to clear previous states, such as CellScanner.
117        shadedRpcController.reset();
118        if (shadedRpcController instanceof HBaseRpcController) {
119          HBaseRpcController hrc = (HBaseRpcController)getRpcController();
120          // If it is an instance of HBaseRpcController, we can set priority on the controller based
121          // off the tableName. Set call timeout too.
122          hrc.setPriority(tableName);
123          hrc.setPriority(priority);
124          hrc.setCallTimeout(callTimeout);
125        }
126      }
127      return rpcCall();
128    } catch (Exception e) {
129      throw ProtobufUtil.handleRemoteException(e);
130    }
131  }
132
133  /**
134   * Run the RPC call. Implement this method. To get at the rpcController that has been created
135   * and configured to make this rpc call, use getRpcController(). We are trying to contain
136   * rpcController references so we don't pollute codebase with protobuf references; keep the
137   * protobuf references contained and only present in a few classes rather than all about the
138   * code base.
139   * @throws Exception
140   */
141  protected abstract T rpcCall() throws Exception;
142
143  /**
144   * Get the RpcController CellScanner.
145   * If the RpcController is a HBaseRpcController, which it is in all cases except
146   * when we are processing Coprocessor Endpoint, then this method returns a reference to the
147   * CellScanner that the HBaseRpcController is carrying. Do it up here in this Callable
148   * so we don't have to scatter ugly instanceof tests around the codebase. Will return null
149   * if called in a Coproccessor Endpoint context. Should never happen.
150   */
151  protected CellScanner getRpcControllerCellScanner() {
152    return (getRpcController() != null && getRpcController() instanceof HBaseRpcController)?
153        ((HBaseRpcController)getRpcController()).cellScanner(): null;
154  }
155
156  protected void setRpcControllerCellScanner(CellScanner cellScanner) {
157    if (getRpcController() != null && getRpcController() instanceof HBaseRpcController) {
158      ((HBaseRpcController)this.rpcController).setCellScanner(cellScanner);
159    }
160  }
161
162  /**
163   * @return {@link ClusterConnection} instance used by this Callable.
164   */
165  protected ClusterConnection getConnection() {
166    return (ClusterConnection) this.connection;
167  }
168
169  protected HRegionLocation getLocation() {
170    return this.location;
171  }
172
173  protected void setLocation(final HRegionLocation location) {
174    this.location = location;
175  }
176
177  public TableName getTableName() {
178    return this.tableName;
179  }
180
181  public byte [] getRow() {
182    return this.row;
183  }
184
185  protected int getPriority() { return this.priority;}
186
187  @Override
188  public void throwable(Throwable t, boolean retrying) {
189    if (location != null) {
190      getConnection().updateCachedLocations(tableName, location.getRegionInfo().getRegionName(),
191          row, t, location.getServerName());
192    }
193  }
194
195  @Override
196  public String getExceptionMessageAdditionalDetail() {
197    return "row '" + Bytes.toString(row) + "' on table '" + tableName + "' at " + location;
198  }
199
200  @Override
201  public long sleep(long pause, int tries) {
202    return ConnectionUtils.getPauseTime(pause, tries);
203  }
204
205  /**
206   * @return the HRegionInfo for the current region
207   */
208  public HRegionInfo getHRegionInfo() {
209    if (this.location == null) {
210      return null;
211    }
212    return this.location.getRegionInfo();
213  }
214
215  @Override
216  public void prepare(final boolean reload) throws IOException {
217    // check table state if this is a retry
218    if (reload && tableName != null && !tableName.equals(TableName.META_TABLE_NAME)
219        && getConnection().isTableDisabled(tableName)) {
220      throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled.");
221    }
222    try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
223      this.location = regionLocator.getRegionLocation(row);
224    }
225    if (this.location == null) {
226      throw new IOException("Failed to find location, tableName=" + tableName +
227          ", row=" + Bytes.toString(row) + ", reload=" + reload);
228    }
229    setStubByServiceName(this.location.getServerName());
230  }
231
232  /**
233   * Set the RCP client stub
234   * @param serviceName to get the rpc stub for
235   * @throws IOException When client could not be created
236   */
237  protected abstract void setStubByServiceName(ServerName serviceName) throws IOException;
238}