001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.thrift; 020 021import java.io.IOException; 022import java.util.Arrays; 023import java.util.List; 024import java.util.Set; 025import java.util.concurrent.Callable; 026import java.util.concurrent.ConcurrentHashMap; 027import java.util.concurrent.ConcurrentMap; 028import java.util.concurrent.LinkedBlockingQueue; 029import java.util.concurrent.ThreadPoolExecutor; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.atomic.LongAdder; 032import org.apache.hadoop.hbase.CellUtil; 033import org.apache.hadoop.hbase.client.Table; 034import org.apache.hadoop.hbase.thrift.ThriftServerRunner.HBaseHandler; 035import org.apache.hadoop.hbase.thrift.generated.TIncrement; 036import org.apache.hadoop.hbase.util.Bytes; 037import org.apache.hadoop.hbase.util.Threads; 038import org.apache.hadoop.metrics2.util.MBeans; 039import org.apache.thrift.TException; 040import org.apache.yetus.audience.InterfaceAudience; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044/** 045 * This class will coalesce increments from a thift server if 046 * hbase.regionserver.thrift.coalesceIncrement is set to true. Turning this 047 * config to true will cause the thrift server to queue increments into an 048 * instance of this class. The thread pool associated with this class will drain 049 * the coalesced increments as the thread is able. This can cause data loss if the 050 * thrift server dies or is shut down before everything in the queue is drained. 051 * 052 */ 053@InterfaceAudience.Private 054public class IncrementCoalescer implements IncrementCoalescerMBean { 055 056 /** 057 * Used to identify a cell that will be incremented. 058 * 059 */ 060 static class FullyQualifiedRow { 061 private byte[] table; 062 private byte[] rowKey; 063 private byte[] family; 064 private byte[] qualifier; 065 066 public FullyQualifiedRow(byte[] table, byte[] rowKey, byte[] fam, byte[] qual) { 067 super(); 068 this.table = table; 069 this.rowKey = rowKey; 070 this.family = fam; 071 this.qualifier = qual; 072 } 073 074 public byte[] getTable() { 075 return table; 076 } 077 078 public void setTable(byte[] table) { 079 this.table = table; 080 } 081 082 public byte[] getRowKey() { 083 return rowKey; 084 } 085 086 public void setRowKey(byte[] rowKey) { 087 this.rowKey = rowKey; 088 } 089 090 public byte[] getFamily() { 091 return family; 092 } 093 094 public void setFamily(byte[] fam) { 095 this.family = fam; 096 } 097 098 public byte[] getQualifier() { 099 return qualifier; 100 } 101 102 public void setQualifier(byte[] qual) { 103 this.qualifier = qual; 104 } 105 106 @Override 107 public int hashCode() { 108 final int prime = 31; 109 int result = 1; 110 result = prime * result + Arrays.hashCode(family); 111 result = prime * result + Arrays.hashCode(qualifier); 112 result = prime * result + Arrays.hashCode(rowKey); 113 result = prime * result + Arrays.hashCode(table); 114 return result; 115 } 116 117 @Override 118 public boolean equals(Object obj) { 119 if (this == obj) return true; 120 if (obj == null) return false; 121 if (getClass() != obj.getClass()) return false; 122 FullyQualifiedRow other = (FullyQualifiedRow) obj; 123 if (!Arrays.equals(family, other.family)) return false; 124 if (!Arrays.equals(qualifier, other.qualifier)) return false; 125 if (!Arrays.equals(rowKey, other.rowKey)) return false; 126 if (!Arrays.equals(table, other.table)) return false; 127 return true; 128 } 129 130 } 131 132 private final LongAdder failedIncrements = new LongAdder(); 133 private final LongAdder successfulCoalescings = new LongAdder(); 134 private final LongAdder totalIncrements = new LongAdder(); 135 private final ConcurrentMap<FullyQualifiedRow, Long> countersMap = 136 new ConcurrentHashMap<>(100000, 0.75f, 1500); 137 private final ThreadPoolExecutor pool; 138 private final HBaseHandler handler; 139 140 private int maxQueueSize = 500000; 141 private static final int CORE_POOL_SIZE = 1; 142 143 private static final Logger LOG = LoggerFactory.getLogger(FullyQualifiedRow.class); 144 145 @SuppressWarnings("deprecation") 146 public IncrementCoalescer(HBaseHandler hand) { 147 this.handler = hand; 148 LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(); 149 pool = new ThreadPoolExecutor(CORE_POOL_SIZE, CORE_POOL_SIZE, 50, 150 TimeUnit.MILLISECONDS, queue, 151 Threads.newDaemonThreadFactory("IncrementCoalescer")); 152 MBeans.register("thrift", "Thrift", this); 153 } 154 155 public boolean queueIncrement(TIncrement inc) throws TException { 156 if (!canQueue()) { 157 failedIncrements.increment(); 158 return false; 159 } 160 return internalQueueTincrement(inc); 161 } 162 163 public boolean queueIncrements(List<TIncrement> incs) throws TException { 164 if (!canQueue()) { 165 failedIncrements.increment(); 166 return false; 167 } 168 169 for (TIncrement tinc : incs) { 170 internalQueueTincrement(tinc); 171 } 172 return true; 173 174 } 175 176 private boolean internalQueueTincrement(TIncrement inc) throws TException { 177 byte[][] famAndQf = CellUtil.parseColumn(inc.getColumn()); 178 if (famAndQf.length != 2) return false; 179 180 return internalQueueIncrement(inc.getTable(), inc.getRow(), famAndQf[0], famAndQf[1], 181 inc.getAmmount()); 182 } 183 184 private boolean internalQueueIncrement(byte[] tableName, byte[] rowKey, byte[] fam, 185 byte[] qual, long ammount) throws TException { 186 int countersMapSize = countersMap.size(); 187 188 189 //Make sure that the number of threads is scaled. 190 dynamicallySetCoreSize(countersMapSize); 191 192 totalIncrements.increment(); 193 194 FullyQualifiedRow key = new FullyQualifiedRow(tableName, rowKey, fam, qual); 195 196 long currentAmount = ammount; 197 // Spin until able to insert the value back without collisions 198 while (true) { 199 Long value = countersMap.remove(key); 200 if (value == null) { 201 // There was nothing there, create a new value 202 value = Long.valueOf(currentAmount); 203 } else { 204 value += currentAmount; 205 successfulCoalescings.increment(); 206 } 207 // Try to put the value, only if there was none 208 Long oldValue = countersMap.putIfAbsent(key, value); 209 if (oldValue == null) { 210 // We were able to put it in, we're done 211 break; 212 } 213 // Someone else was able to put a value in, so let's remember our 214 // current value (plus what we picked up) and retry to add it in 215 currentAmount = value; 216 } 217 218 // We limit the size of the queue simply because all we need is a 219 // notification that something needs to be incremented. No need 220 // for millions of callables that mean the same thing. 221 if (pool.getQueue().size() <= 1000) { 222 // queue it up 223 Callable<Integer> callable = createIncCallable(); 224 pool.submit(callable); 225 } 226 227 return true; 228 } 229 230 public boolean canQueue() { 231 return countersMap.size() < maxQueueSize; 232 } 233 234 private Callable<Integer> createIncCallable() { 235 return new Callable<Integer>() { 236 @Override 237 public Integer call() throws Exception { 238 int failures = 0; 239 Set<FullyQualifiedRow> keys = countersMap.keySet(); 240 for (FullyQualifiedRow row : keys) { 241 Long counter = countersMap.remove(row); 242 if (counter == null) { 243 continue; 244 } 245 Table table = null; 246 try { 247 table = handler.getTable(row.getTable()); 248 if (failures > 2) { 249 throw new IOException("Auto-Fail rest of ICVs"); 250 } 251 table.incrementColumnValue(row.getRowKey(), row.getFamily(), row.getQualifier(), 252 counter); 253 } catch (IOException e) { 254 // log failure of increment 255 failures++; 256 LOG.error("FAILED_ICV: " + Bytes.toString(row.getTable()) + ", " 257 + Bytes.toStringBinary(row.getRowKey()) + ", " 258 + Bytes.toStringBinary(row.getFamily()) + ", " 259 + Bytes.toStringBinary(row.getQualifier()) + ", " + counter, e); 260 } finally{ 261 if(table != null){ 262 table.close(); 263 } 264 } 265 } 266 return failures; 267 } 268 }; 269 } 270 271 /** 272 * This method samples the incoming requests and, if selected, will check if 273 * the corePoolSize should be changed. 274 * @param countersMapSize 275 */ 276 private void dynamicallySetCoreSize(int countersMapSize) { 277 // Here we are using countersMapSize as a random number, meaning this 278 // could be a Random object 279 if (countersMapSize % 10 != 0) { 280 return; 281 } 282 double currentRatio = (double) countersMapSize / (double) maxQueueSize; 283 int newValue = 1; 284 if (currentRatio < 0.1) { 285 // it's 1 286 } else if (currentRatio < 0.3) { 287 newValue = 2; 288 } else if (currentRatio < 0.5) { 289 newValue = 4; 290 } else if (currentRatio < 0.7) { 291 newValue = 8; 292 } else if (currentRatio < 0.9) { 293 newValue = 14; 294 } else { 295 newValue = 22; 296 } 297 if (pool.getCorePoolSize() != newValue) { 298 pool.setCorePoolSize(newValue); 299 } 300 } 301 302 // MBean get/set methods 303 @Override 304 public int getQueueSize() { 305 return pool.getQueue().size(); 306 } 307 308 @Override 309 public int getMaxQueueSize() { 310 return this.maxQueueSize; 311 } 312 313 @Override 314 public void setMaxQueueSize(int newSize) { 315 this.maxQueueSize = newSize; 316 } 317 318 @Override 319 public long getPoolCompletedTaskCount() { 320 return pool.getCompletedTaskCount(); 321 } 322 323 @Override 324 public long getPoolTaskCount() { 325 return pool.getTaskCount(); 326 } 327 328 @Override 329 public int getPoolLargestPoolSize() { 330 return pool.getLargestPoolSize(); 331 } 332 333 @Override 334 public int getCorePoolSize() { 335 return pool.getCorePoolSize(); 336 } 337 338 @Override 339 public void setCorePoolSize(int newCoreSize) { 340 pool.setCorePoolSize(newCoreSize); 341 } 342 343 @Override 344 public int getMaxPoolSize() { 345 return pool.getMaximumPoolSize(); 346 } 347 348 @Override 349 public void setMaxPoolSize(int newMaxSize) { 350 pool.setMaximumPoolSize(newMaxSize); 351 } 352 353 @Override 354 public long getFailedIncrements() { 355 return failedIncrements.sum(); 356 } 357 358 @Override 359 public long getSuccessfulCoalescings() { 360 return successfulCoalescings.sum(); 361 } 362 363 @Override 364 public long getTotalIncrements() { 365 return totalIncrements.sum(); 366 } 367 368 @Override 369 public long getCountersMapSize() { 370 return countersMap.size(); 371 } 372 373}