Skip to content

Handle non-paper-dois // direct zenodo dois #33

Open
@jring-o

Description

@jring-o

Software citation is not efficient. OpenAlex has the papers we want to analyze, but doesn't always index Zenodo DOIs, which are often given to software and datasets. If it does have a Zenodo DOI, it is not always up-to-date. Zenodo provides different DOIs for each version of the software, along with a concept DOI for the entire software (all versions). GitHub repos have the software we need, but don't always have the most up-to-date DOI, and don't always use the concept DOI. It is rare that taking a Zenodo DOI from a github repo directly to OpenAlex will result in identifying the papers that cite the software.

Added: It doesn't seem possible to pull citing and cited works from Zenodo, DataCite, or any other indexing APIs that I can find. Please let me know if I'm missing something!

So far, my workaround is to take the Zenodo DOI to Zenodo, and to aggregate all the DOIs related to the software, then to take those to OpenAlex to identify which of those DOIs are indexed by OpenAlex, and then to analyze all those DOIs and combine the results.

Here is the code for much of this logic, including an attempt at title matching, from a local experiment. It will need to be integrated into MOSS:

# --- Zenodo Functions ---

def _extract_recid_from_doi(doi_cleaned: str) -> Optional[str]:
    """Extracts the numeric record ID from a Zenodo DOI."""
    if not doi_cleaned.lower().startswith("10.5281/zenodo."):
        return None
    match = re.search(r'zenodo\.(\d+)$', doi_cleaned)
    if match:
        return match.group(1)
    logger.warning(f"Could not extract record ID from Zenodo DOI: {doi_cleaned}")
    return None

async def _fetch_zenodo_record_by_recid(client: httpx.AsyncClient, recid: str) -> Optional[Dict[str, Any]]:
    """Fetches a single Zenodo record directly by its record ID."""
    record_url = f"{ZENODO_API_BASE}/records/{recid}"
    headers = {"Accept": "application/json"}
    logger.info(f"Querying Zenodo API for record ID: {recid}")
    try:
        response = await client.get(record_url, headers=headers, timeout=15.0)
        logger.info(f"HTTP Request: {response.request.method} {response.request.url} \"HTTP/{response.http_version} {response.status_code} {response.reason_phrase}\"")
        if response.status_code == 404:
            logger.warning(f"Zenodo record ID {recid} not found (404).")
            return None
        response.raise_for_status()
        zenodo_record = response.json()
        logger.info(f"Successfully fetched Zenodo record for ID {recid}.")
        return zenodo_record
    except httpx.HTTPStatusError as e:
        logger.error(f"HTTP error fetching Zenodo record ID {recid}: {e.response.status_code} - {e.response.text[:200]}")
        return None
    except httpx.RequestError as e:
        logger.error(f"Network error fetching Zenodo record ID {recid}: {e}")
        return None
    except Exception as e:
        logger.exception(f"Unexpected error processing Zenodo record ID {recid}: {e}", exc_info=True)
        return None

