001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.List; 025 026import org.apache.hadoop.hbase.CellScannable; 027import org.apache.hadoop.hbase.CellScanner; 028import org.apache.hadoop.hbase.CellUtil; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.yetus.audience.InterfaceAudience; 032 033/** 034 * Optionally carries Cells across the proxy/service interface down into ipc. On its way out it 035 * optionally carries a set of result Cell data. We stick the Cells here when we want to avoid 036 * having to protobuf them (for performance reasons). This class is used ferrying data across the 037 * proxy/protobuf service chasm. Also does call timeout. Used by client and server ipc'ing. 038 */ 039@InterfaceAudience.Private 040public class HBaseRpcControllerImpl implements HBaseRpcController { 041 /** 042 * The time, in ms before the call should expire. 043 */ 044 private Integer callTimeout; 045 046 private boolean done = false; 047 048 private boolean cancelled = false; 049 050 private final List<RpcCallback<Object>> cancellationCbs = new ArrayList<>(); 051 052 private IOException exception; 053 054 /** 055 * Priority to set on this request. Set it here in controller so available composing the request. 056 * This is the ordained way of setting priorities going forward. We will be undoing the old 057 * annotation-based mechanism. 058 */ 059 private int priority = HConstants.PRIORITY_UNSET; 060 061 /** 062 * They are optionally set on construction, cleared after we make the call, and then optionally 063 * set on response with the result. We use this lowest common denominator access to Cells because 064 * sometimes the scanner is backed by a List of Cells and other times, it is backed by an encoded 065 * block that implements CellScanner. 066 */ 067 private CellScanner cellScanner; 068 069 public HBaseRpcControllerImpl() { 070 this((CellScanner) null); 071 } 072 073 public HBaseRpcControllerImpl(final CellScanner cellScanner) { 074 this.cellScanner = cellScanner; 075 } 076 077 public HBaseRpcControllerImpl(final List<CellScannable> cellIterables) { 078 this.cellScanner = cellIterables == null ? null : CellUtil.createCellScanner(cellIterables); 079 } 080 081 /** 082 * @return One-shot cell scanner (you cannot back it up and restart) 083 */ 084 @Override 085 public CellScanner cellScanner() { 086 return cellScanner; 087 } 088 089 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", 090 justification = "The only possible race method is startCancel") 091 @Override 092 public void setCellScanner(final CellScanner cellScanner) { 093 this.cellScanner = cellScanner; 094 } 095 096 @Override 097 public void setPriority(int priority) { 098 this.priority = Math.max(this.priority, priority); 099 100 } 101 102 @Override 103 public void setPriority(final TableName tn) { 104 setPriority( 105 tn != null && tn.isSystemTable() ? HConstants.SYSTEMTABLE_QOS : HConstants.NORMAL_QOS); 106 } 107 108 @Override 109 public int getPriority() { 110 return priority < 0 ? HConstants.NORMAL_QOS : priority; 111 } 112 113 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", 114 justification = "The only possible race method is startCancel") 115 @Override 116 public void reset() { 117 priority = 0; 118 cellScanner = null; 119 exception = null; 120 callTimeout = null; 121 // In the implementations of some callable with replicas, rpc calls are executed in a executor 122 // and we could cancel the operation from outside which means there could be a race between 123 // reset and startCancel. Although I think the race should be handled by the callable since the 124 // reset may clear the cancel state... 125 synchronized (this) { 126 done = false; 127 cancelled = false; 128 cancellationCbs.clear(); 129 } 130 } 131 132 @Override 133 public int getCallTimeout() { 134 if (callTimeout != null) { 135 return callTimeout.intValue(); 136 } else { 137 return 0; 138 } 139 } 140 141 @Override 142 public void setCallTimeout(int callTimeout) { 143 this.callTimeout = callTimeout; 144 } 145 146 @Override 147 public boolean hasCallTimeout() { 148 return callTimeout != null; 149 } 150 151 @Override 152 public synchronized String errorText() { 153 if (!done || exception == null) { 154 return null; 155 } 156 return exception.getMessage(); 157 } 158 159 @Override 160 public synchronized boolean failed() { 161 return done && this.exception != null; 162 } 163 164 @Override 165 public synchronized boolean isCanceled() { 166 return cancelled; 167 } 168 169 @Override 170 public void notifyOnCancel(RpcCallback<Object> callback) { 171 synchronized (this) { 172 if (done) { 173 return; 174 } 175 if (!cancelled) { 176 cancellationCbs.add(callback); 177 return; 178 } 179 } 180 // run it directly as we have already been cancelled. 181 callback.run(null); 182 } 183 184 @Override 185 public synchronized void setFailed(String reason) { 186 if (done) { 187 return; 188 } 189 done = true; 190 exception = new IOException(reason); 191 } 192 193 @Override 194 public synchronized void setFailed(IOException e) { 195 if (done) { 196 return; 197 } 198 done = true; 199 exception = e; 200 } 201 202 @Override 203 public synchronized IOException getFailed() { 204 return done ? exception : null; 205 } 206 207 @Override 208 public synchronized void setDone(CellScanner cellScanner) { 209 if (done) { 210 return; 211 } 212 done = true; 213 this.cellScanner = cellScanner; 214 } 215 216 @Override 217 public void startCancel() { 218 // As said above in the comment of reset, the cancellationCbs maybe cleared by reset, so we need 219 // to copy it. 220 List<RpcCallback<Object>> cbs; 221 synchronized (this) { 222 if (done) { 223 return; 224 } 225 done = true; 226 cancelled = true; 227 cbs = new ArrayList<>(cancellationCbs); 228 } 229 for (RpcCallback<?> cb : cbs) { 230 cb.run(null); 231 } 232 } 233 234 @Override 235 public synchronized void notifyOnCancel(RpcCallback<Object> callback, CancellationCallback action) 236 throws IOException { 237 if (cancelled) { 238 action.run(true); 239 } else { 240 cancellationCbs.add(callback); 241 action.run(false); 242 } 243 } 244 245}