001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.List;
024import java.util.Map;
025import org.apache.hadoop.hbase.ExtendedCellScannable;
026import org.apache.hadoop.hbase.ExtendedCellScanner;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.hadoop.hbase.PrivateCellUtil;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.client.RegionInfo;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.checkerframework.checker.nullness.qual.Nullable;
033
034import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
035
036/**
037 * Get instances via {@link RpcControllerFactory} on client-side.
038 * @see RpcControllerFactory
039 */
040@InterfaceAudience.Private
041public class HBaseRpcControllerImpl implements HBaseRpcController {
042  /**
043   * The time, in ms before the call should expire.
044   */
045  private Integer callTimeout;
046
047  private boolean done = false;
048
049  private boolean cancelled = false;
050
051  private final List<RpcCallback<Object>> cancellationCbs = new ArrayList<>();
052
053  private IOException exception;
054
055  private TableName tableName;
056
057  /**
058   * Rpc target Region's RegionInfo we are going against. May be null.
059   * @see #hasRegionInfo()
060   */
061  private RegionInfo regionInfo;
062
063  /**
064   * Priority to set on this request. Set it here in controller so available composing the request.
065   * This is the ordained way of setting priorities going forward. We will be undoing the old
066   * annotation-based mechanism.
067   */
068  private int priority = HConstants.PRIORITY_UNSET;
069
070  /**
071   * They are optionally set on construction, cleared after we make the call, and then optionally
072   * set on response with the result. We use this lowest common denominator access to Cells because
073   * sometimes the scanner is backed by a List of Cells and other times, it is backed by an encoded
074   * block that implements CellScanner.
075   */
076  private ExtendedCellScanner cellScanner;
077
078  private Map<String, byte[]> requestAttributes = Collections.emptyMap();
079
080  public HBaseRpcControllerImpl() {
081    this(null, (ExtendedCellScanner) null);
082  }
083
084  /**
085   * Used server-side. Clients should go via {@link RpcControllerFactory}
086   */
087  public HBaseRpcControllerImpl(final ExtendedCellScanner cellScanner) {
088    this(null, cellScanner);
089  }
090
091  HBaseRpcControllerImpl(RegionInfo regionInfo, final ExtendedCellScanner cellScanner) {
092    this.cellScanner = cellScanner;
093    this.regionInfo = regionInfo;
094  }
095
096  HBaseRpcControllerImpl(RegionInfo regionInfo, final List<ExtendedCellScannable> cellIterables) {
097    this.cellScanner =
098      cellIterables == null ? null : PrivateCellUtil.createExtendedCellScanner(cellIterables);
099    this.regionInfo = null;
100  }
101
102  @Override
103  public boolean hasRegionInfo() {
104    return this.regionInfo != null;
105  }
106
107  @Override
108  public RegionInfo getRegionInfo() {
109    return this.regionInfo;
110  }
111
112  /** Returns One-shot cell scanner (you cannot back it up and restart) */
113  @Override
114  public ExtendedCellScanner cellScanner() {
115    return cellScanner;
116  }
117
118  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
119      justification = "The only possible race method is startCancel")
120  @Override
121  public void setCellScanner(final ExtendedCellScanner cellScanner) {
122    this.cellScanner = cellScanner;
123  }
124
125  @Override
126  public void setPriority(int priority) {
127    this.priority = Math.max(this.priority, priority);
128
129  }
130
131  @Override
132  public void setPriority(final TableName tn) {
133    setPriority(
134      tn != null && tn.isSystemTable() ? HConstants.SYSTEMTABLE_QOS : HConstants.NORMAL_QOS);
135  }
136
137  @Override
138  public void setPriority(int priority, @Nullable TableName tableName) {
139    if (priority != HConstants.PRIORITY_UNSET) {
140      this.priority = priority;
141    } else if (tableName != null && tableName.isSystemTable()) {
142      this.priority = HConstants.SYSTEMTABLE_QOS;
143    } else {
144      this.priority = HConstants.NORMAL_QOS;
145    }
146  }
147
148  @Override
149  public int getPriority() {
150    return priority < 0 ? HConstants.NORMAL_QOS : priority;
151  }
152
153  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
154      justification = "The only possible race method is startCancel")
155  @Override
156  public void reset() {
157    priority = 0;
158    cellScanner = null;
159    exception = null;
160    callTimeout = null;
161    regionInfo = null;
162    tableName = null;
163    // In the implementations of some callable with replicas, rpc calls are executed in a executor
164    // and we could cancel the operation from outside which means there could be a race between
165    // reset and startCancel. Although I think the race should be handled by the callable since the
166    // reset may clear the cancel state...
167    synchronized (this) {
168      done = false;
169      cancelled = false;
170      cancellationCbs.clear();
171    }
172  }
173
174  @Override
175  public int getCallTimeout() {
176    return callTimeout != null ? callTimeout : 0;
177  }
178
179  @Override
180  public void setCallTimeout(int callTimeout) {
181    this.callTimeout = callTimeout;
182  }
183
184  @Override
185  public boolean hasCallTimeout() {
186    return callTimeout != null;
187  }
188
189  @Override
190  public Map<String, byte[]> getRequestAttributes() {
191    return requestAttributes;
192  }
193
194  @Override
195  public void setRequestAttributes(Map<String, byte[]> requestAttributes) {
196    this.requestAttributes = requestAttributes;
197  }
198
199  @Override
200  public synchronized String errorText() {
201    if (!done || exception == null) {
202      return null;
203    }
204    return exception.getMessage();
205  }
206
207  @Override
208  public synchronized boolean failed() {
209    return done && this.exception != null;
210  }
211
212  @Override
213  public synchronized boolean isCanceled() {
214    return cancelled;
215  }
216
217  @Override
218  public void notifyOnCancel(RpcCallback<Object> callback) {
219    synchronized (this) {
220      if (done) {
221        return;
222      }
223      if (!cancelled) {
224        cancellationCbs.add(callback);
225        return;
226      }
227    }
228    // run it directly as we have already been cancelled.
229    callback.run(null);
230  }
231
232  @Override
233  public synchronized void setFailed(String reason) {
234    if (done) {
235      return;
236    }
237    done = true;
238    exception = new IOException(reason);
239  }
240
241  @Override
242  public synchronized void setFailed(IOException e) {
243    if (done) {
244      return;
245    }
246    done = true;
247    exception = e;
248  }
249
250  @Override
251  public synchronized IOException getFailed() {
252    return done ? exception : null;
253  }
254
255  @Override
256  public synchronized void setDone(ExtendedCellScanner cellScanner) {
257    if (done) {
258      return;
259    }
260    done = true;
261    this.cellScanner = cellScanner;
262  }
263
264  @Override
265  public void startCancel() {
266    // As said above in the comment of reset, the cancellationCbs maybe cleared by reset, so we need
267    // to copy it.
268    List<RpcCallback<Object>> cbs;
269    synchronized (this) {
270      if (done) {
271        return;
272      }
273      done = true;
274      cancelled = true;
275      cbs = new ArrayList<>(cancellationCbs);
276    }
277    for (RpcCallback<?> cb : cbs) {
278      cb.run(null);
279    }
280  }
281
282  @Override
283  public synchronized void notifyOnCancel(RpcCallback<Object> callback, CancellationCallback action)
284    throws IOException {
285    if (cancelled) {
286      action.run(true);
287    } else {
288      cancellationCbs.add(callback);
289      action.run(false);
290    }
291  }
292
293  @Override
294  public void setTableName(TableName tableName) {
295    this.tableName = tableName;
296  }
297
298  @Override
299  public TableName getTableName() {
300    return tableName;
301  }
302}