001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT; 021import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO; 022 023import java.io.IOException; 024import java.io.PrintWriter; 025import java.io.StringWriter; 026import java.util.Arrays; 027import java.util.HashSet; 028import java.util.Map; 029import java.util.Random; 030import java.util.Set; 031import java.util.concurrent.ThreadLocalRandom; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.CellUtil; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.client.Append; 038import org.apache.hadoop.hbase.client.Delete; 039import org.apache.hadoop.hbase.client.Get; 040import org.apache.hadoop.hbase.client.Increment; 041import org.apache.hadoop.hbase.client.Mutation; 042import org.apache.hadoop.hbase.client.Put; 043import org.apache.hadoop.hbase.client.Result; 044import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; 045import org.apache.hadoop.hbase.client.Table; 046import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; 047import org.apache.hadoop.util.StringUtils; 048import org.apache.yetus.audience.InterfaceAudience; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 053 054import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 055 056/** Creates multiple threads that write key/values into the */ 057@InterfaceAudience.Private 058public class MultiThreadedUpdater extends MultiThreadedWriterBase { 059 private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedUpdater.class); 060 061 protected Set<HBaseUpdaterThread> updaters = new HashSet<>(); 062 063 private MultiThreadedWriterBase writer = null; 064 private boolean isBatchUpdate = false; 065 private boolean ignoreNonceConflicts = false; 066 private final double updatePercent; 067 068 public MultiThreadedUpdater(LoadTestDataGenerator dataGen, Configuration conf, 069 TableName tableName, double updatePercent) throws IOException { 070 super(dataGen, conf, tableName, "U"); 071 this.updatePercent = updatePercent; 072 } 073 074 /** Use batch vs. separate updates for every column in a row */ 075 public void setBatchUpdate(boolean isBatchUpdate) { 076 this.isBatchUpdate = isBatchUpdate; 077 } 078 079 public synchronized void linkToWriter(MultiThreadedWriterBase writer) { 080 this.writer = writer; 081 writer.setTrackWroteKeys(true); 082 } 083 084 @Override 085 public void start(long startKey, long endKey, int numThreads) throws IOException { 086 super.start(startKey, endKey, numThreads); 087 088 if (verbose) { 089 LOG.debug("Updating keys [" + startKey + ", " + endKey + ")"); 090 } 091 092 addUpdaterThreads(numThreads); 093 094 startThreads(updaters); 095 } 096 097 protected void addUpdaterThreads(int numThreads) throws IOException { 098 for (int i = 0; i < numThreads; ++i) { 099 HBaseUpdaterThread updater = new HBaseUpdaterThread(i); 100 updaters.add(updater); 101 } 102 } 103 104 private long getNextKeyToUpdate() { 105 if (writer == null) { 106 return nextKeyToWrite.getAndIncrement(); 107 } 108 synchronized (this) { 109 if (nextKeyToWrite.get() >= endKey) { 110 // Finished the whole key range 111 return endKey; 112 } 113 while (nextKeyToWrite.get() > writer.wroteUpToKey()) { 114 Threads.sleepWithoutInterrupt(100); 115 } 116 long k = nextKeyToWrite.getAndIncrement(); 117 if (writer.failedToWriteKey(k)) { 118 failedKeySet.add(k); 119 return getNextKeyToUpdate(); 120 } 121 return k; 122 } 123 } 124 125 protected class HBaseUpdaterThread extends Thread { 126 protected final Table table; 127 128 public HBaseUpdaterThread(int updaterId) throws IOException { 129 setName(getClass().getSimpleName() + "_" + updaterId); 130 table = createTable(); 131 } 132 133 protected Table createTable() throws IOException { 134 return connection.getTable(tableName); 135 } 136 137 @Override 138 public void run() { 139 try { 140 Random rand = ThreadLocalRandom.current(); 141 long rowKeyBase; 142 StringBuilder buf = new StringBuilder(); 143 byte[][] columnFamilies = dataGenerator.getColumnFamilies(); 144 while ((rowKeyBase = getNextKeyToUpdate()) < endKey) { 145 if (rand.nextInt(100) < updatePercent) { 146 byte[] rowKey = dataGenerator.getDeterministicUniqueKey(rowKeyBase); 147 Increment inc = new Increment(rowKey); 148 Append app = new Append(rowKey); 149 numKeys.addAndGet(1); 150 int columnCount = 0; 151 for (byte[] cf : columnFamilies) { 152 long cfHash = Arrays.hashCode(cf); 153 inc.addColumn(cf, INCREMENT, cfHash); 154 buf.setLength(0); // Clear the buffer 155 buf.append("#").append(Bytes.toString(INCREMENT)); 156 buf.append(":").append(MutationType.INCREMENT.getNumber()); 157 app.addColumn(cf, MUTATE_INFO, Bytes.toBytes(buf.toString())); 158 ++columnCount; 159 if (!isBatchUpdate) { 160 mutate(table, inc, rowKeyBase); 161 numCols.addAndGet(1); 162 inc = new Increment(rowKey); 163 mutate(table, app, rowKeyBase); 164 numCols.addAndGet(1); 165 app = new Append(rowKey); 166 } 167 Get get = new Get(rowKey); 168 get.addFamily(cf); 169 try { 170 get = dataGenerator.beforeGet(rowKeyBase, get); 171 } catch (Exception e) { 172 // Ideally wont happen 173 LOG.warn("Failed to modify the get from the load generator = [" 174 + Bytes.toString(get.getRow()) + "], column family = [" + Bytes.toString(cf) 175 + "]", e); 176 } 177 Result result = getRow(get, rowKeyBase, cf); 178 Map<byte[], byte[]> columnValues = result != null ? result.getFamilyMap(cf) : null; 179 if (columnValues == null) { 180 int specialPermCellInsertionFactor = Integer.parseInt(dataGenerator.getArgs()[2]); 181 if (((int) rowKeyBase % specialPermCellInsertionFactor == 0)) { 182 LOG.info("Null result expected for the rowkey " + Bytes.toString(rowKey)); 183 } else { 184 failedKeySet.add(rowKeyBase); 185 LOG.error("Failed to update the row with key = [" + Bytes.toString(rowKey) 186 + "], since we could not get the original row"); 187 } 188 } 189 if (columnValues != null) { 190 for (byte[] column : columnValues.keySet()) { 191 if (Bytes.equals(column, INCREMENT) || Bytes.equals(column, MUTATE_INFO)) { 192 continue; 193 } 194 MutationType mt = 195 MutationType.values()[rand.nextInt(MutationType.values().length)]; 196 long columnHash = Arrays.hashCode(column); 197 long hashCode = cfHash + columnHash; 198 byte[] hashCodeBytes = Bytes.toBytes(hashCode); 199 byte[] checkedValue = HConstants.EMPTY_BYTE_ARRAY; 200 if (hashCode % 2 == 0) { 201 Cell kv = result.getColumnLatestCell(cf, column); 202 checkedValue = kv != null ? CellUtil.cloneValue(kv) : null; 203 Preconditions.checkNotNull(checkedValue, 204 "Column value to be checked should not be null"); 205 } 206 buf.setLength(0); // Clear the buffer 207 buf.append("#").append(Bytes.toString(column)).append(":"); 208 ++columnCount; 209 switch (mt) { 210 case PUT: 211 Put put = new Put(rowKey); 212 put.addColumn(cf, column, hashCodeBytes); 213 mutate(table, put, rowKeyBase, rowKey, cf, column, checkedValue); 214 buf.append(MutationType.PUT.getNumber()); 215 break; 216 case DELETE: 217 Delete delete = new Delete(rowKey); 218 // Delete all versions since a put 219 // could be called multiple times if CM is used 220 delete.addColumns(cf, column); 221 mutate(table, delete, rowKeyBase, rowKey, cf, column, checkedValue); 222 buf.append(MutationType.DELETE.getNumber()); 223 break; 224 default: 225 buf.append(MutationType.APPEND.getNumber()); 226 app.addColumn(cf, column, hashCodeBytes); 227 } 228 app.addColumn(cf, MUTATE_INFO, Bytes.toBytes(buf.toString())); 229 if (!isBatchUpdate) { 230 mutate(table, app, rowKeyBase); 231 numCols.addAndGet(1); 232 app = new Append(rowKey); 233 } 234 } 235 } 236 } 237 if (isBatchUpdate) { 238 if (verbose) { 239 LOG.debug("Preparing increment and append for key = [" + Bytes.toString(rowKey) 240 + "], " + columnCount + " columns"); 241 } 242 mutate(table, inc, rowKeyBase); 243 mutate(table, app, rowKeyBase); 244 numCols.addAndGet(columnCount); 245 } 246 } 247 if (trackWroteKeys) { 248 wroteKeys.add(rowKeyBase); 249 } 250 } 251 } finally { 252 closeHTable(); 253 numThreadsWorking.decrementAndGet(); 254 } 255 } 256 257 protected void closeHTable() { 258 try { 259 if (table != null) { 260 table.close(); 261 } 262 } catch (IOException e) { 263 LOG.error("Error closing table", e); 264 } 265 } 266 267 protected Result getRow(Get get, long rowKeyBase, byte[] cf) { 268 Result result = null; 269 try { 270 result = table.get(get); 271 } catch (IOException ie) { 272 LOG.warn("Failed to get the row for key = [" + Bytes.toString(get.getRow()) 273 + "], column family = [" + Bytes.toString(cf) + "]", ie); 274 } 275 return result; 276 } 277 278 public void mutate(Table table, Mutation m, long keyBase) { 279 mutate(table, m, keyBase, null, null, null, null); 280 } 281 282 public void mutate(Table table, Mutation m, long keyBase, byte[] row, byte[] cf, byte[] q, 283 byte[] v) { 284 long start = EnvironmentEdgeManager.currentTime(); 285 try { 286 m = dataGenerator.beforeMutate(keyBase, m); 287 if (m instanceof Increment) { 288 table.increment((Increment) m); 289 } else if (m instanceof Append) { 290 table.append((Append) m); 291 } else if (m instanceof Put) { 292 table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenPut((Put) m); 293 } else if (m instanceof Delete) { 294 table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenDelete((Delete) m); 295 } else { 296 throw new IllegalArgumentException( 297 "unsupported mutation " + m.getClass().getSimpleName()); 298 } 299 totalOpTimeMs.addAndGet(EnvironmentEdgeManager.currentTime() - start); 300 } catch (IOException e) { 301 if (ignoreNonceConflicts) { 302 LOG.info("Detected nonce conflict, ignoring: " + e.getMessage()); 303 totalOpTimeMs.addAndGet(EnvironmentEdgeManager.currentTime() - start); 304 return; 305 } 306 failedKeySet.add(keyBase); 307 String exceptionInfo; 308 if (e instanceof RetriesExhaustedWithDetailsException) { 309 RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e; 310 exceptionInfo = aggEx.getExhaustiveDescription(); 311 } else { 312 exceptionInfo = StringUtils.stringifyException(e); 313 } 314 LOG.error("Failed to mutate: " + keyBase + " after " 315 + (EnvironmentEdgeManager.currentTime() - start) + "ms; region information: " 316 + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: " + exceptionInfo); 317 } 318 } 319 } 320 321 @Override 322 public void waitForFinish() { 323 super.waitForFinish(); 324 System.out.println("Failed to update keys: " + failedKeySet.size()); 325 for (Long key : failedKeySet) { 326 System.out.println("Failed to update key: " + key); 327 } 328 } 329 330 public void mutate(Table table, Mutation m, long keyBase) { 331 mutate(table, m, keyBase, null, null, null, null); 332 } 333 334 public void mutate(Table table, Mutation m, long keyBase, byte[] row, byte[] cf, byte[] q, 335 byte[] v) { 336 long start = EnvironmentEdgeManager.currentTime(); 337 try { 338 m = dataGenerator.beforeMutate(keyBase, m); 339 if (m instanceof Increment) { 340 table.increment((Increment) m); 341 } else if (m instanceof Append) { 342 table.append((Append) m); 343 } else if (m instanceof Put) { 344 table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenPut((Put) m); 345 } else if (m instanceof Delete) { 346 table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenDelete((Delete) m); 347 } else { 348 throw new IllegalArgumentException("unsupported mutation " + m.getClass().getSimpleName()); 349 } 350 totalOpTimeMs.addAndGet(EnvironmentEdgeManager.currentTime() - start); 351 } catch (IOException e) { 352 failedKeySet.add(keyBase); 353 String exceptionInfo; 354 if (e instanceof RetriesExhaustedWithDetailsException) { 355 RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e; 356 exceptionInfo = aggEx.getExhaustiveDescription(); 357 } else { 358 StringWriter stackWriter = new StringWriter(); 359 PrintWriter pw = new PrintWriter(stackWriter); 360 e.printStackTrace(pw); 361 pw.flush(); 362 exceptionInfo = StringUtils.stringifyException(e); 363 } 364 LOG.error("Failed to mutate: " + keyBase + " after " 365 + (EnvironmentEdgeManager.currentTime() - start) + "ms; region information: " 366 + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: " + exceptionInfo); 367 } 368 } 369 370 public void setIgnoreNonceConflicts(boolean value) { 371 this.ignoreNonceConflicts = value; 372 } 373}