neo4j/graph: Add Neo4j as a storage mechanism.
authorJean-Christophe Beaupré <jcbrinfo@users.noreply.github.com>
Sat, 20 Dec 2014 23:00:35 +0000 (18:00 -0500)
committerJean-Christophe Beaupré <jcbrinfo@users.noreply.github.com>
Mon, 29 Dec 2014 20:51:24 +0000 (15:51 -0500)
Signed-off-by: Jean-Christophe Beaupré <jcbrinfo@users.noreply.github.com>

lib/neo4j/graph/graph.nit

index db43a1f..39063f2 100644 (file)
@@ -179,3 +179,100 @@ abstract class GraphStore
        fun save_part(nodes: Collection[NeoNode],
                        edges: Collection[NeoEdge]) is abstract
 end
+
+# Save or load a graph using an actual Neo4j database.
+class Neo4jGraphStore
+       super GraphStore
+
+       # The maximum number of entities saved in one request.
+       #
+       # Also defines the granulity of the reported progression.
+       #
+       # TODO Also honor this limit in `load`.
+       var batch_max_size = 512 is writable
+
+       # The Neo4j client to use.
+       var client: Neo4jClient
+
+       # The label to use to retrieve the nodes.
+       var node_label: String
+
+       private var done_part = 0
+       private var total = 0
+
+       # Is the database already contains at least one node with the specified label?
+       fun has_node_label(name: String): Bool do
+               var query = new CypherQuery.from_string(
+                               "match n where \{name\} in labels(n) return count(n)")
+               query.params["name"] = name
+               var data = client.cypher(query).as(JsonObject)["data"]
+               var result = data.as(JsonArray).first.as(JsonArray).first.as(Int)
+               return result > 0
+       end
+
+       redef fun isolated_save do return not has_node_label(node_label)
+
+       redef fun load do
+               assert batch_max_size > 0
+               fire_started
+               var db_nodes = client.nodes_with_label(node_label)
+               var nodes = graph.nodes
+               var edges = graph.edges
+               var i = 0
+
+               total = nodes.length * 2
+               done_part = nodes.length
+               fire_progressed(done_part, total)
+               for node in db_nodes do
+                       nodes.add(node)
+                       edges.add_all(node.out_edges)
+                       i += 1
+                       if i >= batch_max_size then
+                               done_part += batch_max_size
+                               fire_progressed(done_part, total)
+                       end
+               end
+               fire_done
+       end
+
+       redef fun save_part(nodes, edges) do
+               assert batch_max_size > 0
+               fire_started
+               total = nodes.length + edges.length
+               done_part = 0
+
+               save_entities(nodes)
+               save_entities(edges)
+               fire_done
+       end
+
+       # Save the specified entities.
+       private fun save_entities(neo_entities: Collection[NeoEntity]) do
+               var batch = new NeoBatch(client)
+               var batch_length = 0
+
+               for nentity in neo_entities do
+                       batch.save_entity(nentity)
+                       batch_length += 1
+                       if batch_length >= batch_max_size then
+                               do_batch(batch)
+                               done_part += batch_max_size
+                               fire_progressed(done_part, total)
+                               batch = new NeoBatch(client)
+                               batch_length = 0
+                       end
+               end
+               do_batch(batch)
+               done_part += batch_length
+       end
+
+       # Execute `batch` and check for errors.
+       #
+       # Abort if `batch.execute` returns errors.
+       private fun do_batch(batch: NeoBatch) do
+               var errors = batch.execute
+               assert errors.is_empty else
+                       for e in errors do sys.stderr.write("{e}\n")
+               end
+       end
+end