001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license 004 * agreements. See the NOTICE file distributed with this work for additional information regarding 005 * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the 006 * "License"); you may not use this file except in compliance with the License. You may obtain a 007 * copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software distributed under the 012 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 013 * express or implied. See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.apache.hadoop.hbase.client; 017 018import static org.apache.hadoop.hbase.client.BufferedMutatorParams.UNSET; 019 020import java.io.Closeable; 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.util.ArrayList; 024import java.util.Collections; 025import java.util.Iterator; 026import java.util.List; 027import java.util.NoSuchElementException; 028import java.util.Timer; 029import java.util.TimerTask; 030import java.util.concurrent.ConcurrentLinkedQueue; 031import java.util.concurrent.ExecutorService; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicInteger; 034import java.util.concurrent.atomic.AtomicLong; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.apache.yetus.audience.InterfaceStability; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 044 045/** 046 * <p> 047 * Used to communicate with a single HBase table similar to {@link Table} 048 * but meant for batched, potentially asynchronous puts. Obtain an instance from 049 * a {@link Connection} and call {@link #close()} afterwards. Provide an alternate 050 * to this implementation by setting {@link BufferedMutatorParams#implementationClassName(String)} 051 * or by setting alternate classname via the key {} in Configuration. 052 * </p> 053 * 054 * <p> 055 * While this can be used across threads, great care should be used when doing so. 056 * Errors are global to the buffered mutator and the Exceptions can be thrown on any 057 * thread that causes the flush for requests. 058 * </p> 059 * 060 * @see ConnectionFactory 061 * @see Connection 062 * @since 1.0.0 063 */ 064@InterfaceAudience.Private 065@InterfaceStability.Evolving 066public class BufferedMutatorImpl implements BufferedMutator { 067 068 private static final Logger LOG = LoggerFactory.getLogger(BufferedMutatorImpl.class); 069 070 private final ExceptionListener listener; 071 072 private final TableName tableName; 073 074 private final Configuration conf; 075 private final ConcurrentLinkedQueue<Mutation> writeAsyncBuffer = new ConcurrentLinkedQueue<>(); 076 private final AtomicLong currentWriteBufferSize = new AtomicLong(0); 077 /** 078 * Count the size of {@link BufferedMutatorImpl#writeAsyncBuffer}. 079 * The {@link ConcurrentLinkedQueue#size()} is NOT a constant-time operation. 080 */ 081 private final AtomicInteger undealtMutationCount = new AtomicInteger(0); 082 private final long writeBufferSize; 083 084 private final AtomicLong writeBufferPeriodicFlushTimeoutMs = new AtomicLong(0); 085 private final AtomicLong writeBufferPeriodicFlushTimerTickMs = 086 new AtomicLong(MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS); 087 private Timer writeBufferPeriodicFlushTimer = null; 088 089 private final int maxKeyValueSize; 090 private final ExecutorService pool; 091 private final AtomicInteger rpcTimeout; 092 private final AtomicInteger operationTimeout; 093 private final boolean cleanupPoolOnClose; 094 private volatile boolean closed = false; 095 private final AsyncProcess ap; 096 097 @VisibleForTesting 098 BufferedMutatorImpl(ClusterConnection conn, BufferedMutatorParams params, AsyncProcess ap) { 099 if (conn == null || conn.isClosed()) { 100 throw new IllegalArgumentException("Connection is null or closed."); 101 } 102 this.tableName = params.getTableName(); 103 this.conf = conn.getConfiguration(); 104 this.listener = params.getListener(); 105 if (params.getPool() == null) { 106 this.pool = HTable.getDefaultExecutor(conf); 107 cleanupPoolOnClose = true; 108 } else { 109 this.pool = params.getPool(); 110 cleanupPoolOnClose = false; 111 } 112 ConnectionConfiguration tableConf = new ConnectionConfiguration(conf); 113 this.writeBufferSize = 114 params.getWriteBufferSize() != UNSET ? 115 params.getWriteBufferSize() : tableConf.getWriteBufferSize(); 116 117 // Set via the setter because it does value validation and starts/stops the TimerTask 118 long newWriteBufferPeriodicFlushTimeoutMs = 119 params.getWriteBufferPeriodicFlushTimeoutMs() != UNSET 120 ? params.getWriteBufferPeriodicFlushTimeoutMs() 121 : tableConf.getWriteBufferPeriodicFlushTimeoutMs(); 122 long newWriteBufferPeriodicFlushTimerTickMs = 123 params.getWriteBufferPeriodicFlushTimerTickMs() != UNSET 124 ? params.getWriteBufferPeriodicFlushTimerTickMs() 125 : tableConf.getWriteBufferPeriodicFlushTimerTickMs(); 126 this.setWriteBufferPeriodicFlush( 127 newWriteBufferPeriodicFlushTimeoutMs, 128 newWriteBufferPeriodicFlushTimerTickMs); 129 130 this.maxKeyValueSize = 131 params.getMaxKeyValueSize() != UNSET ? 132 params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize(); 133 134 this.rpcTimeout = new AtomicInteger( 135 params.getRpcTimeout() != UNSET ? 136 params.getRpcTimeout() : conn.getConnectionConfiguration().getWriteRpcTimeout()); 137 138 this.operationTimeout = new AtomicInteger( 139 params.getOperationTimeout() != UNSET ? 140 params.getOperationTimeout() : conn.getConnectionConfiguration().getOperationTimeout()); 141 this.ap = ap; 142 } 143 BufferedMutatorImpl(ClusterConnection conn, RpcRetryingCallerFactory rpcCallerFactory, 144 RpcControllerFactory rpcFactory, BufferedMutatorParams params) { 145 this(conn, params, 146 // puts need to track errors globally due to how the APIs currently work. 147 new AsyncProcess(conn, conn.getConfiguration(), rpcCallerFactory, rpcFactory)); 148 } 149 150 private void checkClose() { 151 if (closed) { 152 throw new IllegalStateException("Cannot put when the BufferedMutator is closed."); 153 } 154 } 155 156 @VisibleForTesting 157 ExecutorService getPool() { 158 return pool; 159 } 160 161 @VisibleForTesting 162 AsyncProcess getAsyncProcess() { 163 return ap; 164 } 165 166 @Override 167 public TableName getName() { 168 return tableName; 169 } 170 171 @Override 172 public Configuration getConfiguration() { 173 return conf; 174 } 175 176 @Override 177 public void mutate(Mutation m) throws InterruptedIOException, 178 RetriesExhaustedWithDetailsException { 179 mutate(Collections.singletonList(m)); 180 } 181 182 @Override 183 public void mutate(List<? extends Mutation> ms) throws InterruptedIOException, 184 RetriesExhaustedWithDetailsException { 185 checkClose(); 186 187 long toAddSize = 0; 188 int toAddCount = 0; 189 for (Mutation m : ms) { 190 if (m instanceof Put) { 191 ConnectionUtils.validatePut((Put) m, maxKeyValueSize); 192 } 193 toAddSize += m.heapSize(); 194 ++toAddCount; 195 } 196 197 if (currentWriteBufferSize.get() == 0) { 198 firstRecordInBufferTimestamp.set(System.currentTimeMillis()); 199 } 200 currentWriteBufferSize.addAndGet(toAddSize); 201 writeAsyncBuffer.addAll(ms); 202 undealtMutationCount.addAndGet(toAddCount); 203 doFlush(false); 204 } 205 206 @VisibleForTesting 207 protected long getExecutedWriteBufferPeriodicFlushes() { 208 return executedWriteBufferPeriodicFlushes.get(); 209 } 210 211 private final AtomicLong firstRecordInBufferTimestamp = new AtomicLong(0); 212 private final AtomicLong executedWriteBufferPeriodicFlushes = new AtomicLong(0); 213 214 private void timerCallbackForWriteBufferPeriodicFlush() { 215 if (currentWriteBufferSize.get() == 0) { 216 return; // Nothing to flush 217 } 218 long now = System.currentTimeMillis(); 219 if (firstRecordInBufferTimestamp.get() + writeBufferPeriodicFlushTimeoutMs.get() > now) { 220 return; // No need to flush yet 221 } 222 // The first record in the writebuffer has been in there too long --> flush 223 try { 224 executedWriteBufferPeriodicFlushes.incrementAndGet(); 225 flush(); 226 } catch (InterruptedIOException | RetriesExhaustedWithDetailsException e) { 227 LOG.error("Exception during timerCallbackForWriteBufferPeriodicFlush --> " + e.getMessage()); 228 } 229 } 230 231 @Override 232 public synchronized void close() throws IOException { 233 if (closed) { 234 return; 235 } 236 // Stop any running Periodic Flush timer. 237 disableWriteBufferPeriodicFlush(); 238 try { 239 // As we can have an operation in progress even if the buffer is empty, we call 240 // doFlush at least one time. 241 doFlush(true); 242 } finally { 243 if (cleanupPoolOnClose) { 244 this.pool.shutdown(); 245 try { 246 if (!pool.awaitTermination(600, TimeUnit.SECONDS)) { 247 LOG.warn("close() failed to terminate pool after 10 minutes. Abandoning pool."); 248 } 249 } catch (InterruptedException e) { 250 LOG.warn("waitForTermination interrupted"); 251 Thread.currentThread().interrupt(); 252 } 253 } 254 closed = true; 255 } 256 } 257 258 private AsyncProcessTask createTask(QueueRowAccess access) { 259 return new AsyncProcessTask(AsyncProcessTask.newBuilder() 260 .setPool(pool) 261 .setTableName(tableName) 262 .setRowAccess(access) 263 .setSubmittedRows(AsyncProcessTask.SubmittedRows.AT_LEAST_ONE) 264 .build()) { 265 @Override 266 public int getRpcTimeout() { 267 return rpcTimeout.get(); 268 } 269 270 @Override 271 public int getOperationTimeout() { 272 return operationTimeout.get(); 273 } 274 }; 275 } 276 277 @Override 278 public void flush() throws InterruptedIOException, RetriesExhaustedWithDetailsException { 279 checkClose(); 280 doFlush(true); 281 } 282 283 /** 284 * Send the operations in the buffer to the servers. 285 * 286 * @param flushAll - if true, sends all the writes and wait for all of them to finish before 287 * returning. Otherwise, flush until buffer size is smaller than threshold 288 */ 289 private void doFlush(boolean flushAll) throws InterruptedIOException, 290 RetriesExhaustedWithDetailsException { 291 List<RetriesExhaustedWithDetailsException> errors = new ArrayList<>(); 292 while (true) { 293 if (!flushAll && currentWriteBufferSize.get() <= writeBufferSize) { 294 // There is the room to accept more mutations. 295 break; 296 } 297 AsyncRequestFuture asf; 298 try (QueueRowAccess access = createQueueRowAccess()) { 299 if (access.isEmpty()) { 300 // It means someone has gotten the ticker to run the flush. 301 break; 302 } 303 asf = ap.submit(createTask(access)); 304 } 305 // DON'T do the wait in the try-with-resources. Otherwise, the undealt mutations won't 306 // be released. 307 asf.waitUntilDone(); 308 if (asf.hasError()) { 309 errors.add(asf.getErrors()); 310 } 311 } 312 313 RetriesExhaustedWithDetailsException exception = makeException(errors); 314 if (exception == null) { 315 return; 316 } else if(listener == null) { 317 throw exception; 318 } else { 319 listener.onException(exception, this); 320 } 321 } 322 323 private static RetriesExhaustedWithDetailsException makeException( 324 List<RetriesExhaustedWithDetailsException> errors) { 325 switch (errors.size()) { 326 case 0: 327 return null; 328 case 1: 329 return errors.get(0); 330 default: 331 List<Throwable> exceptions = new ArrayList<>(); 332 List<Row> actions = new ArrayList<>(); 333 List<String> hostnameAndPort = new ArrayList<>(); 334 errors.forEach(e -> { 335 exceptions.addAll(e.exceptions); 336 actions.addAll(e.actions); 337 hostnameAndPort.addAll(e.hostnameAndPort); 338 }); 339 return new RetriesExhaustedWithDetailsException(exceptions, actions, hostnameAndPort); 340 } 341 } 342 343 /** 344 * {@inheritDoc} 345 */ 346 @Override 347 public long getWriteBufferSize() { 348 return this.writeBufferSize; 349 } 350 351 @Override 352 public synchronized void setWriteBufferPeriodicFlush(long timeoutMs, long timerTickMs) { 353 long originalTimeoutMs = this.writeBufferPeriodicFlushTimeoutMs.get(); 354 long originalTimerTickMs = this.writeBufferPeriodicFlushTimerTickMs.get(); 355 356 // Both parameters have minimal values. 357 writeBufferPeriodicFlushTimeoutMs.set(Math.max(0, timeoutMs)); 358 writeBufferPeriodicFlushTimerTickMs.set( 359 Math.max(MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS, timerTickMs)); 360 361 // If something changed we stop the old Timer. 362 if (writeBufferPeriodicFlushTimeoutMs.get() != originalTimeoutMs || 363 writeBufferPeriodicFlushTimerTickMs.get() != originalTimerTickMs) { 364 if (writeBufferPeriodicFlushTimer != null) { 365 writeBufferPeriodicFlushTimer.cancel(); 366 writeBufferPeriodicFlushTimer = null; 367 } 368 } 369 370 // If we have the need for a timer and there is none we start it 371 if (writeBufferPeriodicFlushTimer == null && 372 writeBufferPeriodicFlushTimeoutMs.get() > 0) { 373 writeBufferPeriodicFlushTimer = new Timer(true); // Create Timer running as Daemon. 374 writeBufferPeriodicFlushTimer.schedule(new TimerTask() { 375 @Override 376 public void run() { 377 BufferedMutatorImpl.this.timerCallbackForWriteBufferPeriodicFlush(); 378 } 379 }, writeBufferPeriodicFlushTimerTickMs.get(), 380 writeBufferPeriodicFlushTimerTickMs.get()); 381 } 382 } 383 384 @Override 385 public long getWriteBufferPeriodicFlushTimeoutMs() { 386 return writeBufferPeriodicFlushTimeoutMs.get(); 387 } 388 389 @Override 390 public long getWriteBufferPeriodicFlushTimerTickMs() { 391 return writeBufferPeriodicFlushTimerTickMs.get(); 392 } 393 394 @Override 395 public void setRpcTimeout(int rpcTimeout) { 396 this.rpcTimeout.set(rpcTimeout); 397 } 398 399 @Override 400 public void setOperationTimeout(int operationTimeout) { 401 this.operationTimeout.set(operationTimeout); 402 } 403 404 @VisibleForTesting 405 long getCurrentWriteBufferSize() { 406 return currentWriteBufferSize.get(); 407 } 408 409 /** 410 * Count the mutations which haven't been processed. 411 * @return count of undealt mutation 412 */ 413 @VisibleForTesting 414 int size() { 415 return undealtMutationCount.get(); 416 } 417 418 /** 419 * Count the mutations which haven't been flushed 420 * @return count of unflushed mutation 421 */ 422 @VisibleForTesting 423 int getUnflushedSize() { 424 return writeAsyncBuffer.size(); 425 } 426 427 @VisibleForTesting 428 QueueRowAccess createQueueRowAccess() { 429 return new QueueRowAccess(); 430 } 431 432 @VisibleForTesting 433 class QueueRowAccess implements RowAccess<Row>, Closeable { 434 private int remainder = undealtMutationCount.getAndSet(0); 435 private Mutation last = null; 436 437 private void restoreLastMutation() { 438 // restore the last mutation since it isn't submitted 439 if (last != null) { 440 writeAsyncBuffer.add(last); 441 currentWriteBufferSize.addAndGet(last.heapSize()); 442 last = null; 443 } 444 } 445 446 @Override 447 public void close() { 448 restoreLastMutation(); 449 if (remainder > 0) { 450 undealtMutationCount.addAndGet(remainder); 451 remainder = 0; 452 } 453 } 454 455 @Override 456 public Iterator<Row> iterator() { 457 return new Iterator<Row>() { 458 private int countDown = remainder; 459 @Override 460 public boolean hasNext() { 461 return countDown > 0; 462 } 463 @Override 464 public Row next() { 465 restoreLastMutation(); 466 if (!hasNext()) { 467 throw new NoSuchElementException(); 468 } 469 last = writeAsyncBuffer.poll(); 470 if (last == null) { 471 throw new NoSuchElementException(); 472 } 473 currentWriteBufferSize.addAndGet(-last.heapSize()); 474 --countDown; 475 return last; 476 } 477 @Override 478 public void remove() { 479 if (last == null) { 480 throw new IllegalStateException(); 481 } 482 --remainder; 483 last = null; 484 } 485 }; 486 } 487 488 @Override 489 public int size() { 490 return remainder; 491 } 492 493 @Override 494 public boolean isEmpty() { 495 return remainder <= 0; 496 } 497 } 498}