# -*- coding: utf-8 -*-
#
# This file is part of INSPIRE-SCHEMAS.
# Copyright (C) 2016, 2017 CERN.
#
# INSPIRE-SCHEMAS is free software; you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# INSPIRE-SCHEMAS is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with INSPIRE-SCHEMAS; if not, write to the
# Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307, USA.
#
# In applying this license, CERN does not
# waive the privileges and immunities granted to it by virtue of its status
# as an Intergovernmental Organization or submit itself to any jurisdiction.
"""Public api for methods and functions to handle/verify the jsonschemas."""
import copy
import json
import os
import re
from collections import defaultdict
from functools import partial, wraps
import idutils
import rfc3987
import six
from bleach.linkifier import LinkifyFilter
from bleach.sanitizer import Cleaner
from idutils import is_isni
from inspire_utils.date import PartialDate
from isbn import ISBN
from jsonschema import Draft4Validator, RefResolver, draft4_format_checker
from jsonschema import validate as jsonschema_validate
from pkg_resources import resource_filename
from pytz import UnknownTimeZoneError, timezone
from six.moves.urllib.parse import urlsplit
from unidecode import unidecode
from .errors import (SchemaKeyNotFound, SchemaNotFound, SchemaUIDConflict,
UnknownUIDSchema)
_schema_root_path = os.path.abspath(resource_filename(__name__, 'records'))
_RE_2_CHARS = re.compile(r'[a-z].*[a-z]', re.IGNORECASE)
_RE_CHAR = re.compile(r'[a-z]', re.IGNORECASE)
_RE_AND = re.compile(r'\band\b', re.IGNORECASE)
_RE_COLLABORATION_LEADING = re.compile(
r'^\s*(\b(for|on behalf of|representing)\b)?\s*(\bthe\b)?', re.IGNORECASE
)
_RE_COLLABORATION_TRAILING = re.compile(
r'\bcollaborations?\s*$', re.IGNORECASE
)
_RE_PUBLIC_DOMAIN_URL = re.compile(
r'^/publicdomain/zero(?:/(?P<version>[\.\d]*))?'
)
_RE_LICENSE_URL = re.compile(
r'^/licenses/(?P<sublicense>[-\w]*)(?:/(?P<version>[\.\d]*))?'
)
_RE_VOLUME_STARTS_WITH_A_LETTER = re.compile(
r'^(?P<letter>[A-Z])(?P<volume>\d[\dA-Z-]*$)', re.IGNORECASE
)
_RE_VOLUME_ENDS_WITH_A_LETTER = re.compile(
r'(?P<volume>\d+)(?P<letter>[A-Z])$', re.IGNORECASE
)
_RE_TITLE_ENDS_WITH_A_LETTER = re.compile(
r'(?P<title>.+(\.| ))(?P<letter>[A-Z])$', re.IGNORECASE
)
_RE_AUTHORS_UID = {
'CERN': (re.compile(r'^(CCID-|CERN-)?(?P<uid>\d+)$', flags=re.I), 'CERN-{}'),
'JACOW': (re.compile(r'^(JACOW-)?(?P<uid>\d{8})$', flags=re.I), 'JACoW-{}'),
'SLAC': (re.compile(r'^(SLAC-)?(?P<uid>\d+)$', flags=re.I), 'SLAC-{}'),
'DESY': (re.compile(r'^(DESY-)?(?P<uid>\d+)$', flags=re.I), 'DESY-{}'),
'INSPIRE ID': (re.compile(r'^(INSPIRE-)?(?P<uid>\d{8})$', flags=re.I), 'INSPIRE-{}'),
'INSPIRE BAI': (re.compile(r'^(?P<uid>((\w|\-|\')+\.)+\d+)$'), '{}'),
}
# Matches new style arXiv ID, with an old-style class specification
# (Malformed, but appears in APS records)
RE_ARXIV_POST_2007 = r"((?P<category>(?:[a-z-]+)(?:\.[a-z]{2})?)/)?(?P<identifier>\d{4}\.\d{4,5})(v\d+)?\s*(\[(?:[a-z\-\.]+)\])?$"
RE_ARXIV_POST_2007_CLASS = re.compile(
r"(arxiv:)?{}".format(RE_ARXIV_POST_2007),
flags=re.I
)
RE_ARXIV_PRE_2007 = r"(?P<category>(?P<extraidentifier>[a-z-]+)(?:\.[a-z]{2})?)/(?P<identifier>\d{4}\d+)(v\d+)?\s*(\[(?:[a-z\-\.]+)\])?$"
RE_ARXIV_PRE_2007_CLASS = re.compile(
r"(arxiv:)?{}".format(RE_ARXIV_PRE_2007),
flags=re.I
)
RE_ARXIV_DOI_POST_2007_CLASS = re.compile(r"(doi:)?10.48550/arXiv.{}".format(RE_ARXIV_POST_2007), re.I)
RE_ARXIV_DOI_PRE_2007_CLASS = re.compile(r"(doi:)?10.48550/arXiv.{}".format(RE_ARXIV_PRE_2007), re.I)
RE_ARXIV_URL_PRE_2007_CLASS = re.compile(r"https?://arXiv.org/(abs|pdf)/{}.*".format(RE_ARXIV_PRE_2007), re.I)
RE_ARXIV_URL_POST_2007_CLASS = re.compile(r"https?://arXiv.org/(abs|pdf)/{}.*".format(RE_ARXIV_POST_2007), re.I)
ARXIV_PATTERNS_PRE_2007 = [
RE_ARXIV_PRE_2007_CLASS,
RE_ARXIV_DOI_PRE_2007_CLASS,
RE_ARXIV_URL_PRE_2007_CLASS,
]
ARXIV_PATTERNS_POST_2007 = [
RE_ARXIV_POST_2007_CLASS,
RE_ARXIV_DOI_POST_2007_CLASS,
RE_ARXIV_URL_POST_2007_CLASS,
]
ARXIV_PATTERNS = ARXIV_PATTERNS_PRE_2007 + ARXIV_PATTERNS_POST_2007
JOURNALS_IGNORED_IN_OLD_TO_NEW = [
'econf',
]
ORCID_ISNI_RANGES = [
(15000000, 35000000),
(900000000000, 900100000000),
]
ORCID_URLS = ["http://orcid.org/", "https://orcid.org/"]
# list produced from https://arxiv.org/archive/
_NEW_CATEGORIES = {
'acc-phys': 'physics.acc-ph',
'adap-org': 'nlin.AO',
'alg-geom': 'math.AG',
'ao-sci': 'physics.ao-ph',
'atom-ph': 'physics.atom-ph',
'bayes-an': 'physics.data-an',
'chao-dyn': 'nlin.CD',
'chem-ph': 'physics.chem-ph',
'cmp-lg': 'cs.CL',
'comp-gas': 'nlin.CG',
'dg-ga': 'math.DG',
'funct-an': 'math.FA',
'mtrl-th': 'cond-mat.mtrl-sci',
'patt-sol': 'nlin.PS',
'plasm-ph': 'physics.plasm-ph',
'q-alg': 'math.QA',
'solv-int': 'nlin.SI',
'supr-con': 'cond-mat.supr-con',
}
ARXIV_TO_INSPIRE_CATEGORY_MAPPING = {
'astro-ph': 'Astrophysics',
'astro-ph.CO': 'Astrophysics',
'astro-ph.EP': 'Astrophysics',
'astro-ph.GA': 'Astrophysics',
'astro-ph.HE': 'Astrophysics',
'astro-ph.IM': 'Instrumentation',
'astro-ph.SR': 'Astrophysics',
'cond-mat': 'Condensed Matter',
'cond-mat.dis-nn': 'Condensed Matter',
'cond-mat.mes-hall': 'Condensed Matter',
'cond-mat.mtrl-sci': 'Condensed Matter',
'cond-mat.other': 'Condensed Matter',
'cond-mat.quant-gas': 'Condensed Matter',
'cond-mat.soft': 'Condensed Matter',
'cond-mat.stat-mech': 'Condensed Matter',
'cond-mat.str-el': 'Condensed Matter',
'cond-mat.supr-con': 'Condensed Matter',
'cs': 'Computing',
'cs.AI': 'Computing',
'cs.AR': 'Computing',
'cs.CC': 'Computing',
'cs.CE': 'Computing',
'cs.CG': 'Computing',
'cs.CL': 'Computing',
'cs.CR': 'Computing',
'cs.CV': 'Computing',
'cs.CY': 'Computing',
'cs.DB': 'Computing',
'cs.DC': 'Computing',
'cs.DL': 'Computing',
'cs.DM': 'Computing',
'cs.DS': 'Computing',
'cs.ET': 'Computing',
'cs.FL': 'Computing',
'cs.GL': 'Computing',
'cs.GR': 'Computing',
'cs.GT': 'Computing',
'cs.HC': 'Computing',
'cs.IR': 'Computing',
'cs.IT': 'Computing',
'cs.LG': 'Computing',
'cs.LO': 'Computing',
'cs.MA': 'Computing',
'cs.MM': 'Computing',
'cs.MS': 'Computing',
'cs.NA': 'Computing',
'cs.NE': 'Computing',
'cs.NI': 'Computing',
'cs.OH': 'Computing',
'cs.OS': 'Computing',
'cs.PF': 'Computing',
'cs.PL': 'Computing',
'cs.RO': 'Computing',
'cs.SC': 'Computing',
'cs.SD': 'Computing',
'cs.SE': 'Computing',
'cs.SI': 'Computing',
'cs.SY': 'Computing',
'gr-qc': 'Gravitation and Cosmology',
'hep-ex': 'Experiment-HEP',
'hep-lat': 'Lattice',
'hep-ph': 'Phenomenology-HEP',
'hep-th': 'Theory-HEP',
'math': 'Math and Math Physics',
'math-ph': 'Math and Math Physics',
'math.AC': 'Math and Math Physics',
'math.AG': 'Math and Math Physics',
'math.AP': 'Math and Math Physics',
'math.AT': 'Math and Math Physics',
'math.CA': 'Math and Math Physics',
'math.CO': 'Math and Math Physics',
'math.CT': 'Math and Math Physics',
'math.CV': 'Math and Math Physics',
'math.DG': 'Math and Math Physics',
'math.DS': 'Math and Math Physics',
'math.FA': 'Math and Math Physics',
'math.GM': 'Math and Math Physics',
'math.GN': 'Math and Math Physics',
'math.GR': 'Math and Math Physics',
'math.GT': 'Math and Math Physics',
'math.HO': 'Math and Math Physics',
'math.IT': 'Math and Math Physics',
'math.KT': 'Math and Math Physics',
'math.LO': 'Math and Math Physics',
'math.MG': 'Math and Math Physics',
'math.MP': 'Math and Math Physics',
'math.NA': 'Math and Math Physics',
'math.NT': 'Math and Math Physics',
'math.OA': 'Math and Math Physics',
'math.OC': 'Math and Math Physics',
'math.PR': 'Math and Math Physics',
'math.QA': 'Math and Math Physics',
'math.RA': 'Math and Math Physics',
'math.RT': 'Math and Math Physics',
'math.SG': 'Math and Math Physics',
'math.SP': 'Math and Math Physics',
'math.ST': 'Math and Math Physics',
'nlin': 'General Physics',
'nlin.AO': 'General Physics',
'nlin.CD': 'General Physics',
'nlin.CG': 'General Physics',
'nlin.PS': 'Math and Math Physics',
'nlin.SI': 'Math and Math Physics',
'nucl-ex': 'Experiment-Nucl',
'nucl-th': 'Theory-Nucl',
'physics': 'General Physics',
'physics.acc-ph': 'Accelerators',
'physics.ao-ph': 'General Physics',
'physics.atm-clus': 'General Physics',
'physics.atom-ph': 'General Physics',
'physics.bio-ph': 'Other',
'physics.chem-ph': 'Other',
'physics.class-ph': 'General Physics',
'physics.comp-ph': 'Computing',
'physics.data-an': 'Data Analysis and Statistics',
'physics.ed-ph': 'Other',
'physics.flu-dyn': 'General Physics',
'physics.gen-ph': 'General Physics',
'physics.geo-ph': 'General Physics',
'physics.hist-ph': 'Other',
'physics.ins-det': 'Instrumentation',
'physics.med-ph': 'Other',
'physics.optics': 'General Physics',
'physics.plasm-ph': 'General Physics',
'physics.pop-ph': 'Other',
'physics.soc-ph': 'Other',
'physics.space-ph': 'Astrophysics',
'quant-ph': 'Quantum Physics',
'stat.AP': 'Data Analysis and Statistics',
'stat.CO': 'Data Analysis and Statistics',
'stat.ML': 'Data Analysis and Statistics',
'stat.ME': 'Data Analysis and Statistics',
'stat.OT': 'Data Analysis and Statistics',
'stat.TH': 'Data Analysis and Statistics'
}
_JOURNALS_ALREADY_ENDING_WITH_A_LETTER = {
'Acta Cryst.A',
'Acta Cryst.B',
'Acta Cryst.D',
'Acta Cryst.F',
'Adv.Phys.X',
'Annales Soc.Sci.Bruxelles A',
'Appl.Catal.A',
'Appl.Sci.Res.,Sect.A',
'Bull.Okayama Univ.Sci.A',
'Can.J.Res.A',
'Cesk.Cas.Fys.A',
'Chin.Ann.Math.B',
'Colloids Surf.A',
'Commun.Dublin Inst.Ser.A',
'Concepts Magn.Reson.Part A',
'Concepts Magn.Reson.Part B',
'Global J.Sci.Front.Res.A',
'ITB J.Sci.A',
'Indian J.Phys.A',
'Indian J.Phys.B',
'Indian J.Statist.A',
'Iran.J.Sci.Technol.A',
'J.Chromatogr.A',
'J.Mol.Catal.A',
'J.Opt.A',
'J.Opt.B',
'J.Polymer Sci.B',
'J.Res.Natl.Bur.Stand.A',
'J.Res.Natl.Bur.Stand.B',
'Kumamoto J.Sci.Ser.A',
'NATO Sci.Peace Secur.B',
'NATO Sci.Ser.B',
'NATO Sci.Ser.C',
'NATO Sci.Ser.F',
'Nucl.Data Sheets A',
'Nucl.Data Sheets B',
'Nucl.Sci.Appl.A',
'Phil.Trans.Roy.Soc.Lond.B',
'Polymer Sci.B',
'Proc.Rom.Acad.A',
'Rev.Univ.Nac.Tucuman, Ser.A',
'Sci.Rep.Nat Tsing Hua Univ.Ser.A',
'Spectrochim.Acta A',
'Tellus A',
'Trans.Int.Astron.Union A',
}
_JOURNALS_THAT_NEED_A_HIDDEN_PUBNOTE = {
'Phys.Lett.B': set(str(el) for el in range(24, 171)),
}
_JOURNALS_RENAMED_OLD_TO_NEW = {
'Ann.Inst.H.Poincare Anal.Non Lineaire': 'Ann.Inst.H.Poincare C Anal.Non Lineaire',
'Annales Soc.Sci.Brux.Ser.I Sci.Math.Astron.Phys.': 'Annales Soc.Sci.Bruxelles.I',
'Annales Soc.Sci.Bruxelles Ser.B Sci.Phys.Nat.': 'Annales Soc.Sci.Bruxelles B',
'Diss.Abstr.Int.': 'Diss.Abstr.Int.B',
'J.Comb.Theory Ser.': 'J.Comb.Theor.A',
'J.Vac.Sci.Technol.A Vac.Surf.Films': 'J.Vac.Sci.Technol.A',
'J.Vac.Sci.Technol.B Microelectron.Nanometer Struct.': 'J.Vac.Sci.Technol.B',
'Nucl.Phys.Proc.Suppl.': 'Nucl.Phys.B Proc.Suppl.',
'Proc.Roy.Irish Acad.(Sect.A)': 'Proc.Roy.Irish Acad.A',
'Univ.Politech.Bucharest Sci.Bull.': 'Univ.Politech.Bucharest Sci.Bull.A',
}
_JOURNALS_RENAMED_NEW_TO_OLD = {v: k for (k, v) in six.iteritems(_JOURNALS_RENAMED_OLD_TO_NEW)}
_JOURNALS_WITH_YEAR_ADDED_TO_VOLUME = {
'JHEP',
'JCAP',
}
EMPTIES = [None, '', [], {}]
_BLEACH_CONFIG = {
'tags': ['a', 'b', 'br', 'div', 'em', 'i', 'li', 'ol', 'p', 'strong', 'ul'],
'attributes': {'a': ['href', 'title']},
'strip': True,
'filters': [partial(LinkifyFilter, callbacks=[])],
}
_bleach_cleaner = Cleaner(**_BLEACH_CONFIG)
SCHEMAS = [
"hep",
"authors",
"experiments",
"institutions",
"conferences",
"seminars",
"jobs",
"journals",
]
def _load_countries_data(filename):
path = resource_filename(__name__, 'countries')
with open(os.path.join(path, filename)) as json_fd:
return json.load(json_fd)
def _get_country_name(country):
return country.get("common_name") or country["name"]
# https://salsa.debian.org/iso-codes-team/iso-codes/-/blob/master/data/iso_3166-1.json
COUNTRY_NAME_TO_CODE_ISO_3166_1 = _load_countries_data('iso_3166-1.json')['3166-1']
# https://salsa.debian.org/iso-codes-team/iso-codes/-/blob/master/data/iso_3166-3.json
COUNTRY_NAME_TO_CODE_ISO_3166_3 = _load_countries_data('iso_3166-3.json')['3166-3']
COUNTRY_CODE_TO_NAME = {
country['alpha_2']: _get_country_name(country)
for country in COUNTRY_NAME_TO_CODE_ISO_3166_3 + COUNTRY_NAME_TO_CODE_ISO_3166_1
}
COUNTRY_NAME_TO_CODE = {
value: key
for key, value in COUNTRY_CODE_TO_NAME.items()
}
[docs]def country_code_to_name(code):
"""The country's name for the given code.
:param code: needs to be `alpha_2` country code.
"""
return COUNTRY_CODE_TO_NAME[code]
[docs]def country_name_to_code(name):
"""The country's code for the given name.
:param name: needs to be an `ISO 3166-1` or `ISO 3166-3` country name.
"""
return COUNTRY_NAME_TO_CODE[name]
[docs]def filter_empty_parameters(func):
"""Decorator that is filtering empty parameters.
:param func: function that you want wrapping
:type func: function
"""
@wraps(func)
def func_wrapper(self, *args, **kwargs):
my_kwargs = {key: value for key, value in kwargs.items()
if value not in EMPTIES}
args_is_empty = all(arg in EMPTIES for arg in args)
if (
{'source', 'material'}.issuperset(my_kwargs) or not my_kwargs
) and args_is_empty:
return
return func(self, *args, **my_kwargs)
return func_wrapper
[docs]def is_orcid(val):
"""Test if argument is an ORCID ID.
See http://support.orcid.org/knowledgebase/
articles/116780-structure-of-the-orcid-identifier
"""
for orcid_url in ORCID_URLS:
if val.startswith(orcid_url):
val = val[len(orcid_url):]
break
val = val.replace("-", "").replace(" ", "")
if is_isni(val):
val = int(val[:-1], 10) # Remove check digit and convert to int.
return any(start <= val <= end for start, end in ORCID_ISNI_RANGES)
return False
[docs]def author_id_normalize_and_schema(uid, schema=None):
"""Detect and normalize an author UID schema.
Args:
uid (string): a UID string
schema (string): try to resolve to schema
Returns:
Tuple[string, string]: a tuple (uid, schema) where:
- uid: the UID normalized to comply with the id.json schema
- schema: a schema of the UID or *None* if not recognised
Raise:
UnknownUIDSchema: if UID is too little to definitively guess the schema
SchemaUIDConflict: if specified schema is not matching the given UID
"""
def _get_uid_normalized_in_schema(_uid, _schema):
regex, template = _RE_AUTHORS_UID[_schema]
match = regex.match(_uid)
if match:
return template.format(match.group('uid'))
if idutils.is_orcid(uid) and schema in (None, 'ORCID'):
return idutils.normalize_orcid(uid), 'ORCID'
if schema and schema not in _RE_AUTHORS_UID:
# Schema explicitly specified, but this function can't handle it
return uid, schema
if schema:
normalized_uid = _get_uid_normalized_in_schema(uid, schema)
if normalized_uid:
return normalized_uid, schema
else:
return uid, schema
match_schema, normalized_uid = None, None
for candidate_schema in _RE_AUTHORS_UID:
candidate_uid = _get_uid_normalized_in_schema(uid, candidate_schema)
if candidate_uid:
if match_schema:
# Valid against more than one candidate schema, ambiguous
raise UnknownUIDSchema(uid)
match_schema = candidate_schema
normalized_uid = candidate_uid
if match_schema:
return normalized_uid, match_schema
# No guessess have been found
raise UnknownUIDSchema(uid)
[docs]def normalize_arxiv_category(category):
"""Normalize arXiv category to be schema compliant.
This properly capitalizes the category and replaces the dash by a dot if
needed. If the category is obsolete, it also gets converted it to its
current equivalent.
Example:
>>> from inspire_schemas.utils import normalize_arxiv_category
>>> normalize_arxiv_category('funct-an') # doctest: +SKIP
u'math.FA'
"""
category = _NEW_CATEGORIES.get(category.lower(), category)
for valid_category in valid_arxiv_categories():
if (category.lower() == valid_category.lower() or
category.lower().replace('-', '.') == valid_category.lower()):
return valid_category
return category # XXX: will fail validation and be logged
[docs]def valid_arxiv_categories():
"""List of all arXiv categories that ever existed.
Example:
>>> from inspire_schemas.utils import valid_arxiv_categories
>>> 'funct-an' in valid_arxiv_categories()
True
"""
schema = load_schema('elements/arxiv_categories')
categories = schema['enum']
categories.extend(_NEW_CATEGORIES.keys())
return categories
[docs]def classify_field(value):
"""Normalize ``value`` to an Inspire category.
Args:
value(str): an Inspire category to properly case, or an arXiv category
to translate to the corresponding Inspire category.
Returns:
str: ``None`` if ``value`` is not a non-empty string,
otherwise the corresponding Inspire category.
"""
if not (isinstance(value, six.string_types) and value):
return
schema = load_schema('elements/inspire_field')
inspire_categories = schema['properties']['term']['enum']
for inspire_category in inspire_categories:
if value.upper() == inspire_category.upper():
return inspire_category
category = normalize_arxiv_category(value)
return ARXIV_TO_INSPIRE_CATEGORY_MAPPING.get(category, 'Other')
[docs]def split_page_artid(page_artid):
"""Split page_artid into page_start/end and artid."""
page_start = None
page_end = None
artid = None
if not page_artid:
return None, None, None
# normalize unicode dashes
page_artid = unidecode(six.text_type(page_artid))
if '-' in page_artid:
# if it has a dash it's a page range
page_range = page_artid.replace('--', '-').split('-')
if len(page_range) == 2:
page_start, page_end = page_range
else:
artid = page_artid
elif _RE_2_CHARS.search(page_artid):
# if it has 2 or more letters it's an article ID
artid = page_artid
elif len(_RE_CHAR.sub('', page_artid)) >= 5:
# if there are more than 5 digits it's an article ID
artid = page_artid
else:
if artid is None:
artid = page_artid
if page_start is None:
page_start = page_artid
return page_start, page_end, artid
[docs]def split_pubnote(pubnote_str):
"""Split pubnote into journal information."""
pubnote = {}
parts = pubnote_str.split(',')
if len(parts) > 2:
pubnote['journal_title'] = parts[0]
pubnote['journal_volume'] = parts[1]
pubnote['page_start'], pubnote['page_end'], pubnote['artid'] = split_page_artid(parts[2])
return {key: val for (key, val) in six.iteritems(pubnote) if val is not None}
[docs]def build_pubnote(title, volume, page_start=None, page_end=None, artid=None):
"""Build pubnote string from parts (reverse of split_pubnote)."""
if title and volume and artid and artid != page_start:
pubnote_format = u'{title},{volume},{artid}'
elif title and volume and page_start and page_end:
pubnote_format = u'{title},{volume},{page_start}-{page_end}'
elif title and volume and page_start:
pubnote_format = u'{title},{volume},{page_start}'
elif title and volume:
pubnote_format = u'{title},{volume}'
else:
return None
return pubnote_format.format(
title=title, volume=volume, page_start=page_start, page_end=page_end, artid=artid
)
[docs]class LocalRefResolver(RefResolver):
"""Simple resolver to handle non-uri relative paths."""
[docs] def resolve_remote(self, uri):
"""Resolve a uri or relative path to a schema."""
try:
return super(LocalRefResolver, self).resolve_remote(uri)
except ValueError:
return super(LocalRefResolver, self).resolve_remote(
'file://' + get_schema_path(uri.rsplit('.json', 1)[0])
)
[docs]def get_schema_path(schema, resolved=False):
"""Retrieve the installed path for the given schema.
Args:
schema(str): relative or absolute url of the schema to validate, for
example, 'records/authors.json' or 'jobs.json', or just the name of the
schema, like 'jobs'.
resolved(bool): if True, the returned path points to a fully resolved
schema, that is to the schema with all `$ref` replaced by their
targets.
Returns:
str: path to the given schema name.
Raises:
SchemaNotFound: if no schema could be found.
"""
def _strip_first_path_elem(path):
"""Pass doctests.
Strip the first element of the given path, returning an empty string if
there are no more elements. For example, 'something/other' will end up
as 'other', but passing then 'other' will return ''
"""
stripped_path = path.split(os.path.sep, 1)[1:]
return ''.join(stripped_path)
def _schema_to_normalized_path(schema):
"""Pass doctests.
Extracts the path from the url, makes sure to get rid of any '..' in
the path and adds the json extension if not there.
"""
path = os.path.normpath(os.path.sep + urlsplit(schema).path)
if path.startswith(os.path.sep):
path = path[1:]
if not path.endswith('.json'):
path += '.json'
return path
path = _schema_to_normalized_path(schema)
while path:
if resolved:
schema_path = os.path.abspath(os.path.join(_schema_root_path, path))
else:
schema_path = os.path.abspath(os.path.join(_schema_root_path, path))
if os.path.exists(schema_path):
return os.path.abspath(schema_path)
path = _strip_first_path_elem(path)
raise SchemaNotFound(schema=schema)
[docs]def load_schema(schema_name, resolved=False, _cache={}):
"""Load the given schema from wherever it's installed.
Args:
schema_name(str): Name of the schema to load, for example 'authors'.
resolved(bool): If True will return the resolved schema, that is with
all the $refs replaced by their targets.
_cache(dict): Private argument used for memoization.
Returns:
dict: the schema with the given name.
"""
if schema_name in _cache:
return _cache[schema_name]
schema_path = get_schema_path(schema_name, resolved)
if schema_path in _cache:
schema_data = _cache[schema_path]
_cache[schema_name] = schema_data
return schema_data
with open(schema_path) as schema_fd:
schema_data = json.load(schema_fd)
_cache[schema_name] = schema_data
_cache[schema_path] = schema_data
return schema_data
inspire_format_checker = draft4_format_checker
inspire_format_checker.checks('date', raises=ValueError)(PartialDate.loads)
inspire_format_checker.checks('uri-reference', raises=ValueError)(
partial(rfc3987.parse, rule='URI_reference')
)
inspire_format_checker.checks('orcid')(is_orcid)
inspire_format_checker.checks('timezone', raises=UnknownTimeZoneError)(timezone)
def _load_schema_for_record(data, schema=None):
"""Load the schema from a given record.
Args:
data (dict): record data.
schema (Union[dict, str]): schema to validate against.
Returns:
dict: the loaded schema.
Raises:
SchemaNotFound: if the given schema was not found.
SchemaKeyNotFound: if ``schema`` is ``None`` and no ``$schema`` key was
found in ``data``.
jsonschema.SchemaError: if the schema is invalid.
"""
if schema is None:
if '$schema' not in data:
raise SchemaKeyNotFound(data=data)
schema = data['$schema']
if isinstance(schema, six.string_types):
schema = load_schema(schema_name=schema)
return schema
[docs]def validate(data, schema=None):
"""Validate the given dictionary against the given schema.
Args:
data (dict): record to validate.
schema (Union[dict, str]): schema to validate against. If it is a
string, it is intepreted as the name of the schema to load (e.g.
``authors`` or ``jobs``). If it is ``None``, the schema is taken
from ``data['$schema']``. If it is a dictionary, it is used
directly.
Raises:
SchemaNotFound: if the given schema was not found.
SchemaKeyNotFound: if ``schema`` is ``None`` and no ``$schema`` key was
found in ``data``.
jsonschema.SchemaError: if the schema is invalid.
jsonschema.ValidationError: if the data is invalid.
"""
schema = _load_schema_for_record(data, schema)
return jsonschema_validate(
instance=data,
schema=schema,
resolver=LocalRefResolver.from_schema(schema),
format_checker=inspire_format_checker,
)
[docs]def get_validation_errors(data, schema=None):
"""Validation errors for a given record.
Args:
data (dict): record to validate.
schema (Union[dict, str]): schema to validate against. If it is a
string, it is intepreted as the name of the schema to load (e.g.
``authors`` or ``jobs``). If it is ``None``, the schema is taken
from ``data['$schema']``. If it is a dictionary, it is used
directly.
Yields:
jsonschema.exceptions.ValidationError: validation errors.
Raises:
SchemaNotFound: if the given schema was not found.
SchemaKeyNotFound: if ``schema`` is ``None`` and no ``$schema`` key was
found in ``data``.
jsonschema.SchemaError: if the schema is invalid.
"""
schema = _load_schema_for_record(data, schema)
errors = Draft4Validator(
schema,
resolver=LocalRefResolver.from_schema(schema),
format_checker=inspire_format_checker
)
return errors.iter_errors(data)
[docs]def normalize_collaboration(collaboration):
"""Normalize collaboration string.
Args:
collaboration: a string containing collaboration(s) or None
Returns:
list: List of extracted and normalized collaborations
Examples:
>>> from inspire_schemas.utils import normalize_collaboration
>>> normalize_collaboration('for the CMS and ATLAS Collaborations')
['CMS', 'ATLAS']
"""
if not collaboration:
return []
collaboration = collaboration.strip()
if collaboration.startswith('(') and collaboration.endswith(')'):
collaboration = collaboration[1:-1]
collaborations = _RE_AND.split(collaboration)
collaborations = (_RE_COLLABORATION_LEADING.sub('', collab)
for collab in collaborations)
collaborations = (_RE_COLLABORATION_TRAILING.sub('', collab)
for collab in collaborations)
return [collab.strip() for collab in collaborations]
[docs]def get_license_from_url(url):
"""Get the license abbreviation from an URL.
Args:
url(str): canonical url of the license.
Returns:
str: the corresponding license abbreviation.
Raises:
ValueError: when the url is not recognized
"""
if not url:
return
split_url = urlsplit(url, scheme='http')
if split_url.netloc.lower() == 'creativecommons.org':
if 'publicdomain' in split_url.path:
match = _RE_PUBLIC_DOMAIN_URL.match(split_url.path)
if match is None:
license = ['public domain']
else:
license = ['CC0']
license.extend(part for part in match.groups() if part)
else:
license = ['CC']
match = _RE_LICENSE_URL.match(split_url.path)
license.extend(part.upper() for part in match.groups() if part)
elif split_url.netloc == 'arxiv.org':
license = ['arXiv']
match = _RE_LICENSE_URL.match(split_url.path)
license.extend(part for part in match.groups() if part)
else:
raise ValueError('Unknown license URL')
return u' '.join(license)
[docs]def convert_old_publication_info_to_new(publication_infos):
"""Convert a ``publication_info`` value from the old format to the new.
On Legacy different series of the same journal were modeled by adding the
letter part of the name to the journal volume. For example, a paper published
in Physical Review D contained::
{
'publication_info': [
{
'journal_title': 'Phys.Rev.',
'journal_volume': 'D43',
},
],
}
On Labs we instead represent each series with a different journal record. As
a consequence, the above example becomes::
{
'publication_info': [
{
'journal_title': 'Phys.Rev.D',
'journal_volume': '43',
},
],
}
This function handles this translation from the old format to the new. Please
also see the tests for various edge cases that this function also handles.
Args:
publication_infos: a ``publication_info`` in the old format.
Returns:
list(dict): a ``publication_info`` in the new format.
"""
result = []
hidden_publication_infos = []
for publication_info in publication_infos:
_publication_info = copy.deepcopy(publication_info)
journal_title = _publication_info.get('journal_title')
try:
journal_title = _JOURNALS_RENAMED_OLD_TO_NEW[journal_title]
_publication_info['journal_title'] = journal_title
except KeyError:
pass
journal_volume = _publication_info.get('journal_volume')
if journal_title and journal_title.upper() in _JOURNALS_WITH_YEAR_ADDED_TO_VOLUME and \
journal_volume and len(journal_volume) == 4:
try:
was_last_century = int(journal_volume[:2]) > 50
except ValueError:
pass
else:
_publication_info['year'] = int('19' + journal_volume[:2] if was_last_century else '20' + journal_volume[:2])
_publication_info['journal_volume'] = journal_volume[2:]
result.append(_publication_info)
continue
if journal_title and journal_volume and journal_title.lower() not in JOURNALS_IGNORED_IN_OLD_TO_NEW:
volume_starts_with_a_letter = _RE_VOLUME_STARTS_WITH_A_LETTER.match(journal_volume)
volume_ends_with_a_letter = _RE_VOLUME_ENDS_WITH_A_LETTER.match(journal_volume)
match = volume_starts_with_a_letter or volume_ends_with_a_letter
if match:
_publication_info.pop('journal_record', None)
if journal_title in _JOURNALS_RENAMED_OLD_TO_NEW.values():
_publication_info['journal_title'] = journal_title
else:
_publication_info['journal_title'] = ''.join([
journal_title,
'' if journal_title.endswith('.') else ' ',
match.group('letter'),
])
_publication_info['journal_volume'] = match.group('volume')
hidden = _publication_info.pop('hidden', None)
if hidden:
hidden_publication_infos.append(_publication_info)
else:
result.append(_publication_info)
for publication_info in hidden_publication_infos:
if publication_info not in result:
publication_info['hidden'] = True
result.append(publication_info)
return result
[docs]def convert_new_publication_info_to_old(publication_infos):
"""Convert back a ``publication_info`` value from the new format to the old.
Does the inverse transformation of :func:`convert_old_publication_info_to_new`,
to be used whenever we are sending back records from Labs to Legacy.
Args:
publication_infos: a ``publication_info`` in the new format.
Returns:
list(dict): a ``publication_info`` in the old format.
"""
def _needs_a_hidden_pubnote(journal_title, journal_volume):
return (
journal_title in _JOURNALS_THAT_NEED_A_HIDDEN_PUBNOTE and
journal_volume in _JOURNALS_THAT_NEED_A_HIDDEN_PUBNOTE[journal_title]
)
result = []
for publication_info in publication_infos:
_publication_info = copy.deepcopy(publication_info)
journal_title = _publication_info.get('journal_title')
try:
journal_title = _JOURNALS_RENAMED_NEW_TO_OLD[journal_title]
_publication_info['journal_title'] = journal_title
result.append(_publication_info)
continue
except KeyError:
pass
journal_volume = _publication_info.get('journal_volume')
year = _publication_info.get('year')
if (journal_title and journal_title.upper() in _JOURNALS_WITH_YEAR_ADDED_TO_VOLUME and
year and journal_volume and len(journal_volume) == 2):
two_digit_year = str(year)[2:]
_publication_info['journal_volume'] = ''.join([two_digit_year, journal_volume])
result.append(_publication_info)
continue
if journal_title and journal_volume:
match = _RE_TITLE_ENDS_WITH_A_LETTER.match(journal_title)
if match and _needs_a_hidden_pubnote(journal_title, journal_volume):
_publication_info['journal_title'] = match.group('title').strip()
_publication_info['journal_volume'] = journal_volume + match.group('letter')
result.append(_publication_info)
_publication_info = copy.deepcopy(publication_info)
_publication_info['hidden'] = True
_publication_info['journal_title'] = match.group('title').strip()
_publication_info['journal_volume'] = match.group('letter') + journal_volume
elif match and journal_title not in _JOURNALS_ALREADY_ENDING_WITH_A_LETTER:
_publication_info['journal_title'] = match.group('title').strip()
_publication_info['journal_volume'] = match.group('letter') + journal_volume
result.append(_publication_info)
return result
[docs]def fix_url_bars_instead_of_slashes(string):
"""A common error in urls is that all ``/`` have been changed for ``|``, we fix that in this function"""
if string[:7] == 'http:||' or string[:8] == 'https:||':
string = string.replace('|', '/')
return string
[docs]def fix_url_add_http_if_missing(string):
"""Add the starting ``http`` to a url that is missing it"""
if string[:3] == 'www':
string = 'http://' + string
return string
[docs]def fix_url_replace_tilde(string):
"""Replace unicode characters by their working equivalent"""
string = string.replace('\u223c', '~')
return string.replace('\u02dc', '~')
[docs]def fix_reference_url(url):
"""Used to parse an incorect url to try to fix it with the most common ocurrences for errors.
If the fixed url is still incorrect, it returns ``None``.
Returns:
String containing the fixed url or the original one if it could not be fixed.
"""
new_url = url
new_url = fix_url_bars_instead_of_slashes(new_url)
new_url = fix_url_add_http_if_missing(new_url)
new_url = fix_url_replace_tilde(new_url)
try:
rfc3987.parse(new_url, rule="URI")
return new_url
except ValueError:
return url
[docs]def normalize_isbn(isbn):
"""Normalize an ISBN in order to be schema-compliant."""
try:
return str(ISBN(isbn))
except Exception:
return isbn
def _get_first_regex_match(regex_list, obj_to_match):
for regex in regex_list:
match = regex.match(obj_to_match)
if match:
return match
[docs]def is_arxiv(obj):
"""Return ``True`` if ``obj`` contains an arXiv identifier.
The ``idutils`` library's ``is_arxiv`` function has been
modified here to work with two regular expressions instead
of three and adding a check for valid arxiv categories only"""
arxiv_test = obj.split()
if not arxiv_test:
return False
matched_arxiv = _get_first_regex_match(ARXIV_PATTERNS, arxiv_test[0])
if not matched_arxiv:
return False
if not matched_arxiv.group('category'):
return True
valid_arxiv_categories_lower = [category.lower() for category in valid_arxiv_categories()]
category = matched_arxiv.group('category').lower()
return (category in valid_arxiv_categories_lower or
category.replace('-', '.') in valid_arxiv_categories_lower)
[docs]def normalize_arxiv(obj):
"""Return a normalized arXiv identifier from ``obj``."""
obj = obj.split()[0]
matched_arxiv_pre = _get_first_regex_match(ARXIV_PATTERNS_PRE_2007, obj)
if matched_arxiv_pre:
return ('/'.join(matched_arxiv_pre.group("extraidentifier", "identifier"))).lower()
matched_arxiv_post = _get_first_regex_match(ARXIV_PATTERNS_POST_2007, obj)
if matched_arxiv_post:
return matched_arxiv_post.group("identifier")
return None
[docs]def sanitize_html(text):
"""Sanitize HTML for use inside records fields.
This strips most of the tags and attributes, only allowing a safe whitelisted subset."""
return _bleach_cleaner.clean(text)
[docs]def get_paths(schema, previous_node=None):
for key, val in schema.items():
if isinstance(val, dict):
for subkey in get_paths(val, key):
if key in ["properties", "items", "description"]:
yield subkey
else:
nodes_list = [key]
nodes_list.extend(subkey)
yield nodes_list
else:
if key == "description" and previous_node == "$ref":
yield [val]
[docs]def get_refs_to_schemas(references=defaultdict(list)):
""" For every schema return path and index name for every referenced record
Returns:
dict(list(tuple)): index and path to the referenced record
"""
if references:
return references
for schema_name in SCHEMAS:
schema = load_schema(schema_name=schema_name)
for reference_field in get_paths(schema):
if reference_field[0] in {"deleted_records", "self", "new_record"}:
continue
index_names = reference_field.pop().split(" ")[0].split('/')
reference_search_path = '.'.join(reference_field)
if reference_field[0] == "related_records":
references[schema_name].append((schema_name, reference_search_path))
else:
for index_name in index_names:
references[index_name].append((schema_name, reference_search_path))
return references
[docs]def normalize_collaboration_name(full_collaboration_string):
words_to_ignore = ['group', 'community', 'consortium', 'concept group', 'experiment', 'team']
compiled_regexp = re.compile(
r'\b(' + '|'.join(words_to_ignore) + r')\b', flags=re.IGNORECASE
)
return ' '.join(compiled_regexp.sub('', full_collaboration_string).split())