Lines
0 %
Functions
Branches
100 %
use anyhow::anyhow;
use sequoia_openpgp::packet::{Signature, Tag};
use sequoia_openpgp::parse::stream::{
DetachedVerifierBuilder, GoodChecksum, MessageLayer, MessageStructure, VerificationError,
VerificationHelper,
};
use sequoia_openpgp::policy::{AsymmetricAlgorithm, StandardPolicy};
use sequoia_openpgp::KeyHandle;
use sequoia_openpgp::{cert::raw::RawCertParser, parse::Parse, Cert};
use serde::Serialize;
use std::path::Path;
use std::process::{Command, Stdio};
type Result<T> = anyhow::Result<T>;
fn read_keys(gpgdir: &Path) -> Result<Vec<u8>> {
let gpgdir = gpgdir.to_str().unwrap();
let output = Command::new("gpg")
.args(["--batch", "--homedir", gpgdir, "--export", "-a"])
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.output()?;
if !output.status.success() {
return Err(anyhow!(
"Could load public keys: {}",
&String::from_utf8_lossy(&output.stderr)
));
}
Ok(output.stdout)
pub fn load_keys(gpgdir: &Path) -> Result<Vec<Cert>> {
let keys = read_keys(gpgdir)?;
let parser = RawCertParser::from_bytes(&keys)?;
let mut certs = Vec::new();
for cert in parser {
let cert = cert?;
certs.push(Cert::try_from(cert)?);
println!("Loaded {} keys", certs.len());
Ok(certs)
#[derive(Serialize)]
pub struct PgpSignature {
pub payload: Vec<u8>,
pub signature: Vec<u8>,
pub struct ValidatedSignature {
/// The parsed signature
pub signature: Signature,
/// The certificates corresponding to keys from the signature
pub certificates: Vec<(KeyHandle, Cert)>,
impl ValidatedSignature {
fn from_helper(helper: Helper) -> sequoia_openpgp::Result<Self> {
if let Some(signature) = helper.signature {
signature.typ();
return Ok(Self {
signature,
certificates: helper
.used_certs
.into_iter()
.map(|(id, pos)| (id, helper.certs[pos].to_owned()))
.collect(),
});
todo!()
// Check a signature
// The required public key is retrieved from `certs`. If there is no
// matching key or the required certificate is missing, an error is
// returned.
pub fn check_signature(
certs: &[Cert],
signature: &PgpSignature,
) -> sequoia_openpgp::Result<ValidatedSignature> {
let mut p = StandardPolicy::new();
// accept all versions of the signature tag
// PGP 2.6.x only accepts version 3 signatures.
p.accept_packet_tag(Tag::Signature);
// git uses SHA1 which is insecure in the standard policy
p.accept_hash(sequoia_openpgp::types::HashAlgorithm::SHA1);
p.accept_asymmetric_algo(AsymmetricAlgorithm::DSA1024);
let builder = DetachedVerifierBuilder::from_bytes(&signature.signature)?;
let h = Helper {
certs,
signature: None,
used_certs: Vec::new(),
let mut v = builder.with_policy(&p, None, h)?;
v.verify_bytes(&signature.payload)?;
let helper = v.into_helper();
ValidatedSignature::from_helper(helper)
struct Helper<'a> {
certs: &'a [Cert],
signature: Option<Signature>,
// certificates that are used by the current signature
// They are index into `certs`
used_certs: Vec<(KeyHandle, usize)>,
impl<'a> VerificationHelper for Helper<'a> {
/// This method will be called at most once per message.
/// a missing certificate for an id is not an error
fn get_certs(&mut self, ids: &[KeyHandle]) -> sequoia_openpgp::Result<Vec<Cert>> {
self.used_certs.clear();
let mut certs = Vec::with_capacity(ids.len());
for id in ids {
for (pos, cert) in self.certs.iter().enumerate() {
if cert.key_handle() == *id {
self.used_certs.push((id.clone(), pos));
certs.push(cert.to_owned());
// when debugging certs that cannot be found use the next line
// Ok(self.certs.to_vec())
fn check(&mut self, structure: MessageStructure) -> sequoia_openpgp::Result<()> {
// in detached signatures, there is only one layer that is a
// SignatureGroup
for layer in structure {
if let MessageLayer::SignatureGroup { results } = layer {
let mut errors = Vec::new();
for result in results {
if self.signature.is_some() {
return Err(anyhow!("Multiple signatures found."));
self.signature = check_result(&mut errors, result);
if !errors.is_empty() {
return Err(anyhow::Error::from(SignatureErrors(errors)));
return Ok(());
panic!("MessageStructure should only have one layer with a SignatureGroup when `check` is called for a detached signature.");
// Check the result of a signature match and place any errors into
// `result`.
fn check_result(
errors: &mut Vec<anyhow::Error>,
result: std::result::Result<GoodChecksum, VerificationError>,
) -> Option<Signature> {
match result {
Ok(checksum) => return Some(checksum.sig.clone()),
Err(VerificationError::MalformedSignature { sig: _, error }) => {
errors.push(error);
Err(VerificationError::MissingKey { sig }) => {
errors.push(MissingKeyError(sig.clone()).into());
Err(VerificationError::UnboundKey {
sig: _,
cert: _,
error,
}) => {
Err(VerificationError::BadKey {
ka: _,
Err(VerificationError::BadSignature {
None
#[derive(Debug)]
pub struct SignatureErrors(pub Vec<anyhow::Error>);
impl std::error::Error for SignatureErrors {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
self.0.first().and_then(|e| e.source())
impl std::fmt::Display for SignatureErrors {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
for item in &self.0 {
item.fmt(f)?;
Ok(())
pub struct MissingKeyError(Signature);
impl std::error::Error for MissingKeyError {}
impl MissingKeyError {
pub fn missing_key(&self) -> Option<KeyHandle> {
self.0.get_issuers().get(0).map(|k| k.to_owned())
impl std::fmt::Display for MissingKeyError {
if let Some(issuer) = self.0.get_issuers().get(0) {
write!(f, "Missing key: {}", issuer)
} else {
write!(f, "Missing key")