import copy
import json
import os
from jsonschema.validators import Draft4Validator
from semDiff.compareEntities import EntityCoverage
from utils.schema2context import process_schema_name
[docs]class EntityMerge:
""" A class that merge two schemas based on their semantic annotations
:param schema1: dictionary of the first schema
:param context1: dictionary of the first context as {"@context":{}}
:param schema2: dictionary of the second schema
:param context2: dictionary of the second context as {"@context":{}}
"""
[docs] def __init__(self, schema1, context1, schema2, context2):
# Initiate output as a copy of the first schema and its context
self.output_schema = copy.deepcopy(schema1)
self.output_context = copy.deepcopy(context1)
coverage = EntityCoverage(schema1, context1, schema2, context2)
# for each unmatched field of the second schema that have a semantic value in the second
# context
for field_semantic_value in coverage.unmatched_with_sem.keys():
# field are organized in an array
for field_name in coverage.unmatched_with_sem[field_semantic_value]:
self.output_context["@context"][field_name] = field_semantic_value
self.output_schema['properties'][field_name] = schema2['properties'][field_name]
# for each unmatched field that doesn't have a semantic value
for field_name in coverage.unmatched_without_sem:
# if that field isn't already in the first schema
if field_name not in schema1["properties"]:
self.output_schema["properties"][field_name] = schema2["properties"][field_name]
[docs]class MergeEntityFromDiff:
""" A class that merges network2 into network1 based on overlaps from FullDiff
:param overlaps: a variable containing
"""
[docs] def __init__(self, overlaps):
self.overlaps = overlaps["overlaps"]
self.output = {
"schemas": copy.deepcopy(overlaps["network1"]['schemas']),
"contexts": copy.deepcopy(overlaps["network1"]['contexts'])
}
self.content = overlaps
self.name_mapping = {} # {"oldName":"newName"}
self.output_name = \
self.content['network1']['name'].lower() + \
"_" + self.content['network2']['name'].lower() + "_merge"
self.output_dir = os.path.join(os.path.dirname(__file__),
"../tests/fullDiffOutput/merges/" + self.output_name + "/")
self.errors = {}
self.main_schema_name = overlaps['network1']['name'].lower().replace(' ', '_').capitalize()
if "fields_to_merge" not in overlaps:
print("Nothing to merge for current setup")
exit()
# Process mergings
for schemaName in overlaps['fields_to_merge']:
merging_schema_name = schemaName.replace('_schema.json', '')
merge_with_schema_name = overlaps['fields_to_merge'][schemaName][
'merge_with'].replace('_schema.json', '')
if merge_with_schema_name != merging_schema_name:
merged_schema_name = merge_with_schema_name + "_" \
+ merging_schema_name \
+ "_merged_schema.json"
merged_type = merge_with_schema_name.capitalize(
) + merging_schema_name.capitalize()
else:
merged_schema_name = merge_with_schema_name + "_merged_schema.json"
merged_type = merge_with_schema_name.capitalize() + 'Merged'
self.name_mapping[overlaps['fields_to_merge'][schemaName][
'merge_with']] = merged_schema_name
self.name_mapping[schemaName] = merged_schema_name
merged_title = overlaps["network1"]['schemas'][overlaps[
'fields_to_merge'][schemaName]['merge_with']]['title'] + " - " + \
overlaps["network2"]['schemas'][schemaName]['title'] + " merging"
merged_description = "Merge between the " + overlaps["network1"]['schemas'][overlaps[
'fields_to_merge'][schemaName]['merge_with']]['title'] + " and the " + \
overlaps["network2"]['schemas'][schemaName]['title']
merged_schema = copy.deepcopy(
overlaps["network1"]['schemas'][
overlaps['fields_to_merge'][schemaName]['merge_with']])
merged_context = copy.deepcopy(
overlaps["network1"]['contexts'][overlaps[
'fields_to_merge'][schemaName]['merge_with']])
del self.output['schemas'][overlaps['fields_to_merge'][schemaName]['merge_with']]
del self.output['contexts'][overlaps['fields_to_merge'][schemaName]['merge_with']]
# process the fields to merge
for field in overlaps['fields_to_merge'][schemaName]['fields']:
merged_schema['properties'][field] = overlaps['network2'][
'schemas'][schemaName]['properties'][field]
merged_schema['title'] = merged_title
merged_schema['description'] = merged_description
merged_context[field] = overlaps['network2']['contexts'][schemaName][field]
self.find_references(
overlaps['network2']['schemas'][schemaName]['properties'][field])
if 'enum' in merged_schema['properties']['@type']:
type_iterator = 0
for schema_type in merged_schema['properties']['@type']['enum']:
if schema_type == merge_with_schema_name.capitalize():
del merged_schema['properties']['@type']['enum'][type_iterator]
merged_schema['properties']['@type']['enum'].append(merged_type)
type_iterator += 1
self.output['schemas'][merged_schema_name] = merged_schema
self.output['contexts'][merged_schema_name] = merged_context
# processing main schema name
for overlap in self.overlaps:
if self.main_schema_name in overlap[0] and float(overlap[1]['coverage'][0]) >= 100:
old_schema1_name = self.main_schema_name.lower()
old_schema2_name = overlap[0][1].lower()
new_schema_name = old_schema1_name \
+ "_" + old_schema2_name + '_merged_schema.json'
new_description = "A merge between " \
+ old_schema1_name + " and " \
+ old_schema2_name + " schemas"
new_title = "Merge between " + old_schema1_name + " and " + old_schema2_name
new_schema = self.content['network1']['schemas'][old_schema1_name
+ "_schema.json"]
new_schema['description'] = new_description
new_schema['title'] = new_title
if 'enum' in new_schema['properties']['@type']:
print()
type_iterator = 0
for schema_type in new_schema['properties']['@type']['enum']:
if schema_type == self.main_schema_name:
del new_schema['properties']['@type']['enum'][type_iterator]
new_schema['properties']['@type']['enum'].append(
process_schema_name(new_schema_name))
self.output['schemas'][new_schema_name] = new_schema
del self.output['schemas'][old_schema1_name + "_schema.json"]
# Context
self.output['contexts'][new_schema_name] = self.content['network1'][
'contexts'][old_schema1_name + '_schema.json']
self.output['contexts'][new_schema_name][
process_schema_name(new_schema_name)] = self.output['contexts'][
old_schema1_name + '_schema.json'][process_schema_name(old_schema1_name)]
del self.output['contexts'][new_schema_name][process_schema_name(old_schema1_name)]
del self.output['contexts'][old_schema1_name + '_schema.json']
self.modify_references()
[docs] def find_references(self, field):
""" Find $ref at root, in items or in allOf, anyOf, oneOf, adds the schema/context
to the merge and change reference names
:param field: a schema field
:type field: dict
:return:
"""
look_for = ["anyOf", "oneOf", "allOf"]
# $ref at root
if '$ref' in field:
sub_schema_name = field['$ref'].replace("#", '')
self.add_schema(sub_schema_name)
# $ref in anyOf, oneOf or allOf
for item in look_for:
if item in field:
for sub_item in field[item]:
if '$ref' in sub_item:
sub_schema_name = sub_item['$ref'].replace("#", '')
self.add_schema(sub_schema_name)
# $ref in items
if 'items' in field:
if '$ref' in field['items']:
sub_schema_name = field['items']['$ref'].replace('#', '')
self.add_schema(sub_schema_name)
for item in look_for:
if item in field['items']:
for sub_item in field['items'][item]:
if '$ref' in sub_item:
sub_schema_name = sub_item['$ref']
self.add_schema(sub_schema_name)
[docs] def add_schema(self, schema_name):
""" Adds the schema to the merge
:param schema_name:
:return:
"""
if schema_name not in self.name_mapping:
if schema_name is not None and schema_name not in self.output['schemas']:
schema_name = schema_name.replace("#", '')
self.output['schemas'][schema_name] = \
self.content['network2']['schemas'][schema_name]
"""if schema_name in self.content['network2']['contexts']:
self.output['contexts'][schema_name] = \
self.content['network2']['contexts'][schema_name]"""
self.find_references(self.content['network2']['schemas'][schema_name])
[docs] def modify_references(self):
""" Modify the $ref names
:return:
"""
look_for = ["anyOf", "oneOf", "allOf"]
delete_schemas = []
for schema in self.output['schemas']:
if schema in self.name_mapping:
delete_schemas.append(schema)
else:
if 'properties' in self.output['schemas'][schema]:
for item in self.output['schemas'][schema]['properties']:
field = self.output['schemas'][schema]['properties'][item]
if '$ref' in field:
field_ref = field['$ref'].replace('#', '')
if field_ref in self.name_mapping:
self.output['schemas'][schema]['properties'][item]['$ref'] = \
self.name_mapping[field_ref] + '#'
for reference in look_for:
if reference in field:
sub_item_iterator = 0
for sub_item in field[reference]:
if '$ref' in sub_item:
field_ref = sub_item['$ref']
if field_ref in self.name_mapping:
self.output['schemas'][schema]['properties'][
reference][sub_item_iterator]['$ref'] = \
self.name_mapping[field_ref] + "#"
sub_item_iterator += 1
if 'items' in field:
if '$ref' in field['items']:
field_ref = field['items']['$ref'].replace('#', '')
if field_ref in self.name_mapping:
self.output['schemas'][
schema]['properties'][item]['items']['$ref'] = \
self.name_mapping[field_ref] + '#'
for reference in look_for:
if reference in field['items']:
sub_item_iterator = 0
for sub_item in field['items'][reference]:
if '$ref' in sub_item:
field_ref = sub_item['$ref']
if field_ref in self.name_mapping:
self.output['schemas'][schema]['properties'][
reference]['items'][
sub_item_iterator]['$ref'] = \
self.name_mapping[field_ref] + "#"
sub_item_iterator += 1
for schema in delete_schemas:
del self.output['schemas'][schema]
change_names = {v: k for k, v in self.name_mapping.items()}
for context in self.output['contexts']:
new_field_base_name = process_schema_name(context)
if context in change_names:
old_schema_name = change_names[context]
old_field_base_name = process_schema_name(old_schema_name)
# set the new context field
self.output["contexts"][context][new_field_base_name] = \
copy.copy(self.content['network2'][
"contexts"][old_schema_name][old_field_base_name])
[docs] def save(self, base_url):
""" Saves the merge to disk and replace "id" attribute with the given base url
+ schema name
:param base_url:
:return:
"""
directory_system = [
os.path.join(self.output_dir, 'schema'),
os.path.join(self.output_dir, 'context')
]
if not os.path.exists(self.output_dir):
os.makedirs(self.output_dir)
for directory in directory_system:
if not os.path.exists(directory):
os.makedirs(directory)
for schemaName in self.output["schemas"]:
schema = self.output["schemas"][schemaName]
schema["id"] = base_url + "schema/" + schemaName
schema_file_name = os.path.join(os.path.join(self.output_dir, 'schema/'), schemaName)
context_name = schemaName.replace("_schema.json", '_context.jsonld')
context_file_name = \
os.path.join(os.path.join(self.output_dir, 'context/'), context_name)
with open(schema_file_name, "w") as schemaFile:
schemaFile.write(json.dumps(schema, indent=4))
schemaFile.close()
if schemaName in self.output['contexts'].keys():
with open(context_file_name, "w") as contextFile:
contextFile.write(json.dumps({
"@context": self.output['contexts'][schemaName]
}, indent=4))
contextFile.close()
[docs] def validate_output(self):
""" Validates the output of the merge
:return:
"""
for schema in self.output['schemas']:
try:
Draft4Validator.check_schema(self.output['schemas'][schema])
except Exception as e:
if schema not in self.errors:
self.errors[schema] = []
self.errors[schema].append(str(e))