001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import java.io.IOException; 021import org.apache.hadoop.hbase.CellScanner; 022import org.apache.hadoop.hbase.HConstants; 023import org.apache.hadoop.hbase.HRegionInfo; 024import org.apache.hadoop.hbase.HRegionLocation; 025import org.apache.hadoop.hbase.ServerName; 026import org.apache.hadoop.hbase.TableName; 027import org.apache.hadoop.hbase.TableNotEnabledException; 028import org.apache.hadoop.hbase.ipc.HBaseRpcController; 029import org.apache.hadoop.hbase.protobuf.ProtobufUtil; 030import org.apache.hadoop.hbase.util.Bytes; 031import org.apache.yetus.audience.InterfaceAudience; 032 033import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 034 035/** 036 * Implementations make a RPC call against a RegionService via a protobuf Service. Implement 037 * rpcCall() and the parent class setClientByServiceName; this latter is where the RPC stub gets set 038 * (the appropriate protobuf 'Service'/Client). Be sure to make use of the RpcController that this 039 * instance is carrying via #getRpcController(). 040 * <p> 041 * TODO: this class is actually tied to one region, because most of the paths make use of the 042 * regioninfo part of location when building requests. The only reason it works for multi-region 043 * requests (e.g. batch) is that they happen to not use the region parts. This could be done cleaner 044 * (e.g. having a generic parameter and 2 derived classes, RegionCallable and actual 045 * RegionServerCallable with ServerName. 046 * @param <T> The class that the ServerCallable handles. 047 * @param <S> The protocol to use (Admin or Client or even an Endpoint over in MetaTableAccessor). 048 */ 049// TODO: MasterCallable and this Class have a lot in common. UNIFY! 050// Public but should be package private only it is used by MetaTableAccessor. FIX!! 051@InterfaceAudience.Private 052public abstract class RegionServerCallable<T, S> implements RetryingCallable<T> { 053 private final Connection connection; 054 private final TableName tableName; 055 private final byte[] row; 056 /** 057 * Some subclasses want to set their own location. Make it protected. 058 */ 059 protected HRegionLocation location; 060 protected S stub; 061 062 /** 063 * This is 99% of the time a HBaseRpcController but also used doing Coprocessor Endpoints and in 064 * this case, it is a ServerRpcControllable which is not a HBaseRpcController. Can be null! 065 */ 066 protected final RpcController rpcController; 067 private int priority = HConstants.NORMAL_QOS; 068 069 /** 070 * @param connection Connection to use. 071 * @param rpcController Controller to use; can be shaded or non-shaded. 072 * @param tableName Table name to which <code>row</code> belongs. 073 * @param row The row we want in <code>tableName</code>. 074 */ 075 public RegionServerCallable(Connection connection, TableName tableName, byte[] row, 076 RpcController rpcController) { 077 this(connection, tableName, row, rpcController, HConstants.NORMAL_QOS); 078 } 079 080 public RegionServerCallable(Connection connection, TableName tableName, byte[] row, 081 RpcController rpcController, int priority) { 082 super(); 083 this.connection = connection; 084 this.tableName = tableName; 085 this.row = row; 086 this.rpcController = rpcController; 087 this.priority = priority; 088 } 089 090 protected RpcController getRpcController() { 091 return this.rpcController; 092 } 093 094 protected void setStub(S stub) { 095 this.stub = stub; 096 } 097 098 protected S getStub() { 099 return this.stub; 100 } 101 102 /** 103 * Override that changes call Exception from {@link Exception} to {@link IOException}. Also does 104 * set up of the rpcController. 105 */ 106 @Override 107 public T call(int callTimeout) throws IOException { 108 try { 109 // Iff non-null and an instance of a SHADED rpcController, do config! Unshaded -- i.e. 110 // com.google.protobuf.RpcController or null -- will just skip over this config. 111 if (getRpcController() != null) { 112 RpcController shadedRpcController = (RpcController) getRpcController(); 113 // Do a reset to clear previous states, such as CellScanner. 114 shadedRpcController.reset(); 115 if (shadedRpcController instanceof HBaseRpcController) { 116 HBaseRpcController hrc = (HBaseRpcController) getRpcController(); 117 // If it is an instance of HBaseRpcController, we can set priority on the controller based 118 // off the tableName. Set call timeout too. 119 hrc.setPriority(tableName); 120 hrc.setPriority(priority); 121 hrc.setCallTimeout(callTimeout); 122 } 123 } 124 return rpcCall(); 125 } catch (Exception e) { 126 throw ProtobufUtil.handleRemoteException(e); 127 } 128 } 129 130 /** 131 * Run the RPC call. Implement this method. To get at the rpcController that has been created and 132 * configured to make this rpc call, use getRpcController(). We are trying to contain 133 * rpcController references so we don't pollute codebase with protobuf references; keep the 134 * protobuf references contained and only present in a few classes rather than all about the code 135 * base. n 136 */ 137 protected abstract T rpcCall() throws Exception; 138 139 /** 140 * Get the RpcController CellScanner. If the RpcController is a HBaseRpcController, which it is in 141 * all cases except when we are processing Coprocessor Endpoint, then this method returns a 142 * reference to the CellScanner that the HBaseRpcController is carrying. Do it up here in this 143 * Callable so we don't have to scatter ugly instanceof tests around the codebase. Will return 144 * null if called in a Coproccessor Endpoint context. Should never happen. 145 */ 146 protected CellScanner getRpcControllerCellScanner() { 147 return (getRpcController() != null && getRpcController() instanceof HBaseRpcController) 148 ? ((HBaseRpcController) getRpcController()).cellScanner() 149 : null; 150 } 151 152 protected void setRpcControllerCellScanner(CellScanner cellScanner) { 153 if (getRpcController() != null && getRpcController() instanceof HBaseRpcController) { 154 ((HBaseRpcController) this.rpcController).setCellScanner(cellScanner); 155 } 156 } 157 158 /** Returns {@link ClusterConnection} instance used by this Callable. */ 159 protected ClusterConnection getConnection() { 160 return (ClusterConnection) this.connection; 161 } 162 163 protected HRegionLocation getLocation() { 164 return this.location; 165 } 166 167 protected void setLocation(final HRegionLocation location) { 168 this.location = location; 169 } 170 171 public TableName getTableName() { 172 return this.tableName; 173 } 174 175 public byte[] getRow() { 176 return this.row; 177 } 178 179 protected int getPriority() { 180 return this.priority; 181 } 182 183 @Override 184 public void throwable(Throwable t, boolean retrying) { 185 if (location != null) { 186 getConnection().updateCachedLocations(tableName, location.getRegionInfo().getRegionName(), 187 row, t, location.getServerName()); 188 } 189 } 190 191 @Override 192 public String getExceptionMessageAdditionalDetail() { 193 return "row '" + Bytes.toStringBinary(row) + "' on table '" + tableName + "' at " + location; 194 } 195 196 @Override 197 public long sleep(long pause, int tries) { 198 return ConnectionUtils.getPauseTime(pause, tries); 199 } 200 201 /** Returns the HRegionInfo for the current region */ 202 public HRegionInfo getHRegionInfo() { 203 if (this.location == null) { 204 return null; 205 } 206 return this.location.getRegionInfo(); 207 } 208 209 @Override 210 public void prepare(final boolean reload) throws IOException { 211 // check table state if this is a retry 212 if ( 213 reload && tableName != null && !tableName.equals(TableName.META_TABLE_NAME) 214 && getConnection().isTableDisabled(tableName) 215 ) { 216 throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled."); 217 } 218 try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) { 219 this.location = regionLocator.getRegionLocation(row); 220 } 221 if (this.location == null) { 222 throw new IOException("Failed to find location, tableName=" + tableName + ", row=" 223 + Bytes.toString(row) + ", reload=" + reload); 224 } 225 setStubByServiceName(this.location.getServerName()); 226 } 227 228 /** 229 * Set the RCP client stub 230 * @param serviceName to get the rpc stub for 231 * @throws IOException When client could not be created 232 */ 233 protected abstract void setStubByServiceName(ServerName serviceName) throws IOException; 234}