001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collections; 023import java.util.List; 024import java.util.Map; 025import org.apache.hadoop.hbase.ExtendedCellScannable; 026import org.apache.hadoop.hbase.ExtendedCellScanner; 027import org.apache.hadoop.hbase.HConstants; 028import org.apache.hadoop.hbase.PrivateCellUtil; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.client.RegionInfo; 031import org.apache.yetus.audience.InterfaceAudience; 032 033import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 034 035/** 036 * Get instances via {@link RpcControllerFactory} on client-side. 037 * @see RpcControllerFactory 038 */ 039@InterfaceAudience.Private 040public class HBaseRpcControllerImpl implements HBaseRpcController { 041 /** 042 * The time, in ms before the call should expire. 043 */ 044 private Integer callTimeout; 045 046 private boolean done = false; 047 048 private boolean cancelled = false; 049 050 private final List<RpcCallback<Object>> cancellationCbs = new ArrayList<>(); 051 052 private IOException exception; 053 054 private TableName tableName; 055 056 /** 057 * Rpc target Region's RegionInfo we are going against. May be null. 058 * @see #hasRegionInfo() 059 */ 060 private RegionInfo regionInfo; 061 062 /** 063 * Priority to set on this request. Set it here in controller so available composing the request. 064 * This is the ordained way of setting priorities going forward. We will be undoing the old 065 * annotation-based mechanism. 066 */ 067 private int priority = HConstants.PRIORITY_UNSET; 068 069 /** 070 * They are optionally set on construction, cleared after we make the call, and then optionally 071 * set on response with the result. We use this lowest common denominator access to Cells because 072 * sometimes the scanner is backed by a List of Cells and other times, it is backed by an encoded 073 * block that implements CellScanner. 074 */ 075 private ExtendedCellScanner cellScanner; 076 077 private Map<String, byte[]> requestAttributes = Collections.emptyMap(); 078 079 public HBaseRpcControllerImpl() { 080 this(null, (ExtendedCellScanner) null); 081 } 082 083 /** 084 * Used server-side. Clients should go via {@link RpcControllerFactory} 085 */ 086 public HBaseRpcControllerImpl(final ExtendedCellScanner cellScanner) { 087 this(null, cellScanner); 088 } 089 090 HBaseRpcControllerImpl(RegionInfo regionInfo, final ExtendedCellScanner cellScanner) { 091 this.cellScanner = cellScanner; 092 this.regionInfo = regionInfo; 093 } 094 095 HBaseRpcControllerImpl(RegionInfo regionInfo, final List<ExtendedCellScannable> cellIterables) { 096 this.cellScanner = 097 cellIterables == null ? null : PrivateCellUtil.createExtendedCellScanner(cellIterables); 098 this.regionInfo = null; 099 } 100 101 @Override 102 public boolean hasRegionInfo() { 103 return this.regionInfo != null; 104 } 105 106 @Override 107 public RegionInfo getRegionInfo() { 108 return this.regionInfo; 109 } 110 111 /** Returns One-shot cell scanner (you cannot back it up and restart) */ 112 @Override 113 public ExtendedCellScanner cellScanner() { 114 return cellScanner; 115 } 116 117 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", 118 justification = "The only possible race method is startCancel") 119 @Override 120 public void setCellScanner(final ExtendedCellScanner cellScanner) { 121 this.cellScanner = cellScanner; 122 } 123 124 @Override 125 public void setPriority(int priority) { 126 this.priority = Math.max(this.priority, priority); 127 128 } 129 130 @Override 131 public void setPriority(final TableName tn) { 132 setPriority( 133 tn != null && tn.isSystemTable() ? HConstants.SYSTEMTABLE_QOS : HConstants.NORMAL_QOS); 134 } 135 136 @Override 137 public int getPriority() { 138 return priority < 0 ? HConstants.NORMAL_QOS : priority; 139 } 140 141 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", 142 justification = "The only possible race method is startCancel") 143 @Override 144 public void reset() { 145 priority = 0; 146 cellScanner = null; 147 exception = null; 148 callTimeout = null; 149 regionInfo = null; 150 tableName = null; 151 // In the implementations of some callable with replicas, rpc calls are executed in a executor 152 // and we could cancel the operation from outside which means there could be a race between 153 // reset and startCancel. Although I think the race should be handled by the callable since the 154 // reset may clear the cancel state... 155 synchronized (this) { 156 done = false; 157 cancelled = false; 158 cancellationCbs.clear(); 159 } 160 } 161 162 @Override 163 public int getCallTimeout() { 164 return callTimeout != null ? callTimeout : 0; 165 } 166 167 @Override 168 public void setCallTimeout(int callTimeout) { 169 this.callTimeout = callTimeout; 170 } 171 172 @Override 173 public boolean hasCallTimeout() { 174 return callTimeout != null; 175 } 176 177 @Override 178 public Map<String, byte[]> getRequestAttributes() { 179 return requestAttributes; 180 } 181 182 @Override 183 public void setRequestAttributes(Map<String, byte[]> requestAttributes) { 184 this.requestAttributes = requestAttributes; 185 } 186 187 @Override 188 public synchronized String errorText() { 189 if (!done || exception == null) { 190 return null; 191 } 192 return exception.getMessage(); 193 } 194 195 @Override 196 public synchronized boolean failed() { 197 return done && this.exception != null; 198 } 199 200 @Override 201 public synchronized boolean isCanceled() { 202 return cancelled; 203 } 204 205 @Override 206 public void notifyOnCancel(RpcCallback<Object> callback) { 207 synchronized (this) { 208 if (done) { 209 return; 210 } 211 if (!cancelled) { 212 cancellationCbs.add(callback); 213 return; 214 } 215 } 216 // run it directly as we have already been cancelled. 217 callback.run(null); 218 } 219 220 @Override 221 public synchronized void setFailed(String reason) { 222 if (done) { 223 return; 224 } 225 done = true; 226 exception = new IOException(reason); 227 } 228 229 @Override 230 public synchronized void setFailed(IOException e) { 231 if (done) { 232 return; 233 } 234 done = true; 235 exception = e; 236 } 237 238 @Override 239 public synchronized IOException getFailed() { 240 return done ? exception : null; 241 } 242 243 @Override 244 public synchronized void setDone(ExtendedCellScanner cellScanner) { 245 if (done) { 246 return; 247 } 248 done = true; 249 this.cellScanner = cellScanner; 250 } 251 252 @Override 253 public void startCancel() { 254 // As said above in the comment of reset, the cancellationCbs maybe cleared by reset, so we need 255 // to copy it. 256 List<RpcCallback<Object>> cbs; 257 synchronized (this) { 258 if (done) { 259 return; 260 } 261 done = true; 262 cancelled = true; 263 cbs = new ArrayList<>(cancellationCbs); 264 } 265 for (RpcCallback<?> cb : cbs) { 266 cb.run(null); 267 } 268 } 269 270 @Override 271 public synchronized void notifyOnCancel(RpcCallback<Object> callback, CancellationCallback action) 272 throws IOException { 273 if (cancelled) { 274 action.run(true); 275 } else { 276 cancellationCbs.add(callback); 277 action.run(false); 278 } 279 } 280 281 @Override 282 public void setTableName(TableName tableName) { 283 this.tableName = tableName; 284 } 285 286 @Override 287 public TableName getTableName() { 288 return tableName; 289 } 290}