001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT;
021import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO;
022
023import java.io.IOException;
024import java.io.PrintWriter;
025import java.io.StringWriter;
026import java.util.Arrays;
027import java.util.HashSet;
028import java.util.Map;
029import java.util.Random;
030import java.util.Set;
031import java.util.concurrent.ThreadLocalRandom;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.CellUtil;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.Append;
038import org.apache.hadoop.hbase.client.Delete;
039import org.apache.hadoop.hbase.client.Get;
040import org.apache.hadoop.hbase.client.Increment;
041import org.apache.hadoop.hbase.client.Mutation;
042import org.apache.hadoop.hbase.client.Put;
043import org.apache.hadoop.hbase.client.Result;
044import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
045import org.apache.hadoop.hbase.client.Table;
046import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
047import org.apache.hadoop.util.StringUtils;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
052
053import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
054
055/** Creates multiple threads that write key/values into the */
056public class MultiThreadedUpdater extends MultiThreadedWriterBase {
057  private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedUpdater.class);
058
059  protected Set<HBaseUpdaterThread> updaters = new HashSet<>();
060
061  private MultiThreadedWriterBase writer = null;
062  private boolean isBatchUpdate = false;
063  private boolean ignoreNonceConflicts = false;
064  private final double updatePercent;
065
066  public MultiThreadedUpdater(LoadTestDataGenerator dataGen, Configuration conf,
067    TableName tableName, double updatePercent) throws IOException {
068    super(dataGen, conf, tableName, "U");
069    this.updatePercent = updatePercent;
070  }
071
072  /** Use batch vs. separate updates for every column in a row */
073  public void setBatchUpdate(boolean isBatchUpdate) {
074    this.isBatchUpdate = isBatchUpdate;
075  }
076
077  public void linkToWriter(MultiThreadedWriterBase writer) {
078    this.writer = writer;
079    writer.setTrackWroteKeys(true);
080  }
081
082  @Override
083  public void start(long startKey, long endKey, int numThreads) throws IOException {
084    super.start(startKey, endKey, numThreads);
085
086    if (verbose) {
087      LOG.debug("Updating keys [" + startKey + ", " + endKey + ")");
088    }
089
090    addUpdaterThreads(numThreads);
091
092    startThreads(updaters);
093  }
094
095  protected void addUpdaterThreads(int numThreads) throws IOException {
096    for (int i = 0; i < numThreads; ++i) {
097      HBaseUpdaterThread updater = new HBaseUpdaterThread(i);
098      updaters.add(updater);
099    }
100  }
101
102  private long getNextKeyToUpdate() {
103    if (writer == null) {
104      return nextKeyToWrite.getAndIncrement();
105    }
106    synchronized (this) {
107      if (nextKeyToWrite.get() >= endKey) {
108        // Finished the whole key range
109        return endKey;
110      }
111      while (nextKeyToWrite.get() > writer.wroteUpToKey()) {
112        Threads.sleepWithoutInterrupt(100);
113      }
114      long k = nextKeyToWrite.getAndIncrement();
115      if (writer.failedToWriteKey(k)) {
116        failedKeySet.add(k);
117        return getNextKeyToUpdate();
118      }
119      return k;
120    }
121  }
122
123  protected class HBaseUpdaterThread extends Thread {
124    protected final Table table;
125
126    public HBaseUpdaterThread(int updaterId) throws IOException {
127      setName(getClass().getSimpleName() + "_" + updaterId);
128      table = createTable();
129    }
130
131    protected Table createTable() throws IOException {
132      return connection.getTable(tableName);
133    }
134
135    @Override
136    public void run() {
137      try {
138        Random rand = ThreadLocalRandom.current();
139        long rowKeyBase;
140        StringBuilder buf = new StringBuilder();
141        byte[][] columnFamilies = dataGenerator.getColumnFamilies();
142        while ((rowKeyBase = getNextKeyToUpdate()) < endKey) {
143          if (rand.nextInt(100) < updatePercent) {
144            byte[] rowKey = dataGenerator.getDeterministicUniqueKey(rowKeyBase);
145            Increment inc = new Increment(rowKey);
146            Append app = new Append(rowKey);
147            numKeys.addAndGet(1);
148            int columnCount = 0;
149            for (byte[] cf : columnFamilies) {
150              long cfHash = Arrays.hashCode(cf);
151              inc.addColumn(cf, INCREMENT, cfHash);
152              buf.setLength(0); // Clear the buffer
153              buf.append("#").append(Bytes.toString(INCREMENT));
154              buf.append(":").append(MutationType.INCREMENT.getNumber());
155              app.addColumn(cf, MUTATE_INFO, Bytes.toBytes(buf.toString()));
156              ++columnCount;
157              if (!isBatchUpdate) {
158                mutate(table, inc, rowKeyBase);
159                numCols.addAndGet(1);
160                inc = new Increment(rowKey);
161                mutate(table, app, rowKeyBase);
162                numCols.addAndGet(1);
163                app = new Append(rowKey);
164              }
165              Get get = new Get(rowKey);
166              get.addFamily(cf);
167              try {
168                get = dataGenerator.beforeGet(rowKeyBase, get);
169              } catch (Exception e) {
170                // Ideally wont happen
171                LOG.warn("Failed to modify the get from the load generator  = ["
172                  + Bytes.toString(get.getRow()) + "], column family = [" + Bytes.toString(cf)
173                  + "]", e);
174              }
175              Result result = getRow(get, rowKeyBase, cf);
176              Map<byte[], byte[]> columnValues = result != null ? result.getFamilyMap(cf) : null;
177              if (columnValues == null) {
178                int specialPermCellInsertionFactor = Integer.parseInt(dataGenerator.getArgs()[2]);
179                if (((int) rowKeyBase % specialPermCellInsertionFactor == 0)) {
180                  LOG.info("Null result expected for the rowkey " + Bytes.toString(rowKey));
181                } else {
182                  failedKeySet.add(rowKeyBase);
183                  LOG.error("Failed to update the row with key = [" + Bytes.toString(rowKey)
184                    + "], since we could not get the original row");
185                }
186              }
187              if (columnValues != null) {
188                for (byte[] column : columnValues.keySet()) {
189                  if (Bytes.equals(column, INCREMENT) || Bytes.equals(column, MUTATE_INFO)) {
190                    continue;
191                  }
192                  MutationType mt =
193                    MutationType.values()[rand.nextInt(MutationType.values().length)];
194                  long columnHash = Arrays.hashCode(column);
195                  long hashCode = cfHash + columnHash;
196                  byte[] hashCodeBytes = Bytes.toBytes(hashCode);
197                  byte[] checkedValue = HConstants.EMPTY_BYTE_ARRAY;
198                  if (hashCode % 2 == 0) {
199                    Cell kv = result.getColumnLatestCell(cf, column);
200                    checkedValue = kv != null ? CellUtil.cloneValue(kv) : null;
201                    Preconditions.checkNotNull(checkedValue,
202                      "Column value to be checked should not be null");
203                  }
204                  buf.setLength(0); // Clear the buffer
205                  buf.append("#").append(Bytes.toString(column)).append(":");
206                  ++columnCount;
207                  switch (mt) {
208                    case PUT:
209                      Put put = new Put(rowKey);
210                      put.addColumn(cf, column, hashCodeBytes);
211                      mutate(table, put, rowKeyBase, rowKey, cf, column, checkedValue);
212                      buf.append(MutationType.PUT.getNumber());
213                      break;
214                    case DELETE:
215                      Delete delete = new Delete(rowKey);
216                      // Delete all versions since a put
217                      // could be called multiple times if CM is used
218                      delete.addColumns(cf, column);
219                      mutate(table, delete, rowKeyBase, rowKey, cf, column, checkedValue);
220                      buf.append(MutationType.DELETE.getNumber());
221                      break;
222                    default:
223                      buf.append(MutationType.APPEND.getNumber());
224                      app.addColumn(cf, column, hashCodeBytes);
225                  }
226                  app.addColumn(cf, MUTATE_INFO, Bytes.toBytes(buf.toString()));
227                  if (!isBatchUpdate) {
228                    mutate(table, app, rowKeyBase);
229                    numCols.addAndGet(1);
230                    app = new Append(rowKey);
231                  }
232                }
233              }
234            }
235            if (isBatchUpdate) {
236              if (verbose) {
237                LOG.debug("Preparing increment and append for key = [" + Bytes.toString(rowKey)
238                  + "], " + columnCount + " columns");
239              }
240              mutate(table, inc, rowKeyBase);
241              mutate(table, app, rowKeyBase);
242              numCols.addAndGet(columnCount);
243            }
244          }
245          if (trackWroteKeys) {
246            wroteKeys.add(rowKeyBase);
247          }
248        }
249      } finally {
250        closeHTable();
251        numThreadsWorking.decrementAndGet();
252      }
253    }
254
255    protected void closeHTable() {
256      try {
257        if (table != null) {
258          table.close();
259        }
260      } catch (IOException e) {
261        LOG.error("Error closing table", e);
262      }
263    }
264
265    protected Result getRow(Get get, long rowKeyBase, byte[] cf) {
266      Result result = null;
267      try {
268        result = table.get(get);
269      } catch (IOException ie) {
270        LOG.warn("Failed to get the row for key = [" + Bytes.toString(get.getRow())
271          + "], column family = [" + Bytes.toString(cf) + "]", ie);
272      }
273      return result;
274    }
275
276    public void mutate(Table table, Mutation m, long keyBase) {
277      mutate(table, m, keyBase, null, null, null, null);
278    }
279
280    public void mutate(Table table, Mutation m, long keyBase, byte[] row, byte[] cf, byte[] q,
281      byte[] v) {
282      long start = EnvironmentEdgeManager.currentTime();
283      try {
284        m = dataGenerator.beforeMutate(keyBase, m);
285        if (m instanceof Increment) {
286          table.increment((Increment) m);
287        } else if (m instanceof Append) {
288          table.append((Append) m);
289        } else if (m instanceof Put) {
290          table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenPut((Put) m);
291        } else if (m instanceof Delete) {
292          table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenDelete((Delete) m);
293        } else {
294          throw new IllegalArgumentException(
295            "unsupported mutation " + m.getClass().getSimpleName());
296        }
297        totalOpTimeMs.addAndGet(EnvironmentEdgeManager.currentTime() - start);
298      } catch (IOException e) {
299        if (ignoreNonceConflicts) {
300          LOG.info("Detected nonce conflict, ignoring: " + e.getMessage());
301          totalOpTimeMs.addAndGet(EnvironmentEdgeManager.currentTime() - start);
302          return;
303        }
304        failedKeySet.add(keyBase);
305        String exceptionInfo;
306        if (e instanceof RetriesExhaustedWithDetailsException) {
307          RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e;
308          exceptionInfo = aggEx.getExhaustiveDescription();
309        } else {
310          exceptionInfo = StringUtils.stringifyException(e);
311        }
312        LOG.error("Failed to mutate: " + keyBase + " after "
313          + (EnvironmentEdgeManager.currentTime() - start) + "ms; region information: "
314          + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: " + exceptionInfo);
315      }
316    }
317  }
318
319  @Override
320  public void waitForFinish() {
321    super.waitForFinish();
322    System.out.println("Failed to update keys: " + failedKeySet.size());
323    for (Long key : failedKeySet) {
324      System.out.println("Failed to update key: " + key);
325    }
326  }
327
328  public void mutate(Table table, Mutation m, long keyBase) {
329    mutate(table, m, keyBase, null, null, null, null);
330  }
331
332  public void mutate(Table table, Mutation m, long keyBase, byte[] row, byte[] cf, byte[] q,
333    byte[] v) {
334    long start = EnvironmentEdgeManager.currentTime();
335    try {
336      m = dataGenerator.beforeMutate(keyBase, m);
337      if (m instanceof Increment) {
338        table.increment((Increment) m);
339      } else if (m instanceof Append) {
340        table.append((Append) m);
341      } else if (m instanceof Put) {
342        table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenPut((Put) m);
343      } else if (m instanceof Delete) {
344        table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenDelete((Delete) m);
345      } else {
346        throw new IllegalArgumentException("unsupported mutation " + m.getClass().getSimpleName());
347      }
348      totalOpTimeMs.addAndGet(EnvironmentEdgeManager.currentTime() - start);
349    } catch (IOException e) {
350      failedKeySet.add(keyBase);
351      String exceptionInfo;
352      if (e instanceof RetriesExhaustedWithDetailsException) {
353        RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e;
354        exceptionInfo = aggEx.getExhaustiveDescription();
355      } else {
356        StringWriter stackWriter = new StringWriter();
357        PrintWriter pw = new PrintWriter(stackWriter);
358        e.printStackTrace(pw);
359        pw.flush();
360        exceptionInfo = StringUtils.stringifyException(e);
361      }
362      LOG.error("Failed to mutate: " + keyBase + " after "
363        + (EnvironmentEdgeManager.currentTime() - start) + "ms; region information: "
364        + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: " + exceptionInfo);
365    }
366  }
367
368  public void setIgnoreNonceConflicts(boolean value) {
369    this.ignoreNonceConflicts = value;
370  }
371}