1
use crate::{constants::{COMMENT_RDFS_IRI, LABEL_RDFS_IRI}, index::index_path::IndexPath, server_error::MyError, urn::blob_to_uri};
2
use chrono::{DateTime, Utc};
3
use once_cell::sync::Lazy;
4
use regex::Regex;
5
use serde::Serialize;
6
use sophia::{
7
    api::{
8
        dataset::{self, MutableDataset},
9
        graph::{CollectibleGraph, Graph},
10
        parser::TripleParser,
11
        prefix::{Prefix, PrefixMapPair},
12
        term::{matcher::Any, IriRef, SimpleTerm},
13
        triple::Triple,
14
        MownStr,
15
    },
16
    inmem::{dataset::FastDataset, graph::LightGraph},
17
    iri::Iri,
18
    turtle::parser::{nt::NTriplesParser, turtle::TurtleParser},
19
    xml::parser::RdfXmlParser,
20
};
21
use std::{
22
    collections::{BTreeSet, HashMap},
23
    error::Error,
24
    path::Path,
25
};
26

            
27
pub(crate) struct Dataset {
28
    pub id: String,
29
    pub versions: Vec<DatasetVersion>,
30
}
31

            
32
impl Dataset {
33
6
    pub fn latest_version(&self) -> Option<DateTime<Utc>> {
34
8
        self.versions.iter().map(|v| &v.date).max().copied()
35
6
    }
36
}
37

            
38
pub(crate) struct DatasetVersion {
39
    pub date: DateTime<Utc>,
40
    pub data: FastDataset,
41
}
42

            
43
#[derive(Serialize)]
44
pub(crate) struct PredicateObject<'a> {
45
    pub predicate: &'a IriRef<MownStr<'a>>,
46
    pub object_iri: Option<&'a IriRef<MownStr<'a>>>,
47
    pub object_literal: Option<&'a str>,
48
}
49

            
50
impl DatasetVersion {
51
4
    pub fn number_of_quads(&self) -> usize {
52
4
        sophia::api::prelude::Dataset::quads(&self.data).count()
53
4
    }
54
1
    pub fn subjects(&self) -> BTreeSet<&IriRef<MownStr>> {
55
1
        use dataset::Dataset;
56
1
        let subjects: BTreeSet<_> = self
57
1
            .data
58
1
            .subjects()
59
666
            .filter_map(|t| match t {
60
666
                Ok(SimpleTerm::Iri(iri)) => Some(iri),
61
                _ => None,
62
666
            })
63
1
            .collect();
64
1
        subjects
65
1
    }
66
    // return predicate and object for the given subject
67
124
    pub fn pos(&self, subject: &str) -> Vec<PredicateObject> {
68
124
        use dataset::Dataset;
69
124
        let subject = IriRef::new(subject.to_string()).unwrap();
70
124
        let pos: Vec<_> = self
71
124
            .data
72
124
            .quads_matching([subject], Any, Any, Any)
73
681
            .filter_map(|t| t.ok())
74
681
            .map(|t| (t.1[1], t.1[2]))
75
681
            .filter_map(|(p, o)| match p {
76
681
                SimpleTerm::Iri(p) => Some(PredicateObject {
77
681
                    predicate: p,
78
681
                    object_iri: match o {
79
438
                        SimpleTerm::Iri(o) => Some(o),
80
243
                        _ => None,
81
                    },
82
681
                    object_literal: match o {
83
243
                        SimpleTerm::LiteralDatatype(value, _) => Some(value),
84
                        SimpleTerm::LiteralLanguage(value, _) => Some(value),
85
438
                        _ => None,
86
                    },
87
                }),
88
                _ => None,
89
681
            })
90
124
            .collect();
91
124
        pos
92
124
    }
93
}
94

            
95
/**
96
 * Read each directory under `dir` as a dataset.
97
 */
98
17
pub(crate) fn load_datasets<P: AsRef<Path>>(dir: P) -> Result<Vec<Dataset>, Box<dyn Error>> {
99
17
    let paths = std::fs::read_dir(dir)?;
100
17
    let mut datasets = Vec::new();
101
34
    for path in paths.filter_map(|p| p.ok()) {
102
        // skip entries that are not directories
103
34
        if !path.file_type().map(|ft| ft.is_dir()).unwrap_or_default() {
104
            continue;
105
34
        }
106
34
        datasets.push(Dataset {
107
34
            id: path.file_name().to_string_lossy().to_string(),
108
34
            versions: load_dataset_versions(&path.path())?,
109
        });
110
    }
111
17
    Ok(datasets)
112
17
}
113

            
114
/**
115
 * Read each directory under `dir` as a version of the dataset.
116
 */
117
34
fn load_dataset_versions(dir: &Path) -> Result<Vec<DatasetVersion>, Box<dyn Error>> {
118
34
    let mut versions = Vec::new();
119
44
    for path in std::fs::read_dir(dir)?.filter_map(|p| p.ok()) {
120
44
        if let Ok(index_path) = IndexPath::try_from(path.path()) {
121
44
            versions.push(DatasetVersion {
122
44
                date: index_path.date(),
123
44
                data: load_dataset_version(&index_path)?,
124
            })
125
        }
126
    }
127
34
    versions.sort_unstable_by(|a, b| b.date.cmp(&a.date));
128
34
    Ok(versions)
129
34
}
130

            
131
/**
132
 * Parse a string into a graph with the given base.
133
 * The graph can be in either Turtle (ttl), RDF/XML or NTriples format.
134
 */