async def _fetch_zenodo_versions_by_conceptrecid(client: httpx.AsyncClient, conceptrecid: str) -> List[str]:
    """Fetches all version DOIs associated with a Zenodo concept record ID."""
    search_url = f"{ZENODO_API_BASE}/records"
    params = {
        "q": f'conceptrecid:"{conceptrecid}"',
        "all_versions": "true", # Ensure all versions are included
        "size": 100 # Assume fewer than 100 versions, adjust if needed
    }
    headers = {"Accept": "application/json"}

    logger.info(f"Querying Zenodo API for all versions of conceptrecid: {conceptrecid}")
    version_dois: List[str] = []
    try:
        response = await client.get(search_url, params=params, headers=headers, timeout=20.0) # Longer timeout for potentially larger response
        logger.info(f"HTTP Request: {response.request.method} {response.request.url} \"HTTP/{response.http_version} {response.status_code} {response.reason_phrase}\"")

        response.raise_for_status()
        data = response.json()
        hits = data.get("hits", {}).get("hits", [])

        if not hits:
            logger.warning(f"Zenodo conceptrecid search for '{conceptrecid}' returned no hits.")
            return []

        for record in hits:
            if isinstance(record, dict):
                 record_doi = record.get('doi') # The DOI for this specific version
                 if record_doi and isinstance(record_doi, str):
                      cleaned_doi = re.sub(r'^(https?://)?(dx\.)?doi\.org/', '', record_doi, flags=re.IGNORECASE).strip()
                      if cleaned_doi:
                          version_dois.append(cleaned_doi)

        logger.info(f"Found {len(version_dois)} version DOIs for conceptrecid {conceptrecid}.")
        return list(set(version_dois)) # Return unique DOIs

    except httpx.HTTPStatusError as e:
        logger.error(f"HTTP error fetching Zenodo versions for conceptrecid {conceptrecid}: {e.response.status_code} - {e.response.text[:200]}")
        return []
    except httpx.RequestError as e:
        logger.error(f"Network error fetching Zenodo versions for conceptrecid {conceptrecid}: {e}")
        return []
    except Exception as e:
        logger.exception(f"Unexpected error processing Zenodo versions for conceptrecid {conceptrecid}: {e}", exc_info=True)
        return []


# --- OpenAlex Functions ---

def _normalize_openalex_doi(oa_doi_url: Optional[str]) -> Optional[str]:
    """Helper to extract clean DOI from OpenAlex URL format."""
    if not oa_doi_url:
        return None
    # Handles https://doi.org/DOI_HERE format
    match = re.search(r'doi\.org/(.+)', oa_doi_url, re.IGNORECASE)
    if match:
        return match.group(1).strip()
    logger.warning(f"Could not extract DOI from OpenAlex URL: {oa_doi_url}")
    return None

async def search_openalex_by_title(client: httpx.AsyncClient, title: str) -> List[Dict[str, Any]]:
    """
    Searches OpenAlex works by title.
    """
    if not title:
        logger.warning("Cannot search OpenAlex with an empty title.")
        return []
    # Encode title for URL query parameter
    encoded_title = quote_plus(title)
    search_url = f"{OPENALEX_API_BASE}/works"
    params = {
        "filter": f"title.search:{encoded_title}",
        "mailto": OPENALEX_EMAIL,
        "per-page": 5 # Limit results for title search to avoid ambiguity overload
    }
    logger.info(f"Searching OpenAlex for works with title containing: '{title}'")
    try:
        response = await client.get(search_url, params=params, timeout=20.0) # Allow slightly longer timeout for search
        logger.info(f"HTTP Request: {response.request.method} {response.request.url} \"HTTP/{response.http_version} {response.status_code} {response.reason_phrase}\"") # Log request/response info
        response.raise_for_status()
        data = response.json()
        results = data.get('results', [])
        logger.info(f"OpenAlex title search for '{title}' returned {len(results)} results.")
        return results
    except httpx.HTTPStatusError as e:
        logger.error(f"HTTP error searching OpenAlex by title '{title}': {e.response.status_code} - {e.response.text[:200]}")
        return []
    except httpx.RequestError as e:
        logger.error(f"Network error searching OpenAlex by title '{title}': {e}")
        return []
    except Exception as e:
        logger.exception(f"Unexpected error searching OpenAlex by title '{title}': {e}", exc_info=True)
        return []

