001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.util; 020 021import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT; 022import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO; 023 024import java.io.IOException; 025import java.io.PrintWriter; 026import java.io.StringWriter; 027import java.util.Arrays; 028import java.util.HashSet; 029import java.util.Map; 030import java.util.Set; 031 032import org.apache.commons.lang3.RandomUtils; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.Cell; 035import org.apache.hadoop.hbase.CellUtil; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.Append; 039import org.apache.hadoop.hbase.client.Delete; 040import org.apache.hadoop.hbase.client.Get; 041import org.apache.hadoop.hbase.client.Increment; 042import org.apache.hadoop.hbase.client.Mutation; 043import org.apache.hadoop.hbase.client.Put; 044import org.apache.hadoop.hbase.client.Result; 045import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; 046import org.apache.hadoop.hbase.client.Table; 047import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 048import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; 049import org.apache.hadoop.util.StringUtils; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 053 054/** Creates multiple threads that write key/values into the */ 055public class MultiThreadedUpdater extends MultiThreadedWriterBase { 056 private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedUpdater.class); 057 058 protected Set<HBaseUpdaterThread> updaters = new HashSet<>(); 059 060 private MultiThreadedWriterBase writer = null; 061 private boolean isBatchUpdate = false; 062 private boolean ignoreNonceConflicts = false; 063 private final double updatePercent; 064 065 public MultiThreadedUpdater(LoadTestDataGenerator dataGen, Configuration conf, 066 TableName tableName, double updatePercent) throws IOException { 067 super(dataGen, conf, tableName, "U"); 068 this.updatePercent = updatePercent; 069 } 070 071 /** Use batch vs. separate updates for every column in a row */ 072 public void setBatchUpdate(boolean isBatchUpdate) { 073 this.isBatchUpdate = isBatchUpdate; 074 } 075 076 public void linkToWriter(MultiThreadedWriterBase writer) { 077 this.writer = writer; 078 writer.setTrackWroteKeys(true); 079 } 080 081 @Override 082 public void start(long startKey, long endKey, int numThreads) throws IOException { 083 super.start(startKey, endKey, numThreads); 084 085 if (verbose) { 086 LOG.debug("Updating keys [" + startKey + ", " + endKey + ")"); 087 } 088 089 addUpdaterThreads(numThreads); 090 091 startThreads(updaters); 092 } 093 094 protected void addUpdaterThreads(int numThreads) throws IOException { 095 for (int i = 0; i < numThreads; ++i) { 096 HBaseUpdaterThread updater = new HBaseUpdaterThread(i); 097 updaters.add(updater); 098 } 099 } 100 101 private long getNextKeyToUpdate() { 102 if (writer == null) { 103 return nextKeyToWrite.getAndIncrement(); 104 } 105 synchronized (this) { 106 if (nextKeyToWrite.get() >= endKey) { 107 // Finished the whole key range 108 return endKey; 109 } 110 while (nextKeyToWrite.get() > writer.wroteUpToKey()) { 111 Threads.sleepWithoutInterrupt(100); 112 } 113 long k = nextKeyToWrite.getAndIncrement(); 114 if (writer.failedToWriteKey(k)) { 115 failedKeySet.add(k); 116 return getNextKeyToUpdate(); 117 } 118 return k; 119 } 120 } 121 122 protected class HBaseUpdaterThread extends Thread { 123 protected final Table table; 124 125 public HBaseUpdaterThread(int updaterId) throws IOException { 126 setName(getClass().getSimpleName() + "_" + updaterId); 127 table = createTable(); 128 } 129 130 protected Table createTable() throws IOException { 131 return connection.getTable(tableName); 132 } 133 134 @Override 135 public void run() { 136 try { 137 long rowKeyBase; 138 StringBuilder buf = new StringBuilder(); 139 byte[][] columnFamilies = dataGenerator.getColumnFamilies(); 140 while ((rowKeyBase = getNextKeyToUpdate()) < endKey) { 141 if (RandomUtils.nextInt(0, 100) < updatePercent) { 142 byte[] rowKey = dataGenerator.getDeterministicUniqueKey(rowKeyBase); 143 Increment inc = new Increment(rowKey); 144 Append app = new Append(rowKey); 145 numKeys.addAndGet(1); 146 int columnCount = 0; 147 for (byte[] cf : columnFamilies) { 148 long cfHash = Arrays.hashCode(cf); 149 inc.addColumn(cf, INCREMENT, cfHash); 150 buf.setLength(0); // Clear the buffer 151 buf.append("#").append(Bytes.toString(INCREMENT)); 152 buf.append(":").append(MutationType.INCREMENT.getNumber()); 153 app.addColumn(cf, MUTATE_INFO, Bytes.toBytes(buf.toString())); 154 ++columnCount; 155 if (!isBatchUpdate) { 156 mutate(table, inc, rowKeyBase); 157 numCols.addAndGet(1); 158 inc = new Increment(rowKey); 159 mutate(table, app, rowKeyBase); 160 numCols.addAndGet(1); 161 app = new Append(rowKey); 162 } 163 Get get = new Get(rowKey); 164 get.addFamily(cf); 165 try { 166 get = dataGenerator.beforeGet(rowKeyBase, get); 167 } catch (Exception e) { 168 // Ideally wont happen 169 LOG.warn("Failed to modify the get from the load generator = [" + Bytes.toString(get.getRow()) 170 + "], column family = [" + Bytes.toString(cf) + "]", e); 171 } 172 Result result = getRow(get, rowKeyBase, cf); 173 Map<byte[], byte[]> columnValues = 174 result != null ? result.getFamilyMap(cf) : null; 175 if (columnValues == null) { 176 int specialPermCellInsertionFactor = Integer.parseInt(dataGenerator.getArgs()[2]); 177 if (((int) rowKeyBase % specialPermCellInsertionFactor == 0)) { 178 LOG.info("Null result expected for the rowkey " + Bytes.toString(rowKey)); 179 } else { 180 failedKeySet.add(rowKeyBase); 181 LOG.error("Failed to update the row with key = [" + Bytes.toString(rowKey) 182 + "], since we could not get the original row"); 183 } 184 } 185 if(columnValues != null) { 186 for (byte[] column : columnValues.keySet()) { 187 if (Bytes.equals(column, INCREMENT) || Bytes.equals(column, MUTATE_INFO)) { 188 continue; 189 } 190 MutationType mt = MutationType 191 .valueOf(RandomUtils.nextInt(0, MutationType.values().length)); 192 long columnHash = Arrays.hashCode(column); 193 long hashCode = cfHash + columnHash; 194 byte[] hashCodeBytes = Bytes.toBytes(hashCode); 195 byte[] checkedValue = HConstants.EMPTY_BYTE_ARRAY; 196 if (hashCode % 2 == 0) { 197 Cell kv = result.getColumnLatestCell(cf, column); 198 checkedValue = kv != null ? CellUtil.cloneValue(kv) : null; 199 Preconditions.checkNotNull(checkedValue, 200 "Column value to be checked should not be null"); 201 } 202 buf.setLength(0); // Clear the buffer 203 buf.append("#").append(Bytes.toString(column)).append(":"); 204 ++columnCount; 205 switch (mt) { 206 case PUT: 207 Put put = new Put(rowKey); 208 put.addColumn(cf, column, hashCodeBytes); 209 mutate(table, put, rowKeyBase, rowKey, cf, column, checkedValue); 210 buf.append(MutationType.PUT.getNumber()); 211 break; 212 case DELETE: 213 Delete delete = new Delete(rowKey); 214 // Delete all versions since a put 215 // could be called multiple times if CM is used 216 delete.addColumns(cf, column); 217 mutate(table, delete, rowKeyBase, rowKey, cf, column, checkedValue); 218 buf.append(MutationType.DELETE.getNumber()); 219 break; 220 default: 221 buf.append(MutationType.APPEND.getNumber()); 222 app.addColumn(cf, column, hashCodeBytes); 223 } 224 app.addColumn(cf, MUTATE_INFO, Bytes.toBytes(buf.toString())); 225 if (!isBatchUpdate) { 226 mutate(table, app, rowKeyBase); 227 numCols.addAndGet(1); 228 app = new Append(rowKey); 229 } 230 } 231 } 232 } 233 if (isBatchUpdate) { 234 if (verbose) { 235 LOG.debug("Preparing increment and append for key = [" 236 + Bytes.toString(rowKey) + "], " + columnCount + " columns"); 237 } 238 mutate(table, inc, rowKeyBase); 239 mutate(table, app, rowKeyBase); 240 numCols.addAndGet(columnCount); 241 } 242 } 243 if (trackWroteKeys) { 244 wroteKeys.add(rowKeyBase); 245 } 246 } 247 } finally { 248 closeHTable(); 249 numThreadsWorking.decrementAndGet(); 250 } 251 } 252 253 protected void closeHTable() { 254 try { 255 if (table != null) { 256 table.close(); 257 } 258 } catch (IOException e) { 259 LOG.error("Error closing table", e); 260 } 261 } 262 263 protected Result getRow(Get get, long rowKeyBase, byte[] cf) { 264 Result result = null; 265 try { 266 result = table.get(get); 267 } catch (IOException ie) { 268 LOG.warn( 269 "Failed to get the row for key = [" + Bytes.toString(get.getRow()) + "], column family = [" 270 + Bytes.toString(cf) + "]", ie); 271 } 272 return result; 273 } 274 275 public void mutate(Table table, Mutation m, long keyBase) { 276 mutate(table, m, keyBase, null, null, null, null); 277 } 278 279 public void mutate(Table table, Mutation m, 280 long keyBase, byte[] row, byte[] cf, byte[] q, byte[] v) { 281 long start = System.currentTimeMillis(); 282 try { 283 m = dataGenerator.beforeMutate(keyBase, m); 284 if (m instanceof Increment) { 285 table.increment((Increment)m); 286 } else if (m instanceof Append) { 287 table.append((Append)m); 288 } else if (m instanceof Put) { 289 table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenPut((Put)m); 290 } else if (m instanceof Delete) { 291 table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenDelete((Delete)m); 292 } else { 293 throw new IllegalArgumentException( 294 "unsupported mutation " + m.getClass().getSimpleName()); 295 } 296 totalOpTimeMs.addAndGet(System.currentTimeMillis() - start); 297 } catch (IOException e) { 298 if (ignoreNonceConflicts) { 299 LOG.info("Detected nonce conflict, ignoring: " + e.getMessage()); 300 totalOpTimeMs.addAndGet(System.currentTimeMillis() - start); 301 return; 302 } 303 failedKeySet.add(keyBase); 304 String exceptionInfo; 305 if (e instanceof RetriesExhaustedWithDetailsException) { 306 RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e; 307 exceptionInfo = aggEx.getExhaustiveDescription(); 308 } else { 309 exceptionInfo = StringUtils.stringifyException(e); 310 } 311 LOG.error("Failed to mutate: " + keyBase + " after " + 312 (System.currentTimeMillis() - start) + 313 "ms; region information: " + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: " 314 + exceptionInfo); 315 } 316 } 317 } 318 319 @Override 320 public void waitForFinish() { 321 super.waitForFinish(); 322 System.out.println("Failed to update keys: " + failedKeySet.size()); 323 for (Long key : failedKeySet) { 324 System.out.println("Failed to update key: " + key); 325 } 326 } 327 328 public void mutate(Table table, Mutation m, long keyBase) { 329 mutate(table, m, keyBase, null, null, null, null); 330 } 331 332 public void mutate(Table table, Mutation m, 333 long keyBase, byte[] row, byte[] cf, byte[] q, byte[] v) { 334 long start = System.currentTimeMillis(); 335 try { 336 m = dataGenerator.beforeMutate(keyBase, m); 337 if (m instanceof Increment) { 338 table.increment((Increment)m); 339 } else if (m instanceof Append) { 340 table.append((Append)m); 341 } else if (m instanceof Put) { 342 table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenPut((Put)m); 343 } else if (m instanceof Delete) { 344 table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenDelete((Delete)m); 345 } else { 346 throw new IllegalArgumentException( 347 "unsupported mutation " + m.getClass().getSimpleName()); 348 } 349 totalOpTimeMs.addAndGet(System.currentTimeMillis() - start); 350 } catch (IOException e) { 351 failedKeySet.add(keyBase); 352 String exceptionInfo; 353 if (e instanceof RetriesExhaustedWithDetailsException) { 354 RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException)e; 355 exceptionInfo = aggEx.getExhaustiveDescription(); 356 } else { 357 StringWriter stackWriter = new StringWriter(); 358 PrintWriter pw = new PrintWriter(stackWriter); 359 e.printStackTrace(pw); 360 pw.flush(); 361 exceptionInfo = StringUtils.stringifyException(e); 362 } 363 LOG.error("Failed to mutate: " + keyBase + " after " + (System.currentTimeMillis() - start) + 364 "ms; region information: " + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: " 365 + exceptionInfo); 366 } 367 } 368 369 public void setIgnoreNonceConflicts(boolean value) { 370 this.ignoreNonceConflicts = value; 371 } 372}