001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.trace; 019 020import io.opentelemetry.api.common.AttributeKey; 021import io.opentelemetry.api.common.Attributes; 022import io.opentelemetry.api.trace.Span; 023import io.opentelemetry.context.Scope; 024import java.io.IOException; 025import java.util.Random; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.Executors; 028import java.util.concurrent.LinkedBlockingQueue; 029import java.util.concurrent.ThreadLocalRandom; 030import java.util.concurrent.TimeUnit; 031import org.apache.commons.io.IOUtils; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.HBaseConfiguration; 034import org.apache.hadoop.hbase.IntegrationTestingUtility; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.Admin; 037import org.apache.hadoop.hbase.client.BufferedMutator; 038import org.apache.hadoop.hbase.client.Get; 039import org.apache.hadoop.hbase.client.Put; 040import org.apache.hadoop.hbase.client.Result; 041import org.apache.hadoop.hbase.client.ResultScanner; 042import org.apache.hadoop.hbase.client.Scan; 043import org.apache.hadoop.hbase.client.Table; 044import org.apache.hadoop.hbase.testclassification.IntegrationTests; 045import org.apache.hadoop.hbase.util.AbstractHBaseTool; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.hadoop.util.ToolRunner; 048import org.junit.Test; 049import org.junit.experimental.categories.Category; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 054 055@Category(IntegrationTests.class) 056public class IntegrationTestSendTraceRequests extends AbstractHBaseTool { 057 private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestSendTraceRequests.class); 058 public static final String TABLE_ARG = "t"; 059 public static final String CF_ARG = "f"; 060 061 public static final String TABLE_NAME_DEFAULT = "SendTracesTable"; 062 public static final String COLUMN_FAMILY_DEFAULT = "D"; 063 private TableName tableName = TableName.valueOf(TABLE_NAME_DEFAULT); 064 private byte[] familyName = Bytes.toBytes(COLUMN_FAMILY_DEFAULT); 065 private IntegrationTestingUtility util; 066 private Admin admin; 067 068 public static void main(String[] args) throws Exception { 069 Configuration configuration = HBaseConfiguration.create(); 070 IntegrationTestingUtility.setUseDistributedCluster(configuration); 071 IntegrationTestSendTraceRequests tool = new IntegrationTestSendTraceRequests(); 072 ToolRunner.run(configuration, tool, args); 073 } 074 075 @Override 076 protected void addOptions() { 077 addOptWithArg(TABLE_ARG, "The table name to target. Will be created if not there already."); 078 addOptWithArg(CF_ARG, "The family to target"); 079 } 080 081 @Override 082 public void processOptions(CommandLine cmd) { 083 String tableNameString = cmd.getOptionValue(TABLE_ARG, TABLE_NAME_DEFAULT); 084 String familyString = cmd.getOptionValue(CF_ARG, COLUMN_FAMILY_DEFAULT); 085 086 this.tableName = TableName.valueOf(tableNameString); 087 this.familyName = Bytes.toBytes(familyString); 088 } 089 090 @Override 091 public int doWork() throws Exception { 092 internalDoWork(); 093 return 0; 094 } 095 096 @Test 097 public void internalDoWork() throws Exception { 098 util = createUtil(); 099 admin = util.getAdmin(); 100 101 deleteTable(); 102 createTable(); 103 LinkedBlockingQueue<Long> rks = insertData(); 104 105 ExecutorService service = Executors.newFixedThreadPool(20); 106 doScans(service, rks); 107 doGets(service, rks); 108 109 service.shutdown(); 110 service.awaitTermination(100, TimeUnit.SECONDS); 111 Thread.sleep(90000); 112 util.restoreCluster(); 113 util = null; 114 } 115 116 @SuppressWarnings("FutureReturnValueIgnored") 117 private void doScans(ExecutorService service, final LinkedBlockingQueue<Long> rks) { 118 for (int i = 0; i < 100; i++) { 119 Runnable runnable = new Runnable() { 120 private final LinkedBlockingQueue<Long> rowKeyQueue = rks; 121 122 @Override 123 public void run() { 124 ResultScanner rs = null; 125 Span span = TraceUtil.getGlobalTracer().spanBuilder("Scan").startSpan(); 126 try (Scope scope = span.makeCurrent()) { 127 Table ht = util.getConnection().getTable(tableName); 128 Scan s = new Scan(); 129 s.withStartRow(Bytes.toBytes(rowKeyQueue.take())); 130 s.setBatch(7); 131 rs = ht.getScanner(s); 132 // Something to keep the jvm from removing the loop. 133 long accum = 0; 134 135 for (int x = 0; x < 1000; x++) { 136 Result r = rs.next(); 137 accum |= Bytes.toLong(r.getRow()); 138 } 139 140 span.addEvent("Accum result = " + accum); 141 142 ht.close(); 143 ht = null; 144 } catch (Exception e) { 145 span.addEvent("exception", 146 Attributes.of(AttributeKey.stringKey("exception"), e.getClass().getSimpleName())); 147 } finally { 148 span.end(); 149 if (rs != null) { 150 rs.close(); 151 } 152 } 153 } 154 }; 155 service.execute(runnable); 156 } 157 } 158 159 private void doGets(ExecutorService service, final LinkedBlockingQueue<Long> rowKeys) 160 throws IOException { 161 for (int i = 0; i < 100; i++) { 162 Runnable runnable = new Runnable() { 163 private final LinkedBlockingQueue<Long> rowKeyQueue = rowKeys; 164 165 @Override 166 public void run() { 167 Table ht = null; 168 try { 169 ht = util.getConnection().getTable(tableName); 170 long accum = 0; 171 for (int x = 0; x < 5; x++) { 172 Span span = TraceUtil.getGlobalTracer().spanBuilder("gets").startSpan(); 173 try (Scope scope = span.makeCurrent()) { 174 long rk = rowKeyQueue.take(); 175 Result r1 = ht.get(new Get(Bytes.toBytes(rk))); 176 if (r1 != null) { 177 accum |= Bytes.toLong(r1.getRow()); 178 } 179 Result r2 = ht.get(new Get(Bytes.toBytes(rk))); 180 if (r2 != null) { 181 accum |= Bytes.toLong(r2.getRow()); 182 } 183 span.addEvent("Accum = " + accum); 184 } catch (IOException | InterruptedException ie) { 185 // IGNORED 186 } finally { 187 span.end(); 188 } 189 } 190 } catch (IOException e) { 191 // IGNORED 192 } finally { 193 if (ht != null) { 194 IOUtils.closeQuietly(ht); 195 } 196 } 197 } 198 }; 199 service.execute(runnable); 200 } 201 } 202 203 private void createTable() throws IOException { 204 Span span = TraceUtil.getGlobalTracer().spanBuilder("createTable").startSpan(); 205 try (Scope scope = span.makeCurrent()) { 206 util.createTable(tableName, familyName); 207 } finally { 208 span.end(); 209 } 210 } 211 212 private void deleteTable() throws IOException { 213 Span span = TraceUtil.getGlobalTracer().spanBuilder("deleteTable").startSpan(); 214 try (Scope scope = span.makeCurrent()) { 215 if (admin.tableExists(tableName)) { 216 util.deleteTable(tableName); 217 } 218 } finally { 219 span.end(); 220 } 221 } 222 223 private LinkedBlockingQueue<Long> insertData() throws IOException, InterruptedException { 224 LinkedBlockingQueue<Long> rowKeys = new LinkedBlockingQueue<>(25000); 225 BufferedMutator ht = util.getConnection().getBufferedMutator(this.tableName); 226 Random rand = ThreadLocalRandom.current(); 227 byte[] value = new byte[300]; 228 for (int x = 0; x < 5000; x++) { 229 Span span = TraceUtil.getGlobalTracer().spanBuilder("insertData").startSpan(); 230 try (Scope scope = span.makeCurrent()) { 231 for (int i = 0; i < 5; i++) { 232 long rk = rand.nextLong(); 233 rowKeys.add(rk); 234 Put p = new Put(Bytes.toBytes(rk)); 235 for (int y = 0; y < 10; y++) { 236 Bytes.random(value); 237 p.addColumn(familyName, Bytes.toBytes(rand.nextLong()), value); 238 } 239 ht.mutate(p); 240 } 241 if ((x % 1000) == 0) { 242 admin.flush(tableName); 243 } 244 } finally { 245 span.end(); 246 } 247 } 248 admin.flush(tableName); 249 return rowKeys; 250 } 251 252 private IntegrationTestingUtility createUtil() throws Exception { 253 Configuration conf = getConf(); 254 if (this.util == null) { 255 IntegrationTestingUtility u; 256 if (conf == null) { 257 u = new IntegrationTestingUtility(); 258 } else { 259 u = new IntegrationTestingUtility(conf); 260 } 261 util = u; 262 util.initializeCluster(1); 263 264 } 265 return this.util; 266 } 267}