Lines
78.13 %
Functions
35.29 %
use anyhow::anyhow;
use sequoia_openpgp::packet::{Signature, Tag};
use sequoia_openpgp::parse::stream::{
DetachedVerifierBuilder, GoodChecksum, MessageLayer, MessageStructure, VerificationError,
VerificationHelper,
};
use sequoia_openpgp::policy::{AsymmetricAlgorithm, StandardPolicy};
use sequoia_openpgp::KeyHandle;
use sequoia_openpgp::{cert::raw::RawCertParser, parse::Parse, Cert};
use serde::Serialize;
use std::path::Path;
use std::process::{Command, Stdio};
type Result<T> = anyhow::Result<T>;
fn read_keys(gpgdir: &Path) -> Result<Vec<u8>> {
let gpgdir = gpgdir.to_str().unwrap();
let exe = "gpg";
let output = Command::new(exe)
.args(["--batch", "--homedir", gpgdir, "--export", "-a"])
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.output()
.map_err(|e| match e.kind() {
std::io::ErrorKind::NotFound => anyhow!("Executable {} was not found.", exe),
_ => anyhow::Error::from(e),
})?;
if !output.status.success() {
return Err(anyhow!(
"Could load public keys: {}",
&String::from_utf8_lossy(&output.stderr)
));
}
Ok(output.stdout)
pub fn load_keys(gpgdir: &Path) -> Result<Vec<Cert>> {
let keys = read_keys(gpgdir)?;
let parser = RawCertParser::from_bytes(&keys)?;
let mut certs = Vec::new();
for cert in parser {
let cert = cert?;
certs.push(Cert::try_from(cert)?);
println!("Loaded {} keys", certs.len());
Ok(certs)
#[derive(Serialize, PartialEq)]
pub struct PgpSignature {
pub payload: Vec<u8>,
pub signature: Vec<u8>,
pub struct ValidatedSignature {
/// The parsed signature
pub signature: Signature,
/// The certificates corresponding to keys from the signature
pub certificates: Vec<(KeyHandle, Cert)>,
/// Errors that occurred during the validation
pub errors: Vec<anyhow::Error>,
impl ValidatedSignature {
fn from_helper(helper: Helper) -> sequoia_openpgp::Result<Self> {
if let Some(signature) = helper.signature {
signature.typ();
return Ok(Self {
signature,
certificates: helper
.used_certs
.into_iter()
.map(|(id, pos)| (id, helper.certs[pos].to_owned()))
.collect(),
errors: helper.errors,
});
Err(anyhow!("no (valid) signature"))
// Check a signature
// The required public key is retrieved from `certs`. If there is no
// matching key or the required certificate is missing, an error is
// returned.
pub fn check_signature(
certs: &[Cert],
signature: &PgpSignature,
) -> sequoia_openpgp::Result<ValidatedSignature> {
let mut p = StandardPolicy::new();
// accept all versions of the signature tag
// PGP 2.6.x only accepts version 3 signatures.
p.accept_packet_tag(Tag::Signature);
// git uses SHA1 which is insecure in the standard policy
p.accept_hash(sequoia_openpgp::types::HashAlgorithm::SHA1);
p.accept_asymmetric_algo(AsymmetricAlgorithm::DSA1024);
let builder = DetachedVerifierBuilder::from_bytes(&signature.signature)?;
let h = Helper {
certs,
signature: None,
used_certs: Vec::new(),
errors: Vec::new(),
let mut v = builder.with_policy(&p, None, h)?;
v.verify_bytes(&signature.payload)?;
let helper = v.into_helper();
ValidatedSignature::from_helper(helper)
struct Helper<'a> {
certs: &'a [Cert],
signature: Option<Signature>,
// certificates that are used by the current signature
// They are index into `certs`
used_certs: Vec<(KeyHandle, usize)>,
errors: Vec<anyhow::Error>,
impl<'a> VerificationHelper for Helper<'a> {
/// This method will be called at most once per message.
/// a missing certificate for an id is not an error
fn get_certs(&mut self, ids: &[KeyHandle]) -> sequoia_openpgp::Result<Vec<Cert>> {
self.used_certs.clear();
let mut certs = Vec::with_capacity(ids.len());
for id in ids {
for (pos, cert) in self.certs.iter().enumerate() {
if cert.keys().any(|key| key.key().key_handle() == *id) {
self.used_certs.push((id.clone(), pos));
certs.push(cert.to_owned());
// when debugging certs that cannot be found use the next line
// Ok(self.certs.to_vec())
fn check(&mut self, structure: MessageStructure) -> sequoia_openpgp::Result<()> {
// in detached signatures, there is only one layer that is a
// SignatureGroup
for layer in structure {
if let MessageLayer::SignatureGroup { results } = layer {
self.errors.clear();
for result in results {
if self.signature.is_some() {
return Err(anyhow!("Multiple signatures found."));
self.signature = Some(check_result(&mut self.errors, result));
return Ok(());
panic!("MessageStructure should only have one layer with a SignatureGroup when `check` is called for a detached signature.");
// Check the result of a signature match and place any errors into
// `result`.
fn check_result(
errors: &mut Vec<anyhow::Error>,
result: std::result::Result<GoodChecksum, VerificationError>,
) -> Signature {
match result {
Ok(checksum) => checksum.sig.clone(),
Err(e) => match e {
VerificationError::MalformedSignature { sig, error } => {
errors.push(error);
sig.clone()
VerificationError::MissingKey { sig } => {
errors.push(MissingKeyError(sig.clone()).into());
VerificationError::UnboundKey {
sig,
cert: _,
error,
} => {
VerificationError::BadKey { sig, ka: _, error } => {
VerificationError::BadSignature { sig, ka: _, error } => {
},
#[derive(Debug)]
pub struct MissingKeyError(Signature);
impl std::error::Error for MissingKeyError {}
impl MissingKeyError {
pub fn missing_key(&self) -> Option<KeyHandle> {
self.0.get_issuers().get(0).map(|k| k.to_owned())
impl std::fmt::Display for MissingKeyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if let Some(issuer) = self.0.get_issuers().get(0) {
write!(f, "Missing key: {}", issuer)
} else {
write!(f, "Missing key")