001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collections; 023import java.util.List; 024import java.util.Map; 025import org.apache.hadoop.hbase.ExtendedCellScannable; 026import org.apache.hadoop.hbase.ExtendedCellScanner; 027import org.apache.hadoop.hbase.HConstants; 028import org.apache.hadoop.hbase.PrivateCellUtil; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.client.RegionInfo; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.checkerframework.checker.nullness.qual.Nullable; 033 034import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 035 036/** 037 * Get instances via {@link RpcControllerFactory} on client-side. 038 * @see RpcControllerFactory 039 */ 040@InterfaceAudience.Private 041public class HBaseRpcControllerImpl implements HBaseRpcController { 042 /** 043 * The time, in ms before the call should expire. 044 */ 045 private Integer callTimeout; 046 047 private boolean done = false; 048 049 private boolean cancelled = false; 050 051 private final List<RpcCallback<Object>> cancellationCbs = new ArrayList<>(); 052 053 private IOException exception; 054 055 private TableName tableName; 056 057 /** 058 * Rpc target Region's RegionInfo we are going against. May be null. 059 * @see #hasRegionInfo() 060 */ 061 private RegionInfo regionInfo; 062 063 /** 064 * Priority to set on this request. Set it here in controller so available composing the request. 065 * This is the ordained way of setting priorities going forward. We will be undoing the old 066 * annotation-based mechanism. 067 */ 068 private int priority = HConstants.PRIORITY_UNSET; 069 070 /** 071 * They are optionally set on construction, cleared after we make the call, and then optionally 072 * set on response with the result. We use this lowest common denominator access to Cells because 073 * sometimes the scanner is backed by a List of Cells and other times, it is backed by an encoded 074 * block that implements CellScanner. 075 */ 076 private ExtendedCellScanner cellScanner; 077 078 private Map<String, byte[]> requestAttributes = Collections.emptyMap(); 079 080 public HBaseRpcControllerImpl() { 081 this(null, (ExtendedCellScanner) null); 082 } 083 084 /** 085 * Used server-side. Clients should go via {@link RpcControllerFactory} 086 */ 087 public HBaseRpcControllerImpl(final ExtendedCellScanner cellScanner) { 088 this(null, cellScanner); 089 } 090 091 HBaseRpcControllerImpl(RegionInfo regionInfo, final ExtendedCellScanner cellScanner) { 092 this.cellScanner = cellScanner; 093 this.regionInfo = regionInfo; 094 } 095 096 HBaseRpcControllerImpl(RegionInfo regionInfo, final List<ExtendedCellScannable> cellIterables) { 097 this.cellScanner = 098 cellIterables == null ? null : PrivateCellUtil.createExtendedCellScanner(cellIterables); 099 this.regionInfo = null; 100 } 101 102 @Override 103 public boolean hasRegionInfo() { 104 return this.regionInfo != null; 105 } 106 107 @Override 108 public RegionInfo getRegionInfo() { 109 return this.regionInfo; 110 } 111 112 /** Returns One-shot cell scanner (you cannot back it up and restart) */ 113 @Override 114 public ExtendedCellScanner cellScanner() { 115 return cellScanner; 116 } 117 118 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", 119 justification = "The only possible race method is startCancel") 120 @Override 121 public void setCellScanner(final ExtendedCellScanner cellScanner) { 122 this.cellScanner = cellScanner; 123 } 124 125 @Override 126 public void setPriority(int priority) { 127 this.priority = Math.max(this.priority, priority); 128 129 } 130 131 @Override 132 public void setPriority(final TableName tn) { 133 setPriority( 134 tn != null && tn.isSystemTable() ? HConstants.SYSTEMTABLE_QOS : HConstants.NORMAL_QOS); 135 } 136 137 @Override 138 public void setPriority(int priority, @Nullable TableName tableName) { 139 if (priority != HConstants.PRIORITY_UNSET) { 140 this.priority = priority; 141 } else if (tableName != null && tableName.isSystemTable()) { 142 this.priority = HConstants.SYSTEMTABLE_QOS; 143 } else { 144 this.priority = HConstants.NORMAL_QOS; 145 } 146 } 147 148 @Override 149 public int getPriority() { 150 return priority < 0 ? HConstants.NORMAL_QOS : priority; 151 } 152 153 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", 154 justification = "The only possible race method is startCancel") 155 @Override 156 public void reset() { 157 priority = 0; 158 cellScanner = null; 159 exception = null; 160 callTimeout = null; 161 regionInfo = null; 162 tableName = null; 163 // In the implementations of some callable with replicas, rpc calls are executed in a executor 164 // and we could cancel the operation from outside which means there could be a race between 165 // reset and startCancel. Although I think the race should be handled by the callable since the 166 // reset may clear the cancel state... 167 synchronized (this) { 168 done = false; 169 cancelled = false; 170 cancellationCbs.clear(); 171 } 172 } 173 174 @Override 175 public int getCallTimeout() { 176 return callTimeout != null ? callTimeout : 0; 177 } 178 179 @Override 180 public void setCallTimeout(int callTimeout) { 181 this.callTimeout = callTimeout; 182 } 183 184 @Override 185 public boolean hasCallTimeout() { 186 return callTimeout != null; 187 } 188 189 @Override 190 public Map<String, byte[]> getRequestAttributes() { 191 return requestAttributes; 192 } 193 194 @Override 195 public void setRequestAttributes(Map<String, byte[]> requestAttributes) { 196 this.requestAttributes = requestAttributes; 197 } 198 199 @Override 200 public synchronized String errorText() { 201 if (!done || exception == null) { 202 return null; 203 } 204 return exception.getMessage(); 205 } 206 207 @Override 208 public synchronized boolean failed() { 209 return done && this.exception != null; 210 } 211 212 @Override 213 public synchronized boolean isCanceled() { 214 return cancelled; 215 } 216 217 @Override 218 public void notifyOnCancel(RpcCallback<Object> callback) { 219 synchronized (this) { 220 if (done) { 221 return; 222 } 223 if (!cancelled) { 224 cancellationCbs.add(callback); 225 return; 226 } 227 } 228 // run it directly as we have already been cancelled. 229 callback.run(null); 230 } 231 232 @Override 233 public synchronized void setFailed(String reason) { 234 if (done) { 235 return; 236 } 237 done = true; 238 exception = new IOException(reason); 239 } 240 241 @Override 242 public synchronized void setFailed(IOException e) { 243 if (done) { 244 return; 245 } 246 done = true; 247 exception = e; 248 } 249 250 @Override 251 public synchronized IOException getFailed() { 252 return done ? exception : null; 253 } 254 255 @Override 256 public synchronized void setDone(ExtendedCellScanner cellScanner) { 257 if (done) { 258 return; 259 } 260 done = true; 261 this.cellScanner = cellScanner; 262 } 263 264 @Override 265 public void startCancel() { 266 // As said above in the comment of reset, the cancellationCbs maybe cleared by reset, so we need 267 // to copy it. 268 List<RpcCallback<Object>> cbs; 269 synchronized (this) { 270 if (done) { 271 return; 272 } 273 done = true; 274 cancelled = true; 275 cbs = new ArrayList<>(cancellationCbs); 276 } 277 for (RpcCallback<?> cb : cbs) { 278 cb.run(null); 279 } 280 } 281 282 @Override 283 public synchronized void notifyOnCancel(RpcCallback<Object> callback, CancellationCallback action) 284 throws IOException { 285 if (cancelled) { 286 action.run(true); 287 } else { 288 cancellationCbs.add(callback); 289 action.run(false); 290 } 291 } 292 293 @Override 294 public void setTableName(TableName tableName) { 295 this.tableName = tableName; 296 } 297 298 @Override 299 public TableName getTableName() { 300 return tableName; 301 } 302}