User:Lindenb/Notebook/UMR915/20110615
From OpenWetWare
hadoop
(started on User:Lindenb/Notebook/UMR915/20110610 }
trying to change /etc/hosts: https://twitter.com/#!/fredebibs/status/79170165312454656
then
sudo /etc/rc.d/init.d/network restart
success !
[lindenb@srv-clc-04 hadoop-0.20.203.0]$ bin/hadoop namenode -format 11/06/15 08:32:13 INFO namenode.NameNode: STARTUP_MSG: /************************************************************ STARTUP_MSG: Starting NameNode STARTUP_MSG: host = srv-clc-04.u915.irt.univ-nantes.prive3/127.0.0.1 STARTUP_MSG: args = [-format] STARTUP_MSG: version = 0.20.203.0 STARTUP_MSG: build = http://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-203 -r 1099333; compiled by 'oom' on Wed May 4 07:57:50 PDT 2011 ************************************************************/ 11/06/15 08:32:13 INFO util.GSet: VM type = 64-bit 11/06/15 08:32:13 INFO util.GSet: 2% max memory = 19.1675 MB 11/06/15 08:32:13 INFO util.GSet: capacity = 2^21 = 2097152 entries 11/06/15 08:32:13 INFO util.GSet: recommended=2097152, actual=2097152 11/06/15 08:32:13 INFO namenode.FSNamesystem: fsOwner=lindenb 11/06/15 08:32:13 INFO namenode.FSNamesystem: supergroup=supergroup 11/06/15 08:32:13 INFO namenode.FSNamesystem: isPermissionEnabled=true 11/06/15 08:32:13 INFO namenode.FSNamesystem: dfs.block.invalidate.limit=100 11/06/15 08:32:13 INFO namenode.FSNamesystem: isAccessTokenEnabled=false accessKeyUpdateInterval=0 min(s), accessTokenLifetime=0 min(s) 11/06/15 08:32:13 INFO namenode.NameNode: Caching file names occuring more than 10 times 11/06/15 08:32:13 INFO common.Storage: Image file of size 113 saved in 0 seconds. 11/06/15 08:32:14 INFO common.Storage: Storage directory /home/lindenb/tmp/HADOOP/dfs/name has been successfully formatted. 11/06/15 08:32:14 INFO namenode.NameNode: SHUTDOWN_MSG: /************************************************************ SHUTDOWN_MSG: Shutting down NameNode at srv-clc-04.u915.irt.univ-nantes.prive3/127.0.0.1
start the server
[lindenb@srv-clc-04 hadoop-0.20.203.0]$ ./bin/start-all.sh namenode running as process 1151. Stop it first. localhost: starting datanode, logging to /home/lindenb/package/hadoop-0.20.203.0/bin/../logs/hadoop-lindenb-datanode-srv-clc-04.u915.irt.univ-nantes.prive3.out localhost: secondarynamenode running as process 1490. Stop it first. jobtracker running as process 1588. Stop it first. localhost: starting tasktracker, logging to /home/lindenb/package/hadoop-0.20.203.0/bin/../logs/hadoop-lindenb-tasktracker-srv-clc-04.u915.irt.univ-nantes.prive3.out
copy cDINA data
bin/hadoop fs -mkdir myfolder fs -copyFromLocal ~/Axiom_GW_Hu_SNP.r2.na31.annot.csv myfolder/ bin/hadoop fs -tail /user/lindenb/myfolder/Axiom_GW_Hu_SNP.r2.na31.annot.csv s.135270 // CRMP1 // 1400 // collapsin response mediator protein 1 /// NM_001099433 // downstream // 100570 // Hs.479066 // JAKMIP1 // 152789 // janus kinase and microtubule interacting protein 1","10.7510808767857 // D4S2925 // D4S2366 // --- // --- /// 11.4380209151363 // D4S2285 // D4S431 // UT5936 // AFM262VG9 /// 10.964419629995 // --- // D4S2366 // 58988 // ---","D4S1059 // upstream // 121041 /// D4S2509E // downstream // 104845","0.108333333 // 0.891666667 // Caucasian /// 0.055555556 // 0.944444444 // Han Chinese /// 0.011111111 // 0.988888889 // Japanese /// 0.041666667 // 0.958333333 // Yoruban","0.183333333 // Caucasian /// 0.066666667 // Han Chinese /// 0.022222222 // Japanese /// 0.083333333 // Yoruban","60.0 // Caucasian /// 45.0 // Han Chinese /// 45.0 // Japanese /// 60.0 // Yoruban","YES","same","1","0","T // Caucasian /// T // Han Chinese /// T // Japanese /// T // Yoruban","0.108333333 // Caucasian /// 0.055555556 // Han Chinese /// 0.011111111 // Japanese /// 0.041666667 // Yoruban","---"
#serveur1: scp /CGH/data/users/cdina/DATAS/CEDRIC/Axiom2/resultat/AxiomGT1.calls.txt lindenb@serveur4:
bin/hadoop fs -copyFromLocal ~/AxiomGT1.calls.txt myfolder/ $ ls -lah ~/Axiom* -rw-r--r-- 1 lindenb lindenb 111M jun 15 09:03 /home/lindenb/AxiomGT1.calls.txt -rw-r--r-- 1 lindenb lindenb 766M jun 10 12:35 /home/lindenb/Axiom_GW_Hu_SNP.r2.na31.annot.csv du -h ~/tmp/HADOOP | tail -1 884M /home/lindenb/tmp/HADOOP
Files
Axiom_GW_Hu_SNP.r2.na31.annot.csv
"Probe Set ID","dbSNP RS ID","Chromosome","Physical Position","Strand","ChrX pseudo-autosomal region 1","Cytoband","Flank","Allele A","Allele B","Associated Gene","Genetic Map","Microsatellite","Allele Frequencies","Heterozygous Allele Frequencies","Number of individuals/Number of chromosomes","In Hapmap","Strand Versus dbSNP","Probe Count","ChrX pseudo-autosomal region 2","Minor Allele","Minor Allele Frequency","OMIM" "AX-11086612","rs10001348","4","29912308","+","0","p15.1","gtattcagttgaacacaaatcagtgcatgt[A/G]","A","G","ENST00000467087 // downstream // 2885305 // Hs.724550 // STIM2 // 57620 // stromal interaction molecule 2 /// ENST00000361762 // upstream // 809728 // Hs.479439 // PCDH7 // 5099 // protocadherin 7 /// NM_001169117 // downstream // 2885305 // Hs.724550 // STIM2 // 57620 // stromal interaction molecule 2 /// NM_032456 // upstream // 809728 // Hs.724529 // PCDH7 // 5099 // protocadherin 7","50.0229786923511 // D4S418 // D4S2408 // --- // --- /// 44.2128449948474 // D4S2397 // D4S2430 // ATA27C07 // GCT6F03 /// 42.4637111703432 // --- // --- // 226002 // 46437","D4S333 // upstream // 47955 /// D4S605 // downstream // 97312","0.872881356 // 0.127118644 // Caucasian /// 0.833333333 // 0.166666667 // Han Chinese /// 0.777777778 // 0.222222222 // Japanese /// 0.775 // 0.225 // Yoruban","0.254237288 // Caucasian /// 0.288888889 // Han Chinese /// 0.355555556 // Japanese /// 0.35 // Yoruban","60.0 // Caucasian /// 45.0 // Han Chinese /// 45.0 // Japanese /// 60.0 // Yoruban","YES","same","1","0","G // Caucasian /// G // Han Chinese /// G // Japanese /// G // Yoruban","0.127118644 // Caucasian /// 0.166666667 // Han Chinese /// 0.222222222 // Japanese /// 0.225 // Yoruban","---" "AX-11086611","rs10001340","4","130341127","+","0","q28.2","[A/C]agggcattcatctcagcttactatttgggaaaaat","A","C","ENST00000281146 // downstream // 306645 // Hs.567679 // C4orf33 // 132321 // chromosome 4 open reading frame 33 /// ENST00000394248 // upstream // 3729342 // Hs.192859 // PCDH10 // 57575 // protocadherin 10 /// NM_173487 // downstream // 307285 // Hs.567679 // C4orf33 // 132321 // chromosome 4 open reading frame 33 /// NM_020815 // upstream // 3729342 // Hs.192859 // PCDH10 // 57575 // protocadherin 10","127.864057946266 // D4S1615 // D4S2365 // --- // --- /// 129.756132396152 // D4S2938 // D4S2365 // AFMA284WG5 // GATA10A12 /// 124.03426335901 // D4S2394 // --- // --- // 55218","D4S3198 // upstream // 331274 /// D4S2394 // downstream // 43310","0.815789474 // 0.184210526 // Caucasian /// 1.0 // 0.0 // Han Chinese /// 1.0 // 0.0 // Japanese /// 0.816666667 // 0.183333333 // Yoruban","0.368421053 // Caucasian /// 0.0 // Han Chinese /// 0.0 // Japanese /// 0.266666667 // Yoruban","60.0 // Caucasian /// 45.0 // Han Chinese /// 45.0 // Japanese /// 60.0 // Yoruban","YES","same","1","0","C // Caucasian /// C // Han Chinese /// C // Japanese /// C // Yoruban","0.184210526 // Caucasian /// 0.0 // Han Chinese /// 0.0 // Japanese /// 0.183333333 // Yoruban","---" "AX-11086610","rs10001337","4","54351529","+","0","q12","atgaggagtagccacatgatctaagcacct[C/T]","T","C","ENST00000306888 // --- // 0 // Hs.518760 // LNX1 // 84708 // ligand of numb-protein X 1 /// ENST00000263925 // --- // 0 // Hs.518760 // LNX1 // 84708 // ligand of numb-protein X 1 /// NM_001126328 // intron // 0 // Hs.518760 // LNX1 // 84708 // ligand of numb-protein X 1 /// NM_032622 // intron // 0 // Hs.518760 // LNX1 // 84708 // ligand of numb-protein X 1","67.4182086016315 // D4S2971 // D4S1594 // --- // --- /// 62.2091955728879 // D4S2971 // UNKNOWN // AFMB312YG1 // GATA61B02 /// 61.6059658777947 // --- // GATA61B02 // 149925 // ---","D4S461 // upstream // 151923 /// D4S2583E // downstream // 24481","0.118644068 // 0.881355932 // Caucasian /// 0.111111111 // 0.888888889 // Han Chinese /// 0.122222222 // 0.877777778 // Japanese /// 0.025 // 0.975 // Yoruban","0.203389831 // Caucasian /// 0.133333333 // Han Chinese /// 0.244444444 // Japanese /// 0.05 // Yoruban","60.0 // Caucasian /// 45.0 // Han Chinese /// 45.0 // Japanese /// 60.0 // Yoruban","YES","same","1","0","T // Caucasian /// T // Han Chinese /// T // Japanese /// T // Yoruban","0.118644068 // Caucasian /// 0.111111111 // Han Chinese /// 0.122222222 // Japanese /// 0.025 // Yoruban","---"
Calls
probeset_id A05.CEL A06.CEL A03.CEL A04.CEL A01.CEL A10.CEL A02.CEL A11.CEL A12.CEL A09.CEL A07.CEL A08.CEL B01.CEL B02.CEL B06.CEL B03.CEL B04.CEL B05.CEL B07.CEL B08.CEL B09.CEL B10.CEL C02.CEL B11.CEL B12.CEL C01.CEL C03.CEL C04.CEL C05.CEL C06.CEL C07.CEL C08.CEL C09.CEL C10.CEL C11.CEL C12.CEL D01.CEL D02.CEL D03.CEL D04.CEL D05.CEL D06.CEL D07.CEL D08.CEL D09.CEL D10.CEL D11.CEL D12.CEL E01.CEL E02.CEL E03.CEL E04.CEL E05.CEL E06.CEL E07.CEL E08.CEL E09.CEL E10.CEL E11.CEL E12.CEL F01.CEL F02.CEL F03.CEL F04.CEL F05.CEL F06.CEL F07.CEL F08.CEL H08.CEL H07.CEL G07.CEL G08.CEL H03.CEL H04.CEL H06.CEL H05.CEL G09.CEL G10.CEL H01.CEL H02.CEL G11.CEL G12.CEL G03.CEL G04.CEL H10.CEL H09.CEL F10.CEL F09.CEL G01.CEL G02.CEL F11.CEL F12.CEL G05.CEL G06.CEL H11.CEL H12.CEL AX-11086525 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 1 1 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 1 0 1 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 1 0 1 0 1 0 0 1 1 2 1 1 0 0 0 0 1 0 0 1 0 0 1 1 0 1 0 1 1 2 0 1 0 0 1 2 0 2 AX-11086526 2 1 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 1 2 2 2 2 2 2 2 2 2 2 2 1 2 2 0 2 2 2 1 1 1 2 2 2 2 2 2 2 2 2 1 1 1 2 1 2 1 2 2 0 2 1 2 2 1 2 2 2 2 1 2 2 2 2 2 1 2 1 1 2 2 1 2 2 2 2 2 2 2 2 2 1 2 2 2 AX-11086527 2 0 2 2 2 1 2 2 2 2 2 2 2 2 2 2 2 2 2 2 1 2 2 2 2 2 2 2 2 1 2 2 1 1 0 2 1 2 1 2 2 1 1 1 1 2 2 1 0 2 1 2 1 2 2 2 1 2 1 1 1 1 2 2 2 2 2 1 1 0 2 0 1 0 1 0 1 2 1 1 0 1 1 0 2 1 1 1 0 1 1 1 2 2 1 1
Join
AX-11086525 4 165621955 rs10000041 T T T T T T T T T T T T T T T G T T T T T T T T T T T T T T T T T T T T T T T G T G T T T T T G T T T T T T T T T T T T T T T T T T T T T T T T T T T G T T T T T G T T T G T T T T T T T T T T T T T T T T T T T T T G T T T T T T T T T G T T T G T T T G T T T T T G T G G G T G T G T T T T T T T T T G T T T T T G T T T T T G T G T T T G T T T G T G G G T T T G T T T T T G G G T T G G AX-11086526 4 5237152 rs10000042 C C T C C C C C C C C C C C C C C C C C C C C C C C C C C C C C C C C C C C C C T C C C C C C C C C C C C C C C C C C C C C C C T C C C C C T T C C C C C C T C T C T C C C C C C C C C C C C C C C C C C C T C T C T C C C T C C C T C C C C C T T C C T C C C C C T C C C C C C C C C T C C C C C C C C C C C T C C C T C T C C C C C T C C C C C C C C C C C C C C C C C C C T C C C C C C C
GeneTitan pictures
package test.hadoop; import java.io.BufferedReader; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.io.InputStreamReader; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.mapred.join.CompositeInputFormat; import org.apache.hadoop.mapred.join.TupleWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @SuppressWarnings("deprecation") public class Hadoop01 extends Configured implements Tool { static final String SORTED_MARKERS_PATH="markers.data"; static final String SORTED_CALLS_PATH="calls.data"; static final String JOIN_PATH="joined.data"; static final String CHROM_COL="chrom.col"; static final String POS_COL="pos.col"; static final String ALLELE_A_COL="alleleA.col"; static final String ALLELE_B_COL="alleleB.col"; static final String PROBE_SET_ID_COL="probeSetId.col"; static final String RS_ID_COL="rsId.col"; static public class Genotypes implements Writable { private byte array[]; public Genotypes() { this(0); } public Genotypes(int n) { this.array=new byte[n]; } @Override public void readFields(DataInput in) throws IOException { int n=in.readInt(); this.array=new byte[n]; in.readFully(this.array); } @Override public void write(DataOutput out) throws IOException { out.writeInt(this.array.length); out.write(this.array); } @Override public String toString() { return Arrays.asList(this.array).toString(); } } static public class Position implements WritableComparable<Position> { private String chrom="N/A"; private int position=-1; public Position() { } public Position(String chrom,int position) { this.chrom=chrom; this.position=position; } public String getChrom() { return chrom; } public int getPosition() { return position; } @Override public void readFields(DataInput in) throws IOException { this.chrom=in.readUTF(); this.position=in.readInt(); } @Override public void write(DataOutput out) throws IOException { out.writeUTF(chrom); out.writeInt(position); } @Override public int compareTo(Position o) { int i=chrom.compareTo(o.chrom); if(i!=0) return i; return position - o.position; } @Override public String toString() { return chrom+":"+position; } } static public class Marker implements WritableComparable<Marker> { private Position position=new Position(); private String alleleA; private String alleleB; private String probeSetId; private String rsid; public Marker() { } public Position getPosition() { return position; } public String getAlleleA() { return alleleA; } public String getAlleleB() { return alleleB; } public String getProbeSetId() { return probeSetId; } public String getRsid() { return rsid; } @Override public void readFields(DataInput in) throws IOException { this.position.readFields(in); this.alleleA=in.readUTF(); this.alleleB=in.readUTF(); this.probeSetId=in.readUTF(); this.rsid=in.readUTF(); } @Override public void write(DataOutput out) throws IOException { this.position.write(out); out.writeUTF(this.alleleA); out.writeUTF(this.alleleB); out.writeUTF(this.probeSetId); out.writeUTF(this.rsid); } @Override public int compareTo(Marker o) { return position.compareTo(o.position); } @Override public String toString() { return this.probeSetId+" "+position; } } public static class SortMarkerByProbIdMapper extends Mapper<LongWritable, Text, Text, Marker> { int chromCol=-1; int posCol=-1; int alleleACol=-1; int alleleBCol=-1; int probeSetIdCol=-1; int rsIdCol=-1; protected void setup(Context context) throws IOException ,InterruptedException { this.chromCol= context.getConfiguration().getInt(CHROM_COL,1); this.posCol= context.getConfiguration().getInt(POS_COL,1); this.alleleACol= context.getConfiguration().getInt(ALLELE_A_COL,1); this.alleleBCol= context.getConfiguration().getInt(ALLELE_B_COL,1); this.probeSetIdCol= context.getConfiguration().getInt(PROBE_SET_ID_COL,1); this.rsIdCol= context.getConfiguration().getInt(RS_ID_COL,1); super.setup(context); } @Override protected void map( LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException { //ignore header and comment if( value.find("\"Probe Set ID\"")==0 || value.find("#")==0 ) { return; } List<String> array=splitCSV(new String(value.getBytes(),0,value.getLength())); if(array.get(this.chromCol).equals("---")) return;//undefined chromosome Marker m=new Marker(); m.position=new Position( array.get(this.chromCol), Integer.parseInt(array.get(this.posCol)) ); m.alleleA=array.get(this.alleleACol); m.alleleB=array.get(this.alleleBCol); m.probeSetId=array.get(this.probeSetIdCol); m.rsid=array.get(this.rsIdCol); //System.err.println("Map.KEY"+key+" \nMap.VALUE:"+value); context.write(new Text(m.probeSetId),m); } } public static class SortMarkerByProbIdReducer extends Reducer<Text, Marker, Text, Marker> { @Override protected void reduce( Text key, Iterable<Marker> values, Context context ) throws java.io.IOException ,InterruptedException { Marker marker=null; for(Marker i:values) { if(marker!=null) throw new IOException("Duplicate marker id "+key); marker=i; } if(marker==null) return; context.write(key,marker); } } public static class JoinReducer extends MapReduceBase implements org.apache.hadoop.mapred.Reducer<Writable, TupleWritable, Text, Text> { @Override public void reduce(Writable key, Iterator<TupleWritable> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { System.err.println(""+key+" "+key.getClass()); while(values.hasNext()) { TupleWritable o=values.next(); Marker m=(Marker)o.get(0); Genotypes g=(Genotypes)o.get(1); StringBuilder b=new StringBuilder(); b.append(m.getPosition().getChrom()); b.append("\t"); b.append(m.getPosition().getPosition()); b.append("\t"); b.append(m.getRsid()); for(int i=0;i< g.array.length;++i) { b.append("\t"); switch(g.array[i]) { case -1: b.append("0 0"); break; case 0: b.append(m.alleleA+" "+m.alleleA); break; case 1: b.append(m.alleleA+" "+m.alleleB); break; case 2: b.append(m.alleleB+" "+m.alleleB); break; default:throw new IllegalArgumentException(String.valueOf(g.array[i])); } } output.collect(new Text(m.probeSetId), new Text(b.toString())); } } } public static class SortCallByProbIdMapper extends Mapper<LongWritable, Text, Text, Genotypes> { final private Pattern tab=Pattern.compile("[\t]"); @Override protected void map( LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException { //ignore header and comment if( value.find("probeset_id")==0 || value.find("%")==0 ) { return; } String tokens[]=tab.split(new String(value.getBytes(),0,value.getLength())); Genotypes genotypes=new Genotypes(tokens.length-1); for(int i=1;i< tokens.length;++i) { genotypes.array[i-1]=Byte.parseByte(tokens[i]); } context.write(new Text(tokens[0]),genotypes); } } public static class SortCallByProbIdReducer extends Reducer<Text, Genotypes, Text, Genotypes> { @Override protected void reduce( Text key, Iterable<Genotypes> values, Context context ) throws java.io.IOException ,InterruptedException { Genotypes array=null; for(Genotypes i:values) { if(array!=null) throw new IOException("Duplicate marker id "+key); array=i; } if(array==null) return; context.write(key,array); } } private static List<String> splitCSV(String line) { List<String> L=new ArrayList<String>(); int i=0; while(i< line.length()) { if(line.charAt(i)!='\"') throw new IllegalArgumentException("expected quote"); StringBuilder token=new StringBuilder(); ++i; while(i < line.length() && line.charAt(i)!='\"') { if(line.charAt(i)=='\"') { break; } else if(line.charAt(i)=='\\' && i+1< line.length()) { ++i; switch(line.charAt(i)) { case 'n': token.append("\n");break; case 't': token.append("\t");break; case '\'': token.append("\'");break; case '\"': token.append("\"");break; case '\\': token.append("\\");break; default:throw new IllegalArgumentException("bad escape character in "+line); } } else { token.append(line.charAt(i)); } ++i; } if(line.charAt(i)!='\"') throw new IllegalArgumentException("expected quote after "+token); L.add(token.toString()); ++i; if(i==line.length()) break; if(line.charAt(i)!=',') throw new IllegalArgumentException("comma missing in "+line); ++i; } return L; } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); conf.addResource(new Path("/home/lindenb/package/hadoop-0.20.203.0/conf/core-site.xml")); sortMarkersByProbeid(conf); sortCallsByProbeid(conf); join(conf); return 0; } private void sortMarkersByProbeid(Configuration conf) throws Exception { //JobConf jobConf=new JobConf(conf); FileSystem fs=FileSystem.get(conf); final Path outPath=new Path(SORTED_MARKERS_PATH); final Path inPath=new Path("myfolder/Axiom_GW_Hu_SNP.r2.na31.annot.csv"); List<String> header=null; BufferedReader br=new BufferedReader(new InputStreamReader(fs.open(inPath))); String line; while((line=br.readLine())!=null) { if(line.startsWith("\"Probe Set ID\"")) { header=splitCSV(line); break; } } br.close(); if(header==null || header.isEmpty()) throw new IOException("no header in "+inPath); for(int i=0;i<header.size();++i) { if(header.get(i).equals("Chromosome")) { conf.setInt(CHROM_COL, i); } else if(header.get(i).equals("Physical Position")) { conf.setInt(POS_COL, i); } else if(header.get(i).equals("Allele A")) { conf.setInt(ALLELE_A_COL, i); } else if(header.get(i).equals("Allele B")) { conf.setInt(ALLELE_B_COL, i); } else if(header.get(i).equals("Probe Set ID")) { conf.setInt(PROBE_SET_ID_COL, i); } else if(header.get(i).equals("dbSNP RS ID")) { conf.setInt(RS_ID_COL, i); } } Job job = new Job(conf, Hadoop01.class.getName()); job.setJarByClass(Hadoop01.class); job.setMapperClass(SortMarkerByProbIdMapper.class); job.setReducerClass(SortMarkerByProbIdReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Marker.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Marker.class); job.setOutputFormatClass(org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class); if(fs.exists(outPath)) { fs.delete(outPath, true); } FileInputFormat.addInputPath(job, inPath); FileOutputFormat.setOutputPath(job, outPath); if(!job.waitForCompletion(true) ) { throw new IOException("Cannot complete job"); } } public void sortCallsByProbeid(Configuration conf) throws Exception { //JobConf jobConf=new JobConf(conf); FileSystem fs=FileSystem.get(conf); final Path outPath=new Path(SORTED_CALLS_PATH); final Path inPath=new Path("myfolder/AxiomGT1.calls.txt"); Job job = new Job(conf, Hadoop01.class.getName()); job.setJarByClass(Hadoop01.class); job.setMapperClass(SortCallByProbIdMapper.class); job.setReducerClass(SortCallByProbIdReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Genotypes.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Genotypes.class); job.setOutputFormatClass(org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class); if(fs.exists(outPath)) { fs.delete(outPath, true); } FileInputFormat.addInputPath(job, inPath); FileOutputFormat.setOutputPath(job, outPath); if(!job.waitForCompletion(true) ) { throw new IOException("Cannot complete job"); } } private void join(Configuration conf) throws Exception { FileSystem fs=FileSystem.get(conf); Path outPath=new Path(JOIN_PATH); if(fs.exists(outPath)) { fs.delete(outPath, true); } final String compose=CompositeInputFormat.compose( "inner", SequenceFileInputFormat.class, new Path(SORTED_MARKERS_PATH), new Path(SORTED_CALLS_PATH) ); System.err.println(compose); JobConf jobConf = new JobConf(conf, getClass()); jobConf.setJobName("join"); jobConf.setInputFormat(CompositeInputFormat.class); jobConf.set("mapred.join.expr", compose); jobConf.setMapOutputKeyClass(Text.class); jobConf.setMapOutputValueClass(TupleWritable.class); jobConf.setOutputValueClass(Text.class);//TupleWritable ? jobConf.setOutputKeyClass(Text.class); jobConf.setOutputFormat(TextOutputFormat.class); jobConf.setReducerClass(JoinReducer.class); //jobConf.setMapOutputValueClass(Text.class); org.apache.hadoop.mapred.FileOutputFormat.setOutputPath(jobConf, outPath); JobClient.runJob(jobConf); } /* public void todo(Configuration conf) throws IOException { String header[]=null; BufferedReader br=new BufferedReader(new InputStreamReader(fs.open(inPath))); String line; while((line=br.readLine())!=null) { if(line.startsWith("probeset_id\t")) { header=line.split("[\t]"); break; } } br.close(); if(header==null) throw new IOException("no header in "+inPath); conf.setStrings("calls", header); } */ public static void main(String[] args) throws Exception { //http://distributed-agility.blogspot.com/2010/01/hadoop-0201-example-inverted-line-index.html int res = ToolRunner.run(new Configuration(), new Hadoop01(), args); System.exit(res); } }