1
use crate::{
2
    index::{index_path::IndexPath, read_index, write_index, IndexEntry},
3
    server::dataset::parse_graph,
4
    server_config::ServerConfigDataset,
5
    server_error::MyError,
6
    urn::blob_to_sha512,
7
};
8
use chrono::Utc;
9
use sophia::{
10
    api::{
11
        graph::Graph,
12
        ns::Namespace,
13
        term::{matcher::Any, Term},
14
        triple::Triple,
15
    },
16
    inmem::graph::LightGraph,
17
    iri::Iri,
18
};
19
use std::{
20
    error::Error,
21
    path::{Path, PathBuf},
22
};
23
use tempfile::TempDir;
24

            
25
pub(crate) async fn fetch_all(
26
    datasets: Vec<ServerConfigDataset>,
27
    datasetsdir: &Path,
28
) -> Result<(), MyError> {
29
    let mut results = Vec::new();
30
    for dataset in &datasets {
31
        let datasetdir = datasetsdir.join(&dataset.name);
32
        results.push(fetch(dataset, datasetdir).await?);
33
    }
34
    Ok(())
35
}
36

            
37
async fn fetch_member(dir: &Path, member: String) -> Result<IndexEntry, MyError> {
38
    let resp = reqwest::get(&member).await?.text().await?;
39
    let base: Iri<String> = Iri::new(member.clone())?;
40
    // try to parse the graph, if it cannot be parsed, stop fetching
41
    let _graph = parse_graph(&resp, Some(base))?;
42
    let filename = blob_to_sha512(resp.as_bytes());
43
    let filepath = dir.join(&filename);
44
    tokio::fs::write(&filepath, &resp)
45
        .await
46
        .map_err(|e| MyError::io_error(filepath, e))?;
47
    Ok(IndexEntry {
48
        name: filename,
49
        base_uri: Some(member),
50
    })
51
}
52

            
53
async fn fetch(
54
    dataset_config: &ServerConfigDataset,
55
    datasetdir: PathBuf,
56
) -> Result<Vec<IndexEntry>, MyError> {
57
    let resp = reqwest::get(&dataset_config.url).await?.text().await?;
58
    let base: Iri<String> = Iri::new(dataset_config.url.clone())?;
59
    let graph = parse_graph(&resp, Some(base.clone()))?;
60
    let members = list_members(&graph, &base)?;
61
    let tempdir =
62
        tempfile::tempdir_in(&datasetdir).map_err(|e| MyError::io_error(datasetdir.clone(), e))?;
63
    let tempdirpath = tempdir.path();
64
    let mut result = Vec::new();
65
    for member in members {
66
        if let Ok(index_entry) = fetch_member(tempdirpath, member).await {
67
            result.push(index_entry);
68
        }
69
    }
70
    result.sort_unstable();
71
    if dataset_has_changed(&datasetdir, &result).await? {
72
        save_dataset(datasetdir, tempdir, &result).await?;
73
    }
74
    Ok(result)
75
}
76

            
77
/// Find the directory with the name prepended by a date.
78
/// E.g. 2023-02-02
79
async fn get_newest_dir(datasetdir: &Path) -> Result<Option<IndexPath>, Box<dyn Error>> {
80
    let mut read = tokio::fs::read_dir(datasetdir).await?;
81
    let mut newest: Option<IndexPath> = None;
82
    while let Ok(Some(entry)) = read.next_entry().await {
83
        let entry = if let Ok(entry) = IndexPath::try_from(entry.path().to_path_buf()) {
84
            entry
85
        } else {
86
            continue;
87
        };
88
        if let Some(n) = newest {
89
            if n.dir_name() > entry.dir_name() {
90
                newest = Some(n);
91
            } else {
92
                newest = Some(entry);
93
            }
94
        } else {
95
            newest = Some(entry);
96
        }
97
    }
98
    Ok(newest)
99
}
100

            
101
async fn dataset_has_changed(
102
    datasetdir: &Path,
103
    new_dataset: &[IndexEntry],
104
) -> Result<bool, MyError> {
105
    if let Ok(Some(newest_index_path)) = get_newest_dir(datasetdir).await {
106
        let prev_index = read_index(&newest_index_path)?;
107
        return Ok(new_dataset != &prev_index[..]);
108
    }
109
    // There is no directory with a dataset yet, so the dataset has changed
110
    // from not existing to existing.
111
    Ok(true)
112
}
113

            
114
async fn save_dataset(
115
    datasetdir: PathBuf,
116
    tempdir: TempDir,
117
    dataset: &[IndexEntry],
118
) -> Result<(), Box<dyn Error>> {
119
    // keep the temporary directory if retrieval went well
120
    let saved_dir = tempdir.into_path();
121
    let date = format!("{}", Utc::now().format("%+"));
122
    let dir = datasetdir.join(date);
123
    tokio::fs::rename(saved_dir, &dir).await?;
124
    write_index(&dir, dataset)?;
125
    Ok(())
126
}
127

            
128
fn list_members(graph: &LightGraph, base: &Iri<String>) -> Result<Vec<String>, Box<dyn Error>> {
129
    let ldp = Namespace::new("http://www.w3.org/ns/ldp#")?;
130
    let ldp_contains = ldp.get("contains")?;
131
    let mut members = Vec::new();
132
    for t in graph.triples_matching([base], [ldp_contains], Any) {
133
        if let Some(iri) = t?.o().iri() {
134
            members.push(iri.as_str().to_string());
135
        }
136
    }
137
    Ok(members)
138
}