1
use crate::{index::index_path::IndexPath, server_error::MyError, urn::blob_to_uri};
2
use chrono::{DateTime, Utc};
3
use serde::Serialize;
4
use sophia::{
5
    api::{
6
        dataset::{self, MutableDataset},
7
        graph::{CollectibleGraph, Graph},
8
        parser::TripleParser,
9
        term::{matcher::Any, IriRef, SimpleTerm},
10
        triple::Triple,
11
        MownStr,
12
    },
13
    inmem::{dataset::FastDataset, graph::LightGraph},
14
    iri::Iri,
15
    turtle::parser::{nt::NTriplesParser, turtle::TurtleParser},
16
    xml::parser::RdfXmlParser,
17
};
18
use std::{collections::BTreeSet, error::Error, path::Path};
19

            
20
pub(crate) struct Dataset {
21
    pub id: String,
22
    pub versions: Vec<DatasetVersion>,
23
}
24

            
25
impl Dataset {
26
7
    pub fn latest_version(&self) -> Option<DateTime<Utc>> {
27
10
        self.versions.iter().map(|v| &v.date).max().copied()
28
7
    }
29
}
30

            
31
pub(crate) struct DatasetVersion {
32
    pub date: DateTime<Utc>,
33
    pub data: FastDataset,
34
}
35

            
36
4
#[derive(Serialize)]
37
pub(crate) struct PredicateObject<'a> {
38
    predicate: &'a IriRef<MownStr<'a>>,
39
    object_iri: Option<&'a IriRef<MownStr<'a>>>,
40
    object_literal: Option<&'a str>,
41
}
42

            
43
impl DatasetVersion {
44
4
    pub fn number_of_quads(&self) -> usize {
45
4
        sophia::api::prelude::Dataset::quads(&self.data).count()
46
4
    }
47
1
    pub fn subjects(&self) -> BTreeSet<&IriRef<MownStr>> {
48
1
        use dataset::Dataset;
49
1
        let subjects: BTreeSet<_> = self
50
1
            .data
51
1
            .subjects()
52
666
            .filter_map(|t| match t {
53
666
                Ok(SimpleTerm::Iri(iri)) => Some(iri),
54
                _ => None,
55
666
            })
56
1
            .collect();
57
1
        subjects
58
1
    }
59
    // return predicate and object for the given subject
60
1
    pub fn pos(&self, subject: &str) -> Vec<PredicateObject> {
61
1
        use dataset::Dataset;
62
1
        let subject = IriRef::new(subject.to_string()).unwrap();
63
1
        let pos: Vec<_> = self
64
1
            .data
65
1
            .quads_matching([subject], Any, Any, Any)
66
4
            .filter_map(|t| t.ok())
67
4
            .map(|t| (t.1[1], t.1[2]))
68
4
            .filter_map(|(p, o)| match p {
69
4
                SimpleTerm::Iri(p) => Some(PredicateObject {
70
4
                    predicate: p,
71
4
                    object_iri: match o {
72
1
                        SimpleTerm::Iri(o) => Some(o),
73
3
                        _ => None,
74
                    },
75
4
                    object_literal: match o {
76
3
                        SimpleTerm::LiteralDatatype(value, _) => Some(value),
77
                        SimpleTerm::LiteralLanguage(value, _) => Some(value),
78
1
                        _ => None,
79
                    },
80
                }),
81
                _ => None,
82
4
            })
83
1
            .collect();
84
1
        pos
85
1
    }
86
}
87

            
88
/**
89
 * Read each directory under `dir` as a dataset.
90
 */
91
17
pub(crate) fn load_datasets<P: AsRef<Path>>(dir: P) -> Result<Vec<Dataset>, Box<dyn Error>> {
92
17
    let paths = std::fs::read_dir(dir)?;
93
17
    let mut datasets = Vec::new();
94
34
    for path in paths.filter_map(|p| p.ok()) {
95
        // skip entries that are not directories
96
34
        if !path.file_type().map(|ft| ft.is_dir()).unwrap_or_default() {
97
            continue;
98
34
        }
99
34
        datasets.push(Dataset {
100
34
            id: path.file_name().to_string_lossy().to_string(),
101
34
            versions: load_dataset_versions(&path.path())?,
102
        });
103
    }
104
17
    Ok(datasets)
105
17
}
106

            
107
/**
108
 * Read each directory under `dir` as a version of the dataset.
109
 */
110
34
fn load_dataset_versions(dir: &Path) -> Result<Vec<DatasetVersion>, Box<dyn Error>> {
111
34
    let mut versions = Vec::new();
112
44
    for path in std::fs::read_dir(dir)?.filter_map(|p| p.ok()) {
113
44
        if let Ok(index_path) = IndexPath::try_from(path.path()) {
114
44
            versions.push(DatasetVersion {
115
44
                date: index_path.date(),
116
44
                data: load_dataset_version(&index_path)?,
117
            })
118
        }
119
    }
120
34
    versions.sort_unstable_by(|a, b| b.date.cmp(&a.date));
121
34
    Ok(versions)
122
34
}
123

            
124
/**
125
 * Parse a string into a graph with the given base.
126
 * The graph can be in either Turtle (ttl), RDF/XML or NTriples format.
127
 */
128
181
pub(crate) fn parse_graph(rdf: &str, base: Option<Iri<String>>) -> Result<LightGraph, MyError> {
129
181
    let parser = TurtleParser { base: base.clone() };
130
181
    let source = parser.parse_str(rdf);
131
181
    if let Ok(graph) = LightGraph::from_triple_source(source) {
132
181
        return Ok(graph);
133
    }
134
    let parser = RdfXmlParser { base };
135
    let source = parser.parse_str(rdf);
136
    if let Ok(graph) = LightGraph::from_triple_source(source) {
137
        return Ok(graph);
138
    }
139
    let parser = NTriplesParser {};
140
    let source = parser.parse_str(rdf);
141
    Ok(LightGraph::from_triple_source(source)?)
142
181
}
143

            
144
/**
145
 * Load the dataset for a version.
146
 * This is done by reading all the files in the directory and inserting them
147
 * in a [`dataset::Dataset`].
148
 */
149
44
fn load_dataset_version(dir: &IndexPath) -> Result<FastDataset, Box<dyn Error>> {
150
44
    let mut dataset = FastDataset::new();
151
176
    for path in dir.file_paths()? {
152
176
        let rdf = std::fs::read_to_string(path)?;
153
176
        let urn = blob_to_uri(rdf.as_bytes());
154
176
        let base_ref = IriRef::new(urn.clone())?;
155
176
        let base: Iri<String> = Iri::new(urn)?;
156
176
        let graph = parse_graph(&rdf, Some(base))?;
157
29304
        for t in graph.triples() {
158
29304
            let t = t?;
159
29304
            dataset.insert(t.s(), t.p(), t.o(), Some(base_ref.clone()))?;
160
        }
161
    }
162
44
    Ok(dataset)
163
44
}