aggregate.js 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. import { Db } from './index';
  2. import { EJSON } from 'bson';
  3. import { QuerySerializer } from './serializer/query';
  4. import { stringifyByEJSON } from './utils/utils';
  5. import { getType } from './utils/type';
  6. import { Validate } from './validate';
  7. import { Point } from './geo/point';
  8. export default class Aggregation {
  9. constructor(db, collectionName, rawPipeline) {
  10. this._stages = [];
  11. if (db && collectionName) {
  12. this._db = db;
  13. this._request = new Db.reqClass(this._db.config);
  14. this._collectionName = collectionName;
  15. if (rawPipeline && rawPipeline.length > 0) {
  16. rawPipeline.forEach((stage) => {
  17. Validate.isValidAggregation(stage);
  18. const stageName = Object.keys(stage)[0];
  19. this._pipe(stageName, stage[stageName], true);
  20. });
  21. }
  22. }
  23. }
  24. async end() {
  25. if (!this._collectionName || !this._db) {
  26. throw new Error('Aggregation pipeline cannot send request');
  27. }
  28. const result = await this._request.send('database.aggregateDocuments', {
  29. collectionName: this._collectionName,
  30. stages: this._stages
  31. });
  32. if (result && result.data && result.data.list) {
  33. return {
  34. requestId: result.requestId,
  35. data: result.data.list.map(EJSON.parse)
  36. };
  37. }
  38. return result;
  39. }
  40. unwrap() {
  41. return this._stages;
  42. }
  43. done() {
  44. return this._stages.map(({ stageKey, stageValue }) => {
  45. return {
  46. [stageKey]: JSON.parse(stageValue)
  47. };
  48. });
  49. }
  50. _pipe(stage, param, raw = false) {
  51. let transformParam = '';
  52. if (getType(param) === 'object') {
  53. transformParam = stringifyByEJSON(param);
  54. }
  55. else {
  56. transformParam = JSON.stringify(param);
  57. }
  58. this._stages.push({
  59. stageKey: raw ? stage : `$${stage}`,
  60. stageValue: transformParam
  61. });
  62. return this;
  63. }
  64. addFields(param) {
  65. return this._pipe('addFields', param);
  66. }
  67. bucket(param) {
  68. return this._pipe('bucket', param);
  69. }
  70. bucketAuto(param) {
  71. return this._pipe('bucketAuto', param);
  72. }
  73. count(param) {
  74. return this._pipe('count', param);
  75. }
  76. geoNear(param) {
  77. if (param.query) {
  78. param.query = QuerySerializer.encode(param.query);
  79. }
  80. if (param.distanceMultiplier && typeof (param.distanceMultiplier) === 'number') {
  81. param.distanceMultiplier = param.distanceMultiplier;
  82. }
  83. if (param.near) {
  84. param.near = new Point(param.near.longitude, param.near.latitude).toJSON();
  85. }
  86. return this._pipe('geoNear', param);
  87. }
  88. group(param) {
  89. return this._pipe('group', param);
  90. }
  91. limit(param) {
  92. return this._pipe('limit', param);
  93. }
  94. match(param) {
  95. return this._pipe('match', QuerySerializer.encode(param));
  96. }
  97. project(param) {
  98. return this._pipe('project', param);
  99. }
  100. lookup(param) {
  101. return this._pipe('lookup', param);
  102. }
  103. replaceRoot(param) {
  104. return this._pipe('replaceRoot', param);
  105. }
  106. sample(param) {
  107. return this._pipe('sample', param);
  108. }
  109. skip(param) {
  110. return this._pipe('skip', param);
  111. }
  112. sort(param) {
  113. return this._pipe('sort', param);
  114. }
  115. sortByCount(param) {
  116. return this._pipe('sortByCount', param);
  117. }
  118. unwind(param) {
  119. return this._pipe('unwind', param);
  120. }
  121. }