rdf-canonize/rdf_canonize/canonize.py

49 lines
2.5 KiB
Python

"""
Canonicalization algorithms
* Initialization. Initialize the state needed for the rest of the algorithm using 4.2 Canonicalization State. Also initialize the canonicalized dataset using the input dataset (which remains immutable) the input blank node identifier map (retaining blank node identifiers from the input if possible, otherwise assigning them arbitrarily); the issued identifiers map from the canonical issuer is added upon completion of the algorithm.
* Compute first degree hashes. Compute the first degree hash for each blank node in the dataset using 4.6 Hash First Degree Quads.
* Canonically label unique nodes. Assign canonical identifiers via 4.5 Issue Identifier Algorithm, in Unicode code point order, to each blank node whose first degree hash is unique.
* Compute N-degree hashes for non-unique nodes. For each repeated first degree hash (proceeding in Unicode code point order), compute the N-degree hash via 4.8 Hash N-Degree Quads of every unlabeled blank node that corresponds to the given repeated hash.
* Canonically label remaining nodes. In Unicode code point order of the N-degree hashes, issue canonical identifiers to each corresponding blank node using 4.5 Issue Identifier Algorithm. If more than one node produces the same N-degree hash, the order in which these nodes receive a canonical identifier does not matter.
* Finish. Return the serialized canonical form of the canonicalized dataset. Alternatively, return the canonicalized dataset containing the input blank node identifier map and issued identifiers map.
"""
from typing import Union
from rdflib import Graph, Dataset, BNode
from rdf_canonize.state import CanonicalizationState
from rdf_canonize.types import CanonicalizedDataset, QuadType
def hash_first_degree(quad: QuadType):
pass
class Canonicalizer:
def __init__(self, dataset: Union[Graph, Dataset]):
if isinstance(dataset, Graph):
ds = Dataset()
ds.add_graph(ds)
dataset = ds
self.state = CanonicalizationState()
self.dataset = CanonicalizedDataset(init=dataset)
def init_state(self):
for quad in self.dataset.init.quads():
for node in [quad[0], quad[2]]:
if isinstance(node, BNode):
try:
self.state.bnode_quads[node.node_id].append(quad)
except KeyError:
self.state.bnode_quads[node.node_id] = [quad]
def canonicalize(self):
self.init_state()