Source code for genetic_collections.ncbi_functions

import requests
from lxml import objectify
from collections import namedtuple
from operator import itemgetter
import re


def ncbi_inst_search(search_term, api_key = None):

    search_url = 'https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi'
    search_params = {'term': search_term,
                     'db': 'biocollections',
                     'usehistory': 'y',
                     'retmax': 10000000}
    if api_key:
        search_params['api_key'] = api_key
    r = requests.get(search_url, params=search_params)
    search_results = objectify.fromstring(r.content)
    web_env = search_results.WebEnv.text
    query_key = search_results.QueryKey.text
    result_count = int(search_results.Count.text)
    print('{} matching results found.'.format(result_count))

    parsed_results = []
    if result_count > 0:
        print('Fetching biocollection entries.')
        ret_start = 0
        batch_size = 250
        while ret_start < result_count:
            url_base = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi"
            params = {'db': 'biocollections',
                      'rettype': 'docsum',
                      'retmode': 'xml',
                      'query_key': query_key,
                      'WebEnv': web_env,
                      'retstart': ret_start,
                      'retmax': batch_size}
            if api_key:
                params['api_key'] = api_key
            r = requests.get(url_base, params=params)
            result_list = parse_biocollection_xml(r.content)
            parsed_results += result_list
            ret_start += batch_size
        for inst in parsed_results:
            icode = inst['Institution Code']
            gb_results = gb_search(inst_code = icode)
            inst['gb_count'] = gb_results.result_count
        parsed_results = sorted(parsed_results, key=itemgetter('gb_count'),
                                reverse=True)

    return parsed_results

def parse_biocollection_xml(xml):
    biocoll_url_base = 'https://www.ncbi.nlm.nih.gov/biocollections'
    xml_results = objectify.fromstring(xml)
    result_list = []
    for inst in xml_results.DocumentSummarySet.DocumentSummary:
        parsed_result = {}
        parsed_result['NCBI Link'] = '{}/{}'.format(biocoll_url_base,
                                                    inst.idbid.text)
        parsed_result['Institution Name'] = inst.iname.text
        parsed_result['Institution Code'] = inst.icode.text
        parsed_result['Country'] = inst.country.text
        parsed_result['Collection Type'] = inst.coltype.text
        result_list.append(parsed_result)
    return result_list

def gb_search(format='variable', api_key = None, **kwargs):

    search_url = 'https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi'

    if 'raw_query' in kwargs:
        search_term = kwargs['raw_query']
    elif 'inst_code' in kwargs or 'taxon' in kwargs:
        search_term_list = []
        if 'inst_code' in kwargs:
            inst_search = '"collection {}"[prop]'.format(kwargs['inst_code'])
            search_term_list.append(inst_search)
        if 'taxon' in kwargs:
            taxon_search = '"{}"[organism]'.format(kwargs['taxon'])
            search_term_list.append(taxon_search)
        search_term = ' AND '.join(search_term_list)
    else:
        print('You must provide something to search on.')
        return

    search_params = {'term': search_term,
                     'db': 'nuccore',
                     'usehistory': 'y',
                     'retmax': 10000000}
    if api_key:
        search_params['api_key'] = api_key
    
    # Return GI number (default) or accession.version (idtype='acc')
    if ('idtype' in kwargs):
        if kwargs['idtype'] == 'acc':
            search_params['idtype'] = 'acc'
        else:
            print('Invalid idtype specified, returning default GI number.')

    r = requests.get(search_url, params=search_params)
    search_results = objectify.fromstring(r.content)
    id_list = [id_entry.text for id_entry in search_results.IdList.iterchildren()]
    web_env = search_results.WebEnv.text
    query_key = search_results.QueryKey.text
    result_count = int(search_results.Count.text)
    translated_query = search_results.QueryTranslation.text

    if format == 'variable':
        Result = namedtuple('SearchResults',['web_env','query_key',
                                             'result_count','id_list'])
        result = Result(web_env, query_key, result_count, id_list)
        return result
    elif format == 'result_count':
        base_url = 'https://www.ncbi.nlm.nih.gov/nuccore/?term='
        entrez_url = base_url + requests.compat.quote_plus(translated_query)
        report = 'Your search found {} hits in GenBank'.format(result_count)
        print(report)
        url_report = 'You can see you search results online at {}'.format(entrez_url)
        print(url_report)
    elif format == 'id_list':
        if 'id_list_file' in kwargs:
            id_list_file = kwargs['id_list_file']
        else:
            id_list_file = 'gb_search_result_ids.txt'
        with open(id_list_file, 'w') as outfile:
            for id in id_list:
                outfile.write(id + '\n')
            report = '{} matching GenBank ids saved in {}'.format(result_count,
                                                                  id_list_file)
            print(report)
    else:
        print('Not a valid format')

    return

