001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.thrift; 020 021import java.io.IOException; 022import java.util.Arrays; 023import java.util.List; 024import java.util.Set; 025import java.util.concurrent.Callable; 026import java.util.concurrent.ConcurrentHashMap; 027import java.util.concurrent.ConcurrentMap; 028import java.util.concurrent.LinkedBlockingQueue; 029import java.util.concurrent.ThreadFactory; 030import java.util.concurrent.ThreadPoolExecutor; 031import java.util.concurrent.TimeUnit; 032import java.util.concurrent.atomic.AtomicInteger; 033import java.util.concurrent.atomic.LongAdder; 034import org.apache.hadoop.hbase.CellUtil; 035import org.apache.hadoop.hbase.client.Table; 036import org.apache.hadoop.hbase.thrift.ThriftServerRunner.HBaseHandler; 037import org.apache.hadoop.hbase.thrift.generated.TIncrement; 038import org.apache.hadoop.hbase.util.Bytes; 039import org.apache.hadoop.hbase.util.Threads; 040import org.apache.hadoop.metrics2.util.MBeans; 041import org.apache.thrift.TException; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046/** 047 * This class will coalesce increments from a thift server if 048 * hbase.regionserver.thrift.coalesceIncrement is set to true. Turning this 049 * config to true will cause the thrift server to queue increments into an 050 * instance of this class. The thread pool associated with this class will drain 051 * the coalesced increments as the thread is able. This can cause data loss if the 052 * thrift server dies or is shut down before everything in the queue is drained. 053 * 054 */ 055@InterfaceAudience.Private 056public class IncrementCoalescer implements IncrementCoalescerMBean { 057 058 /** 059 * Used to identify a cell that will be incremented. 060 * 061 */ 062 static class FullyQualifiedRow { 063 private byte[] table; 064 private byte[] rowKey; 065 private byte[] family; 066 private byte[] qualifier; 067 068 public FullyQualifiedRow(byte[] table, byte[] rowKey, byte[] fam, byte[] qual) { 069 super(); 070 this.table = table; 071 this.rowKey = rowKey; 072 this.family = fam; 073 this.qualifier = qual; 074 } 075 076 public byte[] getTable() { 077 return table; 078 } 079 080 public void setTable(byte[] table) { 081 this.table = table; 082 } 083 084 public byte[] getRowKey() { 085 return rowKey; 086 } 087 088 public void setRowKey(byte[] rowKey) { 089 this.rowKey = rowKey; 090 } 091 092 public byte[] getFamily() { 093 return family; 094 } 095 096 public void setFamily(byte[] fam) { 097 this.family = fam; 098 } 099 100 public byte[] getQualifier() { 101 return qualifier; 102 } 103 104 public void setQualifier(byte[] qual) { 105 this.qualifier = qual; 106 } 107 108 @Override 109 public int hashCode() { 110 final int prime = 31; 111 int result = 1; 112 result = prime * result + Arrays.hashCode(family); 113 result = prime * result + Arrays.hashCode(qualifier); 114 result = prime * result + Arrays.hashCode(rowKey); 115 result = prime * result + Arrays.hashCode(table); 116 return result; 117 } 118 119 @Override 120 public boolean equals(Object obj) { 121 if (this == obj) return true; 122 if (obj == null) return false; 123 if (getClass() != obj.getClass()) return false; 124 FullyQualifiedRow other = (FullyQualifiedRow) obj; 125 if (!Arrays.equals(family, other.family)) return false; 126 if (!Arrays.equals(qualifier, other.qualifier)) return false; 127 if (!Arrays.equals(rowKey, other.rowKey)) return false; 128 if (!Arrays.equals(table, other.table)) return false; 129 return true; 130 } 131 132 } 133 134 static class DaemonThreadFactory implements ThreadFactory { 135 static final AtomicInteger poolNumber = new AtomicInteger(1); 136 final ThreadGroup group; 137 final AtomicInteger threadNumber = new AtomicInteger(1); 138 final String namePrefix; 139 140 DaemonThreadFactory() { 141 SecurityManager s = System.getSecurityManager(); 142 group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); 143 namePrefix = "ICV-" + poolNumber.getAndIncrement() + "-thread-"; 144 } 145 146 @Override 147 public Thread newThread(Runnable r) { 148 Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); 149 if (!t.isDaemon()) t.setDaemon(true); 150 if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); 151 return t; 152 } 153 } 154 155 private final LongAdder failedIncrements = new LongAdder(); 156 private final LongAdder successfulCoalescings = new LongAdder(); 157 private final LongAdder totalIncrements = new LongAdder(); 158 private final ConcurrentMap<FullyQualifiedRow, Long> countersMap = 159 new ConcurrentHashMap<>(100000, 0.75f, 1500); 160 private final ThreadPoolExecutor pool; 161 private final HBaseHandler handler; 162 163 private int maxQueueSize = 500000; 164 private static final int CORE_POOL_SIZE = 1; 165 166 private static final Logger LOG = LoggerFactory.getLogger(FullyQualifiedRow.class); 167 168 @SuppressWarnings("deprecation") 169 public IncrementCoalescer(HBaseHandler hand) { 170 this.handler = hand; 171 LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(); 172 pool = 173 new ThreadPoolExecutor(CORE_POOL_SIZE, CORE_POOL_SIZE, 50, TimeUnit.MILLISECONDS, queue, 174 Threads.newDaemonThreadFactory("IncrementCoalescer")); 175 176 MBeans.register("thrift", "Thrift", this); 177 } 178 179 public boolean queueIncrement(TIncrement inc) throws TException { 180 if (!canQueue()) { 181 failedIncrements.increment(); 182 return false; 183 } 184 return internalQueueTincrement(inc); 185 } 186 187 public boolean queueIncrements(List<TIncrement> incs) throws TException { 188 if (!canQueue()) { 189 failedIncrements.increment(); 190 return false; 191 } 192 193 for (TIncrement tinc : incs) { 194 internalQueueTincrement(tinc); 195 } 196 return true; 197 198 } 199 200 private boolean internalQueueTincrement(TIncrement inc) throws TException { 201 byte[][] famAndQf = CellUtil.parseColumn(inc.getColumn()); 202 if (famAndQf.length != 2) return false; 203 204 return internalQueueIncrement(inc.getTable(), inc.getRow(), famAndQf[0], famAndQf[1], 205 inc.getAmmount()); 206 } 207 208 private boolean internalQueueIncrement(byte[] tableName, byte[] rowKey, byte[] fam, 209 byte[] qual, long ammount) throws TException { 210 int countersMapSize = countersMap.size(); 211 212 213 //Make sure that the number of threads is scaled. 214 dynamicallySetCoreSize(countersMapSize); 215 216 totalIncrements.increment(); 217 218 FullyQualifiedRow key = new FullyQualifiedRow(tableName, rowKey, fam, qual); 219 220 long currentAmount = ammount; 221 // Spin until able to insert the value back without collisions 222 while (true) { 223 Long value = countersMap.remove(key); 224 if (value == null) { 225 // There was nothing there, create a new value 226 value = Long.valueOf(currentAmount); 227 } else { 228 value += currentAmount; 229 successfulCoalescings.increment(); 230 } 231 // Try to put the value, only if there was none 232 Long oldValue = countersMap.putIfAbsent(key, value); 233 if (oldValue == null) { 234 // We were able to put it in, we're done 235 break; 236 } 237 // Someone else was able to put a value in, so let's remember our 238 // current value (plus what we picked up) and retry to add it in 239 currentAmount = value; 240 } 241 242 // We limit the size of the queue simply because all we need is a 243 // notification that something needs to be incremented. No need 244 // for millions of callables that mean the same thing. 245 if (pool.getQueue().size() <= 1000) { 246 // queue it up 247 Callable<Integer> callable = createIncCallable(); 248 pool.submit(callable); 249 } 250 251 return true; 252 } 253 254 public boolean canQueue() { 255 return countersMap.size() < maxQueueSize; 256 } 257 258 private Callable<Integer> createIncCallable() { 259 return new Callable<Integer>() { 260 @Override 261 public Integer call() throws Exception { 262 int failures = 0; 263 Set<FullyQualifiedRow> keys = countersMap.keySet(); 264 for (FullyQualifiedRow row : keys) { 265 Long counter = countersMap.remove(row); 266 if (counter == null) { 267 continue; 268 } 269 Table table = null; 270 try { 271 table = handler.getTable(row.getTable()); 272 if (failures > 2) { 273 throw new IOException("Auto-Fail rest of ICVs"); 274 } 275 table.incrementColumnValue(row.getRowKey(), row.getFamily(), row.getQualifier(), 276 counter); 277 } catch (IOException e) { 278 // log failure of increment 279 failures++; 280 LOG.error("FAILED_ICV: " + Bytes.toString(row.getTable()) + ", " 281 + Bytes.toStringBinary(row.getRowKey()) + ", " 282 + Bytes.toStringBinary(row.getFamily()) + ", " 283 + Bytes.toStringBinary(row.getQualifier()) + ", " + counter, e); 284 } finally{ 285 if(table != null){ 286 table.close(); 287 } 288 } 289 } 290 return failures; 291 } 292 }; 293 } 294 295 /** 296 * This method samples the incoming requests and, if selected, will check if 297 * the corePoolSize should be changed. 298 * @param countersMapSize 299 */ 300 private void dynamicallySetCoreSize(int countersMapSize) { 301 // Here we are using countersMapSize as a random number, meaning this 302 // could be a Random object 303 if (countersMapSize % 10 != 0) { 304 return; 305 } 306 double currentRatio = (double) countersMapSize / (double) maxQueueSize; 307 int newValue = 1; 308 if (currentRatio < 0.1) { 309 // it's 1 310 } else if (currentRatio < 0.3) { 311 newValue = 2; 312 } else if (currentRatio < 0.5) { 313 newValue = 4; 314 } else if (currentRatio < 0.7) { 315 newValue = 8; 316 } else if (currentRatio < 0.9) { 317 newValue = 14; 318 } else { 319 newValue = 22; 320 } 321 if (pool.getCorePoolSize() != newValue) { 322 pool.setCorePoolSize(newValue); 323 } 324 } 325 326 // MBean get/set methods 327 @Override 328 public int getQueueSize() { 329 return pool.getQueue().size(); 330 } 331 332 @Override 333 public int getMaxQueueSize() { 334 return this.maxQueueSize; 335 } 336 337 @Override 338 public void setMaxQueueSize(int newSize) { 339 this.maxQueueSize = newSize; 340 } 341 342 @Override 343 public long getPoolCompletedTaskCount() { 344 return pool.getCompletedTaskCount(); 345 } 346 347 @Override 348 public long getPoolTaskCount() { 349 return pool.getTaskCount(); 350 } 351 352 @Override 353 public int getPoolLargestPoolSize() { 354 return pool.getLargestPoolSize(); 355 } 356 357 @Override 358 public int getCorePoolSize() { 359 return pool.getCorePoolSize(); 360 } 361 362 @Override 363 public void setCorePoolSize(int newCoreSize) { 364 pool.setCorePoolSize(newCoreSize); 365 } 366 367 @Override 368 public int getMaxPoolSize() { 369 return pool.getMaximumPoolSize(); 370 } 371 372 @Override 373 public void setMaxPoolSize(int newMaxSize) { 374 pool.setMaximumPoolSize(newMaxSize); 375 } 376 377 @Override 378 public long getFailedIncrements() { 379 return failedIncrements.sum(); 380 } 381 382 @Override 383 public long getSuccessfulCoalescings() { 384 return successfulCoalescings.sum(); 385 } 386 387 @Override 388 public long getTotalIncrements() { 389 return totalIncrements.sum(); 390 } 391 392 @Override 393 public long getCountersMapSize() { 394 return countersMap.size(); 395 } 396 397}