#!/usr/bin/env python # encoding: utf-8 import csv, sys import io, textwrap, itertools from Bio import SeqIO from Bio.Seq import Seq from Bio.SeqRecord import SeqRecord csv.field_size_limit(sys.maxsize) common_nucleotide_set = {'A', 'T', 'C', 'G', 'U', 'N'} # not {'O', 'U', 'Z', 'J', 'B'} # Common amino acids common_amino_acid_set = {'R', 'X', 'S', 'G', 'W', 'I', 'Q', 'A', 'T', 'V', 'K', 'Y', 'C', 'N', 'L', 'F', 'D', 'M', 'P', 'H', 'E'} def clean_seq(protein_id, seq): seq = seq.upper() new_seq = "" has_invalid_char = False invalid_char_set = set() for ch in seq: if 'A' <= ch <= 'Z' and ch not in ['J']: new_seq += ch else: invalid_char_set.add(ch) has_invalid_char = True if has_invalid_char: print("id: %s. Seq: %s" % (protein_id, seq)) print("invalid char set:", invalid_char_set) return new_seq def file_reader(filename, header=True, header_filter=True): if filename.endswith(".fa") or filename.endswith(".fas") or filename.endswith(".fasta"): return fasta_reader(filename) elif filename.endswith(".csv"): return csv_reader(filename, header=True, header_filter=True) elif filename.endswith(".tsv"): return tsv_reader(filename, header=True, header_filter=True) else: return txt_reader(filename, header=header, header_filter=header_filter) def txt_reader(handle, header=True, header_filter=True): ''' csv 读取器,适合大文件 :param handle: :param header: :param header_filter: 返回结果是否去掉头 :return: ''' handle = handle if isinstance(handle, io.TextIOWrapper) else open(handle, 'r') try: cnt = 0 for line in handle: cnt += 1 if header and header_filter and cnt == 1: continue yield line.strip() except Exception as e: raise StopIteration finally: if not handle.closed: handle.close() def tsv_reader(handle, header=True, header_filter=True): ''' csv 读取器,适合大文件 :param handle: :param header: :param header_filter: 返回结果是否去掉头 :return: ''' handle = handle if isinstance(handle, io.TextIOWrapper) else open(handle, 'r') try: reader = csv.reader(handle, delimiter="\t") cnt = 0 for row in reader: cnt += 1 if header and header_filter and cnt == 1: continue yield row except Exception as e: raise StopIteration finally: if not handle.closed: handle.close() def csv_reader(handle, header=True, header_filter=True): ''' csv 读取器,适合大文件 :param handle: :param header: :param header_filter: 返回结果是否去掉头 :return: ''' handle = handle if isinstance(handle, io.TextIOWrapper) else open(handle, 'r') try: # data = csv.reader((line.replace('\0','') for line in data_initial), delimiter=",") # reader = csv.reader(handle) reader = csv.reader((line.replace('\0', '') for line in handle)) cnt = 0 for row in reader: cnt += 1 if header and header_filter and cnt == 1: continue yield row except Exception as e: raise StopIteration finally: if not handle.closed: handle.close() def txt_writer(dataset, handle, header=None): ''' txt 写 :param dataset: 数据 :param handle: 文件 :param header: 头 :return: ''' ''' handle = handle if isinstance(handle, io.TextIOWrapper) else open(handle, 'w') try: if header: if isinstance(header, list): handle.write(",".join(header) + "\n") else: handle.write(header + "\n") print("header: %s" %header) for row in dataset: handle.write(str(row) + "\n") except Exception as e: raise e finally: if not handle.closed: handle.close() ''' with open(handle, "w") as wfp: if header: if isinstance(header, list): wfp.write(",".join(header) + "\n") else: wfp.write(header + "\n") for row in dataset: wfp.write(str(row) + "\n") def csv_writer(dataset, handle, header): ''' csv 写,适合大文件 :param dataset: 数据 :param handle: 文件 :param header: 头 :return: ''' handle = handle if isinstance(handle, io.TextIOWrapper) else open(handle, 'w') try: writer = csv.writer(handle) if header: writer.writerow(header) for row in dataset: writer.writerow(row) except Exception as e: raise e finally: if not handle.closed: handle.close() def fasta_reader(handle, width=None): """ Reads a FASTA file, yielding header, sequence pairs for each sequence recovered 适合大文件 args: :handle (str, pathliob.Path, or file pointer) - fasta to read from :width (int or None) - formats the sequence to have max `width` character per line. If <= 0, processed as None. If None, there is no max width. yields: :(header, sequence) tuples returns: :None """ FASTA_STOP_CODON = "*" handle = handle if isinstance(handle, io.TextIOWrapper) else open(handle, 'r') width = width if isinstance(width, int) and width > 0 else None try: header = None for is_header, group in itertools.groupby(handle, lambda line: line.startswith(">")): if is_header: header = group.__next__().strip() else: seq = ''.join(line.strip() for line in group).strip().rstrip(FASTA_STOP_CODON) if width is not None: seq = textwrap.fill(seq, width) yield header, seq except Exception as e: raise StopIteration finally: if not handle.closed: handle.close() def write_fasta(filepath, sequences): ''' write fasta file :param filepath: savepath :param sequences: fasta sequence(each item: [id, seq]) :return: ''' if sequences: with open(filepath, "w") as output_handle: if len(sequences[0]) > 1 and isinstance(sequences[0][0], str): for row in sequences: protein_id = row[0] seq = row[1] sequence = SeqRecord(Seq(seq, None), id=protein_id[1:] if protein_id and protein_id[0] == ">" else protein_id, description="") SeqIO.write(sequence, output_handle, "fasta") else: for sequence in sequences: SeqIO.write(sequence, output_handle, "fasta")