mirror of
https://git.sr.ht/~tsileo/microblog.pub
synced 2024-12-22 13:14:28 +00:00
Add LD sig from little-boxes
This commit is contained in:
parent
0c1b273398
commit
b846c397ab
1 changed files with 77 additions and 0 deletions
77
app/ldsig.py
Normal file
77
app/ldsig.py
Normal file
|
@ -0,0 +1,77 @@
|
||||||
|
import base64
|
||||||
|
import hashlib
|
||||||
|
import typing
|
||||||
|
from datetime import datetime
|
||||||
|
from functools import lru_cache
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from Crypto.Hash import SHA256
|
||||||
|
from Crypto.Signature import PKCS1_v1_5
|
||||||
|
from pyld import jsonld # type: ignore
|
||||||
|
|
||||||
|
if typing.TYPE_CHECKING:
|
||||||
|
from app.key import Key
|
||||||
|
|
||||||
|
|
||||||
|
_LOADER = jsonld.requests_document_loader()
|
||||||
|
|
||||||
|
|
||||||
|
@lru_cache(256)
|
||||||
|
def _caching_document_loader(url: str) -> Any:
|
||||||
|
return _LOADER(url)
|
||||||
|
|
||||||
|
|
||||||
|
jsonld.set_document_loader(_caching_document_loader)
|
||||||
|
|
||||||
|
|
||||||
|
def _options_hash(doc):
|
||||||
|
doc = dict(doc["signature"])
|
||||||
|
for k in ["type", "id", "signatureValue"]:
|
||||||
|
if k in doc:
|
||||||
|
del doc[k]
|
||||||
|
doc["@context"] = "https://w3id.org/identity/v1"
|
||||||
|
normalized = jsonld.normalize(
|
||||||
|
doc, {"algorithm": "URDNA2015", "format": "application/nquads"}
|
||||||
|
)
|
||||||
|
h = hashlib.new("sha256")
|
||||||
|
h.update(normalized.encode("utf-8"))
|
||||||
|
return h.hexdigest()
|
||||||
|
|
||||||
|
|
||||||
|
def _doc_hash(doc):
|
||||||
|
doc = dict(doc)
|
||||||
|
if "signature" in doc:
|
||||||
|
del doc["signature"]
|
||||||
|
normalized = jsonld.normalize(
|
||||||
|
doc, {"algorithm": "URDNA2015", "format": "application/nquads"}
|
||||||
|
)
|
||||||
|
h = hashlib.new("sha256")
|
||||||
|
h.update(normalized.encode("utf-8"))
|
||||||
|
return h.hexdigest()
|
||||||
|
|
||||||
|
|
||||||
|
def verify_signature(doc, key: "Key"):
|
||||||
|
to_be_signed = _options_hash(doc) + _doc_hash(doc)
|
||||||
|
signature = doc["signature"]["signatureValue"]
|
||||||
|
signer = PKCS1_v1_5.new(key.pubkey or key.privkey) # type: ignore
|
||||||
|
digest = SHA256.new()
|
||||||
|
digest.update(to_be_signed.encode("utf-8"))
|
||||||
|
return signer.verify(digest, base64.b64decode(signature)) # type: ignore
|
||||||
|
|
||||||
|
|
||||||
|
def generate_signature(doc, key: "Key"):
|
||||||
|
options = {
|
||||||
|
"type": "RsaSignature2017",
|
||||||
|
"creator": doc["actor"] + "#main-key",
|
||||||
|
"created": datetime.utcnow().replace(microsecond=0).isoformat() + "Z",
|
||||||
|
}
|
||||||
|
doc["signature"] = options
|
||||||
|
to_be_signed = _options_hash(doc) + _doc_hash(doc)
|
||||||
|
if not key.privkey:
|
||||||
|
raise ValueError(f"missing privkey on key {key!r}")
|
||||||
|
|
||||||
|
signer = PKCS1_v1_5.new(key.privkey)
|
||||||
|
digest = SHA256.new()
|
||||||
|
digest.update(to_be_signed.encode("utf-8"))
|
||||||
|
sig = base64.b64encode(signer.sign(digest)) # type: ignore
|
||||||
|
options["signatureValue"] = sig.decode("utf-8")
|
Loading…
Reference in a new issue