async def _try_direct_openalex_doi_lookup(client: httpx.AsyncClient, doi_cleaned: str) -> Optional[Dict[str, Any]]:
    """Internal helper to attempt direct OpenAlex DOI lookup."""
    logger.info(f"Attempting direct OpenAlex lookup for DOI: {doi_cleaned}")
    # DOIs might contain slashes which need encoding for the path segment
    encoded_doi = quote_plus(doi_cleaned)
    work_url = f"{OPENALEX_API_BASE}/works/doi:{encoded_doi}" # Correctly use encoded DOI here
    params = {"mailto": OPENALEX_EMAIL}
    try:
        response = await client.get(work_url, params=params, timeout=15.0)
        logger.info(f"HTTP Request: {response.request.method} {response.request.url} \"HTTP/{response.http_version} {response.status_code} {response.reason_phrase}\"") # Log request/response info
        if response.status_code == 404:
            logger.info(f"Direct OpenAlex lookup failed: DOI {doi_cleaned} not found (404).")
            return None
        response.raise_for_status() # Raise for other errors
        work_data = response.json()
        logger.info(f"Successfully fetched work data via direct OpenAlex DOI lookup for {doi_cleaned}.")
        return work_data
    except httpx.HTTPStatusError as e:
        # Log specific errors but return None to indicate lookup failure
        logger.error(f"HTTP error during direct OpenAlex DOI lookup for {doi_cleaned}: {e.response.status_code} - {e.response.text[:200]}")
        return None
    except httpx.RequestError as e:
        logger.error(f"Network error during direct OpenAlex DOI lookup for {doi_cleaned}: {e}")
        return None
    except Exception as e:
        logger.exception(f"Unexpected error during direct OpenAlex DOI lookup for {doi_cleaned}: {e}", exc_info=True)
        return None

async def _try_openalex_lookup_by_filter(client: httpx.AsyncClient, oa_filter: str) -> Optional[Dict[str, Any]]:
    """Internal helper to attempt OpenAlex lookup using a generic filter."""
    # NOTE: This function is no longer used in the preferred Zenodo flow, but kept for potential future use.
    logger.info(f"Attempting OpenAlex lookup using filter: {oa_filter}")
    work_url = f"{OPENALEX_API_BASE}/works"
    params = {"filter": oa_filter, "mailto": OPENALEX_EMAIL, "per-page": 1} # Get only 1 result
    try:
        response = await client.get(work_url, params=params, timeout=15.0)
        logger.info(f"HTTP Request: {response.request.method} {response.request.url} \"HTTP/{response.http_version} {response.status_code} {response.reason_phrase}\"")
        if response.status_code == 404: logger.info(f"OpenAlex lookup using filter '{oa_filter}' returned 404."); return None
        response.raise_for_status()
        data = response.json()
        results = data.get('results', [])
        if results and isinstance(results, list) and len(results) > 0:
             work_data = results[0]
             logger.info(f"Successfully fetched work data via OpenAlex filter '{oa_filter}'. Work ID: {work_data.get('id')}")
             return work_data
        else: logger.info(f"OpenAlex lookup using filter '{oa_filter}' returned 0 results."); return None
    except httpx.HTTPStatusError as e: logger.error(f"HTTP error during OpenAlex lookup using filter '{oa_filter}': {e.response.status_code} - {e.response.text[:200]}"); return None
    except httpx.RequestError as e: logger.error(f"Network error during OpenAlex lookup using filter '{oa_filter}': {e}"); return None
    except Exception as e: logger.exception(f"Unexpected error during OpenAlex lookup using filter '{oa_filter}': {e}", exc_info=True); return None


