001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.thrift; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.List; 023import java.util.Set; 024import java.util.concurrent.Callable; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.ConcurrentMap; 027import java.util.concurrent.LinkedBlockingQueue; 028import java.util.concurrent.ThreadPoolExecutor; 029import java.util.concurrent.TimeUnit; 030import java.util.concurrent.atomic.LongAdder; 031import org.apache.hadoop.hbase.CellUtil; 032import org.apache.hadoop.hbase.client.Table; 033import org.apache.hadoop.hbase.thrift.generated.TIncrement; 034import org.apache.hadoop.hbase.util.Bytes; 035import org.apache.hadoop.hbase.util.Threads; 036import org.apache.hadoop.metrics2.util.MBeans; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041/** 042 * This class will coalesce increments from a thift server if 043 * hbase.regionserver.thrift.coalesceIncrement is set to true. Turning this 044 * config to true will cause the thrift server to queue increments into an 045 * instance of this class. The thread pool associated with this class will drain 046 * the coalesced increments as the thread is able. This can cause data loss if the 047 * thrift server dies or is shut down before everything in the queue is drained. 048 * 049 */ 050@InterfaceAudience.Private 051public class IncrementCoalescer implements IncrementCoalescerMBean { 052 /** 053 * Used to identify a cell that will be incremented. 054 * 055 */ 056 static class FullyQualifiedRow { 057 private byte[] table; 058 private byte[] rowKey; 059 private byte[] family; 060 private byte[] qualifier; 061 062 public FullyQualifiedRow(byte[] table, byte[] rowKey, byte[] fam, byte[] qual) { 063 super(); 064 this.table = table; 065 this.rowKey = rowKey; 066 this.family = fam; 067 this.qualifier = qual; 068 } 069 070 public byte[] getTable() { 071 return table; 072 } 073 074 public void setTable(byte[] table) { 075 this.table = table; 076 } 077 078 public byte[] getRowKey() { 079 return rowKey; 080 } 081 082 public byte[] getFamily() { 083 return family; 084 } 085 086 public void setFamily(byte[] fam) { 087 this.family = fam; 088 } 089 090 public byte[] getQualifier() { 091 return qualifier; 092 } 093 094 public void setQualifier(byte[] qual) { 095 this.qualifier = qual; 096 } 097 098 @Override 099 public int hashCode() { 100 final int prime = 31; 101 int result = 1; 102 result = prime * result + Arrays.hashCode(family); 103 result = prime * result + Arrays.hashCode(qualifier); 104 result = prime * result + Arrays.hashCode(rowKey); 105 result = prime * result + Arrays.hashCode(table); 106 return result; 107 } 108 109 @Override 110 public boolean equals(Object obj) { 111 if (this == obj) return true; 112 if (obj == null) return false; 113 if (getClass() != obj.getClass()) return false; 114 FullyQualifiedRow other = (FullyQualifiedRow) obj; 115 116 if (!Arrays.equals(family, other.family)) { 117 return false; 118 } 119 if (!Arrays.equals(qualifier, other.qualifier)) { 120 return false; 121 } 122 if (!Arrays.equals(rowKey, other.rowKey)) { 123 return false; 124 } 125 126 return Arrays.equals(table, other.table); 127 } 128 } 129 130 private final LongAdder failedIncrements = new LongAdder(); 131 private final LongAdder successfulCoalescings = new LongAdder(); 132 private final LongAdder totalIncrements = new LongAdder(); 133 private final ConcurrentMap<FullyQualifiedRow, Long> countersMap = 134 new ConcurrentHashMap<>(100000, 0.75f, 1500); 135 private final ThreadPoolExecutor pool; 136 private final ThriftHBaseServiceHandler handler; 137 138 private int maxQueueSize = 500000; 139 private static final int CORE_POOL_SIZE = 1; 140 141 private static final Logger LOG = LoggerFactory.getLogger(IncrementCoalescer.class); 142 143 public IncrementCoalescer(ThriftHBaseServiceHandler hand) { 144 this.handler = hand; 145 LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(); 146 pool = new ThreadPoolExecutor(CORE_POOL_SIZE, CORE_POOL_SIZE, 50, 147 TimeUnit.MILLISECONDS, queue, 148 Threads.newDaemonThreadFactory("IncrementCoalescer")); 149 MBeans.register("thrift", "Thrift", this); 150 } 151 152 public boolean queueIncrement(TIncrement inc) { 153 if (!canQueue()) { 154 failedIncrements.increment(); 155 return false; 156 } 157 return internalQueueTincrement(inc); 158 } 159 160 public boolean queueIncrements(List<TIncrement> incs) { 161 if (!canQueue()) { 162 failedIncrements.increment(); 163 return false; 164 } 165 166 for (TIncrement tinc : incs) { 167 internalQueueTincrement(tinc); 168 } 169 return true; 170 171 } 172 173 private boolean internalQueueTincrement(TIncrement inc) { 174 byte[][] famAndQf = CellUtil.parseColumn(inc.getColumn()); 175 if (famAndQf.length != 2) return false; 176 177 return internalQueueIncrement(inc.getTable(), inc.getRow(), famAndQf[0], famAndQf[1], 178 inc.getAmmount()); 179 } 180 181 @SuppressWarnings("FutureReturnValueIgnored") 182 private boolean internalQueueIncrement(byte[] tableName, byte[] rowKey, byte[] fam, 183 byte[] qual, long ammount) { 184 int countersMapSize = countersMap.size(); 185 186 187 //Make sure that the number of threads is scaled. 188 dynamicallySetCoreSize(countersMapSize); 189 190 totalIncrements.increment(); 191 192 FullyQualifiedRow key = new FullyQualifiedRow(tableName, rowKey, fam, qual); 193 194 long currentAmount = ammount; 195 // Spin until able to insert the value back without collisions 196 while (true) { 197 Long value = countersMap.remove(key); 198 if (value == null) { 199 // There was nothing there, create a new value 200 value = currentAmount; 201 } else { 202 value += currentAmount; 203 successfulCoalescings.increment(); 204 } 205 // Try to put the value, only if there was none 206 Long oldValue = countersMap.putIfAbsent(key, value); 207 if (oldValue == null) { 208 // We were able to put it in, we're done 209 break; 210 } 211 // Someone else was able to put a value in, so let's remember our 212 // current value (plus what we picked up) and retry to add it in 213 currentAmount = value; 214 } 215 216 // We limit the size of the queue simply because all we need is a 217 // notification that something needs to be incremented. No need 218 // for millions of callables that mean the same thing. 219 if (pool.getQueue().size() <= 1000) { 220 // queue it up 221 Callable<Integer> callable = createIncCallable(); 222 pool.submit(callable); 223 } 224 225 return true; 226 } 227 228 public boolean canQueue() { 229 return countersMap.size() < maxQueueSize; 230 } 231 232 private Callable<Integer> createIncCallable() { 233 return () -> { 234 int failures = 0; 235 Set<FullyQualifiedRow> keys = countersMap.keySet(); 236 for (FullyQualifiedRow row : keys) { 237 Long counter = countersMap.remove(row); 238 if (counter == null) { 239 continue; 240 } 241 Table table = null; 242 try { 243 table = handler.getTable(row.getTable()); 244 if (failures > 2) { 245 throw new IOException("Auto-Fail rest of ICVs"); 246 } 247 table.incrementColumnValue(row.getRowKey(), row.getFamily(), row.getQualifier(), 248 counter); 249 } catch (IOException e) { 250 // log failure of increment 251 failures++; 252 LOG.error("FAILED_ICV: " + Bytes.toString(row.getTable()) + ", " 253 + Bytes.toStringBinary(row.getRowKey()) + ", " 254 + Bytes.toStringBinary(row.getFamily()) + ", " 255 + Bytes.toStringBinary(row.getQualifier()) + ", " + counter, e); 256 } finally{ 257 if(table != null){ 258 table.close(); 259 } 260 } 261 } 262 return failures; 263 }; 264 } 265 266 /** 267 * This method samples the incoming requests and, if selected, will check if 268 * the corePoolSize should be changed. 269 * @param countersMapSize 270 */ 271 private void dynamicallySetCoreSize(int countersMapSize) { 272 // Here we are using countersMapSize as a random number, meaning this 273 // could be a Random object 274 if (countersMapSize % 10 != 0) { 275 return; 276 } 277 double currentRatio = (double) countersMapSize / (double) maxQueueSize; 278 int newValue = 1; 279 if (currentRatio < 0.1) { 280 // it's 1 281 } else if (currentRatio < 0.3) { 282 newValue = 2; 283 } else if (currentRatio < 0.5) { 284 newValue = 4; 285 } else if (currentRatio < 0.7) { 286 newValue = 8; 287 } else if (currentRatio < 0.9) { 288 newValue = 14; 289 } else { 290 newValue = 22; 291 } 292 if (pool.getCorePoolSize() != newValue) { 293 pool.setCorePoolSize(newValue); 294 } 295 } 296 297 // MBean get/set methods 298 @Override 299 public int getQueueSize() { 300 return pool.getQueue().size(); 301 } 302 303 @Override 304 public int getMaxQueueSize() { 305 return this.maxQueueSize; 306 } 307 308 @Override 309 public void setMaxQueueSize(int newSize) { 310 this.maxQueueSize = newSize; 311 } 312 313 @Override 314 public long getPoolCompletedTaskCount() { 315 return pool.getCompletedTaskCount(); 316 } 317 318 @Override 319 public long getPoolTaskCount() { 320 return pool.getTaskCount(); 321 } 322 323 @Override 324 public int getPoolLargestPoolSize() { 325 return pool.getLargestPoolSize(); 326 } 327 328 @Override 329 public int getCorePoolSize() { 330 return pool.getCorePoolSize(); 331 } 332 333 @Override 334 public void setCorePoolSize(int newCoreSize) { 335 pool.setCorePoolSize(newCoreSize); 336 } 337 338 @Override 339 public int getMaxPoolSize() { 340 return pool.getMaximumPoolSize(); 341 } 342 343 @Override 344 public void setMaxPoolSize(int newMaxSize) { 345 pool.setMaximumPoolSize(newMaxSize); 346 } 347 348 @Override 349 public long getFailedIncrements() { 350 return failedIncrements.sum(); 351 } 352 353 @Override 354 public long getSuccessfulCoalescings() { 355 return successfulCoalescings.sum(); 356 } 357 358 @Override 359 public long getTotalIncrements() { 360 return totalIncrements.sum(); 361 } 362 363 @Override 364 public long getCountersMapSize() { 365 return countersMap.size(); 366 } 367}