"""
A module containing some data container base classes.
"""
from __future__ import absolute_import
import json
import logging
import sys
from copy import deepcopy
from jsonschema.exceptions import best_match
from six import string_types
import jsonpickle
import jsonschema
from jsonschema import ValidationError
from jsonschema import SchemaError
logging.basicConfig(
level=logging.DEBUG,
format="%(levelname)s: %(asctime)s {%(filename)s:%(lineno)d}: %(message)s "
)
JSONPICKLE_TYPE_FIELD = "py/object"
TYPE_FIELD = "jsonClass"
#: Maps jsonClass values to the containing Python module object. Useful for (de)serialization. Updated using :func:`update_json_class_index` calls at the end of each module file (such as this one) whose classes may be serialized.
json_class_index = {}
[docs]def update_json_class_index(module_in):
"""Call this function to enable (de)serialization.
Usage example: common.update_json_class_index(sys.modules[__name__]).
"""
import inspect
for name, obj in inspect.getmembers(module_in):
if inspect.isclass(obj):
json_class_index[name] = obj
[docs]def check_class(obj, allowed_types):
results = [isinstance(obj, some_type) for some_type in allowed_types]
# logging.debug(results)
return True in results
[docs]def check_list_item_types(some_list, allowed_types):
check_class_results = [check_class(item, allowed_types=allowed_types) for item in some_list]
# logging.debug(check_class_results)
return not (False in check_class_results)
[docs]def recursively_merge_json_schemas(a, b, json_path=""):
assert a.__class__ == b.__class__, str(a.__class__) + " vs " + str(b.__class__)
if isinstance(b, dict) and isinstance(a, dict):
a_and_b = set(a.keys()) & set(b.keys())
every_key = set(a.keys()) | set(b.keys())
merged_dict = {}
for k in every_key:
if k in a_and_b:
merged_dict[k] = recursively_merge_json_schemas(a[k], b[k], json_path=json_path + "/" + k)
else:
merged_dict[k] = deepcopy(a[k] if k in a else b[k])
return merged_dict
elif isinstance(b, list) and isinstance(a, list) and not json_path.endswith(TYPE_FIELD + "/enum"):
# TODO: What if we have a list of dicts?
return list(set(a + b))
else:
return deepcopy(b)
[docs]class JsonObject(object):
"""The base class of all Json-serializable data container classes, with many utility methods."""
schema = {
"type": "object",
"properties": {
TYPE_FIELD: {
"type": "string",
"description": "A hint used by json libraries to deserialize json data to an object of the appropriate type."
" This is necessary for sub-objects to have as well (to ensure that the deserialization functions as expected)."
},
},
"required": [TYPE_FIELD]
}
def __init__(self):
# Dont do: self._id = None . You'll get "_id": null when the object is serialized.
self.set_type()
[docs] @classmethod
def make_from_dict(cls, input_dict):
"""Defines *our* canonical way of constructing a JSON object from a dict.
All other deserialization methods should use this.
Note that this assumes that json_class_index is populated properly!
- ``from sanskrit_data.schema import *`` before using this should take care of it.
:param input_dict:
:return: A subclass of JsonObject
"""
if input_dict is None:
return None
assert TYPE_FIELD in input_dict, "no type field: " + str(input_dict)
dict_without_id = deepcopy(input_dict)
_id = dict_without_id.pop("_id", None)
def recursively_set_jsonpickle_type(some_dict):
"""Translates jsonClass fields to py/object"""
wire_type = some_dict.pop(TYPE_FIELD, None)
if wire_type:
some_dict[JSONPICKLE_TYPE_FIELD] = json_class_index[wire_type].__module__ + "." + wire_type
for key, value in iter(some_dict.items()):
if isinstance(value, dict):
recursively_set_jsonpickle_type(value)
elif isinstance(value, list):
for item in value:
if isinstance(item, dict):
recursively_set_jsonpickle_type(item)
recursively_set_jsonpickle_type(dict_without_id)
new_obj = jsonpickle.decode(json.dumps(dict_without_id))
# logging.debug(new_obj.__class__)
if _id:
new_obj._id = str(_id)
new_obj.set_type_recursively()
return new_obj
[docs] @classmethod
def make_from_dict_list(cls, input_dict_list):
assert isinstance(input_dict_list, list)
return [cls.make_from_dict(input_dict=input_dict) for input_dict in input_dict_list]
[docs] @classmethod
def make_from_pickledstring(cls, pickle):
input_str = pickle
if not isinstance(pickle, str):
input_str = str(pickle,'utf-8')
if input_str.strip().startswith("["):
return cls.make_from_dict_list(jsonpickle.decode(pickle))
else:
obj = cls.make_from_dict(jsonpickle.decode(pickle))
return obj
[docs] @classmethod
def read_from_file(cls, filename):
try:
with open(filename) as fhandle:
obj = cls.make_from_dict(jsonpickle.decode(fhandle.read()))
return obj
except Exception as e:
try:
with open(filename) as fhandle:
obj = cls.make_from_dict_list(jsonpickle.decode(fhandle.read()))
return obj
except Exception as e:
logging.error("Error reading " + filename + " : ".format(e))
raise e
[docs] def dump_to_file(self, filename):
try:
import os
os.makedirs(os.path.dirname(filename), exist_ok=True)
with open(filename, "w") as f:
f.write(str(self))
except Exception as e:
logging.error("Error writing " + filename + " : ".format(e))
raise e
[docs] @classmethod
def get_wire_typeid(cls):
return cls.__name__
[docs] @classmethod
def get_jsonpickle_typeid(cls):
return cls.__module__ + "." + cls.__name__
[docs] @classmethod
def get_json_map_list(cls, some_list):
return [item.to_json_map() for item in some_list]
[docs] def get_external_storage_path(self, db_interface):
"""Get the directory path where files associated with this object are to be stored."""
import os
return os.path.join(db_interface.external_file_store, self._id)
[docs] def list_files(self, db_interface, suffix_pattern="*"):
import glob
import os
file_list = glob.glob(pathname=os.path.join(self.get_external_storage_path(db_interface=db_interface), suffix_pattern))
return [os.path.basename(f) for f in file_list]
[docs] def set_type(self):
# self.class_type = str(self.__class__.__name__)
setattr(self, TYPE_FIELD, self.__class__.get_wire_typeid())
# setattr(self, TYPE_FIELD, self.__class__.__name__)
[docs] def set_type_recursively(self):
self.set_type()
for key, value in iter(self.__dict__.items()):
if isinstance(value, JsonObject):
value.set_type_recursively()
elif isinstance(value, list):
for item in value:
if isinstance(item, JsonObject):
item.set_type_recursively()
[docs] def set_jsonpickle_type_recursively(self):
self.set_type()
for key, value in iter(self.__dict__.items()):
if isinstance(value, JsonObject):
value.set_type_recursively()
elif isinstance(value, list):
for item in value:
if isinstance(item, JsonObject):
item.set_jsonpickle_type_recursively()
def __str__(self):
return json.dumps(self.to_json_map(), sort_keys=True, indent=2)
[docs] def set_from_dict(self, input_dict):
if input_dict:
for key, value in iter(input_dict.items()):
if isinstance(value, list):
setattr(self, key,
[JsonObject.make_from_dict(item) if isinstance(item, dict) else item for item in value])
elif isinstance(value, dict):
setattr(self, key, JsonObject.make_from_dict(value))
else:
setattr(self, key, value)
# noinspection PyShadowingBuiltins
[docs] def set_from_id(self, db_interface, id):
return self.set_from_dict(db_interface.find_by_id(id=id))
[docs] def to_json_map(self):
"""One convenient way of 'serializing' the object.
So, the type must be properly set.
Many functions accept such json maps, just as they accept strings.
"""
self.set_type_recursively()
json_map = {}
for key, value in iter(self.__dict__.items()):
# logging.debug("%s %s", key, value)
if isinstance(value, JsonObject):
json_map[key] = value.to_json_map()
elif isinstance(value, list):
json_map[key] = [item.to_json_map() if isinstance(item, JsonObject) else item for item in value]
else:
json_map[key] = value
return json_map
[docs] def equals_ignore_id(self, other):
# Makes a unicode copy.
def to_unicode(text):
if isinstance(text, dict):
return {key: to_unicode(value) for key, value in iter(text.items())}
elif isinstance(text, list):
return [to_unicode(element) for element in text]
elif isinstance(text, string_types):
return text.encode('utf-8')
else:
return text
dict1 = to_unicode(self.to_json_map())
dict1.pop("_id", None)
# logging.debug(self.__dict__)
# logging.debug(dict1)
dict2 = to_unicode(other.to_json_map())
dict2.pop("_id", None)
# logging.debug(other.__dict__)
# logging.debug(dict2)
return dict1 == dict2
[docs] def update_collection(self, db_interface, user=None):
"""Do JSON validation and write to database."""
self.set_type_recursively()
if hasattr(self, "schema"):
self.validate(db_interface=db_interface, user=user)
updated_doc = db_interface.update_doc(self.to_json_map())
updated_obj = JsonObject.make_from_dict(updated_doc)
return updated_obj
[docs] def validate_deletion(self, db_interface, user=None):
if not hasattr(self, "_id"):
raise ValidationError("_id not present!")
[docs] def delete_in_collection(self, db_interface, user=None):
"""
To delete referrent items also, use appropriate method in JsonObjectNode.
:param db_interface:
:param user:
:return:
"""
self.validate_deletion(db_interface=db_interface, user=user)
import shutil
db_interface.delete_doc(self._id)
shutil.rmtree(path=self.get_external_storage_path(db_interface=db_interface), ignore_errors=True)
[docs] def validate(self, db_interface=None, user=None):
"""Validate the JSON serialization of this object using the schema member. Called before database writes.
:param user:
:param db_interface: Potentially useful in subclasses to perform validations (eg. is the target_id valid).
This value may not be available: for example when called from the from_details methods.
:return: a boolean.
"""
self.validate_schema()
# Override and call this method to add extra validations.
[docs] def validate_schema(self):
json_map = self.to_json_map()
json_map.pop("_id", None)
# logging.debug(str(self))
try:
jsonschema.validate(json_map, self.schema)
# Subobjects could have specialized validation rules, specified using validate_schema overrides. Hence we specially call those methods.
for key, value in iter(self.__dict__.items()):
# logging.debug("%s %s", key, value)
if isinstance(value, JsonObject):
value.validate_schema()
elif isinstance(value, list):
json_map[key] = [item.validate_schema() if isinstance(item, JsonObject) else item for item in value]
else:
pass
except SchemaError as e:
logging.error("Exception message: " + e.message)
logging.error("Schema is: " + jsonpickle.dumps(self.schema))
logging.error("Context is: " + str(e.context))
logging.error("Best match is: " + str(best_match(errors=[e])))
raise e
except ValidationError as e:
logging.error("Exception message: " + e.message)
logging.error("self is: " + str(self))
logging.error("Schema is: " + jsonpickle.dumps(self.schema))
logging.error("Context is: " + str(e.context))
logging.error("Best match is: " + str(best_match(errors=[e])))
logging.error("json_map is: " + jsonpickle.dumps(json_map))
raise e
# noinspection PyShadowingBuiltins
[docs] @classmethod
def from_id(cls, id, db_interface):
"""Returns None if nothing is found."""
item_dict = db_interface.find_by_id(id=id)
item = None
if item_dict is not None:
item = cls.make_from_dict(item_dict)
return item
[docs] @classmethod
def add_indexes(cls, db_interface):
db_interface.add_index(keys_dict={
"jsonClass": 1
}, index_name="jsonClass")
[docs]class TargetValidationError(Exception):
def __init__(self, allowed_types, target_obj, targeting_obj):
self.allowed_types = allowed_types
self.target_obj = target_obj
self.targeting_obj = targeting_obj
self.message = str(self)
def __str__(self):
return "%s\n targets object \n" \
"%s,\n" \
"which does not belong to \n" \
"%s" % (self.targeting_obj, self.target_obj, str(self.allowed_types))
# noinspection PyProtectedMember,PyUnresolvedReferences
[docs]class Target(JsonObject):
schema = recursively_merge_json_schemas(JsonObject.schema, {
"type": "object",
"properties": {
TYPE_FIELD: {
"enum": ["Target"]
},
"container_id": {
"type": "string"
}
},
"required": ["container_id"]
})
[docs] def get_target_entity(self, db_interface):
"""Returns null if db_interface doesnt have any such entity."""
return JsonObject.from_id(id=self.container_id, db_interface=db_interface)
[docs] def check_target_class(self, db_interface, allowed_types, targeting_obj):
if db_interface is not None:
target_entity = self.get_target_entity(db_interface=db_interface)
if not check_class(obj=target_entity, allowed_types=allowed_types):
raise TargetValidationError(allowed_types=allowed_types, targeting_obj=targeting_obj,
target_obj=target_entity)
[docs] @classmethod
def check_target_classes(cls, targets_to_check, db_interface, allowed_types, targeting_obj):
for target in targets_to_check:
target.check_target_class(db_interface=db_interface, allowed_types=allowed_types, targeting_obj=targeting_obj)
[docs] @classmethod
def from_details(cls, container_id):
target = Target()
target.container_id = container_id
target.validate()
return target
[docs] @classmethod
def from_ids(cls, container_ids):
return [Target.from_details(str(container_id)) for container_id in container_ids]
[docs] @classmethod
def from_containers(cls, containers):
return Target.from_ids(container_ids=[container._id for container in containers])
[docs]class DataSource(JsonObject):
schema = recursively_merge_json_schemas(JsonObject.schema, ({
"type": "object",
"description": "Source of the json-data which contains this node. Eg. Uploader details in case of books, annotator in case of annotations."
" Consider naming the field that contains this object `source` to make querying uniform.",
TYPE_FIELD: {
"enum": ["DataSource"]
},
"properties": {
"source_type": {
"type": "string",
"enum": ["system_inferred", "user_supplied"],
"description": "Does this data come from a machine, or a human? source_ prefix avoids keyword conflicts in some languages.",
"default": "system_inferred"
},
"id": {
"type": "string",
"description": "Something to identify the particular data source.",
},
"by_admin": {
"type": "boolean",
"description": "Was the creator of this data an admin at the time it was created or updated?"
}
},
"required": ["source_type"]
}))
def __init__(self):
"""Set the default properties"""
super().__init__()
# noinspection PyTypeChecker
self.source_type = self.schema["properties"]["source_type"]["default"]
# noinspection PyShadowingBuiltins
[docs] @classmethod
def from_details(cls, source_type, id):
source = DataSource()
source.source_type = source_type
source.id = id
source.validate_schema()
return source
[docs] def infer_by_admin(self, db_interface=None, user=None):
if not hasattr(self, "by_admin"):
# source_type is a compulsory attribute, because that validation is done separately and a suitable error is thrown.
if hasattr(self, "source_type") and self.source_type == "user_supplied":
if user is not None and db_interface is not None:
if not hasattr(self, "id") or self.id in user.get_user_ids():
self.by_admin = user.is_admin(service=db_interface.db_name_frontend)
[docs] def setup_source(self, db_interface=None, user=None):
if not hasattr(self, "source_type"):
self.source_type = "user_supplied" if (user is not None and user.is_human()) else "system_inferred"
if not hasattr(self, "id") and user is not None and user.get_first_user_id_or_none() is not None:
self.id = user.get_first_user_id_or_none()
[docs] def is_id_impersonated_by_non_admin(self, db_interface=None, user=None):
"""A None user is assumed to be a valid authorized backend script."""
if hasattr(self, "id") and user is not None and db_interface is not None:
if self.id not in user.get_user_ids() and not user.is_admin(service=db_interface.db_name_frontend):
return True
return False
[docs] def validate(self, db_interface=None, user=None):
if self.is_id_impersonated_by_non_admin(db_interface=db_interface, user=user):
raise ValidationError("Impersonation by %(id_1)s as %(id_2)s not allowed for this user." % dict(id_1=user.get_first_user_id_or_none(), id_2=self.id))
if "user" in self.source_type and not hasattr(self, "id"):
raise ValidationError("User id compulsary for user sources.")
if hasattr(self, "source_type") and self.source_type == "system_inferred":
if user is not None and user.is_human() and not user.is_admin(service=db_interface.db_name_frontend):
raise ValidationError("Impersonation by %(id_1)s as a bot not allowed for this user." % dict(id_1=user.get_first_user_id_or_none()))
super(DataSource, self).validate(db_interface=db_interface, user=user)
# Only if the writer user is an admin or None, allow by_admin to be set to true (even when the admin is impersonating another user).
if hasattr(self, "by_admin") and self.by_admin:
if user is not None and db_interface is not None and not user.is_admin(service=db_interface.db_name_frontend):
raise ValidationError("Impersonation by %(id_1)s of %(id_2)s not allowed for this user." % dict(id_1=user.get_first_user_id_or_none(), id_2=self.id))
# source_type is a compulsory attribute, because that validation is done separately and a suitable error is thrown.
if hasattr(self, "source_type") and self.source_type != "user_supplied":
if user is not None and db_interface is not None:
raise ValidationError("non user_supplied source_type cannot be an admin.")
[docs]class UllekhanamJsonObject(JsonObject):
"""The archetype JsonObject for use with the Ullekhanam project. See description.schema field"""
schema = recursively_merge_json_schemas(JsonObject.schema, ({
"type": "object",
"description": "Some JsonObject which can be saved as a document in the ullekhanam database.",
"properties": {
"source": DataSource.schema,
"editable_by_others": {
"type": "boolean",
"description": "Can this annotation be taken over by others for wiki-style editing or deleting?",
"default": True
},
"targets": {
"type": "array",
"items": Target.schema,
"description": "This field lets us define a directed graph involving JsonObjects stored in a database."
}
},
"required": [TYPE_FIELD]
}))
target_class = Target
[docs] def is_editable_by_others(self):
# noinspection PyTypeChecker
return self.editable_by_others if hasattr(self, "editable_by_others") else self.schema["properties"]["editable_by_others"]["default"]
def __init__(self):
super(UllekhanamJsonObject, self).__init__()
self.source = DataSource()
[docs] def detect_illegal_takeover(self, db_interface=None, user=None):
if hasattr(self, "_id") and db_interface is not None:
old_obj = JsonObject.from_id(id=self._id, db_interface=db_interface)
if old_obj is not None and not old_obj.is_editable_by_others():
if hasattr(self.source, "id") and hasattr(old_obj.source, "id") and self.source.id != old_obj.source.id:
if user is not None and not user.is_admin(service=db_interface.db_name_frontend):
raise ValidationError("{} cannot take over {}'s annotation for editing or deleting under a non-admin user {}'s authority".format(self.source.id, old_obj.source.id, user.get_first_user_id_or_none))
[docs] def update_collection(self, db_interface, user=None):
self.source.setup_source(db_interface=db_interface, user=user)
return super(UllekhanamJsonObject, self).update_collection(db_interface=db_interface, user=user)
[docs] def validate_deletion_ignoring_targetters(self, db_interface, user=None):
super(UllekhanamJsonObject, self).validate_deletion(db_interface=db_interface, user=user)
if user is not None:
self.source.id = user.get_first_user_id_or_none()
self.detect_illegal_takeover(db_interface=db_interface, user=user)
[docs] def validate_deletion(self, db_interface, user=None):
# Not calling: super(UllekhanamJsonObject, self).validate_deletion(db_interface=db_interface, user=user) as it's called inside the below.
self.validate_deletion_ignoring_targetters(db_interface=db_interface, user=user)
targetting_entities = self.get_targetting_entities(db_interface=db_interface)
if len(targetting_entities) > 0:
raise ValidationError("Unsafe deletion of %s: %d entities refer to this entity. Delete them first" % (self._id, len(targetting_entities)))
[docs] @classmethod
def get_allowed_target_classes(cls):
return []
[docs] def validate_targets(self, db_interface):
allowed_types = self.get_allowed_target_classes()
targets_to_check = self.targets if hasattr(self, "targets") else []
Target.check_target_classes(targets_to_check=targets_to_check, db_interface=db_interface, allowed_types=allowed_types, targeting_obj=self)
[docs] def validate(self, db_interface=None, user=None):
super(UllekhanamJsonObject, self).validate(db_interface=db_interface, user=user)
self.validate_targets(db_interface=db_interface)
self.source.validate(db_interface=db_interface, user=user)
self.detect_illegal_takeover(db_interface=db_interface, user=user)
# noinspection PyTypeHints
[docs] def get_targetting_entities(self, db_interface, entity_type=None):
"""
:type entity_type: str
"""
# Alas, the below shows that no index is used:
# curl -sg vedavaapi.org:5984/vedavaapi_ullekhanam_db/_explain -H content-type:application/json -d '{"selector": {"targets": {"$elemMatch": {"container_id": "4b9f454f5aa5414e82506525d015ac68"}}}}'|jq
# TODO: Use index.
find_filter = {
"targets": {
"$elemMatch": {
"container_id": str(self._id)
}
}
}
targetting_objs = [JsonObject.make_from_dict(item) for item in db_interface.find(find_filter)]
if entity_type is not None:
targetting_objs = list(filter(lambda obj: isinstance(obj, json_class_index[entity_type]), targetting_objs))
return targetting_objs
[docs] @classmethod
def add_indexes(cls, db_interface):
super(UllekhanamJsonObject, cls).add_indexes(db_interface=db_interface)
db_interface.add_index(keys_dict={
"targets.container_id": 1
}, index_name="targets_container_id")
# noinspection PyProtectedMember,PyAttributeOutsideInit,PyAttributeOutsideInit,PyTypeChecker
[docs]class JsonObjectNode(JsonObject):
"""Represents a tree (not a general Directed Acyclic Graph) of UllekhanamJsonObject.
`A video describing its use <https://youtu.be/neVeKcxzeQI>`_.
"""
schema = recursively_merge_json_schemas(
JsonObject.schema, {
"id": "JsonObjectNode",
"properties": {
TYPE_FIELD: {
"enum": ["JsonObjectNode"]
},
"content": JsonObject.schema,
"children": {
"type": "array",
"items": {
'type': 'object',
'$ref': "JsonObjectNode"
}
}
},
"required": [TYPE_FIELD]
}
)
[docs] def setup_source(self, source):
assert self.content is not None
self.content.source = source
for child in self.children:
child.setup_source(source=source)
[docs] def validate_children_types(self):
"""Recursively valdiate target-types."""
for child in self.children:
if not check_class(self.content, child.content.get_allowed_target_classes()):
raise TargetValidationError(targeting_obj=child, allowed_types=child.content.get_allowed_target_classes(),
target_obj=self.content)
for child in self.children:
child.validate_children_types()
[docs] def validate(self, db_interface=None, user=None):
# Note that the below recursively validates ALL members (including content and children).
super(JsonObjectNode, self).validate(db_interface=db_interface, user=user)
self.validate_children_types()
[docs] @classmethod
def from_details(cls, content, children=None):
if children is None:
children = []
node = JsonObjectNode()
# logging.debug(content)
# Strangely, without the backend.data_containers, the below test failed on 20170501
node.content = content
# logging.debug(check_list_item_types(children, [JsonObjectNode]))
node.children = children
node.validate(db_interface=None)
return node
[docs] def update_collection(self, db_interface, user=None):
"""Special info: Mutates this object."""
# But we don't call self.validate() as child.content.targets (required of Annotations) mayn't be set.
self.validate_children_types()
# The content is validated within the below call.
self.content = self.content.update_collection(db_interface=db_interface, user=user)
for child in self.children:
# Initialize the target array if it does not already exist.
if (not hasattr(child.content, "targets")) or child.content.targets is None or len(child.content.targets) == 0:
child.content.targets = [child.content.target_class()]
assert len(child.content.targets) == 1
child.content.targets[0].container_id = str(self.content._id)
child.update_collection(db_interface=db_interface, user=user)
[docs] def affected_user_ids(self):
if not hasattr(self, "content"):
raise ValidationError("This is a node with no content! Not allowed.")
user_ids = []
if hasattr(self.content.source, "id"):
user_ids = [self.content.source.id]
for child in self.children:
user_ids = user_ids + child.affected_user_ids()
return user_ids
[docs] def validate_deletion(self, db_interface, user=None):
# Deliberately not calling super.validate_deletion - the node does not exist in the database.
if not hasattr(self, "content"):
raise ValidationError("This is a node with no content! Not allowed.")
self.content.validate_deletion_ignoring_targetters(db_interface=db_interface, user=user)
for child in self.children:
child.validate_deletion(db_interface=db_interface, user=user)
self.content = JsonObject.from_id(id = self.content._id, db_interface=db_interface)
affected_users = self.affected_user_ids()
# logging.debug(affected_users)
if len(set(affected_users)) > 2 and not user.is_admin(service=db_interface.db_name_frontend):
raise ValidationError("This deletion affects more than 2 other users. Only admins can do that.")
[docs] def delete_in_collection(self, db_interface, user=None):
self.validate_deletion(db_interface=db_interface, user=user)
self.fill_descendents(db_interface=db_interface, depth=100)
for child in self.children:
child.delete_in_collection(db_interface=db_interface, user=user)
# Delete or disconnect children before deleting oneself.
self.content.delete_in_collection(db_interface=db_interface, user=user)
[docs] def fill_descendents(self, db_interface, depth=10, entity_type=None):
targetting_objs = self.content.get_targetting_entities(db_interface=db_interface, entity_type=entity_type)
self.children = []
if depth > 0:
for targetting_obj in targetting_objs:
child = JsonObjectNode.from_details(content=targetting_obj)
child.fill_descendents(db_interface=db_interface, depth=depth - 1, entity_type=entity_type)
self.children.append(child)
[docs] def recursively_delete_attr(self, field_name):
"""Rarely useful method: example when the schema of a Class changes to omit a field.
Limitation: Only useful with direct members.
"""
if hasattr(self.content, field_name):
delattr(self.content, field_name)
for child in self.children:
child.recursively_delete_attr(field_name)
[docs]class ScriptRendering(JsonObject):
schema = recursively_merge_json_schemas(JsonObject.schema, ({
"type": "object",
"properties": {
TYPE_FIELD: {
"enum": ["ScriptRendering"]
},
"text": {
"type": "string",
},
"encoding_scheme": {
"type": "string",
},
},
"required": ["text"]
}))
[docs] @classmethod
def from_details(cls, text, encoding_scheme=None):
obj = ScriptRendering()
obj.text = text
if encoding_scheme is not None:
obj.encoding_scheme = encoding_scheme
obj.validate()
return obj
[docs]class Text(JsonObject):
schema = recursively_merge_json_schemas(JsonObject.schema, ({
"type": "object",
"properties": {
TYPE_FIELD: {
"enum": ["Text"]
},
"script_renderings": {
"type": "array",
"minItems": 1,
"items": ScriptRendering.schema
},
"language_code": {
"type": "string",
},
"search_strings": {
"type": "array",
"items": {
"type": "string"
},
"description": "Search strings which should match this text. "
"It could be derived from script_renderings - "
"by a simple copy (intended for use with a text index) "
"or some intelligent tokenization thereof."
},
}
}))
[docs] @classmethod
def from_details(cls, script_renderings, language_code=None):
obj = Text()
obj.script_renderings = script_renderings
if language_code is not None:
obj.language_code = language_code
return obj
[docs] @classmethod
def from_text_string(cls, text_string, language_code=None, encoding_scheme=None):
obj = Text()
obj.script_renderings = [ScriptRendering.from_details(text=text_string, encoding_scheme=encoding_scheme)]
if language_code is not None:
obj.language_code = language_code
return obj
[docs]class NamedEntity(JsonObject):
"""The same name written in different languages have different spellings - oft due to differing case endings and conventions: kAlidAsaH vs Kalidasa. Hence this container."""
schema = recursively_merge_json_schemas(JsonObject.schema, ({
"type": "object",
"properties": {
TYPE_FIELD: {
"enum": ["NamedEntity"]
},
"names": {
"type": "array",
"items": Text.schema,
"minItems": 1
}
}
}))
[docs] @classmethod
def from_details(cls, names):
obj = NamedEntity()
obj.names = names
return obj
[docs] @classmethod
def from_name_string(cls, name, language_code=None, encoding_scheme=None):
obj = NamedEntity()
obj.names = [Text.from_text_string(text_string=name, language_code=language_code, encoding_scheme=encoding_scheme)]
return obj
[docs]def get_schemas(module_in):
import inspect
schemas = {}
for name, obj in inspect.getmembers(module_in):
if inspect.isclass(obj) and hasattr(obj, "schema"):
schemas[name] = obj.schema
return schemas
# Essential for depickling to work.
update_json_class_index(sys.modules[__name__])
logging.debug(json_class_index)