import logging
from language_tags import tags
from skosprovider.skos import Collection
from skosprovider.skos import Concept
from skosprovider.providers import VocabularyProvider
from sqlalchemy import select
from sqlalchemy.exc import NoResultFound
from sqlalchemy.orm.session import Session
from skosprovider_sqlalchemy.models import ConceptScheme as ConceptSchemeModel
from skosprovider_sqlalchemy.models import Collection as CollectionModel
from skosprovider_sqlalchemy.models import Concept as ConceptModel
from skosprovider_sqlalchemy.models import Label as LabelModel
from skosprovider_sqlalchemy.models import Language as LanguageModel
from skosprovider_sqlalchemy.models import Match as MatchModel
from skosprovider_sqlalchemy.models import Note as NoteModel
from skosprovider_sqlalchemy.models import Source as SourceModel
from skosprovider_sqlalchemy.models import Thing as ThingModel
log = logging.getLogger(__name__)
[docs]def import_provider(provider: VocabularyProvider, session: Session, conceptscheme: ConceptSchemeModel = None) -> ConceptSchemeModel:
'''
Import a provider into a SQLAlchemy database.
:param provider: The :class:`skosprovider.providers.VocabularyProvider`
to import. Since the SQLAlchemy backend uses integers as
keys, this backend should have id values that can be cast to int.
:param conceptscheme: A
:class:`skosprovider_sqlalchemy.models.Conceptscheme` to import
the provider into. This should be an empty scheme so that there are
no possible id clashes. If no conceptscheme is provided, one will be
created.
:param session: A :class:`sqlalchemy.orm.session.Session`.
:return: The conceptscheme that holds the concepts and collections. Either
the same conceptscheme that was passed into the provider, or the one that
was created by this function.
:rtype: skosprovider_sqlalchemy.models.Conceptscheme
'''
# Copy information about the scheme
cs = provider.concept_scheme
if not conceptscheme:
conceptscheme = ConceptSchemeModel(
uri = cs.uri
)
session.add(conceptscheme)
if conceptscheme.uri != cs.uri:
log.warning('Conceptscheme model has different URI than conceptscheme attached to provider.')
_add_labels(conceptscheme, cs.labels, session)
_add_notes(conceptscheme, cs.notes, session)
_add_sources(conceptscheme, cs.sources, session)
for l in cs.languages:
language = _check_language(l, session)
conceptscheme.languages.append(language)
# First pass: load all concepts and collections
for stuff in provider.get_all():
c = provider.get_by_id(stuff['id'])
log.warning(c)
if isinstance(c, Concept):
cm = ConceptModel(
concept_id=c.id,
uri=c.uri,
conceptscheme=conceptscheme
)
elif isinstance(c, Collection):
cm = CollectionModel(
concept_id=c.id,
uri=c.uri,
conceptscheme=conceptscheme
)
session.add(cm)
_add_labels(cm, c.labels, session)
_add_notes(cm, c.notes, session)
_add_sources(cm, c.sources, session)
if hasattr(c, 'matches'):
for mt in c.matches:
matchtype = mt + 'Match'
for m in c.matches[mt]:
match = MatchModel(matchtype_id=matchtype, uri=m)
cm.matches.append(match)
session.flush()
# Second pass: link
for stuff in provider.get_all():
c = provider.get_by_id(stuff['id'])
log.warning(c)
if isinstance(c, Concept):
cm = session.execute(
select(ConceptModel)
.filter(
ConceptModel.conceptscheme_id == conceptscheme.id,
ConceptModel.concept_id == str(c.id)
)
).scalar_one()
if len(c.narrower) > 0:
for nc in c.narrower:
try:
nc = session.execute(
select(ConceptModel)
.filter(
ConceptModel.conceptscheme_id == conceptscheme.id,
ConceptModel.concept_id == str(nc)
)
).scalar_one()
cm.narrower_concepts.add(nc)
except NoResultFound:
log.warning(
'Tried to add a relation %s narrower %s, but target \
does not exist. Relation will be lost.' % (c.id, nc))
if len(c.subordinate_arrays) > 0:
for sa in c.subordinate_arrays:
try:
sa = session.execute(
select(CollectionModel)
.filter(
CollectionModel.conceptscheme_id == conceptscheme.id,
CollectionModel.concept_id == str(sa)
)
).scalar_one()
cm.narrower_collections.add(sa)
except NoResultFound:
log.warning(
'Tried to add a relation %s subordinate array %s, but target \
does not exist. Relation will be lost.' % (c.id, sa))
if len(c.related) > 0:
for rc in c.related:
try:
rc = session.execute(
select(ConceptModel)
.filter(
ConceptModel.conceptscheme_id == conceptscheme.id,
ConceptModel.concept_id == str(rc)
)
).scalar_one()
cm.related_concepts.add(rc)
except NoResultFound:
log.warning(
'Tried to add a relation %s related %s, but target \
does not exist. Relation will be lost.' % (c.id, rc))
elif isinstance(c, Collection) and len(c.members) > 0:
cm = session.execute(
select(CollectionModel)
.filter(
ConceptModel.conceptscheme_id == conceptscheme.id,
ConceptModel.concept_id == str(c.id)
)
).scalar_one()
for mc in c.members:
try:
mc = session.execute(
select(ThingModel)
.filter(
ConceptModel.conceptscheme_id == conceptscheme.id,
ConceptModel.concept_id == str(mc)
)
).scalar_one()
cm.members.add(mc)
except NoResultFound:
log.warning(
'Tried to add a relation %s member %s, but target \
does not exist. Relation will be lost.' % (c.id, mc))
return conceptscheme
def _check_language(language_tag, session):
'''
Checks if a certain language is already present, if not import.
:param string language_tag: IANA language tag
:param session: Database session to use
:rtype: :class:`skosprovider_sqlalchemy.models.Language`
'''
if not language_tag: # pragma: no cover
language_tag = 'und'
l = session.get(LanguageModel, language_tag)
if not l:
if not tags.check(language_tag):
raise ValueError('Unable to import provider. Invalid language tag: %s' % language_tag)
descriptions = ', '.join(tags.description(language_tag))
l = LanguageModel(id=language_tag, name=descriptions)
session.add(l)
return l
def _add_labels(target, labels, session):
'''
Adds the labels to the target
:param target: Target to add the labels to
:param labels: A list of :class:`skosprovider.skos.Label` instances.
:param session: A :class:`sqlalchemy.orm.session.Session`.
'''
for l in labels:
_check_language(l.language, session)
target.labels.append(LabelModel(
label=l.label,
labeltype_id=l.type,
language_id=l.language
))
return target
def _add_notes(target, notes, session):
'''
Adds the notes to the target
:param target: Target to add the notes to
:param notes: A list of :class:`skosprovider.skos.Note` instances.
:param session: A :class:`sqlalchemy.orm.session.Session`.
'''
for n in notes:
_check_language(n.language, session)
target.notes.append(NoteModel(
note=n.note,
notetype_id=n.type,
language_id=n.language,
markup=n.markup
))
return target
def _add_sources(target, sources, session):
'''
Adds the sources to the target
:param target: Target to add the sources to
:param sources: A list of :class:`skosprovider.skos.Source` instances.
:param session: A :class:`sqlalchemy.orm.session.Session`.
'''
for s in sources:
target.sources.append(SourceModel(
citation=s.citation,
markup=s.markup
))
return target
[docs]class VisitationCalculator:
'''
Generates a nested set for a conceptscheme.
'''
def __init__(self, session):
'''
:param :class:`sqlalchemy.orm.session.Session` session: A database
session.
'''
self.session = session
self.count = 0
self.depth = 0
self.visitation = []
[docs] def visit(self, conceptscheme):
'''
Visit a :class:`skosprovider_sqlalchemy.models.Conceptscheme` and
calculate a nested set representation.
:param conceptscheme: A
:class:`skosprovider_sqlalchemy.models.Conceptscheme` for which
the nested set will be calculated.
'''
self.count = 0
self.depth = 0
self.visitation = []
# get all possible top concepts
topc = self.session.execute(
select(ConceptModel)
.filter(
ConceptModel.conceptscheme == conceptscheme,
~ConceptModel.broader_concepts.any()
)
).scalars().all()
# check if they have an indirect broader concept
def _has_higher_concept(c):
for coll in c.member_of:
if (
coll.infer_concept_relations
and coll.broader_concepts or _has_higher_concept(coll)
):
return True
return False
topc = [c for c in topc if not _has_higher_concept(c)]
for tc in topc:
self._visit_concept(tc)
self.visitation.sort(key=lambda v: v['lft'])
return self.visitation
def _visit_concept(self, concept):
if concept.type == 'concept':
log.debug('Visiting concept %s.' % concept.id)
self.depth += 1
self.count += 1
v = {
'id': concept.id,
'lft': self.count,
'depth': self.depth
}
for nc in concept.narrower_concepts:
self._visit_concept(nc)
for ncol in concept.narrower_collections:
if ncol.infer_concept_relations:
self._visit_concept(ncol)
self.count += 1
v['rght'] = self.count
self.visitation.append(v)
self.depth -= 1
elif concept.type == 'collection':
log.debug('Visiting collection %s.' % concept.id)
for m in concept.members:
self._visit_concept(m)