135
181
pub(crate) fn parse_graph(rdf: &str, base: Option<Iri<String>>) -> Result<LightGraph, MyError> {
136
181
    let parser = TurtleParser { base: base.clone() };
137
181
    let source = parser.parse_str(rdf);
138
181
    if let Ok(graph) = LightGraph::from_triple_source(source) {
139
181
        return Ok(graph);
140
    }
141
    let parser = RdfXmlParser { base };
142
    let source = parser.parse_str(rdf);
143
    if let Ok(graph) = LightGraph::from_triple_source(source) {
144
        return Ok(graph);
145
    }
146
    let parser = NTriplesParser {};
147
    let source = parser.parse_str(rdf);
148
    Ok(LightGraph::from_triple_source(source)?)
149
181
}
150

            
151
/**
152
 * Load the dataset for a version.
153
 * This is done by reading all the files in the directory and inserting them
154
 * in a [`dataset::Dataset`].
155
 */
156
44
fn load_dataset_version(dir: &IndexPath) -> Result<FastDataset, Box<dyn Error>> {
157
44
    let mut dataset = FastDataset::new();
158
176
    for path in dir.file_paths()? {
159
176
        let rdf = std::fs::read_to_string(path)?;
160
176
        let urn = blob_to_uri(rdf.as_bytes());
161
176
        let base_ref = IriRef::new(urn.clone())?;
162
176
        let base: Iri<String> = Iri::new(urn)?;
163
176
        let graph = parse_graph(&rdf, Some(base))?;
164
29304
        for t in graph.triples() {
165
29304
            let t = t?;
166
29304
            dataset.insert(t.s(), t.p(), t.o(), Some(base_ref.clone()))?;
167
        }
168
    }
169
44
    Ok(dataset)
170
44
}
171

            
172
/// Regex to find prefix definitions
173
11
static PREFIX_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"@prefix\s([a-zA-Z0-9]+):\s+<(.+)>\s\.").unwrap());
174

            
175
/// Load prefixes for all datasets and versions
176
17
pub(crate) fn load_prefixes<P: AsRef<Path>>(dir: P) -> Result<Vec<PrefixMapPair>, Box<dyn Error>> {
177
17
    let mut prefixes: HashMap<String, String> = HashMap::new();
178
17
    let mut extra_namespace_count = 1;
179

            
180
34
    let mut paths_dataset: Vec<_> = std::fs::read_dir(dir)?.filter_map(|p| p.ok()).collect();
181
34
    paths_dataset.sort_by_key(|p| p.path());
182
    // Dataset directories
183
51
    for path_dataset in paths_dataset {
184
34
        if !path_dataset.file_type().map(|ft| ft.is_dir()).unwrap_or_default() {
185
            continue;
186
34
        }
187
        // Dataset version directories
188
44
        let mut paths_version: Vec<_> = std::fs::read_dir(&path_dataset.path())?.filter_map(|p| p.ok()).collect();
189
74
        paths_version.sort_by_key(|p| p.path());
190
44
        for path in paths_version.iter().filter_map(|p|IndexPath::try_from(p.path()).ok()) {
191
            // RDF files
192
176
            for path in path.file_paths()? {
193
176
                let rdf = std::fs::read_to_string(path)?;
194

            
195
                // Find prefixes in RDF files
196
660
                for (_, [prefix, url]) in PREFIX_RE.captures_iter(&rdf).map(|c| c.extract()) {
197
660
                    match prefixes.insert(prefix.to_string(), url.to_string()) {
198
119
                        None => (),
199
541
                        Some(old_url) => {
200
541
                            // If the prefix was previously assigned to a different URL,
201
541
                            // assign the previous URL to a new namespace
202
541
                            if url != old_url {
203
                                prefixes.insert(format!("ns{extra_namespace_count}"), old_url.to_string());
204
                                extra_namespace_count += 1;
205
541
                            }
206
                        }
207
                    }
208
                }
209
            }
210
        }
211
    }
212
17
    Ok(prefixes
213
17
        .into_iter()
214
119
        .map(|(prefix, url)| {
215
119
            (
216
119
                Prefix::new_unchecked(prefix.into()),
217
119
                Iri::new_unchecked(url.into()),
218
119
            )
219
119
        })
220
17
        .collect())
221
17
}
222

            
223
/// Find the first label resource for a subject
224
1
pub(crate) fn find_first_label_for_subject<'a>(pos: &'a Vec<PredicateObject<'a>>) -> Option<&str> {
225
1
    find_predicate_object_literals_for_subject(pos, LABEL_RDFS_IRI)
226
1
        .first()
227
1
        .copied()
228
1
}
229

            
230
/// Find the first comment resource for a subject
231
1
pub(crate) fn find_first_comment_for_subject<'a>(pos: &'a Vec<PredicateObject<'a>>) -> Option<&str> {
232
1
    find_predicate_object_literals_for_subject(pos, COMMENT_RDFS_IRI)
233
1
        .first()
234
1
        .copied()
235
1
}
236

            
237
/// Find all predicate objects with literals matching a resource for a subject
238
2
pub(crate) fn find_predicate_object_literals_for_subject<'a>(
239
2
    pos: &'a Vec<PredicateObject<'a>>,
240
2
    target: &'a str,
241
2
) -> Vec<&'a str> {
242
2
    find_predicate_objects_for_subject(pos, target)
243
2
        .into_iter()
244
2
        .filter(|po| po.object_literal.is_some())
245
2
        .map(|po| po.object_literal.unwrap_or_default())
246
2
        .collect()
247
2
}
248

            
249
/// Find all predicate objects matching a resource for a subject
250
2
pub(crate) fn find_predicate_objects_for_subject<'a>(
251
2
    pos: &'a Vec<PredicateObject<'a>>,
252
2
    target: &'a str,
253
2
) -> Vec<&'a PredicateObject<'a>> {
254
2
    pos.into_iter()
255
8
        .filter(|po| po.predicate.as_str() == target)
256
2
        .collect()
257
2
}