001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import org.apache.hadoop.hbase.CellScannable; 024import org.apache.hadoop.hbase.CellScanner; 025import org.apache.hadoop.hbase.CellUtil; 026import org.apache.hadoop.hbase.HConstants; 027import org.apache.hadoop.hbase.TableName; 028import org.apache.hadoop.hbase.client.RegionInfo; 029import org.apache.yetus.audience.InterfaceAudience; 030 031import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 032 033/** 034 * Get instances via {@link RpcControllerFactory} on client-side. 035 * @see RpcControllerFactory 036 */ 037@InterfaceAudience.Private 038public class HBaseRpcControllerImpl implements HBaseRpcController { 039 /** 040 * The time, in ms before the call should expire. 041 */ 042 private Integer callTimeout; 043 044 private boolean done = false; 045 046 private boolean cancelled = false; 047 048 private final List<RpcCallback<Object>> cancellationCbs = new ArrayList<>(); 049 050 private IOException exception; 051 052 /** 053 * Rpc target Region's RegionInfo we are going against. May be null. 054 * @see #hasRegionInfo() 055 */ 056 private RegionInfo regionInfo; 057 058 /** 059 * Priority to set on this request. Set it here in controller so available composing the request. 060 * This is the ordained way of setting priorities going forward. We will be undoing the old 061 * annotation-based mechanism. 062 */ 063 private int priority = HConstants.PRIORITY_UNSET; 064 065 /** 066 * They are optionally set on construction, cleared after we make the call, and then optionally 067 * set on response with the result. We use this lowest common denominator access to Cells because 068 * sometimes the scanner is backed by a List of Cells and other times, it is backed by an encoded 069 * block that implements CellScanner. 070 */ 071 private CellScanner cellScanner; 072 073 public HBaseRpcControllerImpl() { 074 this(null, (CellScanner) null); 075 } 076 077 /** 078 * Used server-side. Clients should go via {@link RpcControllerFactory} 079 */ 080 public HBaseRpcControllerImpl(final CellScanner cellScanner) { 081 this(null, cellScanner); 082 } 083 084 HBaseRpcControllerImpl(RegionInfo regionInfo, final CellScanner cellScanner) { 085 this.cellScanner = cellScanner; 086 this.regionInfo = regionInfo; 087 } 088 089 HBaseRpcControllerImpl(RegionInfo regionInfo, final List<CellScannable> cellIterables) { 090 this.cellScanner = cellIterables == null ? null : CellUtil.createCellScanner(cellIterables); 091 this.regionInfo = null; 092 } 093 094 @Override 095 public boolean hasRegionInfo() { 096 return this.regionInfo != null; 097 } 098 099 @Override 100 public RegionInfo getRegionInfo() { 101 return this.regionInfo; 102 } 103 104 /** Returns One-shot cell scanner (you cannot back it up and restart) */ 105 @Override 106 public CellScanner cellScanner() { 107 return cellScanner; 108 } 109 110 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", 111 justification = "The only possible race method is startCancel") 112 @Override 113 public void setCellScanner(final CellScanner cellScanner) { 114 this.cellScanner = cellScanner; 115 } 116 117 @Override 118 public void setPriority(int priority) { 119 this.priority = Math.max(this.priority, priority); 120 121 } 122 123 @Override 124 public void setPriority(final TableName tn) { 125 setPriority( 126 tn != null && tn.isSystemTable() ? HConstants.SYSTEMTABLE_QOS : HConstants.NORMAL_QOS); 127 } 128 129 @Override 130 public int getPriority() { 131 return priority < 0 ? HConstants.NORMAL_QOS : priority; 132 } 133 134 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", 135 justification = "The only possible race method is startCancel") 136 @Override 137 public void reset() { 138 priority = 0; 139 cellScanner = null; 140 exception = null; 141 callTimeout = null; 142 regionInfo = null; 143 // In the implementations of some callable with replicas, rpc calls are executed in a executor 144 // and we could cancel the operation from outside which means there could be a race between 145 // reset and startCancel. Although I think the race should be handled by the callable since the 146 // reset may clear the cancel state... 147 synchronized (this) { 148 done = false; 149 cancelled = false; 150 cancellationCbs.clear(); 151 } 152 } 153 154 @Override 155 public int getCallTimeout() { 156 return callTimeout != null ? callTimeout : 0; 157 } 158 159 @Override 160 public void setCallTimeout(int callTimeout) { 161 this.callTimeout = callTimeout; 162 } 163 164 @Override 165 public boolean hasCallTimeout() { 166 return callTimeout != null; 167 } 168 169 @Override 170 public synchronized String errorText() { 171 if (!done || exception == null) { 172 return null; 173 } 174 return exception.getMessage(); 175 } 176 177 @Override 178 public synchronized boolean failed() { 179 return done && this.exception != null; 180 } 181 182 @Override 183 public synchronized boolean isCanceled() { 184 return cancelled; 185 } 186 187 @Override 188 public void notifyOnCancel(RpcCallback<Object> callback) { 189 synchronized (this) { 190 if (done) { 191 return; 192 } 193 if (!cancelled) { 194 cancellationCbs.add(callback); 195 return; 196 } 197 } 198 // run it directly as we have already been cancelled. 199 callback.run(null); 200 } 201 202 @Override 203 public synchronized void setFailed(String reason) { 204 if (done) { 205 return; 206 } 207 done = true; 208 exception = new IOException(reason); 209 } 210 211 @Override 212 public synchronized void setFailed(IOException e) { 213 if (done) { 214 return; 215 } 216 done = true; 217 exception = e; 218 } 219 220 @Override 221 public synchronized IOException getFailed() { 222 return done ? exception : null; 223 } 224 225 @Override 226 public synchronized void setDone(CellScanner cellScanner) { 227 if (done) { 228 return; 229 } 230 done = true; 231 this.cellScanner = cellScanner; 232 } 233 234 @Override 235 public void startCancel() { 236 // As said above in the comment of reset, the cancellationCbs maybe cleared by reset, so we need 237 // to copy it. 238 List<RpcCallback<Object>> cbs; 239 synchronized (this) { 240 if (done) { 241 return; 242 } 243 done = true; 244 cancelled = true; 245 cbs = new ArrayList<>(cancellationCbs); 246 } 247 for (RpcCallback<?> cb : cbs) { 248 cb.run(null); 249 } 250 } 251 252 @Override 253 public synchronized void notifyOnCancel(RpcCallback<Object> callback, CancellationCallback action) 254 throws IOException { 255 if (cancelled) { 256 action.run(true); 257 } else { 258 cancellationCbs.add(callback); 259 action.run(false); 260 } 261 } 262 263 @Override 264 public String toString() { 265 return "HBaseRpcControllerImpl{" + "callTimeout=" + callTimeout + ", done=" + done 266 + ", cancelled=" + cancelled + ", cancellationCbs=" + cancellationCbs + ", exception=" 267 + exception + ", regionInfo=" + regionInfo + ", priority=" + priority + ", cellScanner=" 268 + cellScanner + '}'; 269 } 270}