001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.List;
025
026import org.apache.hadoop.hbase.CellScannable;
027import org.apache.hadoop.hbase.CellScanner;
028import org.apache.hadoop.hbase.CellUtil;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.yetus.audience.InterfaceAudience;
032
033/**
034 * Optionally carries Cells across the proxy/service interface down into ipc. On its way out it
035 * optionally carries a set of result Cell data. We stick the Cells here when we want to avoid
036 * having to protobuf them (for performance reasons). This class is used ferrying data across the
037 * proxy/protobuf service chasm. Also does call timeout. Used by client and server ipc'ing.
038 */
039@InterfaceAudience.Private
040public class HBaseRpcControllerImpl implements HBaseRpcController {
041  /**
042   * The time, in ms before the call should expire.
043   */
044  private Integer callTimeout;
045
046  private boolean done = false;
047
048  private boolean cancelled = false;
049
050  private final List<RpcCallback<Object>> cancellationCbs = new ArrayList<>();
051
052  private IOException exception;
053
054  /**
055   * Priority to set on this request. Set it here in controller so available composing the request.
056   * This is the ordained way of setting priorities going forward. We will be undoing the old
057   * annotation-based mechanism.
058   */
059  private int priority = HConstants.PRIORITY_UNSET;
060
061  /**
062   * They are optionally set on construction, cleared after we make the call, and then optionally
063   * set on response with the result. We use this lowest common denominator access to Cells because
064   * sometimes the scanner is backed by a List of Cells and other times, it is backed by an encoded
065   * block that implements CellScanner.
066   */
067  private CellScanner cellScanner;
068
069  public HBaseRpcControllerImpl() {
070    this((CellScanner) null);
071  }
072
073  public HBaseRpcControllerImpl(final CellScanner cellScanner) {
074    this.cellScanner = cellScanner;
075  }
076
077  public HBaseRpcControllerImpl(final List<CellScannable> cellIterables) {
078    this.cellScanner = cellIterables == null ? null : CellUtil.createCellScanner(cellIterables);
079  }
080
081  /**
082   * @return One-shot cell scanner (you cannot back it up and restart)
083   */
084  @Override
085  public CellScanner cellScanner() {
086    return cellScanner;
087  }
088
089  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
090      justification = "The only possible race method is startCancel")
091  @Override
092  public void setCellScanner(final CellScanner cellScanner) {
093    this.cellScanner = cellScanner;
094  }
095
096  @Override
097  public void setPriority(int priority) {
098    this.priority = Math.max(this.priority, priority);
099
100  }
101
102  @Override
103  public void setPriority(final TableName tn) {
104    setPriority(
105      tn != null && tn.isSystemTable() ? HConstants.SYSTEMTABLE_QOS : HConstants.NORMAL_QOS);
106  }
107
108  @Override
109  public int getPriority() {
110    return priority < 0 ? HConstants.NORMAL_QOS : priority;
111  }
112
113  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
114      justification = "The only possible race method is startCancel")
115  @Override
116  public void reset() {
117    priority = 0;
118    cellScanner = null;
119    exception = null;
120    callTimeout = null;
121    // In the implementations of some callable with replicas, rpc calls are executed in a executor
122    // and we could cancel the operation from outside which means there could be a race between
123    // reset and startCancel. Although I think the race should be handled by the callable since the
124    // reset may clear the cancel state...
125    synchronized (this) {
126      done = false;
127      cancelled = false;
128      cancellationCbs.clear();
129    }
130  }
131
132  @Override
133  public int getCallTimeout() {
134    if (callTimeout != null) {
135      return callTimeout.intValue();
136    } else {
137      return 0;
138    }
139  }
140
141  @Override
142  public void setCallTimeout(int callTimeout) {
143    this.callTimeout = callTimeout;
144  }
145
146  @Override
147  public boolean hasCallTimeout() {
148    return callTimeout != null;
149  }
150
151  @Override
152  public synchronized String errorText() {
153    if (!done || exception == null) {
154      return null;
155    }
156    return exception.getMessage();
157  }
158
159  @Override
160  public synchronized boolean failed() {
161    return done && this.exception != null;
162  }
163
164  @Override
165  public synchronized boolean isCanceled() {
166    return cancelled;
167  }
168
169  @Override
170  public void notifyOnCancel(RpcCallback<Object> callback) {
171    synchronized (this) {
172      if (done) {
173        return;
174      }
175      if (!cancelled) {
176        cancellationCbs.add(callback);
177        return;
178      }
179    }
180    // run it directly as we have already been cancelled.
181    callback.run(null);
182  }
183
184  @Override
185  public synchronized void setFailed(String reason) {
186    if (done) {
187      return;
188    }
189    done = true;
190    exception = new IOException(reason);
191  }
192
193  @Override
194  public synchronized void setFailed(IOException e) {
195    if (done) {
196      return;
197    }
198    done = true;
199    exception = e;
200  }
201
202  @Override
203  public synchronized IOException getFailed() {
204    return done ? exception : null;
205  }
206
207  @Override
208  public synchronized void setDone(CellScanner cellScanner) {
209    if (done) {
210      return;
211    }
212    done = true;
213    this.cellScanner = cellScanner;
214  }
215
216  @Override
217  public void startCancel() {
218    // As said above in the comment of reset, the cancellationCbs maybe cleared by reset, so we need
219    // to copy it.
220    List<RpcCallback<Object>> cbs;
221    synchronized (this) {
222      if (done) {
223        return;
224      }
225      done = true;
226      cancelled = true;
227      cbs = new ArrayList<>(cancellationCbs);
228    }
229    for (RpcCallback<?> cb : cbs) {
230      cb.run(null);
231    }
232  }
233
234  @Override
235  public synchronized void notifyOnCancel(RpcCallback<Object> callback, CancellationCallback action)
236      throws IOException {
237    if (cancelled) {
238      action.run(true);
239    } else {
240      cancellationCbs.add(callback);
241      action.run(false);
242    }
243  }
244
245}