-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathda5code.py
More file actions
176 lines (134 loc) · 5.89 KB
/
da5code.py
File metadata and controls
176 lines (134 loc) · 5.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# -*- coding: utf-8 -*-
"""DA5code.ipynb
Automatically generated by Colab.
Original file is located at
https://colab.research.google.com/drive/1J6unL1tJQx9paBBefchJRBUzM7lt9A6R
"""
!pip install biopython
import numpy as np
import pandas as pd
from collections import defaultdict
from typing import List, Dict, Tuple, Set
from functools import lru_cache
import multiprocessing as mp
from itertools import islice
import mmap
from array import array
from bisect import bisect_left, bisect_right
from Bio import SeqIO
from Bio.Seq import Seq
import itertools
import gzip
import math
from collections import defaultdict
# Mount Google Drive to access data files
from google.colab import drive
drive.mount('/content/drive')
# Defining all the file paths
REFERENCE_PATH = '/content/drive/MyDrive/chrX_bwt/chrX.fa'
READS_PATH = '/content/drive/MyDrive/chrX_bwt/reads'
EXON_LOCATIONS_PATH = '/content/drive/MyDrive/chrX_bwt/chrX_map.txt'
BWT_LAST_COL_PATH = '/content/drive/MyDrive/chrX_bwt/chrX_last_col.txt'
def process_reference_file(reference_path):
# Load the reference sequence from the FASTA file
with open(reference_path, 'r') as f:
reference_sequence = ''.join(line.strip() for line in f if not line.startswith('>'))
return reference_sequence
def process_reads_file(reads_path):
# Load reads from a text file
with open(reads_path, 'r') as f:
reads = [line.strip().replace('N', 'A') for line in f]
return reads
def process_exon_locations(exon_locations_path):
# Read the exon locations from a tab-separated file
exon_locations = pd.read_csv(exon_locations_path, sep='\t')
return exon_locations
def read_bwt_last_column(bwt_last_col_path):
# Read the BWT last column data
with open(bwt_last_col_path, 'r') as f:
last_column = []
rank_dict = defaultdict(list)
counts = []
for line in f:
line = line.strip()
last_column.append(line)
rank_dict[line].append(len(rank_dict[line]) + 1)
counts.append(len(rank_dict[line]))
return last_column, rank_dict, counts
def find_matches_with_mismatches(read, reference_sequence, last_column, rank_dict, counts):
matches = []
read_length = len(read)
reference_length = len(reference_sequence)
for i in range(reference_length - read_length + 1):
substring = reference_sequence[i:i + read_length]
mismatch_count = sum(1 for a, b in zip(substring, read) if a != b)
if mismatch_count <= 2:
matches.append(i)
return matches
def count_gene_mappings(matches, red_exons, green_exons):
red_count = 0
green_count = 0
for match in matches:
in_red = any(start <= match <= end for start, end in red_exons)
in_green = any(start <= match <= end for start, end in green_exons)
if in_red and in_green:
red_count += 0.5
green_count += 0.5
elif in_red:
red_count += 1
elif in_green:
green_count += 1
return red_count, green_count
def calculate_configuration_probabilities(total_red_count, total_green_count):
# probabilities for the different configurations
total_count = total_red_count + total_green_count
probabilities = {}
if total_count > 0:
probabilities['red'] = total_red_count / total_count
probabilities['green'] = total_green_count / total_count
return probabilities
def process_exon_locations(exon_locations_path):
exon_locations = pd.read_csv(exon_locations_path, sep='\t', header=None, names=['Gene', 'Start', 'End'])
print("Exon Locations DataFrame:")
print(exon_locations.head()) # Print the first few rows of the DataFrame
print("Columns:", exon_locations.columns)
return exon_locations
def read_exon_locations(exon_locations):
print("\nReading exon locations...")
red_exons = [(row['Start'], row['End']) for idx, row in exon_locations[exon_locations['Gene'] == 'red'].iterrows()]
green_exons = [(row['Start'], row['End']) for idx, row in exon_locations[exon_locations['Gene'] == 'green'].iterrows()]
return red_exons, green_exons
def align_reads(reads, reference_sequence, last_column, rank_dict, counts, red_exons, green_exons):
total_red_count = 0
total_green_count = 0
print("Aligning reads and counting gene mappings...")
for i, read in enumerate(reads):
if i % 1000 == 0:
print(f"Processed {i} reads...")
matches = find_matches_with_mismatches(read, reference_sequence, last_column, rank_dict, counts)
red_count, green_count = count_gene_mappings(matches, red_exons, green_exons)
total_red_count += red_count
total_green_count += green_count
return total_red_count, total_green_count
def calculate_probabilities(total_red_count, total_green_count):
print("\nCalculating configuration probabilities...")
return calculate_configuration_probabilities(total_red_count, total_green_count)
def main():
reference_sequence = process_reference_file(REFERENCE_PATH)
reads = process_reads_file(READS_PATH)
exon_locations = process_exon_locations(EXON_LOCATIONS_PATH)
red_exons, green_exons = read_exon_locations(exon_locations)
last_column, rank_dict, counts = read_bwt_last_column(BWT_LAST_COL_PATH)
total_red_count, total_green_count = align_reads(reads, reference_sequence, last_column, rank_dict, counts, red_exons, green_exons)
probabilities = calculate_probabilities(total_red_count, total_green_count)
# Output the most likely configuration
most_likely = max(probabilities.items(), key=lambda x: x[1])
print("\nResults:")
print(f"Total red gene count: {total_red_count}")
print(f"Total green gene count: {total_green_count}")
print(f"\nMost likely configuration: {most_likely[0]}")
print("\nConfiguration probabilities:")
for config, prob in probabilities.items():
print(f"{config}: {prob}")
if __name__ == "__main__":
main()