Lines
15.15 %
Functions
10 %
use crate::{
index::{index_path::IndexPath, read_index, write_index, IndexEntry},
server::dataset::parse_graph,
server_config::ServerConfigDataset,
server_error::MyError,
urn::blob_to_sha512,
};
use chrono::Utc;
use sophia::{
api::{
graph::Graph,
ns::Namespace,
term::{matcher::Any, Term},
triple::Triple,
},
inmem::graph::LightGraph,
iri::Iri,
use std::{
error::Error,
path::{Path, PathBuf},
use tempfile::TempDir;
pub(crate) async fn fetch_all(
datasets: Vec<ServerConfigDataset>,
datasetsdir: &Path,
) -> Result<(), MyError> {
let mut results = Vec::new();
for dataset in &datasets {
let datasetdir = datasetsdir.join(&dataset.name);
results.push(fetch(dataset, datasetdir).await?);
}
Ok(())
async fn fetch_member(dir: &Path, member: String) -> Result<IndexEntry, MyError> {
let resp = reqwest::get(&member).await?.text().await?;
let base: Iri<String> = Iri::new(member.clone())?;
// try to parse the graph, if it cannot be parsed, stop fetching
let _graph = parse_graph(&resp, Some(base))?;
let filename = blob_to_sha512(resp.as_bytes());
let filepath = dir.join(&filename);
tokio::fs::write(&filepath, &resp)
.await
.map_err(|e| MyError::io_error(filepath, e))?;
Ok(IndexEntry {
name: filename,
base_uri: Some(member),
})
async fn fetch(
dataset_config: &ServerConfigDataset,
datasetdir: PathBuf,
) -> Result<Vec<IndexEntry>, MyError> {
let resp = reqwest::get(&dataset_config.url).await?.text().await?;
let base: Iri<String> = Iri::new(dataset_config.url.clone())?;
let graph = parse_graph(&resp, Some(base.clone()))?;
let members = list_members(&graph, &base)?;
let tempdir =
tempfile::tempdir_in(&datasetdir).map_err(|e| MyError::io_error(datasetdir.clone(), e))?;
let tempdirpath = tempdir.path();
let mut result = Vec::new();
for member in members {
if let Ok(index_entry) = fetch_member(tempdirpath, member).await {
result.push(index_entry);
result.sort_unstable();
if dataset_has_changed(&datasetdir, &result).await? {
save_dataset(datasetdir, tempdir, &result).await?;
Ok(result)
/// Find the directory with the name prepended by a date.
/// E.g. 2023-02-02
async fn get_newest_dir(datasetdir: &Path) -> Result<Option<IndexPath>, Box<dyn Error>> {
let mut read = tokio::fs::read_dir(datasetdir).await?;
let mut newest: Option<IndexPath> = None;
while let Ok(Some(entry)) = read.next_entry().await {
let entry = if let Ok(entry) = IndexPath::try_from(entry.path().to_path_buf()) {
entry
} else {
continue;
if let Some(n) = newest {
if n.dir_name() > entry.dir_name() {
newest = Some(n);
newest = Some(entry);
Ok(newest)
async fn dataset_has_changed(
datasetdir: &Path,
new_dataset: &[IndexEntry],
) -> Result<bool, MyError> {
if let Ok(Some(newest_index_path)) = get_newest_dir(datasetdir).await {
let prev_index = read_index(&newest_index_path)?;
return Ok(new_dataset != &prev_index[..]);
// There is no directory with a dataset yet, so the dataset has changed
// from not existing to existing.
Ok(true)
async fn save_dataset(
tempdir: TempDir,
dataset: &[IndexEntry],
) -> Result<(), Box<dyn Error>> {
// keep the temporary directory if retrieval went well
let saved_dir = tempdir.into_path();
let date = format!("{}", Utc::now().format("%+"));
let dir = datasetdir.join(date);
tokio::fs::rename(saved_dir, &dir).await?;
write_index(&dir, dataset)?;
fn list_members(graph: &LightGraph, base: &Iri<String>) -> Result<Vec<String>, Box<dyn Error>> {
let ldp = Namespace::new("http://www.w3.org/ns/ldp#")?;
let ldp_contains = ldp.get("contains")?;
let mut members = Vec::new();
for t in graph.triples_matching([base], [ldp_contains], Any) {
if let Some(iri) = t?.o().iri() {
members.push(iri.as_str().to_string());
Ok(members)