1
use crate::{
2
    constants::{COMMENT_RDFS_IRI, LABEL_RDFS_IRI},
3
    index::index_path::IndexPath,
4
    server_error::MyError,
5
    urn::blob_to_uri,
6
};
7
use chrono::{DateTime, Utc};
8
use once_cell::sync::Lazy;
9
use regex::Regex;
10
use serde::Serialize;
11
use sophia::{
12
    api::{
13
        dataset::{self, MutableDataset},
14
        graph::{CollectibleGraph, Graph},
15
        parser::TripleParser,
16
        prefix::{Prefix, PrefixMapPair},
17
        term::{matcher::Any, IriRef, SimpleTerm},
18
        triple::Triple,
19
        MownStr,
20
    },
21
    inmem::{dataset::FastDataset, graph::LightGraph},
22
    iri::Iri,
23
    turtle::parser::{nt::NTriplesParser, turtle::TurtleParser},
24
    xml::parser::RdfXmlParser,
25
};
26
use std::{
27
    collections::{BTreeSet, HashMap},
28
    error::Error,
29
    path::Path,
30
};
31

            
32
pub(crate) struct Dataset {
33
    pub id: String,
34
    pub versions: Vec<DatasetVersion>,
35
}
36

            
37
impl Dataset {
38
6
    pub fn latest_version(&self) -> Option<DateTime<Utc>> {
39
8
        self.versions.iter().map(|v| &v.date).max().copied()
40
6
    }
41
}
42

            
43
pub(crate) struct DatasetVersion {
44
    pub date: DateTime<Utc>,
45
    pub data: FastDataset,
46
}
47

            
48
#[derive(Serialize)]
49
pub(crate) struct PredicateObject<'a> {
50
    pub predicate: &'a IriRef<MownStr<'a>>,
51
    pub object_iri: Option<&'a IriRef<MownStr<'a>>>,
52
    pub object_literal: Option<&'a str>,
53
}
54

            
55
impl DatasetVersion {
56
4
    pub fn number_of_quads(&self) -> usize {
57
4
        sophia::api::prelude::Dataset::quads(&self.data).count()
58
4
    }
59
1
    pub fn subjects(&self) -> BTreeSet<&IriRef<MownStr>> {
60
1
        use dataset::Dataset;
61
1
        let subjects: BTreeSet<_> = self
62
1
            .data
63
1
            .subjects()
64
666
            .filter_map(|t| match t {
65
666
                Ok(SimpleTerm::Iri(iri)) => Some(iri),
66
                _ => None,
67
666
            })
68
1
            .collect();
69
1
        subjects
70
1
    }
71
    // return predicate and object for the given subject
72
124
    pub fn pos(&self, subject: &str) -> Vec<PredicateObject> {
73
124
        use dataset::Dataset;
74
124
        let subject = IriRef::new(subject.to_string()).unwrap();
75
124
        let pos: Vec<_> = self
76
124
            .data
77
124
            .quads_matching([subject], Any, Any, Any)
78
681
            .filter_map(|t| t.ok())
79
681
            .map(|t| (t.1[1], t.1[2]))
80
681
            .filter_map(|(p, o)| match p {
81
681
                SimpleTerm::Iri(p) => Some(PredicateObject {
82
681
                    predicate: p,
83
681
                    object_iri: match o {
84
438
                        SimpleTerm::Iri(o) => Some(o),
85
243
                        _ => None,
86
                    },
87
681
                    object_literal: match o {
88
243
                        SimpleTerm::LiteralDatatype(value, _) => Some(value),
89
                        SimpleTerm::LiteralLanguage(value, _) => Some(value),
90
438
                        _ => None,
91
                    },
92
                }),
93
                _ => None,
94
681
            })
95
124
            .collect();
96
124
        pos
97
124
    }
98
}
99

            
100
/**
101
 * Read each directory under `dir` as a dataset.
102
 */
