001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.thrift; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.List; 023import java.util.Set; 024import java.util.concurrent.Callable; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.ConcurrentMap; 027import java.util.concurrent.LinkedBlockingQueue; 028import java.util.concurrent.ThreadPoolExecutor; 029import java.util.concurrent.TimeUnit; 030import java.util.concurrent.atomic.LongAdder; 031import org.apache.hadoop.hbase.CellUtil; 032import org.apache.hadoop.hbase.client.Table; 033import org.apache.hadoop.hbase.thrift.generated.TIncrement; 034import org.apache.hadoop.hbase.util.Bytes; 035import org.apache.hadoop.hbase.util.Threads; 036import org.apache.hadoop.metrics2.util.MBeans; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 042 043/** 044 * This class will coalesce increments from a thift server if 045 * hbase.regionserver.thrift.coalesceIncrement is set to true. Turning this config to true will 046 * cause the thrift server to queue increments into an instance of this class. The thread pool 047 * associated with this class will drain the coalesced increments as the thread is able. This can 048 * cause data loss if the thrift server dies or is shut down before everything in the queue is 049 * drained. 050 */ 051@InterfaceAudience.Private 052public class IncrementCoalescer implements IncrementCoalescerMBean { 053 /** 054 * Used to identify a cell that will be incremented. 055 */ 056 static class FullyQualifiedRow { 057 private byte[] table; 058 private byte[] rowKey; 059 private byte[] family; 060 private byte[] qualifier; 061 062 public FullyQualifiedRow(byte[] table, byte[] rowKey, byte[] fam, byte[] qual) { 063 super(); 064 this.table = table; 065 this.rowKey = rowKey; 066 this.family = fam; 067 this.qualifier = qual; 068 } 069 070 public byte[] getTable() { 071 return table; 072 } 073 074 public void setTable(byte[] table) { 075 this.table = table; 076 } 077 078 public byte[] getRowKey() { 079 return rowKey; 080 } 081 082 public byte[] getFamily() { 083 return family; 084 } 085 086 public void setFamily(byte[] fam) { 087 this.family = fam; 088 } 089 090 public byte[] getQualifier() { 091 return qualifier; 092 } 093 094 public void setQualifier(byte[] qual) { 095 this.qualifier = qual; 096 } 097 098 @Override 099 public int hashCode() { 100 final int prime = 31; 101 int result = 1; 102 result = prime * result + Arrays.hashCode(family); 103 result = prime * result + Arrays.hashCode(qualifier); 104 result = prime * result + Arrays.hashCode(rowKey); 105 result = prime * result + Arrays.hashCode(table); 106 return result; 107 } 108 109 @Override 110 public boolean equals(Object obj) { 111 if (this == obj) { 112 return true; 113 } 114 if (obj == null) { 115 return false; 116 } 117 if (getClass() != obj.getClass()) { 118 return false; 119 } 120 121 FullyQualifiedRow other = (FullyQualifiedRow) obj; 122 123 if (!Arrays.equals(family, other.family)) { 124 return false; 125 } 126 if (!Arrays.equals(qualifier, other.qualifier)) { 127 return false; 128 } 129 if (!Arrays.equals(rowKey, other.rowKey)) { 130 return false; 131 } 132 133 return Arrays.equals(table, other.table); 134 } 135 } 136 137 private final LongAdder failedIncrements = new LongAdder(); 138 private final LongAdder successfulCoalescings = new LongAdder(); 139 private final LongAdder totalIncrements = new LongAdder(); 140 private final ConcurrentMap<FullyQualifiedRow, Long> countersMap = 141 new ConcurrentHashMap<>(100000, 0.75f, 1500); 142 private final ThreadPoolExecutor pool; 143 private final ThriftHBaseServiceHandler handler; 144 145 private int maxQueueSize = 500000; 146 private static final int CORE_POOL_SIZE = 1; 147 148 private static final Logger LOG = LoggerFactory.getLogger(IncrementCoalescer.class); 149 150 public IncrementCoalescer(ThriftHBaseServiceHandler hand) { 151 this.handler = hand; 152 LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(); 153 pool = new ThreadPoolExecutor(CORE_POOL_SIZE, CORE_POOL_SIZE, 50, TimeUnit.MILLISECONDS, queue, 154 new ThreadFactoryBuilder().setNameFormat("IncrementCoalescer-pool-%d").setDaemon(true) 155 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 156 MBeans.register("thrift", "Thrift", this); 157 } 158 159 public boolean queueIncrement(TIncrement inc) { 160 if (!canQueue()) { 161 failedIncrements.increment(); 162 return false; 163 } 164 return internalQueueTincrement(inc); 165 } 166 167 public boolean queueIncrements(List<TIncrement> incs) { 168 if (!canQueue()) { 169 failedIncrements.increment(); 170 return false; 171 } 172 173 for (TIncrement tinc : incs) { 174 internalQueueTincrement(tinc); 175 } 176 177 return true; 178 } 179 180 private boolean internalQueueTincrement(TIncrement inc) { 181 byte[][] famAndQf = CellUtil.parseColumn(inc.getColumn()); 182 183 if (famAndQf.length != 2) { 184 return false; 185 } 186 187 return internalQueueIncrement(inc.getTable(), inc.getRow(), famAndQf[0], famAndQf[1], 188 inc.getAmmount()); 189 } 190 191 @SuppressWarnings("FutureReturnValueIgnored") 192 private boolean internalQueueIncrement(byte[] tableName, byte[] rowKey, byte[] fam, byte[] qual, 193 long ammount) { 194 int countersMapSize = countersMap.size(); 195 196 // Make sure that the number of threads is scaled. 197 dynamicallySetCoreSize(countersMapSize); 198 199 totalIncrements.increment(); 200 201 FullyQualifiedRow key = new FullyQualifiedRow(tableName, rowKey, fam, qual); 202 203 long currentAmount = ammount; 204 // Spin until able to insert the value back without collisions 205 while (true) { 206 Long value = countersMap.remove(key); 207 if (value == null) { 208 // There was nothing there, create a new value 209 value = currentAmount; 210 } else { 211 value += currentAmount; 212 successfulCoalescings.increment(); 213 } 214 // Try to put the value, only if there was none 215 Long oldValue = countersMap.putIfAbsent(key, value); 216 if (oldValue == null) { 217 // We were able to put it in, we're done 218 break; 219 } 220 // Someone else was able to put a value in, so let's remember our 221 // current value (plus what we picked up) and retry to add it in 222 currentAmount = value; 223 } 224 225 // We limit the size of the queue simply because all we need is a 226 // notification that something needs to be incremented. No need 227 // for millions of callables that mean the same thing. 228 if (pool.getQueue().size() <= 1000) { 229 // queue it up 230 Callable<Integer> callable = createIncCallable(); 231 pool.submit(callable); 232 } 233 234 return true; 235 } 236 237 public boolean canQueue() { 238 return countersMap.size() < maxQueueSize; 239 } 240 241 private Callable<Integer> createIncCallable() { 242 return () -> { 243 int failures = 0; 244 Set<FullyQualifiedRow> keys = countersMap.keySet(); 245 for (FullyQualifiedRow row : keys) { 246 Long counter = countersMap.remove(row); 247 if (counter == null) { 248 continue; 249 } 250 Table table = null; 251 try { 252 table = handler.getTable(row.getTable()); 253 if (failures > 2) { 254 throw new IOException("Auto-Fail rest of ICVs"); 255 } 256 table.incrementColumnValue(row.getRowKey(), row.getFamily(), row.getQualifier(), counter); 257 } catch (IOException e) { 258 // log failure of increment 259 failures++; 260 LOG.error("FAILED_ICV: " + Bytes.toString(row.getTable()) + ", " 261 + Bytes.toStringBinary(row.getRowKey()) + ", " + Bytes.toStringBinary(row.getFamily()) 262 + ", " + Bytes.toStringBinary(row.getQualifier()) + ", " + counter, e); 263 } finally { 264 if (table != null) { 265 table.close(); 266 } 267 } 268 } 269 return failures; 270 }; 271 } 272 273 /** 274 * This method samples the incoming requests and, if selected, will check if the corePoolSize 275 * should be changed. 276 * @param countersMapSize the size of the counters map 277 */ 278 private void dynamicallySetCoreSize(int countersMapSize) { 279 // Here we are using countersMapSize as a random number, meaning this 280 // could be a Random object 281 if (countersMapSize % 10 != 0) { 282 return; 283 } 284 double currentRatio = (double) countersMapSize / (double) maxQueueSize; 285 int newValue; 286 287 if (currentRatio < 0.1) { 288 newValue = 1; 289 } else if (currentRatio < 0.3) { 290 newValue = 2; 291 } else if (currentRatio < 0.5) { 292 newValue = 4; 293 } else if (currentRatio < 0.7) { 294 newValue = 8; 295 } else if (currentRatio < 0.9) { 296 newValue = 14; 297 } else { 298 newValue = 22; 299 } 300 301 if (pool.getCorePoolSize() != newValue) { 302 pool.setCorePoolSize(newValue); 303 } 304 } 305 306 // MBean get/set methods 307 @Override 308 public int getQueueSize() { 309 return pool.getQueue().size(); 310 } 311 312 @Override 313 public int getMaxQueueSize() { 314 return this.maxQueueSize; 315 } 316 317 @Override 318 public void setMaxQueueSize(int newSize) { 319 this.maxQueueSize = newSize; 320 } 321 322 @Override 323 public long getPoolCompletedTaskCount() { 324 return pool.getCompletedTaskCount(); 325 } 326 327 @Override 328 public long getPoolTaskCount() { 329 return pool.getTaskCount(); 330 } 331 332 @Override 333 public int getPoolLargestPoolSize() { 334 return pool.getLargestPoolSize(); 335 } 336 337 @Override 338 public int getCorePoolSize() { 339 return pool.getCorePoolSize(); 340 } 341 342 @Override 343 public void setCorePoolSize(int newCoreSize) { 344 pool.setCorePoolSize(newCoreSize); 345 } 346 347 @Override 348 public int getMaxPoolSize() { 349 return pool.getMaximumPoolSize(); 350 } 351 352 @Override 353 public void setMaxPoolSize(int newMaxSize) { 354 pool.setMaximumPoolSize(newMaxSize); 355 } 356 357 @Override 358 public long getFailedIncrements() { 359 return failedIncrements.sum(); 360 } 361 362 @Override 363 public long getSuccessfulCoalescings() { 364 return successfulCoalescings.sum(); 365 } 366 367 @Override 368 public long getTotalIncrements() { 369 return totalIncrements.sum(); 370 } 371 372 @Override 373 public long getCountersMapSize() { 374 return countersMap.size(); 375 } 376}