目标 复杂聚合查询
一个MongoDB(4.0+)聚合查询Shell例子
包含分组、连接、转化、判断
$group $project $lookup $unwind $cond pipeline
示例
db.getCollection("P_DaaSProcessNodeInstance").aggregate(
[
{
"$match": {
"pNodeType": {
"$in": [
"sqlNode",
"sqlToTempNode",
"webApiNode"
]
}
}
},
{
"$group": {
"_id": {
"id": "$processId",
"transactionId": "$transactionId",
"pNodeType": "$pNodeType"
},
"totalReadCount": {
"$sum": "$totalReadCount"
},
"startTime": {
"$min": "$startTime"
}
}
},
{
"$project": {
"id": "$_id.id",
"pNodeType": "$_id.pNodeType",
"transactionId": "$_id.transactionId",
"totalReadCount": "$totalReadCount",
"startTime": "$startTime",
"_id": NumberInt(0)
}
},
{
"$sort": {
"id": NumberInt(1),
"startTime": NumberInt(-1)
}
},
{
"$group": {
"_id": {
"processId": "$id"
},
"firstDoc": {
"$first": "$$ROOT"
}
}
},
{
"$lookup": {
"from": "P_DaaSProcessModelConfig",
"localField": "firstDoc.id",
"foreignField": "id",
"as": "lc"
}
},
{
"$unwind": {
"path": "$lc",
"preserveNullAndEmptyArrays": false
}
},
{
"$lookup": {
"from": "P_DaaSProcessNodeConfig",
"let": {
"processId": "$firstDoc.id"
},
"pipeline": [
{
"$match": {
"$expr": {
"$and": [
{
"$eq": [
"$processId",
"$$processId"
]
},
{
"$eq": [
"$incrementFlag",
"1"
]
}
]
}
}
}
],
"as": "jd"
}
},
{
"$lookup": {
"from": "P_DaaSProcessNodeInstance",
"let": {
"transactionId": "$firstDoc.transactionId"
},
"pipeline": [
{
"$match": {
"$expr": {
"$and": [
{
"$eq": [
"$transactionId",
"$$transactionId"
]
},
{
"$eq": [
"$pNodeType",
"process"
]
}
]
}
}
}
],
"as": "lcsl"
}
},
{
"$unwind": {
"path": "$lcsl",
"preserveNullAndEmptyArrays": false
}
},
{
"$lookup": {
"from": "P_DaaSSchedulerRuleConfig",
"localField": "lc.expression",
"foreignField": "ruleId",
"as": "dd"
}
},
{
"$unwind": {
"path": "$dd",
"preserveNullAndEmptyArrays": false
}
},
{
"$project": {
"_id": NumberInt(0),
"advice": {
"$cond": {
"if": {
"$and": [
{
"$eq": [
"$firstDoc.pNodeType",
"sqlNode"
]
},
{
"$gte": [
"$firstDoc.totalReadCount",
NumberInt(200000)
]
}
]
},
"then": "做时间戳增量读取或使用大数据节点降内存(内存读取超20w)",
"else": {
"$cond": {
"if": {
"$and": [
{
"$eq": [
"$firstDoc.pNodeType",
"sqlToTempNode"
]
},
{
"$gte": [
"$firstDoc.totalReadCount",
NumberInt(1000000)
]
}
]
},
"then": "做时间戳增量读取提升效率(大数据读取超100w)",
"else": ""
}
}
}
},
"processName": "$lc.configName",
"totalReadCount": "$firstDoc.totalReadCount",
"faildTransCount": "$lcsl.faildTransCount",
"hasIncr": {
"$cond": {
"if": {
"$eq": [
{
"$size": "$jd"
},
NumberInt(1)
]
},
"then": "已做增量",
"else": "未做增量"
}
},
"processType": {
"$cond": {
"if": {
"$eq": [
"$firstDoc.pNodeType",
"sqlNode"
]
},
"then": "普通节点",
"else": {
"$cond": {
"if": {
"$eq": [
"$firstDoc.pNodeType",
"sqlToTempNode"
]
},
"then": "大数据节点",
"else": "API接口"
}
}
}
},
"resultCode": {
"$cond": {
"if": {
"$eq": [
"$lcsl.resultCode",
NumberInt(1)
]
},
"then": "正常",
"else": {
"$cond": {
"if": {
"$eq": [
"$lcsl.resultCode",
NumberInt(0)
]
},
"then": "失败",
"else": "异常"
}
}
}
},
"startTime": "$firstDoc.startTime",
"endTime": "$lcsl.endTime",
"runTime": { "$convert": { "input": "$lcsl.totalRunTime", "to": "double", "onError": 0.0, "onNull": 0.0 } },
"cronDes": "$dd.ruleName",
"cronId": "$dd.ruleId",
"runServerId": "$lc.runServerId",
"processId": "$firstDoc.id",
"transactionId": "$firstDoc.transactionId"
}
},
{
"$sort": {
"totalReadCount": NumberInt(-1),
"startTime": NumberInt(-1)
}
}
],
{
"allowDiskUse": true
}
);
评论区