001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.List;
024import java.util.Map;
025import org.apache.hadoop.hbase.CellScannable;
026import org.apache.hadoop.hbase.CellScanner;
027import org.apache.hadoop.hbase.CellUtil;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.client.RegionInfo;
031import org.apache.yetus.audience.InterfaceAudience;
032
033import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
034
035/**
036 * Get instances via {@link RpcControllerFactory} on client-side.
037 * @see RpcControllerFactory
038 */
039@InterfaceAudience.Private
040public class HBaseRpcControllerImpl implements HBaseRpcController {
041  /**
042   * The time, in ms before the call should expire.
043   */
044  private Integer callTimeout;
045
046  private boolean done = false;
047
048  private boolean cancelled = false;
049
050  private final List<RpcCallback<Object>> cancellationCbs = new ArrayList<>();
051
052  private IOException exception;
053
054  private TableName tableName;
055
056  /**
057   * Rpc target Region's RegionInfo we are going against. May be null.
058   * @see #hasRegionInfo()
059   */
060  private RegionInfo regionInfo;
061
062  /**
063   * Priority to set on this request. Set it here in controller so available composing the request.
064   * This is the ordained way of setting priorities going forward. We will be undoing the old
065   * annotation-based mechanism.
066   */
067  private int priority = HConstants.PRIORITY_UNSET;
068
069  /**
070   * They are optionally set on construction, cleared after we make the call, and then optionally
071   * set on response with the result. We use this lowest common denominator access to Cells because
072   * sometimes the scanner is backed by a List of Cells and other times, it is backed by an encoded
073   * block that implements CellScanner.
074   */
075  private CellScanner cellScanner;
076
077  private Map<String, byte[]> requestAttributes = Collections.emptyMap();
078
079  public HBaseRpcControllerImpl() {
080    this(null, (CellScanner) null);
081  }
082
083  /**
084   * Used server-side. Clients should go via {@link RpcControllerFactory}
085   */
086  public HBaseRpcControllerImpl(final CellScanner cellScanner) {
087    this(null, cellScanner);
088  }
089
090  HBaseRpcControllerImpl(RegionInfo regionInfo, final CellScanner cellScanner) {
091    this.cellScanner = cellScanner;
092    this.regionInfo = regionInfo;
093  }
094
095  HBaseRpcControllerImpl(RegionInfo regionInfo, final List<CellScannable> cellIterables) {
096    this.cellScanner = cellIterables == null ? null : CellUtil.createCellScanner(cellIterables);
097    this.regionInfo = null;
098  }
099
100  @Override
101  public boolean hasRegionInfo() {
102    return this.regionInfo != null;
103  }
104
105  @Override
106  public RegionInfo getRegionInfo() {
107    return this.regionInfo;
108  }
109
110  /** Returns One-shot cell scanner (you cannot back it up and restart) */
111  @Override
112  public CellScanner cellScanner() {
113    return cellScanner;
114  }
115
116  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
117      justification = "The only possible race method is startCancel")
118  @Override
119  public void setCellScanner(final CellScanner cellScanner) {
120    this.cellScanner = cellScanner;
121  }
122
123  @Override
124  public void setPriority(int priority) {
125    this.priority = Math.max(this.priority, priority);
126
127  }
128
129  @Override
130  public void setPriority(final TableName tn) {
131    setPriority(
132      tn != null && tn.isSystemTable() ? HConstants.SYSTEMTABLE_QOS : HConstants.NORMAL_QOS);
133  }
134
135  @Override
136  public int getPriority() {
137    return priority < 0 ? HConstants.NORMAL_QOS : priority;
138  }
139
140  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
141      justification = "The only possible race method is startCancel")
142  @Override
143  public void reset() {
144    priority = 0;
145    cellScanner = null;
146    exception = null;
147    callTimeout = null;
148    regionInfo = null;
149    tableName = null;
150    // In the implementations of some callable with replicas, rpc calls are executed in a executor
151    // and we could cancel the operation from outside which means there could be a race between
152    // reset and startCancel. Although I think the race should be handled by the callable since the
153    // reset may clear the cancel state...
154    synchronized (this) {
155      done = false;
156      cancelled = false;
157      cancellationCbs.clear();
158    }
159  }
160
161  @Override
162  public int getCallTimeout() {
163    return callTimeout != null ? callTimeout : 0;
164  }
165
166  @Override
167  public void setCallTimeout(int callTimeout) {
168    this.callTimeout = callTimeout;
169  }
170
171  @Override
172  public boolean hasCallTimeout() {
173    return callTimeout != null;
174  }
175
176  @Override
177  public Map<String, byte[]> getRequestAttributes() {
178    return requestAttributes;
179  }
180
181  @Override
182  public void setRequestAttributes(Map<String, byte[]> requestAttributes) {
183    this.requestAttributes = requestAttributes;
184  }
185
186  @Override
187  public synchronized String errorText() {
188    if (!done || exception == null) {
189      return null;
190    }
191    return exception.getMessage();
192  }
193
194  @Override
195  public synchronized boolean failed() {
196    return done && this.exception != null;
197  }
198
199  @Override
200  public synchronized boolean isCanceled() {
201    return cancelled;
202  }
203
204  @Override
205  public void notifyOnCancel(RpcCallback<Object> callback) {
206    synchronized (this) {
207      if (done) {
208        return;
209      }
210      if (!cancelled) {
211        cancellationCbs.add(callback);
212        return;
213      }
214    }
215    // run it directly as we have already been cancelled.
216    callback.run(null);
217  }
218
219  @Override
220  public synchronized void setFailed(String reason) {
221    if (done) {
222      return;
223    }
224    done = true;
225    exception = new IOException(reason);
226  }
227
228  @Override
229  public synchronized void setFailed(IOException e) {
230    if (done) {
231      return;
232    }
233    done = true;
234    exception = e;
235  }
236
237  @Override
238  public synchronized IOException getFailed() {
239    return done ? exception : null;
240  }
241
242  @Override
243  public synchronized void setDone(CellScanner cellScanner) {
244    if (done) {
245      return;
246    }
247    done = true;
248    this.cellScanner = cellScanner;
249  }
250
251  @Override
252  public void startCancel() {
253    // As said above in the comment of reset, the cancellationCbs maybe cleared by reset, so we need
254    // to copy it.
255    List<RpcCallback<Object>> cbs;
256    synchronized (this) {
257      if (done) {
258        return;
259      }
260      done = true;
261      cancelled = true;
262      cbs = new ArrayList<>(cancellationCbs);
263    }
264    for (RpcCallback<?> cb : cbs) {
265      cb.run(null);
266    }
267  }
268
269  @Override
270  public synchronized void notifyOnCancel(RpcCallback<Object> callback, CancellationCallback action)
271    throws IOException {
272    if (cancelled) {
273      action.run(true);
274    } else {
275      cancellationCbs.add(callback);
276      action.run(false);
277    }
278  }
279
280  @Override
281  public void setTableName(TableName tableName) {
282    this.tableName = tableName;
283  }
284
285  @Override
286  public TableName getTableName() {
287    return tableName;
288  }
289}