1
use crate::{
2
    index::{index_path::IndexPath, read_index, write_index, IndexEntry},
3
    server::dataset::parse_graph,
4
    server_config::ServerConfigDataset,
5
    server_error::MyError,
6
    urn::blob_to_sha512,
7
};
8
use chrono::Utc;
9
use sophia::{
10
    api::{
11
        graph::Graph,
12
        ns::Namespace,
13
        term::{matcher::Any, Term},
14
        triple::Triple,
15
    },
16
    inmem::graph::LightGraph,
17
    iri::Iri,
18
};
19
use std::{
20
    error::Error,
21
    path::{Path, PathBuf},
22
};
23
use tempfile::TempDir;
24

            
25
1
pub(crate) async fn fetch_all(
26
1
    datasets: Vec<ServerConfigDataset>,
27
1
    datasetsdir: &Path,
28
1
) -> Result<(), MyError> {
29
1
    let mut results = Vec::new();
30
2
    for dataset in &datasets {
31
1
        let datasetdir = datasetsdir.join(&dataset.name);
32
29
        results.push(fetch(dataset, datasetdir).await?);
33
    }
34
1
    Ok(())
35
1
}
36

            
37
4
async fn fetch_member(dir: &Path, member: String) -> Result<IndexEntry, MyError> {
38
16
    let resp = reqwest::get(&member).await?.text().await?;
39
4
    let base: Iri<String> = Iri::new(member.clone())?;
40
    // try to parse the graph, if it cannot be parsed, stop fetching
41
4
    let _graph = parse_graph(&resp, Some(base))?;
42
4
    let filename = blob_to_sha512(resp.as_bytes());
43
4
    let filepath = dir.join(&filename);
44
4
    tokio::fs::write(&filepath, &resp)
45
4
        .await
46
4
        .map_err(|e| MyError::io_error(filepath, e))?;
47
4
    Ok(IndexEntry {
48
4
        name: filename,
49
4
        base_uri: Some(member),
50
4
    })
51
4
}
52

            
53
1
async fn fetch(
54
1
    dataset_config: &ServerConfigDataset,
55
1
    datasetdir: PathBuf,
56
1
) -> Result<Vec<IndexEntry>, MyError> {
57
4
    let resp = reqwest::get(&dataset_config.url).await?;
58
1
    let resp = resp.text().await?;
59
1
    let base: Iri<String> = Iri::new(dataset_config.url.clone())?;
60
1
    let graph = parse_graph(&resp, Some(base.clone()))?;
61
1
    let members = list_members(&graph, &base)?;
62
1
    let tempdir =
63
1
        tempfile::tempdir_in(&datasetdir).map_err(|e| MyError::io_error(datasetdir.clone(), e))?;
64
1
    let tempdirpath = tempdir.path();
65
1
    let mut result = Vec::new();
66
5
    for member in members {
67
23
        if let Ok(index_entry) = fetch_member(tempdirpath, member).await {
68
4
            result.push(index_entry);
69
4
        }
70
    }
71
1
    result.sort_unstable();
72
1
    if dataset_has_changed(&datasetdir, &result).await? {
73
1
        save_dataset(datasetdir, tempdir, &result).await?;
74
    }
75
1
    Ok(result)
76
1
}
77

            
78
/// Find the directory with the name prepended by a date.
79
/// E.g. 2023-02-02
80
1
async fn get_newest_dir(datasetdir: &Path) -> Result<Option<IndexPath>, Box<dyn Error>> {
81
1
    let mut read = tokio::fs::read_dir(datasetdir).await?;
82
1
    let mut newest: Option<IndexPath> = None;
83
4
    while let Ok(Some(entry)) = read.next_entry().await {
84
3
        let entry = if let Ok(entry) = IndexPath::try_from(entry.path().to_path_buf()) {
85
2
            entry
86
        } else {
87
1
            continue;
88
        };
89
2
        if let Some(n) = newest {
90
1
            if n.dir_name() > entry.dir_name() {
91
1
                newest = Some(n);
92
1
            } else {
93
                newest = Some(entry);
94
            }
95
1
        } else {
96
1
            newest = Some(entry);
97
1
        }
98
    }
99
1
    Ok(newest)
100
1
}
101

            
102
1
async fn dataset_has_changed(
103
1
    datasetdir: &Path,
104
1
    new_dataset: &[IndexEntry],
105
1
) -> Result<bool, MyError> {
106
1
    if let Ok(Some(newest_index_path)) = get_newest_dir(datasetdir).await {
107
1
        let prev_index = read_index(&newest_index_path)?;
108
1
        return Ok(new_dataset != &prev_index[..]);
109
    }
110
    // There is no directory with a dataset yet, so the dataset has changed
111
    // from not existing to existing.
112
    Ok(true)
113
1
}
114

            
115
1
async fn save_dataset(
116
1
    datasetdir: PathBuf,
117
1
    tempdir: TempDir,
118
1
    dataset: &[IndexEntry],
119
1
) -> Result<(), Box<dyn Error>> {
120
1
    // keep the temporary directory if retrieval went well
121
1
    let saved_dir = tempdir.into_path();
122
1
    let date = format!("{}", Utc::now().format("%+"));
123
1
    let dir = datasetdir.join(date);
124
1
    tokio::fs::rename(saved_dir, &dir).await?;
125
1
    write_index(&dir, dataset)?;
126
1
    Ok(())
127
1
}
128

            
129
1
fn list_members(graph: &LightGraph, base: &Iri<String>) -> Result<Vec<String>, Box<dyn Error>> {
130
1
    let ldp = Namespace::new("http://www.w3.org/ns/ldp#")?;
131
1
    let ldp_contains = ldp.get("contains")?;
132
1
    let mut members = Vec::new();
133
4
    for t in graph.triples_matching([base], [ldp_contains], Any) {
134
4
        if let Some(iri) = t?.o().iri() {
135
4
            members.push(iri.as_str().to_string());
136
4
        }
137
    }
138
1
    Ok(members)
139
1
}