001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.trace; 020 021import org.apache.hadoop.conf.Configuration; 022import org.apache.hadoop.hbase.HBaseConfiguration; 023import org.apache.hadoop.hbase.IntegrationTestingUtility; 024import org.apache.hadoop.hbase.testclassification.IntegrationTests; 025import org.apache.hadoop.hbase.TableName; 026import org.apache.hadoop.hbase.client.Admin; 027import org.apache.hadoop.hbase.client.BufferedMutator; 028import org.apache.hadoop.hbase.client.Get; 029import org.apache.hadoop.hbase.client.Put; 030import org.apache.hadoop.hbase.client.Result; 031import org.apache.hadoop.hbase.client.ResultScanner; 032import org.apache.hadoop.hbase.client.Scan; 033import org.apache.hadoop.hbase.client.Table; 034import org.apache.hadoop.hbase.util.AbstractHBaseTool; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.apache.hadoop.util.ToolRunner; 037import org.apache.htrace.core.Sampler; 038import org.apache.htrace.core.TraceScope; 039import org.junit.Test; 040import org.junit.experimental.categories.Category; 041 042import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 043 044import java.io.IOException; 045import java.util.Random; 046import java.util.concurrent.ExecutorService; 047import java.util.concurrent.Executors; 048import java.util.concurrent.LinkedBlockingQueue; 049import java.util.concurrent.TimeUnit; 050 051@Category(IntegrationTests.class) 052public class IntegrationTestSendTraceRequests extends AbstractHBaseTool { 053 054 public static final String TABLE_ARG = "t"; 055 public static final String CF_ARG = "f"; 056 057 public static final String TABLE_NAME_DEFAULT = "SendTracesTable"; 058 public static final String COLUMN_FAMILY_DEFAULT = "D"; 059 private TableName tableName = TableName.valueOf(TABLE_NAME_DEFAULT); 060 private byte[] familyName = Bytes.toBytes(COLUMN_FAMILY_DEFAULT); 061 private IntegrationTestingUtility util; 062 private Random random = new Random(); 063 private Admin admin; 064 private SpanReceiverHost receiverHost; 065 066 public static void main(String[] args) throws Exception { 067 Configuration configuration = HBaseConfiguration.create(); 068 IntegrationTestingUtility.setUseDistributedCluster(configuration); 069 IntegrationTestSendTraceRequests tool = new IntegrationTestSendTraceRequests(); 070 ToolRunner.run(configuration, tool, args); 071 } 072 073 @Override 074 protected void addOptions() { 075 addOptWithArg(TABLE_ARG, "The table name to target. Will be created if not there already."); 076 addOptWithArg(CF_ARG, "The family to target"); 077 } 078 079 @Override 080 public void processOptions(CommandLine cmd) { 081 String tableNameString = cmd.getOptionValue(TABLE_ARG, TABLE_NAME_DEFAULT); 082 String familyString = cmd.getOptionValue(CF_ARG, COLUMN_FAMILY_DEFAULT); 083 084 this.tableName = TableName.valueOf(tableNameString); 085 this.familyName = Bytes.toBytes(familyString); 086 } 087 088 @Override 089 public int doWork() throws Exception { 090 internalDoWork(); 091 return 0; 092 } 093 094 @Test 095 public void internalDoWork() throws Exception { 096 util = createUtil(); 097 admin = util.getAdmin(); 098 setupReceiver(); 099 100 deleteTable(); 101 createTable(); 102 LinkedBlockingQueue<Long> rks = insertData(); 103 104 ExecutorService service = Executors.newFixedThreadPool(20); 105 doScans(service, rks); 106 doGets(service, rks); 107 108 service.shutdown(); 109 service.awaitTermination(100, TimeUnit.SECONDS); 110 Thread.sleep(90000); 111 receiverHost.closeReceivers(); 112 util.restoreCluster(); 113 util = null; 114 } 115 116 private void doScans(ExecutorService service, final LinkedBlockingQueue<Long> rks) { 117 118 for (int i = 0; i < 100; i++) { 119 Runnable runnable = new Runnable() { 120 private final LinkedBlockingQueue<Long> rowKeyQueue = rks; 121 @Override 122 public void run() { 123 ResultScanner rs = null; 124 TraceUtil.addSampler(Sampler.ALWAYS); 125 try (TraceScope scope = TraceUtil.createTrace("Scan")){ 126 Table ht = util.getConnection().getTable(tableName); 127 Scan s = new Scan(); 128 s.setStartRow(Bytes.toBytes(rowKeyQueue.take())); 129 s.setBatch(7); 130 rs = ht.getScanner(s); 131 // Something to keep the jvm from removing the loop. 132 long accum = 0; 133 134 for(int x = 0; x < 1000; x++) { 135 Result r = rs.next(); 136 accum |= Bytes.toLong(r.getRow()); 137 } 138 139 TraceUtil.addTimelineAnnotation("Accum result = " + accum); 140 141 ht.close(); 142 ht = null; 143 } catch (IOException e) { 144 e.printStackTrace(); 145 TraceUtil.addKVAnnotation("exception", e.getClass().getSimpleName()); 146 } catch (Exception e) { 147 } finally { 148 if (rs != null) rs.close(); 149 } 150 151 } 152 }; 153 service.submit(runnable); 154 } 155 156 } 157 158 private void doGets(ExecutorService service, final LinkedBlockingQueue<Long> rowKeys) 159 throws IOException { 160 for (int i = 0; i < 100; i++) { 161 Runnable runnable = new Runnable() { 162 private final LinkedBlockingQueue<Long> rowKeyQueue = rowKeys; 163 164 @Override 165 public void run() { 166 167 168 Table ht = null; 169 try { 170 ht = util.getConnection().getTable(tableName); 171 } catch (IOException e) { 172 e.printStackTrace(); 173 } 174 175 long accum = 0; 176 TraceUtil.addSampler(Sampler.ALWAYS); 177 for (int x = 0; x < 5; x++) { 178 try (TraceScope scope = TraceUtil.createTrace("gets")) { 179 long rk = rowKeyQueue.take(); 180 Result r1 = ht.get(new Get(Bytes.toBytes(rk))); 181 if (r1 != null) { 182 accum |= Bytes.toLong(r1.getRow()); 183 } 184 Result r2 = ht.get(new Get(Bytes.toBytes(rk))); 185 if (r2 != null) { 186 accum |= Bytes.toLong(r2.getRow()); 187 } 188 TraceUtil.addTimelineAnnotation("Accum = " + accum); 189 190 } catch (IOException|InterruptedException ie) { 191 // IGNORED 192 } 193 } 194 195 } 196 }; 197 service.submit(runnable); 198 } 199 } 200 201 private void createTable() throws IOException { 202 TraceUtil.addSampler(Sampler.ALWAYS); 203 try (TraceScope scope = TraceUtil.createTrace("createTable")) { 204 util.createTable(tableName, familyName); 205 } 206 } 207 208 private void deleteTable() throws IOException { 209 TraceUtil.addSampler(Sampler.ALWAYS); 210 try (TraceScope scope = TraceUtil.createTrace("deleteTable")) { 211 if (admin.tableExists(tableName)) { 212 util.deleteTable(tableName); 213 } 214 } 215 } 216 217 private LinkedBlockingQueue<Long> insertData() throws IOException, InterruptedException { 218 LinkedBlockingQueue<Long> rowKeys = new LinkedBlockingQueue<>(25000); 219 BufferedMutator ht = util.getConnection().getBufferedMutator(this.tableName); 220 byte[] value = new byte[300]; 221 TraceUtil.addSampler(Sampler.ALWAYS); 222 for (int x = 0; x < 5000; x++) { 223 try (TraceScope traceScope = TraceUtil.createTrace("insertData")) { 224 for (int i = 0; i < 5; i++) { 225 long rk = random.nextLong(); 226 rowKeys.add(rk); 227 Put p = new Put(Bytes.toBytes(rk)); 228 for (int y = 0; y < 10; y++) { 229 random.nextBytes(value); 230 p.addColumn(familyName, Bytes.toBytes(random.nextLong()), value); 231 } 232 ht.mutate(p); 233 } 234 if ((x % 1000) == 0) { 235 admin.flush(tableName); 236 } 237 } 238 } 239 admin.flush(tableName); 240 return rowKeys; 241 } 242 243 private IntegrationTestingUtility createUtil() throws Exception { 244 Configuration conf = getConf(); 245 if (this.util == null) { 246 IntegrationTestingUtility u; 247 if (conf == null) { 248 u = new IntegrationTestingUtility(); 249 } else { 250 u = new IntegrationTestingUtility(conf); 251 } 252 util = u; 253 util.initializeCluster(1); 254 255 } 256 return this.util; 257 } 258 259 private void setupReceiver() { 260 Configuration conf = new Configuration(util.getConfiguration()); 261 conf.setBoolean("hbase.zipkin.is-in-client-mode", true); 262 263 this.receiverHost = SpanReceiverHost.getInstance(conf); 264 } 265}