001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.client; 020 021import java.io.IOException; 022import java.io.InterruptedIOException; 023 024import org.apache.hadoop.hbase.DoNotRetryIOException; 025import org.apache.hadoop.hbase.HBaseIOException; 026import org.apache.hadoop.hbase.HRegionLocation; 027import org.apache.hadoop.hbase.RegionLocations; 028import org.apache.hadoop.hbase.TableName; 029import org.apache.yetus.audience.InterfaceAudience; 030import org.apache.hadoop.hbase.ipc.HBaseRpcController; 031import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 032import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 033import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 034import org.apache.hadoop.hbase.util.Bytes; 035 036/** 037 * Similar to RegionServerCallable but for the AdminService interface. This service callable 038 * assumes a Table and row and thus does region locating similar to RegionServerCallable. 039 * Works against Admin stub rather than Client stub. 040 */ 041@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD", 042 justification="stub used by ipc") 043@InterfaceAudience.Private 044public abstract class RegionAdminServiceCallable<T> implements RetryingCallable<T> { 045 protected AdminService.BlockingInterface stub; 046 protected final RpcControllerFactory rpcControllerFactory; 047 private HBaseRpcController controller = null; 048 049 protected final ClusterConnection connection; 050 protected HRegionLocation location; 051 protected final TableName tableName; 052 protected final byte[] row; 053 protected final int replicaId; 054 055 public RegionAdminServiceCallable(ClusterConnection connection, 056 RpcControllerFactory rpcControllerFactory, TableName tableName, byte[] row) { 057 this(connection, rpcControllerFactory, null, tableName, row); 058 } 059 060 public RegionAdminServiceCallable(ClusterConnection connection, 061 RpcControllerFactory rpcControllerFactory, HRegionLocation location, 062 TableName tableName, byte[] row) { 063 this(connection, rpcControllerFactory, location, 064 tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID); 065 } 066 067 public RegionAdminServiceCallable(ClusterConnection connection, 068 RpcControllerFactory rpcControllerFactory, HRegionLocation location, 069 TableName tableName, byte[] row, int replicaId) { 070 this.connection = connection; 071 this.rpcControllerFactory = rpcControllerFactory; 072 this.location = location; 073 this.tableName = tableName; 074 this.row = row; 075 this.replicaId = replicaId; 076 } 077 078 @Override 079 public void prepare(boolean reload) throws IOException { 080 if (Thread.interrupted()) { 081 throw new InterruptedIOException(); 082 } 083 if (reload || location == null) { 084 location = getLocation(!reload); 085 } 086 if (location == null) { 087 // With this exception, there will be a retry. 088 throw new HBaseIOException(getExceptionMessage()); 089 } 090 this.setStub(connection.getAdmin(location.getServerName())); 091 } 092 093 protected void setStub(AdminService.BlockingInterface stub) { 094 this.stub = stub; 095 } 096 097 public HRegionLocation getLocation(boolean useCache) throws IOException { 098 RegionLocations rl = getRegionLocations(connection, tableName, row, useCache, replicaId); 099 if (rl == null) { 100 throw new HBaseIOException(getExceptionMessage()); 101 } 102 HRegionLocation location = rl.getRegionLocation(replicaId); 103 if (location == null) { 104 throw new HBaseIOException(getExceptionMessage()); 105 } 106 107 return location; 108 } 109 110 @Override 111 public void throwable(Throwable t, boolean retrying) { 112 if (location != null) { 113 connection.updateCachedLocations(tableName, location.getRegionInfo().getRegionName(), row, 114 t, location.getServerName()); 115 } 116 } 117 118 /** 119 * @return {@link Connection} instance used by this Callable. 120 */ 121 Connection getConnection() { 122 return this.connection; 123 } 124 125 //subclasses can override this. 126 protected String getExceptionMessage() { 127 return "There is no location" + " table=" + tableName 128 + " ,replica=" + replicaId + ", row=" + Bytes.toStringBinary(row); 129 } 130 131 @Override 132 public String getExceptionMessageAdditionalDetail() { 133 return null; 134 } 135 136 @Override 137 public long sleep(long pause, int tries) { 138 return ConnectionUtils.getPauseTime(pause, tries); 139 } 140 141 public static RegionLocations getRegionLocations( 142 ClusterConnection connection, TableName tableName, byte[] row, 143 boolean useCache, int replicaId) 144 throws RetriesExhaustedException, DoNotRetryIOException, InterruptedIOException { 145 RegionLocations rl; 146 try { 147 rl = connection.locateRegion(tableName, row, useCache, true, replicaId); 148 } catch (DoNotRetryIOException e) { 149 throw e; 150 } catch (RetriesExhaustedException e) { 151 throw e; 152 } catch (InterruptedIOException e) { 153 throw e; 154 } catch (IOException e) { 155 throw new RetriesExhaustedException("Can't get the location", e); 156 } 157 if (rl == null) { 158 throw new RetriesExhaustedException("Can't get the locations"); 159 } 160 return rl; 161 } 162 163 /** 164 * Override that changes Exception from {@link Exception} to {@link IOException}. It also does 165 * setup of an rpcController and calls through to the unimplemented 166 * call(PayloadCarryingRpcController) method; implement this method to add your rpc invocation. 167 */ 168 @Override 169 // Same trick as in RegionServerCallable so users don't have to copy/paste so much boilerplate 170 // and so we contain references to protobuf. We can't set priority on the rpcController as 171 // we do in RegionServerCallable because we don't always have a Table when we call. 172 public T call(int callTimeout) throws IOException { 173 this.controller = rpcControllerFactory.newController(); 174 this.controller.setPriority(this.tableName); 175 this.controller.setCallTimeout(callTimeout); 176 try { 177 return call(this.controller); 178 } catch (Exception e) { 179 throw ProtobufUtil.handleRemoteException(e); 180 } 181 } 182 183 HBaseRpcController getCurrentPayloadCarryingRpcController() { 184 return this.controller; 185 } 186 187 /** 188 * Run RPC call. 189 * @param rpcController PayloadCarryingRpcController is a mouthful but it at a minimum is a 190 * facade on protobuf so we don't have to put protobuf everywhere; we can keep it behind this 191 * class. 192 * @throws Exception 193 */ 194 protected abstract T call(HBaseRpcController rpcController) throws Exception; 195}