001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.BufferedMutatorParams.UNSET; 021 022import java.io.Closeable; 023import java.io.IOException; 024import java.io.InterruptedIOException; 025import java.util.ArrayList; 026import java.util.Collections; 027import java.util.Iterator; 028import java.util.List; 029import java.util.NoSuchElementException; 030import java.util.Timer; 031import java.util.TimerTask; 032import java.util.concurrent.ConcurrentLinkedQueue; 033import java.util.concurrent.ExecutorService; 034import java.util.concurrent.TimeUnit; 035import java.util.concurrent.atomic.AtomicInteger; 036import java.util.concurrent.atomic.AtomicLong; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.apache.yetus.audience.InterfaceStability; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046/** 047 * <p> 048 * Used to communicate with a single HBase table similar to {@link Table} but meant for batched, 049 * potentially asynchronous puts. Obtain an instance from a {@link Connection} and call 050 * {@link #close()} afterwards. Provide an alternate to this implementation by setting 051 * {@link BufferedMutatorParams#implementationClassName(String)} or by setting alternate classname 052 * via the key {} in Configuration. 053 * </p> 054 * <p> 055 * While this can be used across threads, great care should be used when doing so. Errors are global 056 * to the buffered mutator and the Exceptions can be thrown on any thread that causes the flush for 057 * requests. 058 * </p> 059 * @see ConnectionFactory 060 * @see Connection 061 * @since 1.0.0 062 */ 063@InterfaceAudience.Private 064@InterfaceStability.Evolving 065public class BufferedMutatorImpl implements BufferedMutator { 066 067 private static final Logger LOG = LoggerFactory.getLogger(BufferedMutatorImpl.class); 068 069 private final ExceptionListener listener; 070 071 private final TableName tableName; 072 073 private final Configuration conf; 074 private final ConcurrentLinkedQueue<Mutation> writeAsyncBuffer = new ConcurrentLinkedQueue<>(); 075 private final AtomicLong currentWriteBufferSize = new AtomicLong(0); 076 /** 077 * Count the size of {@link BufferedMutatorImpl#writeAsyncBuffer}. The 078 * {@link ConcurrentLinkedQueue#size()} is NOT a constant-time operation. 079 */ 080 private final AtomicInteger undealtMutationCount = new AtomicInteger(0); 081 private final long writeBufferSize; 082 083 private final AtomicLong writeBufferPeriodicFlushTimeoutMs = new AtomicLong(0); 084 private final AtomicLong writeBufferPeriodicFlushTimerTickMs = 085 new AtomicLong(MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS); 086 private Timer writeBufferPeriodicFlushTimer = null; 087 088 private final int maxKeyValueSize; 089 private final ExecutorService pool; 090 private final AtomicInteger rpcTimeout; 091 private final AtomicInteger operationTimeout; 092 private final boolean cleanupPoolOnClose; 093 private volatile boolean closed = false; 094 private final AsyncProcess ap; 095 096 BufferedMutatorImpl(ClusterConnection conn, BufferedMutatorParams params, AsyncProcess ap) { 097 if (conn == null || conn.isClosed()) { 098 throw new IllegalArgumentException("Connection is null or closed."); 099 } 100 this.tableName = params.getTableName(); 101 this.conf = conn.getConfiguration(); 102 this.listener = params.getListener(); 103 if (params.getPool() == null) { 104 this.pool = HTable.getDefaultExecutor(conf); 105 cleanupPoolOnClose = true; 106 } else { 107 this.pool = params.getPool(); 108 cleanupPoolOnClose = false; 109 } 110 ConnectionConfiguration tableConf = new ConnectionConfiguration(conf); 111 this.writeBufferSize = params.getWriteBufferSize() != UNSET 112 ? params.getWriteBufferSize() 113 : tableConf.getWriteBufferSize(); 114 115 // Set via the setter because it does value validation and starts/stops the TimerTask 116 long newWriteBufferPeriodicFlushTimeoutMs = 117 params.getWriteBufferPeriodicFlushTimeoutMs() != UNSET 118 ? params.getWriteBufferPeriodicFlushTimeoutMs() 119 : tableConf.getWriteBufferPeriodicFlushTimeoutMs(); 120 long newWriteBufferPeriodicFlushTimerTickMs = 121 params.getWriteBufferPeriodicFlushTimerTickMs() != UNSET 122 ? params.getWriteBufferPeriodicFlushTimerTickMs() 123 : tableConf.getWriteBufferPeriodicFlushTimerTickMs(); 124 this.setWriteBufferPeriodicFlush(newWriteBufferPeriodicFlushTimeoutMs, 125 newWriteBufferPeriodicFlushTimerTickMs); 126 127 this.maxKeyValueSize = params.getMaxKeyValueSize() != UNSET 128 ? params.getMaxKeyValueSize() 129 : tableConf.getMaxKeyValueSize(); 130 131 this.rpcTimeout = new AtomicInteger(params.getRpcTimeout() != UNSET 132 ? params.getRpcTimeout() 133 : conn.getConnectionConfiguration().getWriteRpcTimeout()); 134 135 this.operationTimeout = new AtomicInteger(params.getOperationTimeout() != UNSET 136 ? params.getOperationTimeout() 137 : conn.getConnectionConfiguration().getOperationTimeout()); 138 this.ap = ap; 139 } 140 141 BufferedMutatorImpl(ClusterConnection conn, RpcRetryingCallerFactory rpcCallerFactory, 142 RpcControllerFactory rpcFactory, BufferedMutatorParams params) { 143 this(conn, params, 144 // puts need to track errors globally due to how the APIs currently work. 145 new AsyncProcess(conn, conn.getConfiguration(), rpcCallerFactory, rpcFactory)); 146 } 147 148 private void checkClose() { 149 if (closed) { 150 throw new IllegalStateException("Cannot put when the BufferedMutator is closed."); 151 } 152 } 153 154 ExecutorService getPool() { 155 return pool; 156 } 157 158 AsyncProcess getAsyncProcess() { 159 return ap; 160 } 161 162 @Override 163 public TableName getName() { 164 return tableName; 165 } 166 167 @Override 168 public Configuration getConfiguration() { 169 return conf; 170 } 171 172 @Override 173 public void mutate(Mutation m) 174 throws InterruptedIOException, RetriesExhaustedWithDetailsException { 175 mutate(Collections.singletonList(m)); 176 } 177 178 @Override 179 public void mutate(List<? extends Mutation> ms) 180 throws InterruptedIOException, RetriesExhaustedWithDetailsException { 181 checkClose(); 182 183 long toAddSize = 0; 184 int toAddCount = 0; 185 for (Mutation m : ms) { 186 if (m instanceof Put) { 187 ConnectionUtils.validatePut((Put) m, maxKeyValueSize); 188 } 189 toAddSize += m.heapSize(); 190 ++toAddCount; 191 } 192 193 if (currentWriteBufferSize.get() == 0) { 194 firstRecordInBufferTimestamp.set(EnvironmentEdgeManager.currentTime()); 195 } 196 currentWriteBufferSize.addAndGet(toAddSize); 197 writeAsyncBuffer.addAll(ms); 198 undealtMutationCount.addAndGet(toAddCount); 199 doFlush(false); 200 } 201 202 protected long getExecutedWriteBufferPeriodicFlushes() { 203 return executedWriteBufferPeriodicFlushes.get(); 204 } 205 206 private final AtomicLong firstRecordInBufferTimestamp = new AtomicLong(0); 207 private final AtomicLong executedWriteBufferPeriodicFlushes = new AtomicLong(0); 208 209 private void timerCallbackForWriteBufferPeriodicFlush() { 210 if (currentWriteBufferSize.get() == 0) { 211 return; // Nothing to flush 212 } 213 long now = EnvironmentEdgeManager.currentTime(); 214 if (firstRecordInBufferTimestamp.get() + writeBufferPeriodicFlushTimeoutMs.get() > now) { 215 return; // No need to flush yet 216 } 217 // The first record in the writebuffer has been in there too long --> flush 218 try { 219 executedWriteBufferPeriodicFlushes.incrementAndGet(); 220 flush(); 221 } catch (InterruptedIOException | RetriesExhaustedWithDetailsException e) { 222 LOG.error("Exception during timerCallbackForWriteBufferPeriodicFlush --> " + e.getMessage()); 223 } 224 } 225 226 @Override 227 public synchronized void close() throws IOException { 228 if (closed) { 229 return; 230 } 231 // Stop any running Periodic Flush timer. 232 disableWriteBufferPeriodicFlush(); 233 try { 234 // As we can have an operation in progress even if the buffer is empty, we call 235 // doFlush at least one time. 236 doFlush(true); 237 } finally { 238 if (cleanupPoolOnClose) { 239 this.pool.shutdown(); 240 try { 241 if (!pool.awaitTermination(600, TimeUnit.SECONDS)) { 242 LOG.warn("close() failed to terminate pool after 10 minutes. Abandoning pool."); 243 } 244 } catch (InterruptedException e) { 245 LOG.warn("waitForTermination interrupted"); 246 Thread.currentThread().interrupt(); 247 } 248 } 249 closed = true; 250 } 251 } 252 253 private AsyncProcessTask createTask(QueueRowAccess access) { 254 return new AsyncProcessTask(AsyncProcessTask.newBuilder().setPool(pool).setTableName(tableName) 255 .setRowAccess(access).setSubmittedRows(AsyncProcessTask.SubmittedRows.AT_LEAST_ONE).build()) { 256 @Override 257 public int getRpcTimeout() { 258 return rpcTimeout.get(); 259 } 260 261 @Override 262 public int getOperationTimeout() { 263 return operationTimeout.get(); 264 } 265 }; 266 } 267 268 @Override 269 public void flush() throws InterruptedIOException, RetriesExhaustedWithDetailsException { 270 checkClose(); 271 doFlush(true); 272 } 273 274 /** 275 * Send the operations in the buffer to the servers. 276 * @param flushAll - if true, sends all the writes and wait for all of them to finish before 277 * returning. Otherwise, flush until buffer size is smaller than threshold 278 */ 279 private void doFlush(boolean flushAll) 280 throws InterruptedIOException, RetriesExhaustedWithDetailsException { 281 List<RetriesExhaustedWithDetailsException> errors = new ArrayList<>(); 282 while (true) { 283 if (!flushAll && currentWriteBufferSize.get() <= writeBufferSize) { 284 // There is the room to accept more mutations. 285 break; 286 } 287 AsyncRequestFuture asf; 288 try (QueueRowAccess access = createQueueRowAccess()) { 289 if (access.isEmpty()) { 290 // It means someone has gotten the ticker to run the flush. 291 break; 292 } 293 asf = ap.submit(createTask(access)); 294 } 295 // DON'T do the wait in the try-with-resources. Otherwise, the undealt mutations won't 296 // be released. 297 asf.waitUntilDone(); 298 if (asf.hasError()) { 299 errors.add(asf.getErrors()); 300 } 301 } 302 303 RetriesExhaustedWithDetailsException exception = makeException(errors); 304 if (exception == null) { 305 return; 306 } else if (listener == null) { 307 throw exception; 308 } else { 309 listener.onException(exception, this); 310 } 311 } 312 313 private static RetriesExhaustedWithDetailsException 314 makeException(List<RetriesExhaustedWithDetailsException> errors) { 315 switch (errors.size()) { 316 case 0: 317 return null; 318 case 1: 319 return errors.get(0); 320 default: 321 List<Throwable> exceptions = new ArrayList<>(); 322 List<Row> actions = new ArrayList<>(); 323 List<String> hostnameAndPort = new ArrayList<>(); 324 errors.forEach(e -> { 325 exceptions.addAll(e.exceptions); 326 actions.addAll(e.actions); 327 hostnameAndPort.addAll(e.hostnameAndPort); 328 }); 329 return new RetriesExhaustedWithDetailsException(exceptions, actions, hostnameAndPort); 330 } 331 } 332 333 /** 334 * {@inheritDoc} 335 */ 336 @Override 337 public long getWriteBufferSize() { 338 return this.writeBufferSize; 339 } 340 341 @Override 342 public synchronized void setWriteBufferPeriodicFlush(long timeoutMs, long timerTickMs) { 343 long originalTimeoutMs = this.writeBufferPeriodicFlushTimeoutMs.get(); 344 long originalTimerTickMs = this.writeBufferPeriodicFlushTimerTickMs.get(); 345 346 // Both parameters have minimal values. 347 writeBufferPeriodicFlushTimeoutMs.set(Math.max(0, timeoutMs)); 348 writeBufferPeriodicFlushTimerTickMs 349 .set(Math.max(MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS, timerTickMs)); 350 351 // If something changed we stop the old Timer. 352 if ( 353 writeBufferPeriodicFlushTimeoutMs.get() != originalTimeoutMs 354 || writeBufferPeriodicFlushTimerTickMs.get() != originalTimerTickMs 355 ) { 356 if (writeBufferPeriodicFlushTimer != null) { 357 writeBufferPeriodicFlushTimer.cancel(); 358 writeBufferPeriodicFlushTimer = null; 359 } 360 } 361 362 // If we have the need for a timer and there is none we start it 363 if (writeBufferPeriodicFlushTimer == null && writeBufferPeriodicFlushTimeoutMs.get() > 0) { 364 writeBufferPeriodicFlushTimer = new Timer(true); // Create Timer running as Daemon. 365 writeBufferPeriodicFlushTimer.schedule(new TimerTask() { 366 @Override 367 public void run() { 368 BufferedMutatorImpl.this.timerCallbackForWriteBufferPeriodicFlush(); 369 } 370 }, writeBufferPeriodicFlushTimerTickMs.get(), writeBufferPeriodicFlushTimerTickMs.get()); 371 } 372 } 373 374 @Override 375 public long getWriteBufferPeriodicFlushTimeoutMs() { 376 return writeBufferPeriodicFlushTimeoutMs.get(); 377 } 378 379 @Override 380 public long getWriteBufferPeriodicFlushTimerTickMs() { 381 return writeBufferPeriodicFlushTimerTickMs.get(); 382 } 383 384 @Override 385 public void setRpcTimeout(int rpcTimeout) { 386 this.rpcTimeout.set(rpcTimeout); 387 } 388 389 @Override 390 public void setOperationTimeout(int operationTimeout) { 391 this.operationTimeout.set(operationTimeout); 392 } 393 394 long getCurrentWriteBufferSize() { 395 return currentWriteBufferSize.get(); 396 } 397 398 /** 399 * Count the mutations which haven't been processed. 400 * @return count of undealt mutation 401 */ 402 int size() { 403 return undealtMutationCount.get(); 404 } 405 406 /** 407 * Count the mutations which haven't been flushed 408 * @return count of unflushed mutation 409 */ 410 int getUnflushedSize() { 411 return writeAsyncBuffer.size(); 412 } 413 414 QueueRowAccess createQueueRowAccess() { 415 return new QueueRowAccess(); 416 } 417 418 class QueueRowAccess implements RowAccess<Row>, Closeable { 419 private int remainder = undealtMutationCount.getAndSet(0); 420 private Mutation last = null; 421 422 private void restoreLastMutation() { 423 // restore the last mutation since it isn't submitted 424 if (last != null) { 425 writeAsyncBuffer.add(last); 426 currentWriteBufferSize.addAndGet(last.heapSize()); 427 last = null; 428 } 429 } 430 431 @Override 432 public void close() { 433 restoreLastMutation(); 434 if (remainder > 0) { 435 undealtMutationCount.addAndGet(remainder); 436 remainder = 0; 437 } 438 } 439 440 @Override 441 public Iterator<Row> iterator() { 442 return new Iterator<Row>() { 443 private int countDown = remainder; 444 445 @Override 446 public boolean hasNext() { 447 return countDown > 0; 448 } 449 450 @Override 451 public Row next() { 452 restoreLastMutation(); 453 if (!hasNext()) { 454 throw new NoSuchElementException(); 455 } 456 last = writeAsyncBuffer.poll(); 457 if (last == null) { 458 throw new NoSuchElementException(); 459 } 460 currentWriteBufferSize.addAndGet(-last.heapSize()); 461 --countDown; 462 return last; 463 } 464 465 @Override 466 public void remove() { 467 if (last == null) { 468 throw new IllegalStateException(); 469 } 470 --remainder; 471 last = null; 472 } 473 }; 474 } 475 476 @Override 477 public int size() { 478 return remainder; 479 } 480 481 @Override 482 public boolean isEmpty() { 483 return remainder <= 0; 484 } 485 } 486}