# Copyright 2022 Curtin University
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Author: Aniek Roelofs
from typing import Optional, Tuple, Union
import pendulum
from connexion import request
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import NotFoundError
from flask import jsonify, current_app
from academic_observatory_api.server.utils import (
APIError,
create_es_connection,
create_search_body,
list_available_index_dates,
process_response,
parse_args,
)
from typing import Dict
[docs]class ElasticsearchIndex:
def __init__(self, es: Elasticsearch, agg: str, subagg: Optional[str], index_date: Optional[str]):
self.es = es
self.agg = agg
self.subagg = subagg
# Create alias from aggregate and subaggregate
self.alias = self.set_alias()
# Set index date
index_dates = list_available_index_dates(es, self.alias)
if index_date:
self.index_date = index_date
# Check if given index date is in available dates
if self.index_date not in index_dates:
index_dates_str = "\n".join(index_dates)
raise APIError(
{
"code": "index_error",
"description": f"Index not available for given index date: {index_date}.\nAvailable "
f"index dates: {index_dates_str}",
},
500,
)
else:
if index_dates:
self.index_date = index_dates[0]
else:
raise APIError({"code": "index_error", "description": "No dates available for given index"}, 500)
@property
[docs] def name(self) -> str:
"""
:return:
"""
return f"{self.alias}-{self.index_date}"
@property
[docs] def agg_field(self) -> str:
"""
:return:
"""
return self.agg_mappings.get(self.agg)
@property
[docs] def subagg_field(self) -> Optional[str]:
"""
:return:
"""
return self.subagg_mappings.get(self.subagg)
@property
[docs] def agg_mappings(self) -> Dict[str, str]:
"""
:return:
"""
return {
"author": "author_id",
"country": "country_id",
"funder": "funder_id",
"group": "group_id",
"institution": "institution_id",
"journal": "journal_id",
"publisher": "publisher_id",
"region": "region_id",
"subregion": "subregion_id",
}
@property
[docs] def subagg_mappings(self) -> Dict[str, str]:
"""
:return:
"""
return {
"access_types": "access_types_access_type",
"countries": "countries_id",
"disciplines": "disciplines_field",
"events": "events_source",
"funders": "funders_id",
"groupings": "groupings_id",
"institutions": "institutions_id",
"journals": "journals_id",
"members": "members_id",
"metrics": None,
"output_types": "output_types_output_type",
"publishers": "publishers_id",
}
@property
[docs] def invalid_combinations(self) -> Dict[str, list]:
"""
:return:
"""
return {
"author": ["members"],
"country": ["countries", "groupings", "institutions"],
"funder": ["journals"],
"group": ["countries", "groupings"],
"institution": ["groupings", "members"],
"journal": ["members", "publishers"],
"publisher": ["journals", "members", "publishers"],
"region": ["countries", "groupings", "institutions", "journals", "members"],
"subregion": ["countries", "groupings", "institutions", "journals", "members"],
}
[docs] def set_alias(self) -> [bool, str]:
"""
:return:
"""
if self.subagg:
# Check if combination of aggregate and subaggregate is valid
if self.subagg in self.invalid_combinations[self.agg]:
invalid_combinations_str = "\n".join(self.invalid_combinations[self.agg])
raise APIError(
{
"code": "index_error",
"description": "Combination of agg and subagg is invalid.\nInvalid "
f"subaggregates for '{self.agg}': {invalid_combinations_str}",
}, status_code=403
)
alias = f"ao-{self.agg}-{self.subagg}"
else:
alias = f"ao-{self.agg}-unique-list"
return alias
[docs]def query_elasticsearch(agg: str, subagg: Optional[str]) -> Union[dict, Tuple[str, int]]:
"""
:param agg:
:param subagg:
:return:
"""
agg_ids, subagg_ids, index_date, from_date, to_date, size, search_after, pit_id, pretty_print = parse_args()
es = create_es_connection()
es_index = ElasticsearchIndex(es, agg, subagg, index_date)
search_body = create_search_body(
es_index.agg_field, agg_ids, es_index.subagg_field, subagg_ids, from_date, to_date, size, search_after, pit_id
)
if pit_id:
try:
res = es.search(body=search_body)
except NotFoundError:
raise APIError({"code": "invalid_pit", "description": "Point In Time timed out"}, 500)
else:
res = es.search(index=es_index.name, body=search_body)
new_pit_id, search_after_text, results_data, took = process_response(res)
number_total_results = res["hits"]["total"]["value"]
results = {
"version": "v1",
"index": es_index.name,
"pit": new_pit_id,
"search_after": search_after_text,
"returned_hits": len(results_data),
"total_hits": number_total_results,
"took": took,
"results": results_data,
}
if pretty_print:
current_app.config["JSONIFY_PRETTYPRINT_REGULAR"] = pretty_print
return jsonify(results)
[docs]def get_pit_id(agg: str, subagg: Optional[str]) -> Union[dict, Tuple[str, int]]:
"""
:param agg:
:param subagg:
:return:
"""
index_date = request.args.get("index_date")
keep_alive = f"{request.args.get('keep_alive', 1)}m"
# Convert index date to YYYYMMDD format
index_date = pendulum.parse(index_date).strftime("%Y%m%d") if index_date else None
es = create_es_connection()
es_index = ElasticsearchIndex(es, agg, subagg, index_date)
pit_id = es.open_point_in_time(index=es_index.name, keep_alive=keep_alive)["id"]
return {"pit_id": pit_id, "keep_alive": keep_alive, "index": es_index.name}