From 37f8ab8f7c512116a5004a638bcaee92f9941bfc Mon Sep 17 00:00:00 2001 From: =?utf8?q?Jean-Christophe=20Beaupr=C3=A9?= Date: Sat, 20 Dec 2014 18:00:35 -0500 Subject: [PATCH] neo4j/graph: Add Neo4j as a storage mechanism. MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jean-Christophe Beaupré --- lib/neo4j/graph/graph.nit | 97 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 97 insertions(+) diff --git a/lib/neo4j/graph/graph.nit b/lib/neo4j/graph/graph.nit index db43a1f..39063f2 100644 --- a/lib/neo4j/graph/graph.nit +++ b/lib/neo4j/graph/graph.nit @@ -179,3 +179,100 @@ abstract class GraphStore fun save_part(nodes: Collection[NeoNode], edges: Collection[NeoEdge]) is abstract end + +# Save or load a graph using an actual Neo4j database. +class Neo4jGraphStore + super GraphStore + + # The maximum number of entities saved in one request. + # + # Also defines the granulity of the reported progression. + # + # TODO Also honor this limit in `load`. + var batch_max_size = 512 is writable + + # The Neo4j client to use. + var client: Neo4jClient + + # The label to use to retrieve the nodes. + var node_label: String + + private var done_part = 0 + private var total = 0 + + # Is the database already contains at least one node with the specified label? + fun has_node_label(name: String): Bool do + var query = new CypherQuery.from_string( + "match n where \{name\} in labels(n) return count(n)") + query.params["name"] = name + var data = client.cypher(query).as(JsonObject)["data"] + var result = data.as(JsonArray).first.as(JsonArray).first.as(Int) + return result > 0 + end + + redef fun isolated_save do return not has_node_label(node_label) + + redef fun load do + assert batch_max_size > 0 + fire_started + var db_nodes = client.nodes_with_label(node_label) + var nodes = graph.nodes + var edges = graph.edges + var i = 0 + + total = nodes.length * 2 + done_part = nodes.length + fire_progressed(done_part, total) + for node in db_nodes do + nodes.add(node) + edges.add_all(node.out_edges) + i += 1 + if i >= batch_max_size then + done_part += batch_max_size + fire_progressed(done_part, total) + end + end + fire_done + end + + redef fun save_part(nodes, edges) do + assert batch_max_size > 0 + fire_started + total = nodes.length + edges.length + done_part = 0 + + save_entities(nodes) + save_entities(edges) + fire_done + end + + # Save the specified entities. + private fun save_entities(neo_entities: Collection[NeoEntity]) do + var batch = new NeoBatch(client) + var batch_length = 0 + + for nentity in neo_entities do + batch.save_entity(nentity) + batch_length += 1 + if batch_length >= batch_max_size then + do_batch(batch) + done_part += batch_max_size + fire_progressed(done_part, total) + batch = new NeoBatch(client) + batch_length = 0 + end + end + do_batch(batch) + done_part += batch_length + end + + # Execute `batch` and check for errors. + # + # Abort if `batch.execute` returns errors. + private fun do_batch(batch: NeoBatch) do + var errors = batch.execute + assert errors.is_empty else + for e in errors do sys.stderr.write("{e}\n") + end + end +end -- 1.7.9.5