001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.IOException; 021import java.util.List; 022import java.util.Map; 023import org.apache.hadoop.fs.Path; 024import org.apache.hadoop.hbase.HBaseClassTestRule; 025import org.apache.hadoop.hbase.testclassification.RegionServerTests; 026import org.apache.hadoop.hbase.testclassification.SmallTests; 027import org.junit.Assert; 028import org.junit.ClassRule; 029import org.junit.Test; 030import org.junit.experimental.categories.Category; 031 032@Category({ RegionServerTests.class, SmallTests.class }) 033public class TestOutputSinkWriter { 034 035 @ClassRule 036 public static final HBaseClassTestRule CLASS_RULE = 037 HBaseClassTestRule.forClass( 038 TestOutputSinkWriter.class); 039 040 @Test 041 public void testExeptionHandling() throws IOException, InterruptedException { 042 WALSplitter.PipelineController controller = new WALSplitter.PipelineController(); 043 BrokenEntryBuffers entryBuffers = new BrokenEntryBuffers(controller, 2000); 044 OutputSink sink = new OutputSink(controller, entryBuffers, 1) { 045 046 @Override protected int getNumOpenWriters() { 047 return 0; 048 } 049 050 @Override protected void append(EntryBuffers.RegionEntryBuffer buffer) throws IOException { 051 052 } 053 054 @Override protected List<Path> close() throws IOException { 055 return null; 056 } 057 058 @Override public Map<String,Long> getOutputCounts() { 059 return null; 060 } 061 062 @Override public int getNumberOfRecoveredRegions() { 063 return 0; 064 } 065 066 @Override public boolean keepRegionEvent(WAL.Entry entry) { 067 return false; 068 } 069 }; 070 071 //start the Writer thread and give it time trow the exception 072 sink.startWriterThreads(); 073 Thread.sleep(1000L); 074 075 //make sure the exception is stored 076 try { 077 controller.checkForErrors(); 078 Assert.fail(); 079 } 080 catch (RuntimeException re){ 081 Assert.assertTrue(true); 082 } 083 084 sink.restartWriterThreadsIfNeeded(); 085 086 //after the check the stored exception should be gone 087 try { 088 controller.checkForErrors(); 089 } 090 catch (RuntimeException re){ 091 Assert.fail(); 092 } 093 094 //prep another exception and wait for it to be thrown 095 entryBuffers.setThrowError(true); 096 Thread.sleep(1000L); 097 098 //make sure the exception is stored 099 try { 100 controller.checkForErrors(); 101 Assert.fail(); 102 } 103 catch (RuntimeException re){ 104 Assert.assertTrue(true); 105 } 106 } 107 108 static class BrokenEntryBuffers extends EntryBuffers{ 109 boolean throwError = true; 110 111 public BrokenEntryBuffers(WALSplitter.PipelineController controller, long maxHeapUsage) { 112 super(controller, maxHeapUsage); 113 } 114 115 @Override 116 synchronized EntryBuffers.RegionEntryBuffer getChunkToWrite() { 117 //This just emulates something going wrong with in the Writer 118 if(throwError){ 119 throwError = false; 120 throw new RuntimeException("testing"); 121 } 122 return null; 123 } 124 125 public void setThrowError(boolean newValue){ 126 throwError = newValue; 127 } 128 }; 129}