+ while cursor.is_ok do
+ res.add cursor.item
+ cursor.next
+ end
+ return res
+ end
+
+ # Applies an aggregation `pipeline` over the collection.
+ #
+ # ~~~
+ # var client = new MongoClient("mongodb://localhost:27017/")
+ # var col = client.database("test").collection("test_aggregate")
+ #
+ # col.drop
+ #
+ # col.insert("""{ "cust_id": "A123", "amount": 500, "status": "A"}""".parse_json.as(JsonObject))
+ # col.insert("""{ "cust_id": "A123", "amount": 250, "status": "A"}""".parse_json.as(JsonObject))
+ # col.insert("""{ "cust_id": "B212", "amount": 200, "status": "A"}""".parse_json.as(JsonObject))
+ # col.insert("""{ "cust_id": "A123", "amount": 300, "status": "D"}""".parse_json.as(JsonObject))
+ #
+ # var res = col.aggregate("""[
+ # { "$match": { "status": "A" } },
+ # { "$group": { "_id": "$cust_id", "total": { "$sum": "$amount" } } }
+ # ]""".parse_json.as(JsonArray))
+ #
+ # assert res[0].to_json == """{"_id":"B212","total":200}"""
+ # assert res[1].to_json == """{"_id":"A123","total":750}"""
+ # ~~~
+ fun aggregate(pipeline: JsonArray): Array[JsonObject] do
+ var q = new JsonObject
+ q["pipeline"] = pipeline
+ var res = new Array[JsonObject]
+ var c = native.aggregate(q.to_bson.native)
+ if c == null then return res
+ var cursor = new MongoCursor(c)
+ while cursor.is_ok do
+ res.add cursor.item
+ cursor.next
+ end