[docs]def gb_fetch_from_id_list(id_list, batch_size=250, api_key = None): """ Orchestrates making calls to the NCBI efetch service, and passes off the XML to the parse_fetch_results function. :param list id_list: List of GenBank IDs -- this can be either GIs or Accessions :param int batch_size: The number of results to request at a time -- the higher the better, but too large of a result set causes errors :param str api_key: A personal NCBI API key, obtained by creating an NCBI account. An API key is not required, but supplying one will increase API call rate to NCBI from three per second to 10 per second. :rtype: list of dictionaries """ result_count = len(id_list) parsed_results = [] i = 0 while i < result_count: list_for_url = ','.join(id_list[i : i+batch_size]) url_base = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi" params = {'db': 'nuccore', 'rettype': 'gb', 'retmode': 'xml', 'id': list_for_url} if api_key: params['api_key'] = api_key r = requests.get(url_base, params=params) result_list = gb_parse_xml_fetch_results(r.content) parsed_results += result_list i += batch_size return parsed_results
def gb_parse_xml_fetch_results(gb_xml): target_pieces = ['specimen_voucher','country','lat_lon','collection_date', 'collected_by','identified_by','bio_material', 'culture_collection','PCR_primers'] result_list = [] huge_parser = objectify.makeparser(huge_tree=True) xml_results = objectify.fromstring(gb_xml, huge_parser) for gb in xml_results.GBSeq: result = {} try: result['accession'] = gb['GBSeq_primary-accession'].text result['scientific_name'] = gb['GBSeq_organism'].text result['publish_date'] = gb['GBSeq_create-date'].text result['update_date'] = gb['GBSeq_update-date'].text result['keyword'] = '' if hasattr(gb, 'GBSeq_xrefs'): for xref in gb['GBSeq_xrefs']['GBXref']: if xref['GBXref_dbname'].text == 'BioProject': result['bioproject'] = xref['GBXref_id'].text elif xref['GBXref_dbname'].text == 'Sequence Read Archive': result['sra'] = xref['GBXref_id'].text elif xref['GBXref_dbname'].text == 'BioSample': result['biosample'] = xref['GBXref_id'].text if hasattr(gb, 'GBSeq_keywords'): for keyword in gb['GBSeq_keywords'].iterchildren(): if keyword.text == 'BARCODE': result['keyword'] = 'BARCODE' result['seq_len'] = gb.GBSeq_length.text for ref in gb['GBSeq_references']['GBReference']: if hasattr(ref, 'GBReference_title'): if ref['GBReference_title'] == 'Direct Submission': if hasattr(ref, 'GBReference_authors'): result['submit_authors'] = '; '.join([author.text for author in ref['GBReference_authors']['GBAuthor']]) submit_inst_text = ref['GBReference_journal'].text inst_split = re.split('[\(\)]',submit_inst_text) result['submit_date'] = inst_split[1] result['submit_inst'] = inst_split[2] for feature in gb['GBSeq_feature-table'].iterchildren(): if feature.GBFeature_key.text == 'source': for feature_qual in feature.GBFeature_quals.iterchildren(): if feature_qual.GBQualifier_name.text == 'db_xref': if 'taxon' in feature_qual.GBQualifier_value.text: result['taxid'] = str(feature_qual.GBQualifier_value.text.split(":")[1]) elif 'BOLD' in feature_qual.GBQualifier_value.text: result['bold_id'] = str(feature_qual.GBQualifier_value.text.split(":")[1]) elif feature_qual.GBQualifier_name.text in target_pieces: if feature_qual.GBQualifier_name.text in result: result[feature_qual.GBQualifier_name.text] += ';' + str(feature_qual.GBQualifier_value.text) else: result[feature_qual.GBQualifier_name.text] = str(feature_qual.GBQualifier_value.text) except: problem_child = gb['GBSeq_primary-accession'].text print('{} could not be parsed'.format(problem_child)) result_list.append(result) return result_list
[docs]def ncbi_taxonomy(gb_fetch_results, batch_size=250, api_key = None): """ Orchestrates making calls to the NCBI efetch service, querying the NCBI Taxonomy database for a list of NCBI taxids. Passes off the XML to the ncbi_parse_taxonomy_xml function. :param gb_fetch_results_list: The unprocessed results of a previous gb_fetch_from_id_list :param int batch_size: The number of results to request at a time -- the higher the better, but too large of a result set causes errors :param str api_key: A personal NCBI API key, obtained by creating an NCBI account. An API key is not required, but supplying one will increase API call rate to NCBI from three per second to 10 per second. :rtype: list of dictionaries """ taxid_list = set() for fetchresult in gb_fetch_results: try: taxid_list.add(fetchresult['taxid']) except (KeyError, TypeError) as e: continue taxid_list = list(taxid_list) result_count = len(taxid_list) if result_count == 0: print('Must supply gb_fetch_from_id_list() results.') return parsed_results = [] i = 0 while i < result_count: taxids = ','.join(taxid_list[i : i+batch_size]) fetch_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi" params = {'db': 'taxonomy', 'retmode': 'xml', 'id': taxids} if api_key: params['api_key'] = api_key r = requests.get(fetch_url, params=params) result_list = ncbi_parse_taxonomy_xml(r.content) parsed_results += result_list i += batch_size return parsed_results
def ncbi_parse_taxonomy_xml(tax_xml): result_keys = ['taxid', 'scientific_name', 'rank', 'kingdom', 'phylum', 'class', 'order', 'family', 'genus', 'full_lineage'] result_list = [] huge_parser = objectify.makeparser(huge_tree=True) xml_results = objectify.fromstring(tax_xml, huge_parser) for tx in xml_results.Taxon: result = {} try: result['taxid'] = tx['TaxId'].text result['scientific_name'] = tx['ScientificName'].text result['rank'] = tx['Rank'].text if hasattr(tx, 'Lineage'): result['full_lineage'] = tx['Lineage'].text if hasattr(tx, 'LineageEx'): for child in tx['LineageEx'].iterchildren(): if child.Rank.text in result_keys: result[child.Rank.text] = child.ScientificName.text except: problem_child = tx['TaxId'].text print('{} could not be parsed'.format(problem_child)) result_list.append(result) return result_list