103
17
pub(crate) fn load_datasets<P: AsRef<Path>>(dir: P) -> Result<Vec<Dataset>, Box<dyn Error>> {
104
17
    let paths = std::fs::read_dir(dir)?;
105
17
    let mut datasets = Vec::new();
106
34
    for path in paths.filter_map(|p| p.ok()) {
107
        // skip entries that are not directories
108
34
        if !path.file_type().map(|ft| ft.is_dir()).unwrap_or_default() {
109
            continue;
110
34
        }
111
34
        datasets.push(Dataset {
112
34
            id: path.file_name().to_string_lossy().to_string(),
113
34
            versions: load_dataset_versions(&path.path())?,
114
        });
115
    }
116
17
    Ok(datasets)
117
17
}
118

            
119
/**
120
 * Read each directory under `dir` as a version of the dataset.
121
 */
122
34
fn load_dataset_versions(dir: &Path) -> Result<Vec<DatasetVersion>, Box<dyn Error>> {
123
34
    let mut versions = Vec::new();
124
44
    for path in std::fs::read_dir(dir)?.filter_map(|p| p.ok()) {
125
44
        if let Ok(index_path) = IndexPath::try_from(path.path()) {
126
44
            versions.push(DatasetVersion {
127
44
                date: index_path.date(),
128
44
                data: load_dataset_version(&index_path)?,
129
            })
130
        }
131
    }
132
34
    versions.sort_unstable_by(|a, b| b.date.cmp(&a.date));
133
34
    Ok(versions)
134
34
}
135

            
136
/**
137
 * Parse a string into a graph with the given base.
138
 * The graph can be in either Turtle (ttl), RDF/XML or NTriples format.
139
 */
140
181
pub(crate) fn parse_graph(rdf: &str, base: Option<Iri<String>>) -> Result<LightGraph, MyError> {
141
181
    let parser = TurtleParser { base: base.clone() };
142
181
    let source = parser.parse_str(rdf);
143
181
    if let Ok(graph) = LightGraph::from_triple_source(source) {
144
181
        return Ok(graph);
145
    }
146
    let parser = RdfXmlParser { base };
147
    let source = parser.parse_str(rdf);
148
    if let Ok(graph) = LightGraph::from_triple_source(source) {
149
        return Ok(graph);
150
    }
151
    let parser = NTriplesParser {};
152
    let source = parser.parse_str(rdf);
153
    Ok(LightGraph::from_triple_source(source)?)
154
181
}
155

            
156
/**
157
 * Load the dataset for a version.
158
 * This is done by reading all the files in the directory and inserting them
159
 * in a [`dataset::Dataset`].
160
 */
161
44
fn load_dataset_version(dir: &IndexPath) -> Result<FastDataset, Box<dyn Error>> {
162
44
    let mut dataset = FastDataset::new();
163
176
    for path in dir.file_paths()? {
164
176
        let rdf = std::fs::read_to_string(path)?;
165
176
        let urn = blob_to_uri(rdf.as_bytes());
166
176
        let base_ref = IriRef::new(urn.clone())?;
167
176
        let base: Iri<String> = Iri::new(urn)?;
168
176
        let graph = parse_graph(&rdf, Some(base))?;
169
29304
        for t in graph.triples() {
170
29304
            let t = t?;
171
29304
            dataset.insert(t.s(), t.p(), t.o(), Some(base_ref.clone()))?;
172
        }
173
    }
174
44
    Ok(dataset)
175
44
}
176

            
177
/// Regex to find prefix definitions
178
static PREFIX_RE: Lazy<Regex> =
179
11
    Lazy::new(|| Regex::new(r"@prefix\s([a-zA-Z0-9]+):\s+<(.+)>\s\.").unwrap());
