001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.client;
020
021import java.io.IOException;
022import java.io.InterruptedIOException;
023
024import org.apache.hadoop.hbase.DoNotRetryIOException;
025import org.apache.hadoop.hbase.HBaseIOException;
026import org.apache.hadoop.hbase.HRegionLocation;
027import org.apache.hadoop.hbase.RegionLocations;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.yetus.audience.InterfaceAudience;
030import org.apache.hadoop.hbase.ipc.HBaseRpcController;
031import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
032import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
033import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
034import org.apache.hadoop.hbase.util.Bytes;
035
036/**
037 * Similar to RegionServerCallable but for the AdminService interface. This service callable
038 * assumes a Table and row and thus does region locating similar to RegionServerCallable.
039 * Works against Admin stub rather than Client stub.
040 */
041@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD",
042  justification="stub used by ipc")
043@InterfaceAudience.Private
044public abstract class RegionAdminServiceCallable<T> implements RetryingCallable<T> {
045  protected AdminService.BlockingInterface stub;
046  protected final RpcControllerFactory rpcControllerFactory;
047  private HBaseRpcController controller = null;
048
049  protected final ClusterConnection connection;
050  protected HRegionLocation location;
051  protected final TableName tableName;
052  protected final byte[] row;
053  protected final int replicaId;
054
055  public RegionAdminServiceCallable(ClusterConnection connection,
056      RpcControllerFactory rpcControllerFactory, TableName tableName, byte[] row) {
057    this(connection, rpcControllerFactory, null, tableName, row);
058  }
059
060  public RegionAdminServiceCallable(ClusterConnection connection,
061      RpcControllerFactory rpcControllerFactory, HRegionLocation location,
062      TableName tableName, byte[] row) {
063    this(connection, rpcControllerFactory, location,
064      tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID);
065  }
066
067  public RegionAdminServiceCallable(ClusterConnection connection,
068      RpcControllerFactory rpcControllerFactory, HRegionLocation location,
069      TableName tableName, byte[] row, int replicaId) {
070    this.connection = connection;
071    this.rpcControllerFactory = rpcControllerFactory;
072    this.location = location;
073    this.tableName = tableName;
074    this.row = row;
075    this.replicaId = replicaId;
076  }
077
078  @Override
079  public void prepare(boolean reload) throws IOException {
080    if (Thread.interrupted()) {
081      throw new InterruptedIOException();
082    }
083    if (reload || location == null) {
084      location = getLocation(!reload);
085    }
086    if (location == null) {
087      // With this exception, there will be a retry.
088      throw new HBaseIOException(getExceptionMessage());
089    }
090    this.setStub(connection.getAdmin(location.getServerName()));
091  }
092
093  protected void setStub(AdminService.BlockingInterface stub) {
094    this.stub = stub;
095  }
096
097  public HRegionLocation getLocation(boolean useCache) throws IOException {
098    RegionLocations rl = getRegionLocations(connection, tableName, row, useCache, replicaId);
099    if (rl == null) {
100      throw new HBaseIOException(getExceptionMessage());
101    }
102    HRegionLocation location = rl.getRegionLocation(replicaId);
103    if (location == null) {
104      throw new HBaseIOException(getExceptionMessage());
105    }
106
107    return location;
108  }
109
110  @Override
111  public void throwable(Throwable t, boolean retrying) {
112    if (location != null) {
113      connection.updateCachedLocations(tableName, location.getRegionInfo().getRegionName(), row,
114          t, location.getServerName());
115    }
116  }
117
118  /**
119   * @return {@link Connection} instance used by this Callable.
120   */
121  Connection getConnection() {
122    return this.connection;
123  }
124
125  //subclasses can override this.
126  protected String getExceptionMessage() {
127    return "There is no location" + " table=" + tableName
128        + " ,replica=" + replicaId + ", row=" + Bytes.toStringBinary(row);
129  }
130
131  @Override
132  public String getExceptionMessageAdditionalDetail() {
133    return null;
134  }
135
136  @Override
137  public long sleep(long pause, int tries) {
138    return ConnectionUtils.getPauseTime(pause, tries);
139  }
140
141  public static RegionLocations getRegionLocations(
142      ClusterConnection connection, TableName tableName, byte[] row,
143      boolean useCache, int replicaId)
144      throws RetriesExhaustedException, DoNotRetryIOException, InterruptedIOException {
145    RegionLocations rl;
146    try {
147      rl = connection.locateRegion(tableName, row, useCache, true, replicaId);
148    } catch (DoNotRetryIOException e) {
149      throw e;
150    } catch (RetriesExhaustedException e) {
151      throw e;
152    } catch (InterruptedIOException e) {
153      throw e;
154    } catch (IOException e) {
155      throw new RetriesExhaustedException("Can't get the location", e);
156    }
157    if (rl == null) {
158      throw new RetriesExhaustedException("Can't get the locations");
159    }
160    return rl;
161  }
162
163  /**
164   * Override that changes Exception from {@link Exception} to {@link IOException}. It also does
165   * setup of an rpcController and calls through to the unimplemented
166   * call(PayloadCarryingRpcController) method; implement this method to add your rpc invocation.
167   */
168  @Override
169  // Same trick as in RegionServerCallable so users don't have to copy/paste so much boilerplate
170  // and so we contain references to protobuf. We can't set priority on the rpcController as
171  // we do in RegionServerCallable because we don't always have a Table when we call.
172  public T call(int callTimeout) throws IOException {
173    this.controller = rpcControllerFactory.newController();
174    this.controller.setPriority(this.tableName);
175    this.controller.setCallTimeout(callTimeout);
176    try {
177      return call(this.controller);
178    } catch (Exception e) {
179      throw ProtobufUtil.handleRemoteException(e);
180    }
181  }
182
183  HBaseRpcController getCurrentPayloadCarryingRpcController() {
184    return this.controller;
185  }
186
187  /**
188   * Run RPC call.
189   * @param rpcController PayloadCarryingRpcController is a mouthful but it at a minimum is a
190   * facade on protobuf so we don't have to put protobuf everywhere; we can keep it behind this
191   * class.
192   * @throws Exception
193   */
194  protected abstract T call(HBaseRpcController rpcController) throws Exception;
195}