neo4j :: NeoBatch :: defaultinit
# Batches are used to perform multiple operations on the REST API in one cURL request.
# This can significantly improve performance for large insert and update operations.
#
# see: http://docs.neo4j.org/chunked/milestone/rest-api-batch-ops.html
#
# This service is transactional.
# If any of the operations performed fails (returns a non-2xx HTTP status code),
# the transaction will be rolled back and all changes will be undone.
#
# Example:
#
# var client = new Neo4jClient("http://neo4j:7474")
#
# var node1 = new NeoNode
# var node2 = new NeoNode
# var edge = new NeoEdge(node1, "TO", node2)
#
# var batch = new NeoBatch(client)
# batch.save_node(node1)
# batch.save_node(node2)
# batch.save_edge(edge)
# batch.execute
#
# assert node1.is_linked
# assert node2.is_linked
# assert edge.is_linked
class NeoBatch
# Neo4j client connector
var client: Neo4jClient
# Jobs to perform in this batch
#
# The batch service expects an array of job descriptions as input,
# each job description describing an action to be performed via the normal server API.
var jobs = new HashMap[Int, NeoJob]
# Append a new job to the batch in JSON Format
# see `NeoJob`
fun new_job(nentity: NeoEntity): NeoJob do
var id = jobs.length
var job = new NeoJob(id, nentity)
jobs[id] = job
return job
end
# Load a node in batch mode also load labels, data and edges
fun load_node(node: NeoNode) do
var job = new_job(node)
job.action = load_node_data_action
job.method = "GET"
if node.id != null then
job.to = "/node/{node.id.to_s}"
else
job.to = "\{{node.batch_id.to_s}\}"
end
job = new_job(node)
job.action = load_node_labels_action
job.method = "GET"
if node.id != null then
job.to = "/node/{node.id.to_s}/labels"
else
job.to = "\{{node.batch_id.to_s}\}/labels"
end
end
# Load in and out edges into node
fun load_node_edges(node: NeoNode) do
var job = new_job(node)
job.action = load_node_in_edges_action
job.method = "GET"
if node.id != null then
job.to = "/node/{node.id.to_s}/relationships/in"
else
job.to = "\{{node.batch_id.to_s}\}/relationships/in"
end
job = new_job(node)
job.action = load_node_out_edges_action
job.method = "GET"
if node.id != null then
job.to = "/node/{node.id.to_s}/relationships/out"
else
job.to = "\{{node.batch_id.to_s}\}/relationships/out"
end
end
# Create a `NeoNode` or a `NeoEdge` in batch mode.
fun save_entity(nentity: NeoEntity) do
if nentity isa NeoNode then
save_node(nentity)
else if nentity isa NeoEdge then
save_edge(nentity)
else abort
end
# Create a node in batch mode also create labels and edges
fun save_node(node: NeoNode) do
if node.id != null or node.batch_id != null then return
# create node
var job = new_job(node)
node.batch_id = job.id
job.action = create_node_action
job.method = "POST"
job.to = "/node"
job.body = node.properties
# add labels
job = new_job(node)
job.method = "POST"
job.to = "\{{node.batch_id.to_s}\}/labels"
job.body = new JsonArray.from(node.labels)
# add edges
#save_edges(node.out_edges)
end
# Create multiple nodes
# also create labels and edges
fun save_nodes(nodes: Collection[NeoNode]) do for node in nodes do save_node(node)
# Create an edge
# nodes `edge.from` and `edge.to` will be created if not in base
fun save_edge(edge: NeoEdge) do
if edge.id != null or edge.batch_id != null then return
# create nodes
save_node(edge.from)
save_node(edge.to)
# create edge
var job = new_job(edge)
edge.batch_id = job.id
job.action = create_edge_action
job.method = "POST"
if edge.from.id != null then
job.to = "/node/{edge.from.id.to_s}/relationships"
else
job.to = "\{{edge.from.batch_id.to_s}\}/relationships"
end
job.body = edge.to_rest
end
# Create multiple edges
fun save_edges(edges: Collection[NeoEdge]) do for edge in edges do save_edge(edge)
# Execute the batch and update local nodes
fun execute: List[NeoError] do
var request = new JsonPOST(client.batch_url)
# request.headers["X-Stream"] = "true"
var json_jobs = new JsonArray
for job in jobs.values do json_jobs.add job.to_rest
request.json_data = json_jobs
var response = request.execute
var res = client.parse_response(response)
return finalize_batch(res)
end
# Associate data from response in original nodes and edges
private fun finalize_batch(response: Serializable): List[NeoError] do
var errors = new List[NeoError]
if not response isa JsonArray then
errors.add(new NeoError("Unexpected batch response format.", "Neo4jError"))
return errors
end
# print " {res.length} jobs executed"
for res in response do
if not res isa JsonObject then
errors.add(new NeoError("Unexpected job format in batch response.", "Neo4jError"))
continue
end
var id = res["id"].as(Int)
var job = jobs[id]
if job.action == create_node_action then
var node = job.entity.as(NeoNode)
node.batch_id = null
node.url = res["location"].to_s
else if job.action == create_edge_action then
var edge = job.entity.as(NeoEdge)
edge.batch_id = null
edge.url = res["location"].to_s
else if job.action == load_node_data_action then
var node = job.entity.as(NeoNode)
node.internal_properties = res["body"].as(JsonObject)["data"].as(JsonObject)
else if job.action == load_node_labels_action then
var node = job.entity.as(NeoNode)
var labels = new Array[String]
for l in res["body"].as(JsonArray) do labels.add l.to_s
node.internal_labels = labels
else if job.action == load_node_in_edges_action then
var node = job.entity.as(NeoNode)
var edges = res["body"].as(JsonArray)
node.internal_in_edges = new List[NeoEdge]
for edge in edges do
node.internal_in_edges.add client.load_edge(edge.as(JsonObject)["self"].to_s)
end
else if job.action == load_node_out_edges_action then
var node = job.entity.as(NeoNode)
var edges = res["body"].as(JsonArray)
node.internal_out_edges = new List[NeoEdge]
for edge in edges do
node.internal_out_edges.add client.load_edge(edge.as(JsonObject)["self"].to_s)
end
end
end
return errors
end
# JobActions
# TODO replace with enum
private fun create_node_action: Int do return 1
private fun create_edge_action: Int do return 2
private fun load_node_data_action: Int do return 3
private fun load_node_labels_action: Int do return 4
private fun load_node_in_edges_action: Int do return 5
private fun load_node_out_edges_action: Int do return 6
end
lib/neo4j/neo4j.nit:760,1--972,3