1
use crate::{index::index_path::IndexPath, server_error::MyError, urn::blob_to_uri};
2
use chrono::{DateTime, Utc};
3
use serde::Serialize;
4
use sophia::{
5
    api::{
6
        dataset::{self, MutableDataset},
7
        graph::{CollectibleGraph, Graph},
8
        parser::TripleParser,
9
        term::{matcher::Any, IriRef, SimpleTerm},
10
        triple::Triple,
11
        MownStr,
12
    },
13
    inmem::{dataset::FastDataset, graph::LightGraph},
14
    iri::Iri,
15
    turtle::parser::{nt::NTriplesParser, turtle::TurtleParser},
16
    xml::parser::RdfXmlParser,
17
};
18
use std::{collections::BTreeSet, error::Error, path::Path};
19

            
20
pub(crate) struct Dataset {
21
    pub id: String,
22
    pub versions: Vec<DatasetVersion>,
23
}
24

            
25
impl Dataset {
26
    pub fn latest_version(&self) -> Option<DateTime<Utc>> {
27
        self.versions.iter().map(|v| &v.date).max().copied()
28
    }
29
}
30

            
31
pub(crate) struct DatasetVersion {
32
    pub date: DateTime<Utc>,
33
    pub data: FastDataset,
34
}
35

            
36
#[derive(Serialize)]
37
pub(crate) struct PredicateObject<'a> {
38
    predicate: &'a IriRef<MownStr<'a>>,
39
    object_iri: Option<&'a IriRef<MownStr<'a>>>,
40
    object_literal: Option<&'a str>,
41
}
42

            
43
impl DatasetVersion {
44
    pub fn number_of_quads(&self) -> usize {
45
        sophia::api::prelude::Dataset::quads(&self.data).count()
46
    }
47
    pub fn subjects(&self) -> BTreeSet<&IriRef<MownStr>> {
48
        use dataset::Dataset;
49
        let subjects: BTreeSet<_> = self
50
            .data
51
            .subjects()
52
            .filter_map(|t| match t {
53
                Ok(SimpleTerm::Iri(iri)) => Some(iri),
54
                _ => None,
55
            })
56
            .collect();
57
        subjects
58
    }
59
    // return predicate and object for the given subject
60
    pub fn pos(&self, subject: &str) -> Vec<PredicateObject> {
61
        use dataset::Dataset;
62
        let subject = IriRef::new(subject.to_string()).unwrap();
63
        let pos: Vec<_> = self
64
            .data
65
            .quads_matching([subject], Any, Any, Any)
66
            .filter_map(|t| t.ok())
67
            .map(|t| (t.1[1], t.1[2]))
68
            .filter_map(|(p, o)| match p {
69
                SimpleTerm::Iri(p) => Some(PredicateObject {
70
                    predicate: p,
71
                    object_iri: match o {
72
                        SimpleTerm::Iri(o) => Some(o),
73
                        _ => None,
74
                    },
75
                    object_literal: match o {
76
                        SimpleTerm::LiteralDatatype(value, _) => Some(value),
77
                        SimpleTerm::LiteralLanguage(value, _) => Some(value),
78
                        _ => None,
79
                    },
80
                }),
81
                _ => None,
82
            })
83
            .collect();
84
        pos
85
    }
86
}
87

            
88
1
pub(crate) fn load_datasets<P: AsRef<Path>>(dir: P) -> Result<Vec<Dataset>, Box<dyn Error>> {
89
1
    let paths = std::fs::read_dir(dir)?;
90
1
    let mut datasets = Vec::new();
91
1
    for path in paths.filter_map(|p| p.ok()) {
92
        // skip entries that are not directories
93
1
        if !path.file_type().map(|ft| ft.is_dir()).unwrap_or_default() {
94
1
            continue;
95
        }
96
        datasets.push(Dataset {
97
            id: path.file_name().to_string_lossy().to_string(),
98
            versions: load_dataset_versions(&path.path())?,
99
        });
100
    }
101
1
    Ok(datasets)
102
1
}
103

            
104
fn load_dataset_versions(dir: &Path) -> Result<Vec<DatasetVersion>, Box<dyn Error>> {
105
    let mut versions = Vec::new();
106
    for path in std::fs::read_dir(dir)?.filter_map(|p| p.ok()) {
107
        if let Ok(index_path) = IndexPath::try_from(path.path()) {
108
            versions.push(DatasetVersion {
109
                date: index_path.date(),
110
                data: load_dataset_version(&index_path)?,
111
            })
112
        }
113
    }
114
    versions.sort_unstable_by(|a, b| b.date.cmp(&a.date));
115
    Ok(versions)
116
}
117

            
118
pub(crate) fn parse_graph(rdf: &str, base: Option<Iri<String>>) -> Result<LightGraph, MyError> {
119
    let parser = TurtleParser { base: base.clone() };
120
    let source = parser.parse_str(rdf);
121
    if let Ok(graph) = LightGraph::from_triple_source(source) {
122
        return Ok(graph);
123
    }
124
    let parser = RdfXmlParser { base };
125
    let source = parser.parse_str(rdf);
126
    if let Ok(graph) = LightGraph::from_triple_source(source) {
127
        return Ok(graph);
128
    }
129
    let parser = NTriplesParser {};
130
    let source = parser.parse_str(rdf);
131
    Ok(LightGraph::from_triple_source(source)?)
132
}
133

            
134
fn load_dataset_version(dir: &IndexPath) -> Result<FastDataset, Box<dyn Error>> {
135
    let mut dataset = FastDataset::new();
136
    for path in dir.file_paths()? {
137
        let rdf = std::fs::read_to_string(path)?;
138
        let urn = blob_to_uri(rdf.as_bytes());
139
        let base_ref = IriRef::new(urn.clone())?;
140
        let base: Iri<String> = Iri::new(urn)?;
141
        let graph = parse_graph(&rdf, Some(base))?;
142
        for t in graph.triples() {
143
            let t = t?;
144
            let _ = dataset.insert(t.s(), t.p(), t.o(), Some(base_ref.clone()));
145
        }
146
    }
147
    Ok(dataset)
148
}