001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT; 021import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO; 022 023import java.io.IOException; 024import java.io.PrintWriter; 025import java.io.StringWriter; 026import java.util.Arrays; 027import java.util.HashSet; 028import java.util.Map; 029import java.util.Random; 030import java.util.Set; 031import java.util.concurrent.ThreadLocalRandom; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.CellUtil; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.client.Append; 038import org.apache.hadoop.hbase.client.Delete; 039import org.apache.hadoop.hbase.client.Get; 040import org.apache.hadoop.hbase.client.Increment; 041import org.apache.hadoop.hbase.client.Mutation; 042import org.apache.hadoop.hbase.client.Put; 043import org.apache.hadoop.hbase.client.Result; 044import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; 045import org.apache.hadoop.hbase.client.Table; 046import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; 047import org.apache.hadoop.util.StringUtils; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 052 053import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 054 055/** Creates multiple threads that write key/values into the */ 056public class MultiThreadedUpdater extends MultiThreadedWriterBase { 057 private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedUpdater.class); 058 059 protected Set<HBaseUpdaterThread> updaters = new HashSet<>(); 060 061 private MultiThreadedWriterBase writer = null; 062 private boolean isBatchUpdate = false; 063 private boolean ignoreNonceConflicts = false; 064 private final double updatePercent; 065 066 public MultiThreadedUpdater(LoadTestDataGenerator dataGen, Configuration conf, 067 TableName tableName, double updatePercent) throws IOException { 068 super(dataGen, conf, tableName, "U"); 069 this.updatePercent = updatePercent; 070 } 071 072 /** Use batch vs. separate updates for every column in a row */ 073 public void setBatchUpdate(boolean isBatchUpdate) { 074 this.isBatchUpdate = isBatchUpdate; 075 } 076 077 public void linkToWriter(MultiThreadedWriterBase writer) { 078 this.writer = writer; 079 writer.setTrackWroteKeys(true); 080 } 081 082 @Override 083 public void start(long startKey, long endKey, int numThreads) throws IOException { 084 super.start(startKey, endKey, numThreads); 085 086 if (verbose) { 087 LOG.debug("Updating keys [" + startKey + ", " + endKey + ")"); 088 } 089 090 addUpdaterThreads(numThreads); 091 092 startThreads(updaters); 093 } 094 095 protected void addUpdaterThreads(int numThreads) throws IOException { 096 for (int i = 0; i < numThreads; ++i) { 097 HBaseUpdaterThread updater = new HBaseUpdaterThread(i); 098 updaters.add(updater); 099 } 100 } 101 102 private long getNextKeyToUpdate() { 103 if (writer == null) { 104 return nextKeyToWrite.getAndIncrement(); 105 } 106 synchronized (this) { 107 if (nextKeyToWrite.get() >= endKey) { 108 // Finished the whole key range 109 return endKey; 110 } 111 while (nextKeyToWrite.get() > writer.wroteUpToKey()) { 112 Threads.sleepWithoutInterrupt(100); 113 } 114 long k = nextKeyToWrite.getAndIncrement(); 115 if (writer.failedToWriteKey(k)) { 116 failedKeySet.add(k); 117 return getNextKeyToUpdate(); 118 } 119 return k; 120 } 121 } 122 123 protected class HBaseUpdaterThread extends Thread { 124 protected final Table table; 125 126 public HBaseUpdaterThread(int updaterId) throws IOException { 127 setName(getClass().getSimpleName() + "_" + updaterId); 128 table = createTable(); 129 } 130 131 protected Table createTable() throws IOException { 132 return connection.getTable(tableName); 133 } 134 135 @Override 136 public void run() { 137 try { 138 Random rand = ThreadLocalRandom.current(); 139 long rowKeyBase; 140 StringBuilder buf = new StringBuilder(); 141 byte[][] columnFamilies = dataGenerator.getColumnFamilies(); 142 while ((rowKeyBase = getNextKeyToUpdate()) < endKey) { 143 if (rand.nextInt(100) < updatePercent) { 144 byte[] rowKey = dataGenerator.getDeterministicUniqueKey(rowKeyBase); 145 Increment inc = new Increment(rowKey); 146 Append app = new Append(rowKey); 147 numKeys.addAndGet(1); 148 int columnCount = 0; 149 for (byte[] cf : columnFamilies) { 150 long cfHash = Arrays.hashCode(cf); 151 inc.addColumn(cf, INCREMENT, cfHash); 152 buf.setLength(0); // Clear the buffer 153 buf.append("#").append(Bytes.toString(INCREMENT)); 154 buf.append(":").append(MutationType.INCREMENT.getNumber()); 155 app.addColumn(cf, MUTATE_INFO, Bytes.toBytes(buf.toString())); 156 ++columnCount; 157 if (!isBatchUpdate) { 158 mutate(table, inc, rowKeyBase); 159 numCols.addAndGet(1); 160 inc = new Increment(rowKey); 161 mutate(table, app, rowKeyBase); 162 numCols.addAndGet(1); 163 app = new Append(rowKey); 164 } 165 Get get = new Get(rowKey); 166 get.addFamily(cf); 167 try { 168 get = dataGenerator.beforeGet(rowKeyBase, get); 169 } catch (Exception e) { 170 // Ideally wont happen 171 LOG.warn("Failed to modify the get from the load generator = [" 172 + Bytes.toString(get.getRow()) + "], column family = [" + Bytes.toString(cf) 173 + "]", e); 174 } 175 Result result = getRow(get, rowKeyBase, cf); 176 Map<byte[], byte[]> columnValues = result != null ? result.getFamilyMap(cf) : null; 177 if (columnValues == null) { 178 int specialPermCellInsertionFactor = Integer.parseInt(dataGenerator.getArgs()[2]); 179 if (((int) rowKeyBase % specialPermCellInsertionFactor == 0)) { 180 LOG.info("Null result expected for the rowkey " + Bytes.toString(rowKey)); 181 } else { 182 failedKeySet.add(rowKeyBase); 183 LOG.error("Failed to update the row with key = [" + Bytes.toString(rowKey) 184 + "], since we could not get the original row"); 185 } 186 } 187 if (columnValues != null) { 188 for (byte[] column : columnValues.keySet()) { 189 if (Bytes.equals(column, INCREMENT) || Bytes.equals(column, MUTATE_INFO)) { 190 continue; 191 } 192 MutationType mt = 193 MutationType.values()[rand.nextInt(MutationType.values().length)]; 194 long columnHash = Arrays.hashCode(column); 195 long hashCode = cfHash + columnHash; 196 byte[] hashCodeBytes = Bytes.toBytes(hashCode); 197 byte[] checkedValue = HConstants.EMPTY_BYTE_ARRAY; 198 if (hashCode % 2 == 0) { 199 Cell kv = result.getColumnLatestCell(cf, column); 200 checkedValue = kv != null ? CellUtil.cloneValue(kv) : null; 201 Preconditions.checkNotNull(checkedValue, 202 "Column value to be checked should not be null"); 203 } 204 buf.setLength(0); // Clear the buffer 205 buf.append("#").append(Bytes.toString(column)).append(":"); 206 ++columnCount; 207 switch (mt) { 208 case PUT: 209 Put put = new Put(rowKey); 210 put.addColumn(cf, column, hashCodeBytes); 211 mutate(table, put, rowKeyBase, rowKey, cf, column, checkedValue); 212 buf.append(MutationType.PUT.getNumber()); 213 break; 214 case DELETE: 215 Delete delete = new Delete(rowKey); 216 // Delete all versions since a put 217 // could be called multiple times if CM is used 218 delete.addColumns(cf, column); 219 mutate(table, delete, rowKeyBase, rowKey, cf, column, checkedValue); 220 buf.append(MutationType.DELETE.getNumber()); 221 break; 222 default: 223 buf.append(MutationType.APPEND.getNumber()); 224 app.addColumn(cf, column, hashCodeBytes); 225 } 226 app.addColumn(cf, MUTATE_INFO, Bytes.toBytes(buf.toString())); 227 if (!isBatchUpdate) { 228 mutate(table, app, rowKeyBase); 229 numCols.addAndGet(1); 230 app = new Append(rowKey); 231 } 232 } 233 } 234 } 235 if (isBatchUpdate) { 236 if (verbose) { 237 LOG.debug("Preparing increment and append for key = [" + Bytes.toString(rowKey) 238 + "], " + columnCount + " columns"); 239 } 240 mutate(table, inc, rowKeyBase); 241 mutate(table, app, rowKeyBase); 242 numCols.addAndGet(columnCount); 243 } 244 } 245 if (trackWroteKeys) { 246 wroteKeys.add(rowKeyBase); 247 } 248 } 249 } finally { 250 closeHTable(); 251 numThreadsWorking.decrementAndGet(); 252 } 253 } 254 255 protected void closeHTable() { 256 try { 257 if (table != null) { 258 table.close(); 259 } 260 } catch (IOException e) { 261 LOG.error("Error closing table", e); 262 } 263 } 264 265 protected Result getRow(Get get, long rowKeyBase, byte[] cf) { 266 Result result = null; 267 try { 268 result = table.get(get); 269 } catch (IOException ie) { 270 LOG.warn("Failed to get the row for key = [" + Bytes.toString(get.getRow()) 271 + "], column family = [" + Bytes.toString(cf) + "]", ie); 272 } 273 return result; 274 } 275 276 public void mutate(Table table, Mutation m, long keyBase) { 277 mutate(table, m, keyBase, null, null, null, null); 278 } 279 280 public void mutate(Table table, Mutation m, long keyBase, byte[] row, byte[] cf, byte[] q, 281 byte[] v) { 282 long start = EnvironmentEdgeManager.currentTime(); 283 try { 284 m = dataGenerator.beforeMutate(keyBase, m); 285 if (m instanceof Increment) { 286 table.increment((Increment) m); 287 } else if (m instanceof Append) { 288 table.append((Append) m); 289 } else if (m instanceof Put) { 290 table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenPut((Put) m); 291 } else if (m instanceof Delete) { 292 table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenDelete((Delete) m); 293 } else { 294 throw new IllegalArgumentException( 295 "unsupported mutation " + m.getClass().getSimpleName()); 296 } 297 totalOpTimeMs.addAndGet(EnvironmentEdgeManager.currentTime() - start); 298 } catch (IOException e) { 299 if (ignoreNonceConflicts) { 300 LOG.info("Detected nonce conflict, ignoring: " + e.getMessage()); 301 totalOpTimeMs.addAndGet(EnvironmentEdgeManager.currentTime() - start); 302 return; 303 } 304 failedKeySet.add(keyBase); 305 String exceptionInfo; 306 if (e instanceof RetriesExhaustedWithDetailsException) { 307 RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e; 308 exceptionInfo = aggEx.getExhaustiveDescription(); 309 } else { 310 exceptionInfo = StringUtils.stringifyException(e); 311 } 312 LOG.error("Failed to mutate: " + keyBase + " after " 313 + (EnvironmentEdgeManager.currentTime() - start) + "ms; region information: " 314 + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: " + exceptionInfo); 315 } 316 } 317 } 318 319 @Override 320 public void waitForFinish() { 321 super.waitForFinish(); 322 System.out.println("Failed to update keys: " + failedKeySet.size()); 323 for (Long key : failedKeySet) { 324 System.out.println("Failed to update key: " + key); 325 } 326 } 327 328 public void mutate(Table table, Mutation m, long keyBase) { 329 mutate(table, m, keyBase, null, null, null, null); 330 } 331 332 public void mutate(Table table, Mutation m, long keyBase, byte[] row, byte[] cf, byte[] q, 333 byte[] v) { 334 long start = EnvironmentEdgeManager.currentTime(); 335 try { 336 m = dataGenerator.beforeMutate(keyBase, m); 337 if (m instanceof Increment) { 338 table.increment((Increment) m); 339 } else if (m instanceof Append) { 340 table.append((Append) m); 341 } else if (m instanceof Put) { 342 table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenPut((Put) m); 343 } else if (m instanceof Delete) { 344 table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenDelete((Delete) m); 345 } else { 346 throw new IllegalArgumentException("unsupported mutation " + m.getClass().getSimpleName()); 347 } 348 totalOpTimeMs.addAndGet(EnvironmentEdgeManager.currentTime() - start); 349 } catch (IOException e) { 350 failedKeySet.add(keyBase); 351 String exceptionInfo; 352 if (e instanceof RetriesExhaustedWithDetailsException) { 353 RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e; 354 exceptionInfo = aggEx.getExhaustiveDescription(); 355 } else { 356 StringWriter stackWriter = new StringWriter(); 357 PrintWriter pw = new PrintWriter(stackWriter); 358 e.printStackTrace(pw); 359 pw.flush(); 360 exceptionInfo = StringUtils.stringifyException(e); 361 } 362 LOG.error("Failed to mutate: " + keyBase + " after " 363 + (EnvironmentEdgeManager.currentTime() - start) + "ms; region information: " 364 + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: " + exceptionInfo); 365 } 366 } 367 368 public void setIgnoreNonceConflicts(boolean value) { 369 this.ignoreNonceConflicts = value; 370 } 371}