async def get_openalex_work_data(client: httpx.AsyncClient, doi: str) -> Optional[Dict[str, Any]]:
    """
    Retrieves the OpenAlex work data for a given DOI.
    Handles Zenodo DOIs robustly by trying direct DOI lookup, Zenodo concept DOI lookup,
    Zenodo version DOI lookups, and finally falling back to title search (via Zenodo metadata).

    Args:
        client: An httpx.AsyncClient instance.
        doi: The DOI string (identifier part only, e.g., 10.xxxx/...).

    Returns:
        The parsed JSON dictionary representing the best matching OpenAlex work object,
        or None if not found or an error occurs.
    """
    if not doi:
        logger.warning("No DOI provided to get_openalex_work_data.")
        return None

    doi_cleaned = re.sub(r'^(https?://)?(dx\.)?doi\.org/', '', doi, flags=re.IGNORECASE).strip()
    if not doi_cleaned:
         logger.warning(f"Invalid DOI provided after cleaning: '{doi}'")
         return None

    # --- Step 1: Try Direct Lookup (for any DOI) ---
    direct_oa_work = await _try_direct_openalex_doi_lookup(client, doi_cleaned)
    if direct_oa_work:
        return direct_oa_work

    # --- Step 2: Handle Zenodo DOI if direct lookup failed ---
    is_zenodo = doi_cleaned.lower().startswith("10.5281/zenodo.")
    if is_zenodo:
        logger.info(f"Direct OpenAlex lookup failed for Zenodo DOI {doi_cleaned}. Attempting Zenodo API/Title lookup...")

        # --- Step 2a: Fetch Zenodo Record using RECID ---
        recid = _extract_recid_from_doi(doi_cleaned)
        zenodo_record = None
        if recid:
            zenodo_record = await _fetch_zenodo_record_by_recid(client, recid)
        else:
             logger.warning(f"Could not extract recid from Zenodo DOI {doi_cleaned}, cannot fetch record by ID.")

        if not zenodo_record or not isinstance(zenodo_record, dict):
            logger.warning(f"Could not retrieve valid Zenodo record for DOI {doi_cleaned} (recid: {recid}). Cannot proceed further.")
            return None # Cannot proceed without Zenodo info

        # --- Step 2b: Extract Concept DOI and Try Lookup ---
        conceptdoi = zenodo_record.get('conceptdoi')
        conceptdoi_cleaned: Optional[str] = None # Keep track of cleaned concept DOI
        if conceptdoi and isinstance(conceptdoi, str):
            conceptdoi_cleaned = re.sub(r'^(https?://)?(dx\.)?doi\.org/', '', conceptdoi, flags=re.IGNORECASE).strip()
            if conceptdoi_cleaned and conceptdoi_cleaned != doi_cleaned: # Avoid re-checking original
                 logger.info(f"Found Zenodo concept DOI: {conceptdoi_cleaned}. Attempting direct OpenAlex lookup...")
                 concept_oa_work = await _try_direct_openalex_doi_lookup(client, conceptdoi_cleaned)
                 if concept_oa_work:
                      logger.info(f"Successfully found OpenAlex work via Zenodo concept DOI {conceptdoi_cleaned} (derived from {doi_cleaned}).")
                      return concept_oa_work
            else: logger.debug(f"Concept DOI '{conceptdoi}' is same as original or invalid.")
        else: logger.info(f"No concept DOI found in Zenodo record for {doi_cleaned}.")

        # --- Step 2c: Fetch All Version DOIs and Try Lookup ---
        conceptrecid = zenodo_record.get('conceptrecid')
        if conceptrecid and isinstance(conceptrecid, str):
             logger.info(f"Concept DOI lookup failed. Fetching all version DOIs for conceptrecid: {conceptrecid}")
             version_dois = await _fetch_zenodo_versions_by_conceptrecid(client, conceptrecid)
             # Exclude original DOI and concept DOI if found
             version_dois_to_try = [vd for vd in version_dois if vd != doi_cleaned and vd != conceptdoi_cleaned]

             if version_dois_to_try:
                  logger.info(f"Attempting direct OpenAlex lookup for {len(version_dois_to_try)} other version DOI(s)...")
                  tasks = [_try_direct_openalex_doi_lookup(client, version_doi) for version_doi in version_dois_to_try]
                  results = await asyncio.gather(*tasks)
                  for i, version_oa_work in enumerate(results):
                       if version_oa_work:
                            version_doi = version_dois_to_try[i]
                            logger.info(f"Successfully found OpenAlex work via Zenodo version DOI {version_doi} (derived from {doi_cleaned}).")
                            return version_oa_work
                  logger.info(f"Direct lookup failed for all other version DOIs derived from {doi_cleaned}.")
             else:
                  logger.info(f"No other unique version DOIs found to check for conceptrecid {conceptrecid}.")
        else: logger.warning(f"No conceptrecid found in Zenodo record for {doi_cleaned}, cannot fetch versions.")

        # --- Step 2d: Fallback to Title Search ---
        logger.info(f"Falling back to OpenAlex title search for Zenodo DOI {doi_cleaned}...")
        title_to_search: Optional[str] = None
        try:
            metadata = zenodo_record.get('metadata')
            if metadata and isinstance(metadata, dict):
                 title_to_search = metadata.get('title')
        except Exception as e:
            logger.error(f"Error extracting title from Zenodo metadata for {doi_cleaned}: {e}")

        if title_to_search and isinstance(title_to_search, str):
            logger.info(f"Using title '{title_to_search}' from Zenodo metadata for OpenAlex search.")
            oa_results = await search_openalex_by_title(client, title_to_search)
            if oa_results:
                selected_oa_work = oa_results[0] # Take first result
                selected_oa_doi_url = selected_oa_work.get('doi')
                selected_oa_doi = _normalize_openalex_doi(selected_oa_doi_url)
                # Log verification difference
                if selected_oa_doi != doi_cleaned:
                     logger.warning(f"OpenAlex work found by title for original DOI {doi_cleaned} has a different primary DOI: {selected_oa_doi}. Proceeding with the OpenAlex record.")
                else:
                     logger.info(f"OpenAlex work found by title search matched the original DOI: {doi_cleaned}")
                logger.info(f"Successfully found matching OpenAlex work (ID: {selected_oa_work.get('id')}) for original DOI {doi_cleaned} via fallback title search.")
                return selected_oa_work
            else:
                 logger.warning(f"OpenAlex title search yielded no results for title '{title_to_search}' (from original DOI {doi_cleaned}).")
        else:
            logger.warning(f"Could not extract title from Zenodo metadata for {doi_cleaned}, cannot perform title search.")

        # If all Zenodo strategies fail
        logger.warning(f"Failed to find matching OpenAlex work for Zenodo DOI {doi_cleaned} via all methods.")
        return None

    # --- Step 3: If not Zenodo and direct lookup failed ---
    else:
        logger.info(f"Direct OpenAlex lookup failed for non-Zenodo DOI {doi_cleaned}.")
        return None


