001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import org.apache.hadoop.hbase.DoNotRetryIOException;
023import org.apache.hadoop.hbase.HBaseIOException;
024import org.apache.hadoop.hbase.HRegionLocation;
025import org.apache.hadoop.hbase.RegionLocations;
026import org.apache.hadoop.hbase.TableName;
027import org.apache.hadoop.hbase.ipc.HBaseRpcController;
028import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
029import org.apache.hadoop.hbase.util.Bytes;
030import org.apache.yetus.audience.InterfaceAudience;
031
032import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
033import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
034
035/**
036 * Similar to RegionServerCallable but for the AdminService interface. This service callable assumes
037 * a Table and row and thus does region locating similar to RegionServerCallable. Works against
038 * Admin stub rather than Client stub.
039 */
040@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD",
041    justification = "stub used by ipc")
042@InterfaceAudience.Private
043public abstract class RegionAdminServiceCallable<T> implements RetryingCallable<T> {
044  protected AdminService.BlockingInterface stub;
045  protected final RpcControllerFactory rpcControllerFactory;
046  private HBaseRpcController controller = null;
047
048  protected final ClusterConnection connection;
049  protected HRegionLocation location;
050  protected final TableName tableName;
051  protected final byte[] row;
052  protected final int replicaId;
053
054  public RegionAdminServiceCallable(ClusterConnection connection,
055    RpcControllerFactory rpcControllerFactory, TableName tableName, byte[] row) {
056    this(connection, rpcControllerFactory, null, tableName, row);
057  }
058
059  public RegionAdminServiceCallable(ClusterConnection connection,
060    RpcControllerFactory rpcControllerFactory, HRegionLocation location, TableName tableName,
061    byte[] row) {
062    this(connection, rpcControllerFactory, location, tableName, row,
063      RegionReplicaUtil.DEFAULT_REPLICA_ID);
064  }
065
066  public RegionAdminServiceCallable(ClusterConnection connection,
067    RpcControllerFactory rpcControllerFactory, HRegionLocation location, TableName tableName,
068    byte[] row, int replicaId) {
069    this.connection = connection;
070    this.rpcControllerFactory = rpcControllerFactory;
071    this.location = location;
072    this.tableName = tableName;
073    this.row = row;
074    this.replicaId = replicaId;
075  }
076
077  @Override
078  public void prepare(boolean reload) throws IOException {
079    if (Thread.interrupted()) {
080      throw new InterruptedIOException();
081    }
082    if (reload || location == null) {
083      location = getLocation(!reload);
084    }
085    if (location == null) {
086      // With this exception, there will be a retry.
087      throw new HBaseIOException(getExceptionMessage());
088    }
089    this.setStub(connection.getAdmin(location.getServerName()));
090  }
091
092  protected void setStub(AdminService.BlockingInterface stub) {
093    this.stub = stub;
094  }
095
096  public HRegionLocation getLocation(boolean useCache) throws IOException {
097    RegionLocations rl = getRegionLocations(connection, tableName, row, useCache, replicaId);
098    if (rl == null) {
099      throw new HBaseIOException(getExceptionMessage());
100    }
101    HRegionLocation location = rl.getRegionLocation(replicaId);
102    if (location == null) {
103      throw new HBaseIOException(getExceptionMessage());
104    }
105
106    return location;
107  }
108
109  @Override
110  public void throwable(Throwable t, boolean retrying) {
111    if (location != null) {
112      connection.updateCachedLocations(tableName, location.getRegionInfo().getRegionName(), row, t,
113        location.getServerName());
114    }
115  }
116
117  /** Returns {@link Connection} instance used by this Callable. */
118  Connection getConnection() {
119    return this.connection;
120  }
121
122  // subclasses can override this.
123  protected String getExceptionMessage() {
124    return "There is no location" + " table=" + tableName + " ,replica=" + replicaId + ", row="
125      + Bytes.toStringBinary(row);
126  }
127
128  @Override
129  public String getExceptionMessageAdditionalDetail() {
130    return null;
131  }
132
133  @Override
134  public long sleep(long pause, int tries) {
135    return ConnectionUtils.getPauseTime(pause, tries);
136  }
137
138  public static RegionLocations getRegionLocations(ClusterConnection connection,
139    TableName tableName, byte[] row, boolean useCache, int replicaId)
140    throws RetriesExhaustedException, DoNotRetryIOException, InterruptedIOException {
141    RegionLocations rl;
142    try {
143      rl = connection.locateRegion(tableName, row, useCache, true, replicaId);
144    } catch (DoNotRetryIOException e) {
145      throw e;
146    } catch (RetriesExhaustedException e) {
147      throw e;
148    } catch (InterruptedIOException e) {
149      throw e;
150    } catch (IOException e) {
151      throw new RetriesExhaustedException("Can't get the location", e);
152    }
153    if (rl == null) {
154      throw new RetriesExhaustedException("Can't get the locations");
155    }
156    return rl;
157  }
158
159  /**
160   * Override that changes Exception from {@link Exception} to {@link IOException}. It also does
161   * setup of an rpcController and calls through to the unimplemented
162   * call(PayloadCarryingRpcController) method; implement this method to add your rpc invocation.
163   */
164  @Override
165  // Same trick as in RegionServerCallable so users don't have to copy/paste so much boilerplate
166  // and so we contain references to protobuf. We can't set priority on the rpcController as
167  // we do in RegionServerCallable because we don't always have a Table when we call.
168  public T call(int callTimeout) throws IOException {
169    this.controller = rpcControllerFactory.newController();
170    this.controller.setPriority(this.tableName);
171    this.controller.setCallTimeout(callTimeout);
172    try {
173      return call(this.controller);
174    } catch (Exception e) {
175      throw ProtobufUtil.handleRemoteException(e);
176    }
177  }
178
179  HBaseRpcController getCurrentPayloadCarryingRpcController() {
180    return this.controller;
181  }
182
183  /**
184   * Run RPC call.
185   * @param rpcController PayloadCarryingRpcController is a mouthful but it at a minimum is a facade
186   *                      on protobuf so we don't have to put protobuf everywhere; we can keep it
187   *                      behind this class. n
188   */
189  protected abstract T call(HBaseRpcController rpcController) throws Exception;
190}