001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.io.IOException; 021import java.nio.ByteBuffer; 022import java.nio.channels.CancelledKeyException; 023import java.nio.channels.ClosedChannelException; 024import java.nio.channels.SelectionKey; 025import java.nio.channels.Selector; 026import java.util.ArrayList; 027import java.util.Collections; 028import java.util.Iterator; 029import java.util.Set; 030import java.util.concurrent.ConcurrentHashMap; 031import org.apache.hadoop.hbase.HBaseIOException; 032import org.apache.hadoop.hbase.security.HBaseSaslRpcServer; 033import org.apache.hadoop.hbase.util.Bytes; 034import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 035import org.apache.hadoop.hbase.util.Threads; 036import org.apache.hadoop.util.StringUtils; 037import org.apache.yetus.audience.InterfaceAudience; 038 039/** 040 * Sends responses of RPC back to clients. 041 */ 042@Deprecated 043@InterfaceAudience.Private 044class SimpleRpcServerResponder extends Thread { 045 046 private final SimpleRpcServer simpleRpcServer; 047 private final Selector writeSelector; 048 private final Set<SimpleServerRpcConnection> writingCons = 049 Collections.newSetFromMap(new ConcurrentHashMap<>()); 050 051 SimpleRpcServerResponder(SimpleRpcServer simpleRpcServer) throws IOException { 052 this.simpleRpcServer = simpleRpcServer; 053 this.setName("RpcServer.responder"); 054 this.setDaemon(true); 055 this.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER); 056 writeSelector = Selector.open(); // create a selector 057 } 058 059 @Override 060 public void run() { 061 SimpleRpcServer.LOG.debug(getName() + ": starting"); 062 try { 063 doRunLoop(); 064 } finally { 065 SimpleRpcServer.LOG.info(getName() + ": stopping"); 066 try { 067 writeSelector.close(); 068 } catch (IOException ioe) { 069 SimpleRpcServer.LOG.error(getName() + ": couldn't close write selector", ioe); 070 } 071 } 072 } 073 074 /** 075 * Take the list of the connections that want to write, and register them in the selector. 076 */ 077 private void registerWrites() { 078 Iterator<SimpleServerRpcConnection> it = writingCons.iterator(); 079 while (it.hasNext()) { 080 SimpleServerRpcConnection c = it.next(); 081 it.remove(); 082 SelectionKey sk = c.channel.keyFor(writeSelector); 083 try { 084 if (sk == null) { 085 try { 086 c.channel.register(writeSelector, SelectionKey.OP_WRITE, c); 087 } catch (ClosedChannelException e) { 088 // ignore: the client went away. 089 if (SimpleRpcServer.LOG.isTraceEnabled()) SimpleRpcServer.LOG.trace("ignored", e); 090 } 091 } else { 092 sk.interestOps(SelectionKey.OP_WRITE); 093 } 094 } catch (CancelledKeyException e) { 095 // ignore: the client went away. 096 if (SimpleRpcServer.LOG.isTraceEnabled()) SimpleRpcServer.LOG.trace("ignored", e); 097 } 098 } 099 } 100 101 /** 102 * Add a connection to the list that want to write, 103 */ 104 public void registerForWrite(SimpleServerRpcConnection c) { 105 if (writingCons.add(c)) { 106 writeSelector.wakeup(); 107 } 108 } 109 110 private void doRunLoop() { 111 long lastPurgeTime = 0; // last check for old calls. 112 while (this.simpleRpcServer.running) { 113 try { 114 registerWrites(); 115 int keyCt = writeSelector.select(this.simpleRpcServer.purgeTimeout); 116 if (keyCt == 0) { 117 continue; 118 } 119 120 Set<SelectionKey> keys = writeSelector.selectedKeys(); 121 Iterator<SelectionKey> iter = keys.iterator(); 122 while (iter.hasNext()) { 123 SelectionKey key = iter.next(); 124 iter.remove(); 125 try { 126 if (key.isValid() && key.isWritable()) { 127 doAsyncWrite(key); 128 } 129 } catch (IOException e) { 130 SimpleRpcServer.LOG.debug(getName() + ": asyncWrite", e); 131 } 132 } 133 134 lastPurgeTime = purge(lastPurgeTime); 135 136 } catch (OutOfMemoryError e) { 137 if (this.simpleRpcServer.errorHandler != null) { 138 if (this.simpleRpcServer.errorHandler.checkOOME(e)) { 139 SimpleRpcServer.LOG.info(getName() + ": exiting on OutOfMemoryError"); 140 return; 141 } 142 } else { 143 // 144 // we can run out of memory if we have too many threads 145 // log the event and sleep for a minute and give 146 // some thread(s) a chance to finish 147 // 148 SimpleRpcServer.LOG.warn(getName() + ": OutOfMemoryError in server select", e); 149 try { 150 Thread.sleep(60000); 151 } catch (InterruptedException ex) { 152 SimpleRpcServer.LOG.debug("Interrupted while sleeping"); 153 return; 154 } 155 } 156 } catch (Exception e) { 157 SimpleRpcServer.LOG 158 .warn(getName() + ": exception in Responder " + StringUtils.stringifyException(e), e); 159 } 160 } 161 SimpleRpcServer.LOG.info(getName() + ": stopped"); 162 } 163 164 /** 165 * If there were some calls that have not been sent out for a long time, we close the connection. 166 * @return the time of the purge. 167 */ 168 private long purge(long lastPurgeTime) { 169 long now = EnvironmentEdgeManager.currentTime(); 170 if (now < lastPurgeTime + this.simpleRpcServer.purgeTimeout) { 171 return lastPurgeTime; 172 } 173 174 ArrayList<SimpleServerRpcConnection> conWithOldCalls = new ArrayList<>(); 175 // get the list of channels from list of keys. 176 synchronized (writeSelector.keys()) { 177 for (SelectionKey key : writeSelector.keys()) { 178 SimpleServerRpcConnection connection = (SimpleServerRpcConnection) key.attachment(); 179 if (connection == null) { 180 throw new IllegalStateException("Coding error: SelectionKey key without attachment."); 181 } 182 if ( 183 connection.lastSentTime > 0 184 && now > connection.lastSentTime + this.simpleRpcServer.purgeTimeout 185 ) { 186 conWithOldCalls.add(connection); 187 } 188 } 189 } 190 191 // Seems safer to close the connection outside of the synchronized loop... 192 for (SimpleServerRpcConnection connection : conWithOldCalls) { 193 this.simpleRpcServer.closeConnection(connection); 194 } 195 196 return now; 197 } 198 199 private void doAsyncWrite(SelectionKey key) throws IOException { 200 SimpleServerRpcConnection connection = (SimpleServerRpcConnection) key.attachment(); 201 if (connection == null) { 202 throw new IOException("doAsyncWrite: no connection"); 203 } 204 if (key.channel() != connection.channel) { 205 throw new IOException("doAsyncWrite: bad channel"); 206 } 207 208 if (processAllResponses(connection)) { 209 try { 210 // We wrote everything, so we don't need to be told when the socket is ready for 211 // write anymore. 212 key.interestOps(0); 213 } catch (CancelledKeyException e) { 214 /* 215 * The Listener/reader might have closed the socket. We don't explicitly cancel the key, so 216 * not sure if this will ever fire. This warning could be removed. 217 */ 218 SimpleRpcServer.LOG.warn("Exception while changing ops : " + e); 219 } 220 } 221 } 222 223 private BufferChain wrapWithSasl(HBaseSaslRpcServer saslServer, BufferChain bc) 224 throws IOException { 225 // Looks like no way around this; saslserver wants a byte array. I have to make it one. 226 // THIS IS A BIG UGLY COPY. 227 byte[] responseBytes = bc.getBytes(); 228 byte[] token; 229 // synchronization may be needed since there can be multiple Handler 230 // threads using saslServer or Crypto AES to wrap responses. 231 synchronized (saslServer) { 232 token = saslServer.wrap(responseBytes, 0, responseBytes.length); 233 } 234 if (SimpleRpcServer.LOG.isTraceEnabled()) { 235 SimpleRpcServer.LOG 236 .trace("Adding saslServer wrapped token of size " + token.length + " as call response."); 237 } 238 239 ByteBuffer[] responseBufs = new ByteBuffer[2]; 240 responseBufs[0] = ByteBuffer.wrap(Bytes.toBytes(token.length)); 241 responseBufs[1] = ByteBuffer.wrap(token); 242 return new BufferChain(responseBufs); 243 } 244 245 /** 246 * Process the response for this call. You need to have the lock on 247 * {@link org.apache.hadoop.hbase.ipc.SimpleServerRpcConnection#responseWriteLock} 248 * @return true if we proceed the call fully, false otherwise. 249 */ 250 private boolean processResponse(SimpleServerRpcConnection conn, RpcResponse resp) 251 throws IOException { 252 boolean error = true; 253 BufferChain buf = resp.getResponse(); 254 if (conn.useWrap) { 255 buf = wrapWithSasl(conn.saslServer, buf); 256 } 257 try { 258 // Send as much data as we can in the non-blocking fashion 259 long numBytes = this.simpleRpcServer.channelWrite(conn.channel, buf); 260 if (numBytes < 0) { 261 throw new HBaseIOException("Error writing on the socket " + conn); 262 } 263 error = false; 264 } finally { 265 if (error) { 266 SimpleRpcServer.LOG.debug(conn + ": output error -- closing"); 267 // We will be closing this connection itself. Mark this call as done so that all the 268 // buffer(s) it got from pool can get released 269 resp.done(); 270 this.simpleRpcServer.closeConnection(conn); 271 } 272 } 273 274 if (!buf.hasRemaining()) { 275 resp.done(); 276 return true; 277 } else { 278 // set the serve time when the response has to be sent later 279 conn.lastSentTime = EnvironmentEdgeManager.currentTime(); 280 return false; // Socket can't take more, we will have to come back. 281 } 282 } 283 284 /** 285 * Process all the responses for this connection 286 * @return true if all the calls were processed or that someone else is doing it. false if there 287 * is still some work to do. In this case, we expect the caller to delay us. 288 */ 289 private boolean processAllResponses(final SimpleServerRpcConnection connection) 290 throws IOException { 291 // We want only one writer on the channel for a connection at a time. 292 connection.responseWriteLock.lock(); 293 try { 294 for (int i = 0; i < 20; i++) { 295 // protection if some handlers manage to need all the responder 296 RpcResponse resp = connection.responseQueue.pollFirst(); 297 if (resp == null) { 298 return true; 299 } 300 if (!processResponse(connection, resp)) { 301 connection.responseQueue.addFirst(resp); 302 return false; 303 } 304 } 305 } finally { 306 connection.responseWriteLock.unlock(); 307 } 308 309 return connection.responseQueue.isEmpty(); 310 } 311 312 // 313 // Enqueue a response from the application. 314 // 315 void doRespond(SimpleServerRpcConnection conn, RpcResponse resp) throws IOException { 316 boolean added = false; 317 // If there is already a write in progress, we don't wait. This allows to free the handlers 318 // immediately for other tasks. 319 if (conn.responseQueue.isEmpty() && conn.responseWriteLock.tryLock()) { 320 try { 321 if (conn.responseQueue.isEmpty()) { 322 // If we're alone, we can try to do a direct call to the socket. It's 323 // an optimization to save on context switches and data transfer between cores.. 324 if (processResponse(conn, resp)) { 325 return; // we're done. 326 } 327 // Too big to fit, putting ahead. 328 conn.responseQueue.addFirst(resp); 329 added = true; // We will register to the selector later, outside of the lock. 330 } 331 } finally { 332 conn.responseWriteLock.unlock(); 333 } 334 } 335 336 if (!added) { 337 conn.responseQueue.addLast(resp); 338 } 339 registerForWrite(conn); 340 } 341}