# -*- coding: utf-8 -*-
import logging
log = logging.getLogger(__name__)
from skosprovider.skos import (
Concept,
Collection
)
from language_tags import tags
from skosprovider_sqlalchemy.models import (
Thing as ThingModel,
Concept as ConceptModel,
Collection as CollectionModel,
Label as LabelModel,
Note as NoteModel,
Match as MatchModel,
Language as LanguageModel,
Source as SourceModel
)
from sqlalchemy.orm.exc import NoResultFound
[docs]def import_provider(provider, conceptscheme, session):
'''
Import a provider into a SQLAlchemy database.
:param provider: The :class:`skosprovider.providers.VocabularyProvider`
to import. Since the SQLAlchemy backend uses integers as
keys, this backend should have id values that can be cast to int.
:param conceptscheme: A
:class:`skosprovider_sqlalchemy.models.Conceptscheme` to import
the provider into. This should be an empty scheme so that there are
no possible id clashes.
:param session: A :class:`sqlalchemy.orm.session.Session`.
'''
# Copy information about the scheme
cs = provider.concept_scheme
_add_labels(conceptscheme, cs.labels, session)
_add_notes(conceptscheme, cs.notes, session)
_add_sources(conceptscheme, cs.sources, session)
for l in cs.languages:
language = _check_language(l, session)
conceptscheme.languages.append(language)
# First pass: load all concepts and collections
for stuff in provider.get_all():
c = provider.get_by_id(stuff['id'])
if isinstance(c, Concept):
cm = ConceptModel(
concept_id=int(c.id),
uri=c.uri,
conceptscheme=conceptscheme
)
elif isinstance(c, Collection):
cm = CollectionModel(
concept_id=int(c.id),
uri=c.uri,
conceptscheme=conceptscheme
)
_add_labels(cm, c.labels, session)
_add_notes(cm, c.notes, session)
_add_sources(cm, c.sources, session)
if hasattr(c, 'matches'):
for mt in c.matches:
matchtype = mt + 'Match'
for m in c.matches[mt]:
match = MatchModel(matchtype_id=matchtype, uri=m)
cm.matches.append(match)
session.add(cm)
session.flush()
# Second pass: link
for stuff in provider.get_all():
c = provider.get_by_id(stuff['id'])
if isinstance(c, Concept):
cm = session.query(ConceptModel) \
.filter(ConceptModel.conceptscheme_id == conceptscheme.id) \
.filter(ConceptModel.concept_id == int(c.id)) \
.one()
if len(c.narrower) > 0:
for nc in c.narrower:
try:
nc = session.query(ConceptModel) \
.filter(ConceptModel.conceptscheme_id == conceptscheme.id) \
.filter(ConceptModel.concept_id == int(nc)) \
.one()
cm.narrower_concepts.add(nc)
except NoResultFound:
log.warning(
'Tried to add a relation %s narrower %s, but target \
does not exist. Relation will be lost.' % (c.id, nc))
if len(c.subordinate_arrays) > 0:
for sa in c.subordinate_arrays:
try:
sa = session.query(CollectionModel) \
.filter(CollectionModel.conceptscheme_id == conceptscheme.id) \
.filter(CollectionModel.concept_id == int(sa)) \
.one()
cm.narrower_collections.add(sa)
except NoResultFound:
log.warning(
'Tried to add a relation %s subordinate array %s, but target \
does not exist. Relation will be lost.' % (c.id, sa))
if len(c.related) > 0:
for rc in c.related:
try:
rc = session.query(ConceptModel) \
.filter(ConceptModel.conceptscheme_id == conceptscheme.id) \
.filter(ConceptModel.concept_id == int(rc)) \
.one()
cm.related_concepts.add(rc)
except NoResultFound:
log.warning(
'Tried to add a relation %s related %s, but target \
does not exist. Relation will be lost.' % (c.id, rc))
elif isinstance(c, Collection) and len(c.members) > 0:
cm = session.query(CollectionModel) \
.filter(ConceptModel.conceptscheme_id == conceptscheme.id) \
.filter(ConceptModel.concept_id == int(c.id)) \
.one()
for mc in c.members:
try:
mc = session.query(ThingModel) \
.filter(ConceptModel.conceptscheme_id == conceptscheme.id) \
.filter(ConceptModel.concept_id == int(mc)) \
.one()
cm.members.add(mc)
except NoResultFound:
log.warning(
'Tried to add a relation %s member %s, but target \
does not exist. Relation will be lost.' % (c.id, mc))
def _check_language(language_tag, session):
'''
Checks if a certain language is already present, if not import.
:param string language_tag: IANA language tag
:param session: Database session to use
:rtype: :class:`skosprovider_sqlalchemy.models.Language`
'''
if not language_tag:
language_tag = 'und'
l = session.query(LanguageModel).get(language_tag)
if not l:
if not tags.check(language_tag):
raise ValueError('Unable to import provider. Invalid language tag: %s' % language_tag)
descriptions = ', '.join(tags.description(language_tag))
l = LanguageModel(id=language_tag, name=descriptions)
session.add(l)
return l
def _add_labels(target, labels, session):
'''
Adds the labels to the target
:param target: Target to add the labels to
:param labels: A list of :class:`skosprovider.skos.Label` instances.
:param session: A :class:`sqlalchemy.orm.session.Session`.
'''
for l in labels:
_check_language(l.language, session)
target.labels.append(LabelModel(
label=l.label,
labeltype_id=l.type,
language_id=l.language
))
return target
def _add_notes(target, notes, session):
'''
Adds the notes to the target
:param target: Target to add the notes to
:param notes: A list of :class:`skosprovider.skos.Note` instances.
:param session: A :class:`sqlalchemy.orm.session.Session`.
'''
for n in notes:
_check_language(n.language, session)
target.notes.append(NoteModel(
note=n.note,
notetype_id=n.type,
language_id=n.language,
markup=n.markup
))
return target
def _add_sources(target, sources, session):
'''
Adds the sources to the target
:param target: Target to add the sources to
:param sources: A list of :class:`skosprovider.skos.Source` instances.
:param session: A :class:`sqlalchemy.orm.session.Session`.
'''
for s in sources:
target.sources.append(SourceModel(
citation=s.citation,
markup=s.markup
))
return target
[docs]class VisitationCalculator(object):
'''
Generates a nested set for a conceptscheme.
'''
def __init__(self, session):
'''
:param :class:`sqlalchemy.orm.session.Session` session: A database
session.
'''
self.session = session
[docs] def visit(self, conceptscheme):
'''
Visit a :class:`skosprovider_sqlalchemy.models.Conceptscheme` and
calculate a nested set representation.
:param conceptscheme: A
:class:`skosprovider_sqlalchemy.models.Conceptscheme` for which
the nested set will be calculated.
'''
self.count = 0
self.depth = 0
self.visitation = []
topc = self.session \
.query(ConceptModel) \
.filter(ConceptModel.conceptscheme == conceptscheme) \
.filter(ConceptModel.broader_concepts == None) \
.all()
for tc in topc:
self._visit_concept(tc)
self.visitation.sort(key=lambda v: v['lft'])
return self.visitation
def _visit_concept(self, concept):
log.debug('Visiting concept %s.' % concept.id)
self.depth += 1
self.count += 1
v = {
'id': concept.id,
'lft': self.count,
'depth': self.depth
}
if concept.type == 'concept':
for nc in concept.narrower_concepts:
self._visit_concept(nc)
self.count += 1
v['rght'] = self.count
self.visitation.append(v)
self.depth -= 1
def session_factory(session_maker_name):
def with_session(fn):
def go(parent_object, *args, **kw):
if hasattr(parent_object, session_maker_name):
root_call = True
session_maker = getattr(parent_object, session_maker_name)
if not hasattr(parent_object, 'session'):
parent_object.session = None
if parent_object.session is None:
session = session_maker()
parent_object.session = session
else:
root_call = False
try:
parent_object.session.begin(subtransactions=True)
ret = fn(parent_object, *args, **kw)
parent_object.session.commit()
return ret
except:
parent_object.session.rollback()
raise
finally:
if root_call:
parent_object.session.close()
parent_object.session = None
else:
raise Exception('session_maker %s not found' % session_maker_name)
return go
return with_session