View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.ipc;
19  
20  import java.io.IOException;
21  import java.util.List;
22  import java.util.concurrent.atomic.AtomicReference;
23
24  import org.apache.hadoop.hbase.CellScannable;
25  import org.apache.hadoop.hbase.CellScanner;
26  import org.apache.hadoop.hbase.CellUtil;
27  import org.apache.hadoop.hbase.HConstants;
28  import org.apache.hadoop.hbase.TableName;
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30
31  import com.google.protobuf.RpcCallback;
32  import com.google.protobuf.RpcController;
33
34  /**
35   * Optionally carries Cells across the proxy/service interface down into ipc. On its
36   * way out it optionally carries a set of result Cell data. We stick the Cells here when we want
37   * to avoid having to protobuf them (for performance reasons). This class is used ferrying data
38   * across the proxy/protobuf service chasm. Also does call timeout. Used by client and server
39   * ipc'ing.
40   */
41  @InterfaceAudience.Private
42  public class PayloadCarryingRpcController implements RpcController, CellScannable {
43    /**
44     * The time, in ms before the call should expire.
45     */
46    protected volatile Integer callTimeout;
47    protected volatile boolean cancelled = false;
48    protected final AtomicReference<RpcCallback<Object>> cancellationCb = new AtomicReference<>(null);
49    protected final AtomicReference<RpcCallback<IOException>> failureCb = new AtomicReference<>(null);
50    private IOException exception;
51
52    public static final int PRIORITY_UNSET = -1;
53    /**
54     * Priority to set on this request.  Set it here in controller so available composing the
55     * request.  This is the ordained way of setting priorities going forward.  We will be
56     * undoing the old annotation-based mechanism.
57     */
58    private int priority = PRIORITY_UNSET;
59
60    /**
61     * They are optionally set on construction, cleared after we make the call, and then optionally
62     * set on response with the result. We use this lowest common denominator access to Cells because
63     * sometimes the scanner is backed by a List of Cells and other times, it is backed by an
64     * encoded block that implements CellScanner.
65     */
66    private CellScanner cellScanner;
67
68    public PayloadCarryingRpcController() {
69      this((CellScanner)null);
70    }
71
72    public PayloadCarryingRpcController(final CellScanner cellScanner) {
73      this.cellScanner = cellScanner;
74    }
75
76    public PayloadCarryingRpcController(final List<CellScannable> cellIterables) {
77      this.cellScanner = cellIterables == null? null: CellUtil.createCellScanner(cellIterables);
78    }
79
80    /**
81     * @return One-shot cell scanner (you cannot back it up and restart)
82     */
83    @Override
84    public CellScanner cellScanner() {
85      return cellScanner;
86    }
87
88    public void setCellScanner(final CellScanner cellScanner) {
89      this.cellScanner = cellScanner;
90    }
91
92    /**
93     * @param priority Priority for this request; should fall roughly in the range
94     * {@link HConstants#NORMAL_QOS} to {@link HConstants#HIGH_QOS}
95     */
96    public void setPriority(int priority) {
97      this.priority = priority;
98    }
99
100   /**
101    * @param tn Set priority based off the table we are going against.
102    */
103   public void setPriority(final TableName tn) {
104     setPriority(tn != null && tn.isSystemTable()? HConstants.SYSTEMTABLE_QOS:
105       HConstants.NORMAL_QOS);
106   }
107
108   /**
109    * @return The priority of this request
110    */
111   public int getPriority() {
112     return priority;
113   }
114
115   @Override
116   public void reset() {
117     priority = 0;
118     cellScanner = null;
119     exception = null;
120     cancelled = false;
121     failureCb.set(null);
122     cancellationCb.set(null);
123     callTimeout = null;
124   }
125
126   public int getCallTimeout() {
127     if (callTimeout != null) {
128       return callTimeout;
129     } else {
130       return 0;
131     }
132   }
133
134   public void setCallTimeout(int callTimeout) {
135     this.callTimeout = callTimeout;
136   }
137
138   public boolean hasCallTimeout(){
139     return callTimeout != null;
140   }
141
142   @Override
143   public String errorText() {
144     if (exception != null) {
145       return exception.getMessage();
146     } else {
147       return null;
148     }
149   }
150
151   /**
152    * For use in async rpc clients
153    * @return true if failed
154    */
155   @Override
156   public boolean failed() {
157     return this.exception != null;
158   }
159
160   @Override
161   public boolean isCanceled() {
162     return cancelled;
163   }
164
165   @Override
166   public void notifyOnCancel(RpcCallback<Object> cancellationCb) {
167     this.cancellationCb.set(cancellationCb);
168     if (this.cancelled) {
169       cancellationCb.run(null);
170     }
171   }
172
173   /**
174    * Notify a callback on error.
175    * For use in async rpc clients
176    *
177    * @param failureCb the callback to call on error
178    */
179   public void notifyOnFail(RpcCallback<IOException> failureCb) {
180     this.failureCb.set(failureCb);
181     if (this.exception != null) {
182       failureCb.run(this.exception);
183     }
184   }
185
186   @Override
187   public void setFailed(String reason) {
188     this.exception = new IOException(reason);
189     if (this.failureCb.get() != null) {
190       this.failureCb.get().run(this.exception);
191     }
192   }
193
194   /**
195    * Set failed with an exception to pass on.
196    * For use in async rpc clients
197    *
198    * @param e exception to set with
199    */
200   public void setFailed(IOException e) {
201     this.exception = e;
202     if (this.failureCb.get() != null) {
203       this.failureCb.get().run(this.exception);
204     }
205   }
206
207   @Override
208   public void startCancel() {
209     cancelled = true;
210     if (cancellationCb.get() != null) {
211       cancellationCb.get().run(null);
212     }
213   }
214 }