Source code for academic_observatory_api.server.elastic

# Copyright 2022 Curtin University
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Author: Aniek Roelofs

from typing import Optional, Tuple, Union

import pendulum
from connexion import request

from elasticsearch import Elasticsearch
from elasticsearch.exceptions import NotFoundError
from flask import jsonify, current_app
from academic_observatory_api.server.utils import (
    APIError,
    create_es_connection,
    create_search_body,
    list_available_index_dates,
    process_response,
    parse_args,
)
from typing import Dict


[docs]class ElasticsearchIndex: def __init__(self, es: Elasticsearch, agg: str, subagg: Optional[str], index_date: Optional[str]): self.es = es self.agg = agg self.subagg = subagg # Create alias from aggregate and subaggregate self.alias = self.set_alias() # Set index date index_dates = list_available_index_dates(es, self.alias) if index_date: self.index_date = index_date # Check if given index date is in available dates if self.index_date not in index_dates: index_dates_str = "\n".join(index_dates) raise APIError( { "code": "index_error", "description": f"Index not available for given index date: {index_date}.\nAvailable " f"index dates: {index_dates_str}", }, 500, ) else: if index_dates: self.index_date = index_dates[0] else: raise APIError({"code": "index_error", "description": "No dates available for given index"}, 500) @property
[docs] def name(self) -> str: """ :return: """ return f"{self.alias}-{self.index_date}"
@property
[docs] def agg_field(self) -> str: """ :return: """ return self.agg_mappings.get(self.agg)
@property
[docs] def subagg_field(self) -> Optional[str]: """ :return: """ return self.subagg_mappings.get(self.subagg)
@property
[docs] def agg_mappings(self) -> Dict[str, str]: """ :return: """ return { "author": "author_id", "country": "country_id", "funder": "funder_id", "group": "group_id", "institution": "institution_id", "journal": "journal_id", "publisher": "publisher_id", "region": "region_id", "subregion": "subregion_id",
} @property
[docs] def subagg_mappings(self) -> Dict[str, str]: """ :return: """ return { "access_types": "access_types_access_type", "countries": "countries_id", "disciplines": "disciplines_field", "events": "events_source", "funders": "funders_id", "groupings": "groupings_id", "institutions": "institutions_id", "journals": "journals_id", "members": "members_id", "metrics": None, "output_types": "output_types_output_type", "publishers": "publishers_id",
} @property
[docs] def invalid_combinations(self) -> Dict[str, list]: """ :return: """ return { "author": ["members"], "country": ["countries", "groupings", "institutions"], "funder": ["journals"], "group": ["countries", "groupings"], "institution": ["groupings", "members"], "journal": ["members", "publishers"], "publisher": ["journals", "members", "publishers"], "region": ["countries", "groupings", "institutions", "journals", "members"], "subregion": ["countries", "groupings", "institutions", "journals", "members"],
}
[docs] def set_alias(self) -> [bool, str]: """ :return: """ if self.subagg: # Check if combination of aggregate and subaggregate is valid if self.subagg in self.invalid_combinations[self.agg]: invalid_combinations_str = "\n".join(self.invalid_combinations[self.agg]) raise APIError( { "code": "index_error", "description": "Combination of agg and subagg is invalid.\nInvalid " f"subaggregates for '{self.agg}': {invalid_combinations_str}", }, status_code=403 ) alias = f"ao-{self.agg}-{self.subagg}" else: alias = f"ao-{self.agg}-unique-list" return alias
[docs]def query_elasticsearch(agg: str, subagg: Optional[str]) -> Union[dict, Tuple[str, int]]: """ :param agg: :param subagg: :return: """ agg_ids, subagg_ids, index_date, from_date, to_date, size, search_after, pit_id, pretty_print = parse_args() es = create_es_connection() es_index = ElasticsearchIndex(es, agg, subagg, index_date) search_body = create_search_body( es_index.agg_field, agg_ids, es_index.subagg_field, subagg_ids, from_date, to_date, size, search_after, pit_id ) if pit_id: try: res = es.search(body=search_body) except NotFoundError: raise APIError({"code": "invalid_pit", "description": "Point In Time timed out"}, 500) else: res = es.search(index=es_index.name, body=search_body) new_pit_id, search_after_text, results_data, took = process_response(res) number_total_results = res["hits"]["total"]["value"] results = { "version": "v1", "index": es_index.name, "pit": new_pit_id, "search_after": search_after_text, "returned_hits": len(results_data), "total_hits": number_total_results, "took": took, "results": results_data, } if pretty_print: current_app.config["JSONIFY_PRETTYPRINT_REGULAR"] = pretty_print return jsonify(results)
[docs]def get_pit_id(agg: str, subagg: Optional[str]) -> Union[dict, Tuple[str, int]]: """ :param agg: :param subagg: :return: """ index_date = request.args.get("index_date") keep_alive = f"{request.args.get('keep_alive', 1)}m" # Convert index date to YYYYMMDD format index_date = pendulum.parse(index_date).strftime("%Y%m%d") if index_date else None es = create_es_connection() es_index = ElasticsearchIndex(es, agg, subagg, index_date) pit_id = es.open_point_in_time(index=es_index.name, keep_alive=keep_alive)["id"] return {"pit_id": pit_id, "keep_alive": keep_alive, "index": es_index.name}