001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import org.apache.hadoop.hbase.CellScannable;
024import org.apache.hadoop.hbase.CellScanner;
025import org.apache.hadoop.hbase.CellUtil;
026import org.apache.hadoop.hbase.HConstants;
027import org.apache.hadoop.hbase.TableName;
028import org.apache.hadoop.hbase.client.RegionInfo;
029import org.apache.yetus.audience.InterfaceAudience;
030
031import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
032
033/**
034 * Get instances via {@link RpcControllerFactory} on client-side.
035 * @see RpcControllerFactory
036 */
037@InterfaceAudience.Private
038public class HBaseRpcControllerImpl implements HBaseRpcController {
039  /**
040   * The time, in ms before the call should expire.
041   */
042  private Integer callTimeout;
043
044  private boolean done = false;
045
046  private boolean cancelled = false;
047
048  private final List<RpcCallback<Object>> cancellationCbs = new ArrayList<>();
049
050  private IOException exception;
051
052  /**
053   * Rpc target Region's RegionInfo we are going against. May be null.
054   * @see #hasRegionInfo()
055   */
056  private RegionInfo regionInfo;
057
058  /**
059   * Priority to set on this request. Set it here in controller so available composing the request.
060   * This is the ordained way of setting priorities going forward. We will be undoing the old
061   * annotation-based mechanism.
062   */
063  private int priority = HConstants.PRIORITY_UNSET;
064
065  /**
066   * They are optionally set on construction, cleared after we make the call, and then optionally
067   * set on response with the result. We use this lowest common denominator access to Cells because
068   * sometimes the scanner is backed by a List of Cells and other times, it is backed by an encoded
069   * block that implements CellScanner.
070   */
071  private CellScanner cellScanner;
072
073  public HBaseRpcControllerImpl() {
074    this(null, (CellScanner) null);
075  }
076
077  /**
078   * Used server-side. Clients should go via {@link RpcControllerFactory}
079   */
080  public HBaseRpcControllerImpl(final CellScanner cellScanner) {
081    this(null, cellScanner);
082  }
083
084  HBaseRpcControllerImpl(RegionInfo regionInfo, final CellScanner cellScanner) {
085    this.cellScanner = cellScanner;
086    this.regionInfo = regionInfo;
087  }
088
089  HBaseRpcControllerImpl(RegionInfo regionInfo, final List<CellScannable> cellIterables) {
090    this.cellScanner = cellIterables == null ? null : CellUtil.createCellScanner(cellIterables);
091    this.regionInfo = null;
092  }
093
094  @Override
095  public boolean hasRegionInfo() {
096    return this.regionInfo != null;
097  }
098
099  @Override
100  public RegionInfo getRegionInfo() {
101    return this.regionInfo;
102  }
103
104  /** Returns One-shot cell scanner (you cannot back it up and restart) */
105  @Override
106  public CellScanner cellScanner() {
107    return cellScanner;
108  }
109
110  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
111      justification = "The only possible race method is startCancel")
112  @Override
113  public void setCellScanner(final CellScanner cellScanner) {
114    this.cellScanner = cellScanner;
115  }
116
117  @Override
118  public void setPriority(int priority) {
119    this.priority = Math.max(this.priority, priority);
120
121  }
122
123  @Override
124  public void setPriority(final TableName tn) {
125    setPriority(
126      tn != null && tn.isSystemTable() ? HConstants.SYSTEMTABLE_QOS : HConstants.NORMAL_QOS);
127  }
128
129  @Override
130  public int getPriority() {
131    return priority < 0 ? HConstants.NORMAL_QOS : priority;
132  }
133
134  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
135      justification = "The only possible race method is startCancel")
136  @Override
137  public void reset() {
138    priority = 0;
139    cellScanner = null;
140    exception = null;
141    callTimeout = null;
142    regionInfo = null;
143    // In the implementations of some callable with replicas, rpc calls are executed in a executor
144    // and we could cancel the operation from outside which means there could be a race between
145    // reset and startCancel. Although I think the race should be handled by the callable since the
146    // reset may clear the cancel state...
147    synchronized (this) {
148      done = false;
149      cancelled = false;
150      cancellationCbs.clear();
151    }
152  }
153
154  @Override
155  public int getCallTimeout() {
156    return callTimeout != null ? callTimeout : 0;
157  }
158
159  @Override
160  public void setCallTimeout(int callTimeout) {
161    this.callTimeout = callTimeout;
162  }
163
164  @Override
165  public boolean hasCallTimeout() {
166    return callTimeout != null;
167  }
168
169  @Override
170  public synchronized String errorText() {
171    if (!done || exception == null) {
172      return null;
173    }
174    return exception.getMessage();
175  }
176
177  @Override
178  public synchronized boolean failed() {
179    return done && this.exception != null;
180  }
181
182  @Override
183  public synchronized boolean isCanceled() {
184    return cancelled;
185  }
186
187  @Override
188  public void notifyOnCancel(RpcCallback<Object> callback) {
189    synchronized (this) {
190      if (done) {
191        return;
192      }
193      if (!cancelled) {
194        cancellationCbs.add(callback);
195        return;
196      }
197    }
198    // run it directly as we have already been cancelled.
199    callback.run(null);
200  }
201
202  @Override
203  public synchronized void setFailed(String reason) {
204    if (done) {
205      return;
206    }
207    done = true;
208    exception = new IOException(reason);
209  }
210
211  @Override
212  public synchronized void setFailed(IOException e) {
213    if (done) {
214      return;
215    }
216    done = true;
217    exception = e;
218  }
219
220  @Override
221  public synchronized IOException getFailed() {
222    return done ? exception : null;
223  }
224
225  @Override
226  public synchronized void setDone(CellScanner cellScanner) {
227    if (done) {
228      return;
229    }
230    done = true;
231    this.cellScanner = cellScanner;
232  }
233
234  @Override
235  public void startCancel() {
236    // As said above in the comment of reset, the cancellationCbs maybe cleared by reset, so we need
237    // to copy it.
238    List<RpcCallback<Object>> cbs;
239    synchronized (this) {
240      if (done) {
241        return;
242      }
243      done = true;
244      cancelled = true;
245      cbs = new ArrayList<>(cancellationCbs);
246    }
247    for (RpcCallback<?> cb : cbs) {
248      cb.run(null);
249    }
250  }
251
252  @Override
253  public synchronized void notifyOnCancel(RpcCallback<Object> callback, CancellationCallback action)
254    throws IOException {
255    if (cancelled) {
256      action.run(true);
257    } else {
258      cancellationCbs.add(callback);
259      action.run(false);
260    }
261  }
262
263  @Override
264  public String toString() {
265    return "HBaseRpcControllerImpl{" + "callTimeout=" + callTimeout + ", done=" + done
266      + ", cancelled=" + cancelled + ", cancellationCbs=" + cancellationCbs + ", exception="
267      + exception + ", regionInfo=" + regionInfo + ", priority=" + priority + ", cellScanner="
268      + cellScanner + '}';
269  }
270}