180

            
181
/// Load prefixes for all datasets and versions
182
17
pub(crate) fn load_prefixes<P: AsRef<Path>>(dir: P) -> Result<Vec<PrefixMapPair>, Box<dyn Error>> {
183
17
    let mut prefixes: HashMap<String, String> = HashMap::new();
184
17
    let mut extra_namespace_count = 1;
185

            
186
17
    let mut paths_dataset: Vec<_> = std::fs::read_dir(dir)?
187
34
        .filter_map(|p| p.ok())
188
34
        .filter(|p| p.file_type().map(|ft| ft.is_dir()).unwrap_or_default())
189
17
        .collect();
190
34
    paths_dataset.sort_by_key(|p| p.path());
191
    // Dataset directories
192
51
    for path_dataset in paths_dataset {
193
        // Dataset version directories
194
34
        let mut paths_version: Vec<_> = std::fs::read_dir(&path_dataset.path())?
195
44
            .filter_map(|p| p.ok())
196
34
            .collect();
197
74
        paths_version.sort_by_key(|p| p.path());
198
78
        for path in &paths_version {
199
44
            let path = IndexPath::try_from(path.path())?;
200
            // RDF files
201
176
            for path in path.file_paths()? {
202
176
                let rdf = std::fs::read_to_string(path)?;
203

            
204
                // Find prefixes in RDF files
205
660
                for (_, [prefix, url]) in PREFIX_RE.captures_iter(&rdf).map(|c| c.extract()) {
206
660
                    match prefixes.insert(prefix.to_string(), url.to_string()) {
207
119
                        None => (),
208
541
                        Some(old_url) => {
209
541
                            // If the prefix was previously assigned to a different URL,
210
541
                            // assign the previous URL to a new namespace
211
541
                            if url != old_url {
212
                                prefixes.insert(
213
                                    format!("ns{extra_namespace_count}"),
214
                                    old_url.to_string(),
215
                                );
216
                                extra_namespace_count += 1;
217
541
                            }
218
                        }
219
                    }
220
                }
221
            }
222
        }
223
    }
224
17
    Ok(prefixes
225
17
        .into_iter()
226
119
        .map(|(prefix, url)| {
227
119
            (
228
119
                Prefix::new_unchecked(prefix.into()),
229
119
                Iri::new_unchecked(url.into()),
230
119
            )
231
119
        })
232
17
        .collect())
233
17
}
234

            
235
/// Find the first label resource for a subject
236
1
pub(crate) fn find_first_label_for_subject<'a>(pos: &'a Vec<PredicateObject<'a>>) -> Option<&str> {
237
1
    find_predicate_object_literals_for_subject(pos, LABEL_RDFS_IRI)
238
1
        .first()
239
1
        .copied()
240
1
}
241

            
242
/// Find the first comment resource for a subject
243
1
pub(crate) fn find_first_comment_for_subject<'a>(
244
1
    pos: &'a Vec<PredicateObject<'a>>,
245
1
) -> Option<&str> {
246
1
    find_predicate_object_literals_for_subject(pos, COMMENT_RDFS_IRI)
247
1
        .first()
248
1
        .copied()
249
1
}
250

            
251
/// Find all predicate objects with literals matching a resource for a subject
252
2
pub(crate) fn find_predicate_object_literals_for_subject<'a>(
253
2
    pos: &'a Vec<PredicateObject<'a>>,
254
2
    target: &'a str,
255
2
) -> Vec<&'a str> {
256
2
    find_predicate_objects_for_subject(pos, target)
257
2
        .into_iter()
258
2
        .filter(|po| po.object_literal.is_some())
259
2
        .map(|po| po.object_literal.unwrap_or_default())
260
2
        .collect()
261
2
}
262

            
263
/// Find all predicate objects matching a resource for a subject
264
2
pub(crate) fn find_predicate_objects_for_subject<'a>(
265
2
    pos: &'a Vec<PredicateObject<'a>>,
266
2
    target: &'a str,
267
2
) -> Vec<&'a PredicateObject<'a>> {
268
2
    pos.into_iter()
269
8
        .filter(|po| po.predicate.as_str() == target)
270
2
        .collect()
271
2
}