Source code for compareEntities

import copy
from urllib.parse import urlparse
from collections import namedtuple


[docs]class EntityCoverage: """ A class that compute the overlap between two JSON schemas semantic values taken from context files. This operation is not commutative. Thus, to find out if the schema/context pairs are equivalent, we need to run both semDiff(s_a, c_a, s_b, c_b) and semDiff(s_b, c_b, s_a, c_a) :param schema_a: the content of the first schema :param context_a: the context content bound to the first schema :param schema_b: the content of the second schema :param context_b: the context content bound to the second schema """
[docs] def __init__(self, schema_a, context_a, schema_b, context_b): self.input1 = { "schema": schema_a, "context": context_a } self.input2 = { "schema": schema_b, "context": context_b } self.comparator1 = self.__build_context_dict(self.input1) self.comparator2 = self.__build_context_dict(self.input2) self.overlaps = self.__compute_context_coverage(self.comparator1[0], self.comparator2[0]) self.unmatched_with_sem = self.overlaps[2] self.unmatched_without_sem = self.comparator2[1] self.full_coverage = { "coverage": self.overlaps[0], "overlapping fields": self.overlaps[1], "ignored fields": self.comparator1[1] }
def __build_context_dict(self, schema_input): """ A private method that associate each field in a schema to it's semantic value in the context and reverse the result :param schema_input: :return sorted_values: a dictionary of semantic values and their corresponding field :return ignored_fields: a list of fields that were ignored due to having no semantic value in the context file """ sorted_values = {} ignored_keys = ["@id", "@context", "@type"] schema = copy.deepcopy(schema_input) ignored_fields = [] # for each field in the schema for field in schema['schema']['properties']: # Ignoring useless keys if field not in ignored_keys: # If the field can be found in the context, process it if field in schema["context"]["@context"].keys(): # This is the raw semantic value of the field, it might need some processing raw_semantic_value = schema["context"]["@context"][field] # If the field raw semantic value is a string if isinstance(raw_semantic_value, str): sorted_values = self.__process_field(field, raw_semantic_value, schema["context"]["@context"], sorted_values) # if the field raw semantic value is not a string else: sorted_values = self.__process_field(field, raw_semantic_value['@id'], schema["context"]["@context"], sorted_values) # if the field is absent from the context file, ignore it as it has no semantic # definition else: ignored_fields.append(field) return sorted_values, ignored_fields @staticmethod def __process_field(field_name, field_value, context, comparator): """ Private method that catches a given field semantic value from the given context and adds it to the output :param field_name: the name of the given field :param field_value: the value of the given field :param context: the context from which to retrieve the semantic value :param comparator: the output of __build_context_dict() :return comparator: a dictionary of semantic values and corresponding fields from the given schema and context """ base_url = urlparse(field_value).scheme # if the raw value is already an URL, it does not need processing if base_url in ('http', 'https'): if field_value not in comparator: comparator[field_value] = [field_name] else: comparator[field_value].append(field_name) # replacing semantic base to form an absolute IRI else: to_be_processed = True if ":" in field_value: if copy.deepcopy(field_value).split(":")[1] == "": to_be_processed = False if to_be_processed is not False: processed_semantic_value = field_value.replace(base_url + ":", context[base_url]) if processed_semantic_value not in comparator: comparator[processed_semantic_value] = [field_name] else: comparator[processed_semantic_value].append(field_name) return comparator @staticmethod def __compute_context_coverage(context1, context2): """ Private method that compares the fields from the two schemas based on their semantic values :param context1: the final output of __build_context_dict() for the first schema :param context2: the final output of __build_context_dict() for the second schema :return local_overlap_value: a namedtuple containing relative and absolute coverage :return overlap_output: a dictionary that associate fields in schema 1 with their semantic twins in schema 2 :return unmatched_fields: a dictionary of all fields of the second schema that haven't been matched in the first schema """ unmatched_fields = copy.deepcopy(context2) Overlap = namedtuple('Overlap', ['first_field', 'second_field']) OverlapValue = namedtuple('OverlapValue', ['relative_coverage', 'absolute_coverage']) overlap_number = 0 overlap_output = [] processed_field = 0 for field in context1: processed_field += 1 if field in context2: overlap_number += len(context1[field]) for first_field_val in context1[field]: for second_field_val in context2[field]: local_overlap = Overlap(first_field_val, second_field_val) overlap_output.append(local_overlap) if field in unmatched_fields: del unmatched_fields[field] absolute_coverage = namedtuple('AbsoluteCoverage', ['overlap_number', 'total_fields']) local_coverage = absolute_coverage(str(overlap_number), str(processed_field)) try: local_overlap_value = OverlapValue(str(round((overlap_number * 100) / len(context1), 2)), local_coverage) except ZeroDivisionError: local_overlap_value = OverlapValue(0, 0) return local_overlap_value, overlap_output, unmatched_fields