async def get_openalex_hierarchy_from_work_data(work_data: Optional[Dict[str, Any]]) -> List[Dict[str, Optional[str]]]:
    """
    Extracts the hierarchy (Domain, Field, Subfield, Topic) for *all* topics
    listed within pre-fetched OpenAlex work data.
    """
    hierarchies: List[Dict[str, Optional[str]]] = []
    if not work_data:
        logger.debug("No work data provided to extract hierarchies from.")
        return hierarchies
    work_id = work_data.get('id', 'UNKNOWN_ID')
    topics = work_data.get('topics', [])
    logger.debug(f"Extracting hierarchies from {len(topics)} topics for work {work_id}.")
    for topic_data in topics:
        if not isinstance(topic_data, dict):
            logger.warning(f"Skipping invalid topic data item for work {work_id}: {topic_data}")
            continue
        # Extract names safely using .get()
        topic_name = topic_data.get('display_name')
        subfield_data = topic_data.get('subfield', {})
        subfield_name = subfield_data.get('display_name') if isinstance(subfield_data, dict) else None
        field_data = topic_data.get('field', {})
        field_name = field_data.get('display_name') if isinstance(field_data, dict) else None
        domain_data = topic_data.get('domain', {})
        domain_name = domain_data.get('display_name') if isinstance(domain_data, dict) else None
        current_hierarchy = {
            'openalex_domain': domain_name,
            'openalex_field': field_name,
            'openalex_subfield': subfield_name,
            'openalex_topic': topic_name
        }
        hierarchies.append(current_hierarchy)
    if not hierarchies:
         logger.info(f"No valid topics found in OpenAlex work data for {work_id}")
    else:
         logger.debug(f"Finished extracting {len(hierarchies)} hierarchies for work {work_id}")
    return hierarchies

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions