001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import org.apache.hadoop.hbase.DoNotRetryIOException; 023import org.apache.hadoop.hbase.HBaseIOException; 024import org.apache.hadoop.hbase.HRegionLocation; 025import org.apache.hadoop.hbase.RegionLocations; 026import org.apache.hadoop.hbase.TableName; 027import org.apache.hadoop.hbase.ipc.HBaseRpcController; 028import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 029import org.apache.hadoop.hbase.util.Bytes; 030import org.apache.yetus.audience.InterfaceAudience; 031 032import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 033import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 034 035/** 036 * Similar to RegionServerCallable but for the AdminService interface. This service callable assumes 037 * a Table and row and thus does region locating similar to RegionServerCallable. Works against 038 * Admin stub rather than Client stub. 039 */ 040@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD", 041 justification = "stub used by ipc") 042@InterfaceAudience.Private 043public abstract class RegionAdminServiceCallable<T> implements RetryingCallable<T> { 044 protected AdminService.BlockingInterface stub; 045 protected final RpcControllerFactory rpcControllerFactory; 046 private HBaseRpcController controller = null; 047 048 protected final ClusterConnection connection; 049 protected HRegionLocation location; 050 protected final TableName tableName; 051 protected final byte[] row; 052 protected final int replicaId; 053 054 public RegionAdminServiceCallable(ClusterConnection connection, 055 RpcControllerFactory rpcControllerFactory, TableName tableName, byte[] row) { 056 this(connection, rpcControllerFactory, null, tableName, row); 057 } 058 059 public RegionAdminServiceCallable(ClusterConnection connection, 060 RpcControllerFactory rpcControllerFactory, HRegionLocation location, TableName tableName, 061 byte[] row) { 062 this(connection, rpcControllerFactory, location, tableName, row, 063 RegionReplicaUtil.DEFAULT_REPLICA_ID); 064 } 065 066 public RegionAdminServiceCallable(ClusterConnection connection, 067 RpcControllerFactory rpcControllerFactory, HRegionLocation location, TableName tableName, 068 byte[] row, int replicaId) { 069 this.connection = connection; 070 this.rpcControllerFactory = rpcControllerFactory; 071 this.location = location; 072 this.tableName = tableName; 073 this.row = row; 074 this.replicaId = replicaId; 075 } 076 077 @Override 078 public void prepare(boolean reload) throws IOException { 079 if (Thread.interrupted()) { 080 throw new InterruptedIOException(); 081 } 082 if (reload || location == null) { 083 location = getLocation(!reload); 084 } 085 if (location == null) { 086 // With this exception, there will be a retry. 087 throw new HBaseIOException(getExceptionMessage()); 088 } 089 this.setStub(connection.getAdmin(location.getServerName())); 090 } 091 092 protected void setStub(AdminService.BlockingInterface stub) { 093 this.stub = stub; 094 } 095 096 public HRegionLocation getLocation(boolean useCache) throws IOException { 097 RegionLocations rl = getRegionLocations(connection, tableName, row, useCache, replicaId); 098 if (rl == null) { 099 throw new HBaseIOException(getExceptionMessage()); 100 } 101 HRegionLocation location = rl.getRegionLocation(replicaId); 102 if (location == null) { 103 throw new HBaseIOException(getExceptionMessage()); 104 } 105 106 return location; 107 } 108 109 @Override 110 public void throwable(Throwable t, boolean retrying) { 111 if (location != null) { 112 connection.updateCachedLocations(tableName, location.getRegionInfo().getRegionName(), row, t, 113 location.getServerName()); 114 } 115 } 116 117 /** Returns {@link Connection} instance used by this Callable. */ 118 Connection getConnection() { 119 return this.connection; 120 } 121 122 // subclasses can override this. 123 protected String getExceptionMessage() { 124 return "There is no location" + " table=" + tableName + " ,replica=" + replicaId + ", row=" 125 + Bytes.toStringBinary(row); 126 } 127 128 @Override 129 public String getExceptionMessageAdditionalDetail() { 130 return null; 131 } 132 133 @Override 134 public long sleep(long pause, int tries) { 135 return ConnectionUtils.getPauseTime(pause, tries); 136 } 137 138 public static RegionLocations getRegionLocations(ClusterConnection connection, 139 TableName tableName, byte[] row, boolean useCache, int replicaId) 140 throws RetriesExhaustedException, DoNotRetryIOException, InterruptedIOException { 141 RegionLocations rl; 142 try { 143 rl = connection.locateRegion(tableName, row, useCache, true, replicaId); 144 } catch (DoNotRetryIOException e) { 145 throw e; 146 } catch (RetriesExhaustedException e) { 147 throw e; 148 } catch (InterruptedIOException e) { 149 throw e; 150 } catch (IOException e) { 151 throw new RetriesExhaustedException("Can't get the location", e); 152 } 153 if (rl == null) { 154 throw new RetriesExhaustedException("Can't get the locations"); 155 } 156 return rl; 157 } 158 159 /** 160 * Override that changes Exception from {@link Exception} to {@link IOException}. It also does 161 * setup of an rpcController and calls through to the unimplemented 162 * call(PayloadCarryingRpcController) method; implement this method to add your rpc invocation. 163 */ 164 @Override 165 // Same trick as in RegionServerCallable so users don't have to copy/paste so much boilerplate 166 // and so we contain references to protobuf. We can't set priority on the rpcController as 167 // we do in RegionServerCallable because we don't always have a Table when we call. 168 public T call(int callTimeout) throws IOException { 169 this.controller = rpcControllerFactory.newController(); 170 this.controller.setPriority(this.tableName); 171 this.controller.setCallTimeout(callTimeout); 172 try { 173 return call(this.controller); 174 } catch (Exception e) { 175 throw ProtobufUtil.handleRemoteException(e); 176 } 177 } 178 179 HBaseRpcController getCurrentPayloadCarryingRpcController() { 180 return this.controller; 181 } 182 183 /** 184 * Run RPC call. 185 * @param rpcController PayloadCarryingRpcController is a mouthful but it at a minimum is a facade 186 * on protobuf so we don't have to put protobuf everywhere; we can keep it 187 * behind this class. n 188 */ 189 protected abstract T call(HBaseRpcController rpcController) throws Exception; 190}