001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.thrift; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.List; 023import java.util.Set; 024import java.util.concurrent.Callable; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.ConcurrentMap; 027import java.util.concurrent.LinkedBlockingQueue; 028import java.util.concurrent.ThreadPoolExecutor; 029import java.util.concurrent.TimeUnit; 030import java.util.concurrent.atomic.LongAdder; 031import org.apache.hadoop.hbase.CellUtil; 032import org.apache.hadoop.hbase.client.Table; 033import org.apache.hadoop.hbase.thrift.generated.TIncrement; 034import org.apache.hadoop.hbase.util.Bytes; 035import org.apache.hadoop.hbase.util.Threads; 036import org.apache.hadoop.metrics2.util.MBeans; 037import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042/** 043 * This class will coalesce increments from a thift server if 044 * hbase.regionserver.thrift.coalesceIncrement is set to true. Turning this 045 * config to true will cause the thrift server to queue increments into an 046 * instance of this class. The thread pool associated with this class will drain 047 * the coalesced increments as the thread is able. This can cause data loss if the 048 * thrift server dies or is shut down before everything in the queue is drained. 049 * 050 */ 051@InterfaceAudience.Private 052public class IncrementCoalescer implements IncrementCoalescerMBean { 053 /** 054 * Used to identify a cell that will be incremented. 055 * 056 */ 057 static class FullyQualifiedRow { 058 private byte[] table; 059 private byte[] rowKey; 060 private byte[] family; 061 private byte[] qualifier; 062 063 public FullyQualifiedRow(byte[] table, byte[] rowKey, byte[] fam, byte[] qual) { 064 super(); 065 this.table = table; 066 this.rowKey = rowKey; 067 this.family = fam; 068 this.qualifier = qual; 069 } 070 071 public byte[] getTable() { 072 return table; 073 } 074 075 public void setTable(byte[] table) { 076 this.table = table; 077 } 078 079 public byte[] getRowKey() { 080 return rowKey; 081 } 082 083 public byte[] getFamily() { 084 return family; 085 } 086 087 public void setFamily(byte[] fam) { 088 this.family = fam; 089 } 090 091 public byte[] getQualifier() { 092 return qualifier; 093 } 094 095 public void setQualifier(byte[] qual) { 096 this.qualifier = qual; 097 } 098 099 @Override 100 public int hashCode() { 101 final int prime = 31; 102 int result = 1; 103 result = prime * result + Arrays.hashCode(family); 104 result = prime * result + Arrays.hashCode(qualifier); 105 result = prime * result + Arrays.hashCode(rowKey); 106 result = prime * result + Arrays.hashCode(table); 107 return result; 108 } 109 110 @Override 111 public boolean equals(Object obj) { 112 if (this == obj) return true; 113 if (obj == null) return false; 114 if (getClass() != obj.getClass()) return false; 115 FullyQualifiedRow other = (FullyQualifiedRow) obj; 116 117 if (!Arrays.equals(family, other.family)) { 118 return false; 119 } 120 if (!Arrays.equals(qualifier, other.qualifier)) { 121 return false; 122 } 123 if (!Arrays.equals(rowKey, other.rowKey)) { 124 return false; 125 } 126 127 return Arrays.equals(table, other.table); 128 } 129 } 130 131 private final LongAdder failedIncrements = new LongAdder(); 132 private final LongAdder successfulCoalescings = new LongAdder(); 133 private final LongAdder totalIncrements = new LongAdder(); 134 private final ConcurrentMap<FullyQualifiedRow, Long> countersMap = 135 new ConcurrentHashMap<>(100000, 0.75f, 1500); 136 private final ThreadPoolExecutor pool; 137 private final ThriftHBaseServiceHandler handler; 138 139 private int maxQueueSize = 500000; 140 private static final int CORE_POOL_SIZE = 1; 141 142 private static final Logger LOG = LoggerFactory.getLogger(IncrementCoalescer.class); 143 144 public IncrementCoalescer(ThriftHBaseServiceHandler hand) { 145 this.handler = hand; 146 LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(); 147 pool = new ThreadPoolExecutor(CORE_POOL_SIZE, CORE_POOL_SIZE, 50, TimeUnit.MILLISECONDS, queue, 148 new ThreadFactoryBuilder().setNameFormat("IncrementCoalescer-pool-%d").setDaemon(true) 149 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 150 MBeans.register("thrift", "Thrift", this); 151 } 152 153 public boolean queueIncrement(TIncrement inc) { 154 if (!canQueue()) { 155 failedIncrements.increment(); 156 return false; 157 } 158 return internalQueueTincrement(inc); 159 } 160 161 public boolean queueIncrements(List<TIncrement> incs) { 162 if (!canQueue()) { 163 failedIncrements.increment(); 164 return false; 165 } 166 167 for (TIncrement tinc : incs) { 168 internalQueueTincrement(tinc); 169 } 170 return true; 171 172 } 173 174 private boolean internalQueueTincrement(TIncrement inc) { 175 byte[][] famAndQf = CellUtil.parseColumn(inc.getColumn()); 176 if (famAndQf.length != 2) return false; 177 178 return internalQueueIncrement(inc.getTable(), inc.getRow(), famAndQf[0], famAndQf[1], 179 inc.getAmmount()); 180 } 181 182 @SuppressWarnings("FutureReturnValueIgnored") 183 private boolean internalQueueIncrement(byte[] tableName, byte[] rowKey, byte[] fam, 184 byte[] qual, long ammount) { 185 int countersMapSize = countersMap.size(); 186 187 188 //Make sure that the number of threads is scaled. 189 dynamicallySetCoreSize(countersMapSize); 190 191 totalIncrements.increment(); 192 193 FullyQualifiedRow key = new FullyQualifiedRow(tableName, rowKey, fam, qual); 194 195 long currentAmount = ammount; 196 // Spin until able to insert the value back without collisions 197 while (true) { 198 Long value = countersMap.remove(key); 199 if (value == null) { 200 // There was nothing there, create a new value 201 value = currentAmount; 202 } else { 203 value += currentAmount; 204 successfulCoalescings.increment(); 205 } 206 // Try to put the value, only if there was none 207 Long oldValue = countersMap.putIfAbsent(key, value); 208 if (oldValue == null) { 209 // We were able to put it in, we're done 210 break; 211 } 212 // Someone else was able to put a value in, so let's remember our 213 // current value (plus what we picked up) and retry to add it in 214 currentAmount = value; 215 } 216 217 // We limit the size of the queue simply because all we need is a 218 // notification that something needs to be incremented. No need 219 // for millions of callables that mean the same thing. 220 if (pool.getQueue().size() <= 1000) { 221 // queue it up 222 Callable<Integer> callable = createIncCallable(); 223 pool.submit(callable); 224 } 225 226 return true; 227 } 228 229 public boolean canQueue() { 230 return countersMap.size() < maxQueueSize; 231 } 232 233 private Callable<Integer> createIncCallable() { 234 return () -> { 235 int failures = 0; 236 Set<FullyQualifiedRow> keys = countersMap.keySet(); 237 for (FullyQualifiedRow row : keys) { 238 Long counter = countersMap.remove(row); 239 if (counter == null) { 240 continue; 241 } 242 Table table = null; 243 try { 244 table = handler.getTable(row.getTable()); 245 if (failures > 2) { 246 throw new IOException("Auto-Fail rest of ICVs"); 247 } 248 table.incrementColumnValue(row.getRowKey(), row.getFamily(), row.getQualifier(), 249 counter); 250 } catch (IOException e) { 251 // log failure of increment 252 failures++; 253 LOG.error("FAILED_ICV: " + Bytes.toString(row.getTable()) + ", " 254 + Bytes.toStringBinary(row.getRowKey()) + ", " 255 + Bytes.toStringBinary(row.getFamily()) + ", " 256 + Bytes.toStringBinary(row.getQualifier()) + ", " + counter, e); 257 } finally{ 258 if(table != null){ 259 table.close(); 260 } 261 } 262 } 263 return failures; 264 }; 265 } 266 267 /** 268 * This method samples the incoming requests and, if selected, will check if 269 * the corePoolSize should be changed. 270 * @param countersMapSize 271 */ 272 private void dynamicallySetCoreSize(int countersMapSize) { 273 // Here we are using countersMapSize as a random number, meaning this 274 // could be a Random object 275 if (countersMapSize % 10 != 0) { 276 return; 277 } 278 double currentRatio = (double) countersMapSize / (double) maxQueueSize; 279 int newValue = 1; 280 if (currentRatio < 0.1) { 281 // it's 1 282 } else if (currentRatio < 0.3) { 283 newValue = 2; 284 } else if (currentRatio < 0.5) { 285 newValue = 4; 286 } else if (currentRatio < 0.7) { 287 newValue = 8; 288 } else if (currentRatio < 0.9) { 289 newValue = 14; 290 } else { 291 newValue = 22; 292 } 293 if (pool.getCorePoolSize() != newValue) { 294 pool.setCorePoolSize(newValue); 295 } 296 } 297 298 // MBean get/set methods 299 @Override 300 public int getQueueSize() { 301 return pool.getQueue().size(); 302 } 303 304 @Override 305 public int getMaxQueueSize() { 306 return this.maxQueueSize; 307 } 308 309 @Override 310 public void setMaxQueueSize(int newSize) { 311 this.maxQueueSize = newSize; 312 } 313 314 @Override 315 public long getPoolCompletedTaskCount() { 316 return pool.getCompletedTaskCount(); 317 } 318 319 @Override 320 public long getPoolTaskCount() { 321 return pool.getTaskCount(); 322 } 323 324 @Override 325 public int getPoolLargestPoolSize() { 326 return pool.getLargestPoolSize(); 327 } 328 329 @Override 330 public int getCorePoolSize() { 331 return pool.getCorePoolSize(); 332 } 333 334 @Override 335 public void setCorePoolSize(int newCoreSize) { 336 pool.setCorePoolSize(newCoreSize); 337 } 338 339 @Override 340 public int getMaxPoolSize() { 341 return pool.getMaximumPoolSize(); 342 } 343 344 @Override 345 public void setMaxPoolSize(int newMaxSize) { 346 pool.setMaximumPoolSize(newMaxSize); 347 } 348 349 @Override 350 public long getFailedIncrements() { 351 return failedIncrements.sum(); 352 } 353 354 @Override 355 public long getSuccessfulCoalescings() { 356 return successfulCoalescings.sum(); 357 } 358 359 @Override 360 public long getTotalIncrements() { 361 return totalIncrements.sum(); 362 } 363 364 @Override 365 public long getCountersMapSize() { 366 return countersMap.size(); 367 } 368}