-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathfa_merge.py
More file actions
executable file
·118 lines (94 loc) · 3.29 KB
/
fa_merge.py
File metadata and controls
executable file
·118 lines (94 loc) · 3.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 10/24/2020 5:26 PM
# @Author : Runsheng
# @File : fa_merge.py.py
"""
Merge contig1 with contig2, use sequences only from contig1 if this sequence exsit in both contig1 and 2
"""
from __future__ import print_function
from Bio import SeqIO
import gzip
def fasta2dic(fastafile):
"""
Give a fasta file name, return a dict contains the name and seq
Require Biopython SeqIO medule to parse the sequence into dict, a large genome may take a lot of RAM
"""
if ".gz" in fastafile:
handle=gzip.open(fastafile, "r")
else:
handle=open(fastafile, "r")
record_dict=SeqIO.to_dict(SeqIO.parse(handle,"fasta"))
handle.close()
return record_dict
def dic2dic(record_dict):
"""
:param record_dict: a SeqIO dict generated by Biopython
:return the dict contain {name:seq}
"""
seq_dict={}
for k,v in record_dict.items():
seq=str(v.seq)
seq_dict[k]=seq
return seq_dict
def dic2fasta(record_dict,out="record_dict.fasta"):
"""
Write a record_dict of fatsa file back to fasta file
:param record_dict:
:param out:
:return:
"""
with open(out,"w") as f:
for record in sorted(record_dict.keys()):
name=record
seq=record_dict[name]
f.write(">")
f.write(name)
f.write("\n")
f.write(seq)
f.write("\n")
if __name__=="__main__":
import argparse
import logging
example_text = '''example:
### example to run the contig merging
fa_merge.py -1 contig1.fa -2 contig2.fa > merge12.fa
'''
parser = argparse.ArgumentParser(prog='fa_merge',
description='get the union of contig1+2, retain the intersection contigs from 1',
epilog=example_text,
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument("-1", "--contig1",help="contig1 in fasta format")
parser.add_argument("-2", "--contig2", help="contig2 in fasta format")
args = parser.parse_args()
fadic1=fasta2dic(args.contig1)
fadic2=fasta2dic(args.contig2)
key1=set(fadic1.keys())
key2=set(fadic2.keys())
intersection12=key1.intersection(key2)
union12=key1.union(key2)
# create logger
logger = logging.getLogger('Contig Info')
logger.setLevel(logging.INFO)
# create console handler and set level to debug
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
# create formatter
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# add formatter to ch
ch.setFormatter(formatter)
# add ch to logger
logger.addHandler(ch)
# 'application' code
logger.info("contig1 have {len1} sequences, contig2 have {len2} sequences".format(len1=len(key1), len2=len(key2)))
logger.info("Total number after merge is {len_sum}, merged from contig1 is {len_intersection}".format(
len_sum=len(union12), len_intersection=len(intersection12)))
for k in sorted(list(union12)):
try:
seq=str(fadic1[k].seq)
print(">"+k)
print(seq)
except KeyError:
seq=str(fadic2[k].seq)
print(">"+k)
print(seq)